Terraria v1.4.4.9
Terraria source code documentation
Loading...
Searching...
No Matches
StreamBuffer.cs
Go to the documentation of this file.
1using System.Net;
6
7namespace System.IO;
8
9internal sealed class StreamBuffer : IDisposable
10{
12 {
14
16
17 private int _hasWaiter;
18
20 {
21 return _waitSource.GetStatus(token);
22 }
23
24 void IValueTaskSource.OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags)
25 {
26 _waitSource.OnCompleted(continuation, state, token, flags);
27 }
28
35
36 public void SignalWaiter()
37 {
38 if (Interlocked.Exchange(ref _hasWaiter, 0) == 1)
39 {
40 _waitSource.SetResult(result: true);
41 }
42 }
43
51
52 public void Reset()
53 {
54 if (_hasWaiter != 0)
55 {
56 throw new InvalidOperationException("Concurrent use is not supported");
57 }
60 }
61
62 public void Wait()
63 {
64 _waitSource.RunContinuationsAsynchronously = false;
65 new ValueTask(this, _waitSource.Version).AsTask().GetAwaiter().GetResult();
66 }
67
69 {
70 _waitSource.RunContinuationsAsynchronously = true;
71 _waitSourceCancellation = cancellationToken.UnsafeRegister(delegate(object s, CancellationToken token)
72 {
73 ((ResettableValueTaskSource)s).CancelWaiter(token);
74 }, this);
75 return new ValueTask(this, _waitSource.Version);
76 }
77 }
78
80
81 private readonly int _maxBufferSize;
82
83 private bool _writeEnded;
84
85 private bool _readAborted;
86
88
90
91 private object SyncObject => _readTaskSource;
92
93 public bool IsComplete
94 {
95 get
96 {
97 lock (SyncObject)
98 {
99 return _writeEnded && _buffer.IsEmpty;
100 }
101 }
102 }
103
104 public StreamBuffer(int initialBufferSize = 4096, int maxBufferSize = 32768)
105 {
106 _buffer = new MultiArrayBuffer(initialBufferSize);
107 _maxBufferSize = maxBufferSize;
110 }
111
112 private (bool wait, int bytesWritten) TryWriteToBuffer(ReadOnlySpan<byte> buffer)
113 {
114 lock (SyncObject)
115 {
116 if (_writeEnded)
117 {
118 throw new InvalidOperationException();
119 }
120 if (_readAborted)
121 {
122 return (wait: false, bytesWritten: buffer.Length);
123 }
125 int num = Math.Min(buffer.Length, _buffer.AvailableMemory.Length);
126 if (num > 0)
127 {
128 _buffer.AvailableMemory.CopyFrom(buffer.Slice(0, num));
129 _buffer.Commit(num);
131 }
132 buffer = buffer.Slice(num);
133 if (buffer.Length == 0)
134 {
135 return (wait: false, bytesWritten: num);
136 }
138 return (wait: true, bytesWritten: num);
139 }
140 }
141
143 {
144 if (buffer.Length == 0)
145 {
146 return;
147 }
148 while (true)
149 {
150 var (flag, start) = TryWriteToBuffer(buffer);
151 if (flag)
152 {
153 buffer = buffer.Slice(start);
155 continue;
156 }
157 break;
158 }
159 }
160
162 {
163 cancellationToken.ThrowIfCancellationRequested();
164 if (buffer.Length == 0)
165 {
166 return;
167 }
168 while (true)
169 {
170 var (flag, start) = TryWriteToBuffer(buffer.Span);
171 if (flag)
172 {
173 buffer = buffer.Slice(start);
174 await _writeTaskSource.WaitAsync(cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
175 continue;
176 }
177 break;
178 }
179 }
180
181 public void EndWrite()
182 {
183 lock (SyncObject)
184 {
185 if (!_writeEnded)
186 {
187 _writeEnded = true;
189 }
190 }
191 }
192
193 private (bool wait, int bytesRead) TryReadFromBuffer(Span<byte> buffer)
194 {
195 lock (SyncObject)
196 {
197 if (_readAborted)
198 {
199 return (wait: false, bytesRead: 0);
200 }
201 if (!_buffer.IsEmpty)
202 {
203 int num = Math.Min(buffer.Length, _buffer.ActiveMemory.Length);
204 _buffer.ActiveMemory.Slice(0, num).CopyTo(buffer);
205 _buffer.Discard(num);
207 return (wait: false, bytesRead: num);
208 }
209 if (_writeEnded)
210 {
211 return (wait: false, bytesRead: 0);
212 }
214 return (wait: true, bytesRead: 0);
215 }
216 }
217
219 {
220 if (buffer.Length == 0)
221 {
222 return 0;
223 }
224 int result;
225 bool flag;
226 (flag, result) = TryReadFromBuffer(buffer);
227 if (flag)
228 {
230 (flag, result) = TryReadFromBuffer(buffer);
231 }
232 return result;
233 }
234
236 {
237 cancellationToken.ThrowIfCancellationRequested();
238 if (buffer.Length == 0)
239 {
240 return 0;
241 }
242 var (flag, result) = TryReadFromBuffer(buffer.Span);
243 if (flag)
244 {
245 await _readTaskSource.WaitAsync(cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
246 (bool wait, int bytesRead) tuple2 = TryReadFromBuffer(buffer.Span);
247 _ = tuple2.wait;
248 result = tuple2.bytesRead;
249 }
250 return result;
251 }
252
253 public void AbortRead()
254 {
255 lock (SyncObject)
256 {
257 if (!_readAborted)
258 {
259 _readAborted = true;
263 }
264 }
265 }
266
267 public void Dispose()
268 {
269 AbortRead();
270 EndWrite();
271 lock (SyncObject)
272 {
274 }
275 }
276}
ManualResetValueTaskSourceCore< bool > _waitSource
void CancelWaiter(CancellationToken cancellationToken)
ValueTask WaitAsync(CancellationToken cancellationToken)
CancellationTokenRegistration _waitSourceCancellation
MultiArrayBuffer _buffer
async ValueTask< int > ReadAsync(Memory< byte > buffer, CancellationToken cancellationToken=default(CancellationToken))
int Read(Span< byte > buffer)
bool int bytesRead TryReadFromBuffer(Span< byte > buffer)
void Write(ReadOnlySpan< byte > buffer)
async ValueTask WriteAsync(ReadOnlyMemory< byte > buffer, CancellationToken cancellationToken=default(CancellationToken))
readonly ResettableValueTaskSource _writeTaskSource
StreamBuffer(int initialBufferSize=4096, int maxBufferSize=32768)
readonly int _maxBufferSize
bool int bytesWritten TryWriteToBuffer(ReadOnlySpan< byte > buffer)
readonly ResettableValueTaskSource _readTaskSource
static byte Min(byte val1, byte val2)
Definition Math.cs:912
static int Exchange(ref int location1, int value)
new TaskAwaiter< TResult > GetAwaiter()
Definition Task.cs:221
static void Write(ref bool location, bool value)
Definition Volatile.cs:74
void OnCompleted(Action< object?> continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags)
ValueTaskSourceStatus GetStatus(short token)
void EnsureAvailableSpaceUpToLimit(int byteCount, int limit)
System.Net.MultiMemory ActiveMemory
System.Net.MultiMemory AvailableMemory
void Discard(int byteCount)
void OnCompleted(Action< object?> continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags)
ConfiguredValueTaskAwaitable ConfigureAwait(bool continueOnCapturedContext)
Definition ValueTask.cs:312