Terraria v1.4.4.9
Terraria source code documentation
Loading...
Searching...
No Matches
UnboundedChannel.cs
Go to the documentation of this file.
6
8
9[DebuggerDisplay("Items={ItemsCountForDebugger}, Closed={ChannelIsClosedForDebugger}")]
11internal sealed class UnboundedChannel<T> : Channel<T>, IDebugEnumerable<T>
12{
13 [DebuggerDisplay("Items={Count}")]
16 {
17 internal readonly UnboundedChannel<T> _parent;
18
20
22
23 public override Task Completion => _parent._completion.Task;
24
25 public override bool CanCount => true;
26
27 public override bool CanPeek => true;
28
29 public override int Count => _parent._items.Count;
30
32 {
33 _parent = parent;
34 _readerSingleton = new AsyncOperation<T>(parent._runContinuationsAsynchronously, default(CancellationToken), pooled: true);
35 _waiterSingleton = new AsyncOperation<bool>(parent._runContinuationsAsynchronously, default(CancellationToken), pooled: true);
36 }
37
39 {
40 if (cancellationToken.IsCancellationRequested)
41 {
43 }
45 if (parent._items.TryDequeue(out var result))
46 {
47 CompleteIfDone(parent);
48 return new ValueTask<T>(result);
49 }
50 lock (parent.SyncObj)
51 {
52 if (parent._items.TryDequeue(out result))
53 {
54 CompleteIfDone(parent);
55 return new ValueTask<T>(result);
56 }
57 if (parent._doneWriting != null)
58 {
59 return ChannelUtilities.GetInvalidCompletionValueTask<T>(parent._doneWriting);
60 }
61 if (!cancellationToken.CanBeCanceled)
62 {
64 if (readerSingleton.TryOwnAndReset())
65 {
66 parent._blockedReaders.EnqueueTail(readerSingleton);
67 return readerSingleton.ValueTaskOfT;
68 }
69 }
70 AsyncOperation<T> asyncOperation = new AsyncOperation<T>(parent._runContinuationsAsynchronously, cancellationToken);
71 parent._blockedReaders.EnqueueTail(asyncOperation);
72 return asyncOperation.ValueTaskOfT;
73 }
74 }
75
76 public override bool TryRead([MaybeNullWhen(false)] out T item)
77 {
79 if (parent._items.TryDequeue(out item))
80 {
81 CompleteIfDone(parent);
82 return true;
83 }
84 item = default(T);
85 return false;
86 }
87
88 public override bool TryPeek([MaybeNullWhen(false)] out T item)
89 {
90 return _parent._items.TryPeek(out item);
91 }
92
94 {
95 if (parent._doneWriting != null && parent._items.IsEmpty)
96 {
97 ChannelUtilities.Complete(parent._completion, parent._doneWriting);
98 }
99 }
100
102 {
103 if (cancellationToken.IsCancellationRequested)
104 {
106 }
107 if (!_parent._items.IsEmpty)
108 {
109 return new ValueTask<bool>(result: true);
110 }
112 lock (parent.SyncObj)
113 {
114 if (!parent._items.IsEmpty)
115 {
116 return new ValueTask<bool>(result: true);
117 }
118 if (parent._doneWriting != null)
119 {
120 return (parent._doneWriting != ChannelUtilities.s_doneWritingSentinel) ? new ValueTask<bool>(Task.FromException<bool>(parent._doneWriting)) : default(ValueTask<bool>);
121 }
122 if (!cancellationToken.CanBeCanceled)
123 {
125 if (waiterSingleton.TryOwnAndReset())
126 {
127 ChannelUtilities.QueueWaiter(ref parent._waitingReadersTail, waiterSingleton);
128 return waiterSingleton.ValueTaskOfT;
129 }
130 }
131 AsyncOperation<bool> asyncOperation = new AsyncOperation<bool>(parent._runContinuationsAsynchronously, cancellationToken);
132 ChannelUtilities.QueueWaiter(ref parent._waitingReadersTail, asyncOperation);
133 return asyncOperation.ValueTaskOfT;
134 }
135 }
136
141 }
142
143 [DebuggerDisplay("Items={ItemsCountForDebugger}")]
146 {
147 internal readonly UnboundedChannel<T> _parent;
148
149 private int ItemsCountForDebugger => _parent._items.Count;
150
152 {
153 _parent = parent;
154 }
155
156 public override bool TryComplete(Exception error)
157 {
159 bool isEmpty;
160 lock (parent.SyncObj)
161 {
162 if (parent._doneWriting != null)
163 {
164 return false;
165 }
166 parent._doneWriting = error ?? ChannelUtilities.s_doneWritingSentinel;
167 isEmpty = parent._items.IsEmpty;
168 }
169 if (isEmpty)
170 {
171 ChannelUtilities.Complete(parent._completion, error);
172 }
174 ChannelUtilities.WakeUpWaiters(ref parent._waitingReadersTail, result: false, error);
175 return true;
176 }
177
178 public override bool TryWrite(T item)
179 {
182 while (true)
183 {
185 listTail = null;
186 lock (parent.SyncObj)
187 {
188 if (parent._doneWriting != null)
189 {
190 return false;
191 }
192 if (parent._blockedReaders.IsEmpty)
193 {
194 parent._items.Enqueue(item);
195 listTail = parent._waitingReadersTail;
196 if (listTail == null)
197 {
198 return true;
199 }
200 parent._waitingReadersTail = null;
201 }
202 else
203 {
204 asyncOperation = parent._blockedReaders.DequeueHead();
205 }
206 }
207 if (asyncOperation == null)
208 {
209 break;
210 }
211 if (asyncOperation.TrySetResult(item))
212 {
213 return true;
214 }
215 }
217 return true;
218 }
219
221 {
222 Exception doneWriting = _parent._doneWriting;
223 if (!cancellationToken.IsCancellationRequested)
224 {
225 if (doneWriting != null)
226 {
228 {
229 return default(ValueTask<bool>);
230 }
232 }
233 return new ValueTask<bool>(result: true);
234 }
236 }
237
239 {
240 if (!cancellationToken.IsCancellationRequested)
241 {
242 if (!TryWrite(item))
243 {
245 }
246 return default(ValueTask);
247 }
249 }
250
255 }
256
258
260
262
263 private readonly bool _runContinuationsAsynchronously;
264
266
268
269 private object SyncObj => _items;
270
272
274
282
287}
static void QueueWaiter(ref AsyncOperation< bool > tail, AsyncOperation< bool > waiter)
static readonly Exception s_doneWritingSentinel
static void Complete(TaskCompletionSource tcs, Exception error=null)
static void WakeUpWaiters(ref AsyncOperation< bool > listTail, bool result, Exception error=null)
static Exception CreateInvalidCompletionException(Exception inner=null)
override ValueTask< bool > WaitToReadAsync(CancellationToken cancellationToken)
override bool TryRead([MaybeNullWhen(false)] out T item)
override ValueTask< T > ReadAsync(CancellationToken cancellationToken)
override bool TryPeek([MaybeNullWhen(false)] out T item)
override ValueTask< bool > WaitToWriteAsync(CancellationToken cancellationToken)
override ValueTask WriteAsync(T item, CancellationToken cancellationToken)
readonly Deque< AsyncOperation< T > > _blockedReaders
readonly TaskCompletionSource _completion
UnboundedChannel(bool runContinuationsAsynchronously)
static Task FromException(Exception exception)
Definition Task.cs:3341
static Task FromCanceled(CancellationToken cancellationToken)
Definition Task.cs:3363