Terraria v1.4.4.9
Terraria source code documentation
Loading...
Searching...
No Matches
BoundedChannel.cs
Go to the documentation of this file.
5
7
8[DebuggerDisplay("Items={ItemsCountForDebugger}, Capacity={_bufferedCapacity}, Mode={_mode}, Closed={ChannelIsClosedForDebugger}")]
10internal sealed class BoundedChannel<T> : Channel<T>, IDebugEnumerable<T>
11{
12 [DebuggerDisplay("Items={ItemsCountForDebugger}")]
15 {
16 internal readonly BoundedChannel<T> _parent;
17
19
21
22 public override Task Completion => _parent._completion.Task;
23
24 public override bool CanCount => true;
25
26 public override bool CanPeek => true;
27
28 public override int Count
29 {
30 get
31 {
33 lock (parent.SyncObj)
34 {
35 return parent._items.Count;
36 }
37 }
38 }
39
40 private int ItemsCountForDebugger => _parent._items.Count;
41
43 {
44 _parent = parent;
45 _readerSingleton = new AsyncOperation<T>(parent._runContinuationsAsynchronously, default(CancellationToken), pooled: true);
46 _waiterSingleton = new AsyncOperation<bool>(parent._runContinuationsAsynchronously, default(CancellationToken), pooled: true);
47 }
48
49 public override bool TryRead([MaybeNullWhen(false)] out T item)
50 {
52 lock (parent.SyncObj)
53 {
54 if (!parent._items.IsEmpty)
55 {
57 return true;
58 }
59 }
60 item = default(T);
61 return false;
62 }
63
64 public override bool TryPeek([MaybeNullWhen(false)] out T item)
65 {
67 lock (parent.SyncObj)
68 {
69 if (!parent._items.IsEmpty)
70 {
71 item = parent._items.PeekHead();
72 return true;
73 }
74 }
75 item = default(T);
76 return false;
77 }
78
80 {
81 if (cancellationToken.IsCancellationRequested)
82 {
84 }
86 lock (parent.SyncObj)
87 {
88 if (!parent._items.IsEmpty)
89 {
91 }
92 if (parent._doneWriting != null)
93 {
94 return ChannelUtilities.GetInvalidCompletionValueTask<T>(parent._doneWriting);
95 }
96 if (!cancellationToken.CanBeCanceled)
97 {
99 if (readerSingleton.TryOwnAndReset())
100 {
101 parent._blockedReaders.EnqueueTail(readerSingleton);
102 return readerSingleton.ValueTaskOfT;
103 }
104 }
105 AsyncOperation<T> asyncOperation = new AsyncOperation<T>(parent._runContinuationsAsynchronously | cancellationToken.CanBeCanceled, cancellationToken);
106 parent._blockedReaders.EnqueueTail(asyncOperation);
107 return asyncOperation.ValueTaskOfT;
108 }
109 }
110
112 {
113 if (cancellationToken.IsCancellationRequested)
114 {
116 }
117 BoundedChannel<T> parent = _parent;
118 lock (parent.SyncObj)
119 {
120 if (!parent._items.IsEmpty)
121 {
122 return new ValueTask<bool>(result: true);
123 }
124 if (parent._doneWriting != null)
125 {
126 return (parent._doneWriting != ChannelUtilities.s_doneWritingSentinel) ? new ValueTask<bool>(Task.FromException<bool>(parent._doneWriting)) : default(ValueTask<bool>);
127 }
128 if (!cancellationToken.CanBeCanceled)
129 {
131 if (waiterSingleton.TryOwnAndReset())
132 {
133 ChannelUtilities.QueueWaiter(ref parent._waitingReadersTail, waiterSingleton);
134 return waiterSingleton.ValueTaskOfT;
135 }
136 }
137 AsyncOperation<bool> asyncOperation = new AsyncOperation<bool>(parent._runContinuationsAsynchronously | cancellationToken.CanBeCanceled, cancellationToken);
139 return asyncOperation.ValueTaskOfT;
140 }
141 }
142
144 {
145 BoundedChannel<T> parent = _parent;
146 T result = parent._items.DequeueHead();
147 if (parent._doneWriting != null)
148 {
149 if (parent._items.IsEmpty)
150 {
151 ChannelUtilities.Complete(parent._completion, parent._doneWriting);
152 }
153 }
154 else
155 {
156 while (!parent._blockedWriters.IsEmpty)
157 {
158 VoidAsyncOperationWithData<T> voidAsyncOperationWithData = parent._blockedWriters.DequeueHead();
159 if (voidAsyncOperationWithData.TrySetResult(default(VoidResult)))
160 {
161 parent._items.EnqueueTail(voidAsyncOperationWithData.Item);
162 return result;
163 }
164 }
165 ChannelUtilities.WakeUpWaiters(ref parent._waitingWritersTail, result: true);
166 }
167 return result;
168 }
169
174 }
175
176 [DebuggerDisplay("Items={ItemsCountForDebugger}, Capacity={CapacityForDebugger}")]
179 {
180 internal readonly BoundedChannel<T> _parent;
181
183
185
186 private int ItemsCountForDebugger => _parent._items.Count;
187
188 private int CapacityForDebugger => _parent._bufferedCapacity;
189
196
197 public override bool TryComplete(Exception error)
198 {
199 BoundedChannel<T> parent = _parent;
200 bool isEmpty;
201 lock (parent.SyncObj)
202 {
203 if (parent._doneWriting != null)
204 {
205 return false;
206 }
207 parent._doneWriting = error ?? ChannelUtilities.s_doneWritingSentinel;
208 isEmpty = parent._items.IsEmpty;
209 }
210 if (isEmpty)
211 {
212 ChannelUtilities.Complete(parent._completion, error);
213 }
216 ChannelUtilities.WakeUpWaiters(ref parent._waitingReadersTail, result: false, error);
217 ChannelUtilities.WakeUpWaiters(ref parent._waitingWritersTail, result: false, error);
218 return true;
219 }
220
221 public override bool TryWrite(T item)
222 {
225 BoundedChannel<T> parent = _parent;
226 bool lockTaken = false;
227 try
228 {
229 Monitor.Enter(parent.SyncObj, ref lockTaken);
230 if (parent._doneWriting != null)
231 {
232 return false;
233 }
234 int count = parent._items.Count;
235 if (count != 0)
236 {
237 if (count < parent._bufferedCapacity)
238 {
239 parent._items.EnqueueTail(item);
240 return true;
241 }
242 if (parent._mode == BoundedChannelFullMode.Wait)
243 {
244 return false;
245 }
246 if (parent._mode == BoundedChannelFullMode.DropWrite)
247 {
248 Monitor.Exit(parent.SyncObj);
249 lockTaken = false;
250 parent._itemDropped?.Invoke(item);
251 return true;
252 }
253 T obj = ((parent._mode == BoundedChannelFullMode.DropNewest) ? parent._items.DequeueTail() : parent._items.DequeueHead());
254 parent._items.EnqueueTail(item);
255 Monitor.Exit(parent.SyncObj);
256 lockTaken = false;
257 parent._itemDropped?.Invoke(obj);
258 return true;
259 }
260 while (!parent._blockedReaders.IsEmpty)
261 {
262 AsyncOperation<T> asyncOperation2 = parent._blockedReaders.DequeueHead();
263 if (asyncOperation2.UnregisterCancellation())
264 {
266 break;
267 }
268 }
269 if (asyncOperation == null)
270 {
271 parent._items.EnqueueTail(item);
272 listTail = parent._waitingReadersTail;
273 if (listTail == null)
274 {
275 return true;
276 }
277 parent._waitingReadersTail = null;
278 }
279 }
280 finally
281 {
282 if (lockTaken)
283 {
284 Monitor.Exit(parent.SyncObj);
285 }
286 }
287 if (asyncOperation != null)
288 {
289 bool flag = asyncOperation.TrySetResult(item);
290 }
291 else
292 {
294 }
295 return true;
296 }
297
299 {
300 if (cancellationToken.IsCancellationRequested)
301 {
303 }
304 BoundedChannel<T> parent = _parent;
305 lock (parent.SyncObj)
306 {
307 if (parent._doneWriting != null)
308 {
309 return (parent._doneWriting != ChannelUtilities.s_doneWritingSentinel) ? new ValueTask<bool>(Task.FromException<bool>(parent._doneWriting)) : default(ValueTask<bool>);
310 }
311 if (parent._items.Count < parent._bufferedCapacity || parent._mode != 0)
312 {
313 return new ValueTask<bool>(result: true);
314 }
315 if (!cancellationToken.CanBeCanceled)
316 {
318 if (waiterSingleton.TryOwnAndReset())
319 {
320 ChannelUtilities.QueueWaiter(ref parent._waitingWritersTail, waiterSingleton);
321 return waiterSingleton.ValueTaskOfT;
322 }
323 }
325 ChannelUtilities.QueueWaiter(ref parent._waitingWritersTail, asyncOperation);
326 return asyncOperation.ValueTaskOfT;
327 }
328 }
329
331 {
332 if (cancellationToken.IsCancellationRequested)
333 {
335 }
338 BoundedChannel<T> parent = _parent;
339 bool lockTaken = false;
340 try
341 {
342 Monitor.Enter(parent.SyncObj, ref lockTaken);
343 if (parent._doneWriting != null)
344 {
346 }
347 int count = parent._items.Count;
348 if (count != 0)
349 {
350 if (count < parent._bufferedCapacity)
351 {
352 parent._items.EnqueueTail(item);
353 return default(ValueTask);
354 }
355 if (parent._mode == BoundedChannelFullMode.Wait)
356 {
357 if (!cancellationToken.CanBeCanceled)
358 {
360 if (writerSingleton.TryOwnAndReset())
361 {
362 writerSingleton.Item = item;
363 parent._blockedWriters.EnqueueTail(writerSingleton);
364 return writerSingleton.ValueTask;
365 }
366 }
368 voidAsyncOperationWithData.Item = item;
369 parent._blockedWriters.EnqueueTail(voidAsyncOperationWithData);
370 return voidAsyncOperationWithData.ValueTask;
371 }
372 if (parent._mode == BoundedChannelFullMode.DropWrite)
373 {
374 Monitor.Exit(parent.SyncObj);
375 lockTaken = false;
376 parent._itemDropped?.Invoke(item);
377 return default(ValueTask);
378 }
379 T obj = ((parent._mode == BoundedChannelFullMode.DropNewest) ? parent._items.DequeueTail() : parent._items.DequeueHead());
380 parent._items.EnqueueTail(item);
381 Monitor.Exit(parent.SyncObj);
382 lockTaken = false;
383 parent._itemDropped?.Invoke(obj);
384 return default(ValueTask);
385 }
386 while (!parent._blockedReaders.IsEmpty)
387 {
388 AsyncOperation<T> asyncOperation2 = parent._blockedReaders.DequeueHead();
389 if (asyncOperation2.UnregisterCancellation())
390 {
392 break;
393 }
394 }
395 if (asyncOperation == null)
396 {
397 parent._items.EnqueueTail(item);
398 listTail = parent._waitingReadersTail;
399 if (listTail == null)
400 {
401 return default(ValueTask);
402 }
403 parent._waitingReadersTail = null;
404 }
405 }
406 finally
407 {
408 if (lockTaken)
409 {
410 Monitor.Exit(parent.SyncObj);
411 }
412 }
413 if (asyncOperation != null)
414 {
415 bool flag = asyncOperation.TrySetResult(item);
416 }
417 else
418 {
420 }
421 return default(ValueTask);
422 }
423
428 }
429
431
432 private readonly Action<T> _itemDropped;
433
435
436 private readonly int _bufferedCapacity;
437
438 private readonly Deque<T> _items = new Deque<T>();
439
441
443
445
447
448 private readonly bool _runContinuationsAsynchronously;
449
451
452 private object SyncObj => _items;
453
455
457
468
473}
override ValueTask< T > ReadAsync(CancellationToken cancellationToken)
override bool TryPeek([MaybeNullWhen(false)] out T item)
override bool TryRead([MaybeNullWhen(false)] out T item)
override ValueTask< bool > WaitToReadAsync(CancellationToken cancellationToken)
readonly VoidAsyncOperationWithData< T > _writerSingleton
override ValueTask WriteAsync(T item, CancellationToken cancellationToken)
override ValueTask< bool > WaitToWriteAsync(CancellationToken cancellationToken)
readonly TaskCompletionSource _completion
readonly BoundedChannelFullMode _mode
readonly Deque< VoidAsyncOperationWithData< T > > _blockedWriters
BoundedChannel(int bufferedCapacity, BoundedChannelFullMode mode, bool runContinuationsAsynchronously, Action< T > itemDropped)
readonly Deque< AsyncOperation< T > > _blockedReaders
static void QueueWaiter(ref AsyncOperation< bool > tail, AsyncOperation< bool > waiter)
static readonly Exception s_doneWritingSentinel
static void Complete(TaskCompletionSource tcs, Exception error=null)
static void WakeUpWaiters(ref AsyncOperation< bool > listTail, bool result, Exception error=null)
static Exception CreateInvalidCompletionException(Exception inner=null)
static void Exit(object obj)
static void Enter(object obj)
static Task FromException(Exception exception)
Definition Task.cs:3341
static Task FromCanceled(CancellationToken cancellationToken)
Definition Task.cs:3363