Terraria v1.4.4.9
Terraria source code documentation
Loading...
Searching...
No Matches
BufferBlock.cs
Go to the documentation of this file.
5
7
8[DebuggerDisplay("{DebuggerDisplayContent,nq}")]
11{
12 private sealed class DebugView
13 {
14 private readonly BufferBlock<T> _bufferBlock;
15
16 private readonly SourceCore<T>.DebuggingInformation _sourceDebuggingInformation;
17
19 {
20 get
21 {
22 if (_bufferBlock._boundingState == null)
23 {
24 return null;
25 }
26 return _bufferBlock._boundingState.PostponedMessages;
27 }
28 }
29
31
33 {
34 get
35 {
36 if (_bufferBlock._boundingState == null)
37 {
38 return null;
39 }
40 return _bufferBlock._boundingState.TaskForInputProcessing;
41 }
42 }
43
44 public Task TaskForOutputProcessing => _sourceDebuggingInformation.TaskForOutputProcessing;
45
47
48 public bool IsDecliningPermanently => _bufferBlock._targetDecliningPermanently;
49
50 public bool IsCompleted => _sourceDebuggingInformation.IsCompleted;
51
53
55
57
59 {
61 _sourceDebuggingInformation = bufferBlock._source.GetDebuggingInformation();
62 }
63 }
64
65 private readonly SourceCore<T> _source;
66
68
70
72
73 private object IncomingLock => _source;
74
75 public int Count => _source.OutputCount;
76
78
79 private int CountForDebugger => _source.GetDebuggingInformation().OutputCount;
80
81 private object DebuggerDisplayContent => $"{Common.GetNameForDebugger(this, _source.DataflowBlockOptions)}, Count={CountForDebugger}";
82
83 object IDebuggerDisplay.Content => DebuggerDisplayContent;
84
85 public BufferBlock()
87 {
88 }
89
91 {
92 if (dataflowBlockOptions == null)
93 {
94 throw new ArgumentNullException("dataflowBlockOptions");
95 }
98 if (dataflowBlockOptions.BoundedCapacity > 0)
99 {
101 {
102 ((BufferBlock<T>)owningSource).OnItemsRemoved(count);
103 };
105 }
107 {
108 ((BufferBlock<T>)owningSource).Complete();
111 {
113 dataflowBlock.Fault(completed.Exception);
116 {
117 ((BufferBlock<T>)owningSource).Complete();
118 }, this);
120 if (log.IsEnabled())
121 {
122 log.DataflowBlockCreated(this, dataflowBlockOptions);
123 }
124 }
125
127 {
128 if (!messageHeader.IsValid)
129 {
130 throw new ArgumentException(System.SR.Argument_InvalidMessageHeader, "messageHeader");
131 }
132 if (source == null && consumeToAccept)
133 {
135 }
137 {
139 {
141 return DataflowMessageStatus.DecliningPermanently;
142 }
143 if (_boundingState == null || (_boundingState.CountIsLessThanBound && _boundingState.PostponedMessages.Count == 0 && _boundingState.TaskForInputProcessing == null))
144 {
145 if (consumeToAccept)
146 {
147 messageValue = source.ConsumeMessage(messageHeader, this, out var messageConsumed);
148 if (!messageConsumed)
149 {
150 return DataflowMessageStatus.NotAvailable;
151 }
152 }
154 if (_boundingState != null)
155 {
156 _boundingState.CurrentCount++;
157 }
158 return DataflowMessageStatus.Accepted;
159 }
160 if (source != null)
161 {
162 _boundingState.PostponedMessages.Push(source, messageHeader);
163 return DataflowMessageStatus.Postponed;
164 }
165 return DataflowMessageStatus.Declined;
166 }
167 }
168
169 public void Complete()
170 {
172 }
173
175 {
176 if (exception == null)
177 {
178 throw new ArgumentNullException("exception");
179 }
181 }
182
184 {
186 {
188 {
190 }
192 {
193 _boundingState.TaskForInputProcessing = null;
194 }
197 }
198 }
199
204
206 {
208 }
209
210 public bool TryReceiveAll([NotNullWhen(true)] out IList<T>? items)
211 {
212 return _source.TryReceiveAll(out items);
213 }
214
219
221 {
222 return _source.ReserveMessage(messageHeader, target);
223 }
224
229
231 {
232 if (_boundingState != null)
233 {
235 {
236 _boundingState.CurrentCount -= numItemsRemoved;
239 }
240 }
241 }
242
244 {
245 if (_targetDecliningPermanently || _boundingState.TaskForInputProcessing != null || _boundingState.PostponedMessages.Count <= 0 || !_boundingState.CountIsLessThanBound)
246 {
247 return;
248 }
249 _boundingState.TaskForInputProcessing = new Task(delegate(object state)
250 {
251 ((BufferBlock<T>)state).ConsumeMessagesLoopCore();
254 if (log.IsEnabled())
255 {
256 log.TaskLaunchedForMessageHandling(this, _boundingState.TaskForInputProcessing, DataflowEtwProvider.TaskLaunchedReason.ProcessingInputMessages, _boundingState.PostponedMessages.Count);
257 }
259 if (ex != null)
260 {
261 Task.Factory.StartNew(delegate(object exc)
262 {
265 }
266 }
267
269 {
270 try
271 {
273 for (int i = 0; i < actualMaxMessagesPerTask; i++)
274 {
276 {
277 break;
278 }
279 }
280 }
281 catch (Exception exception)
282 {
284 }
285 finally
286 {
288 {
289 _boundingState.TaskForInputProcessing = null;
292 }
293 }
294 }
295
297 {
298 while (true)
299 {
302 {
304 {
305 return false;
306 }
307 if (!_boundingState.CountIsLessThanBound)
308 {
309 return false;
310 }
311 if (!_boundingState.PostponedMessages.TryPop(out item))
312 {
313 return false;
314 }
315 _boundingState.CurrentCount++;
316 }
317 bool messageConsumed = false;
318 try
319 {
320 T item2 = item.Key.ConsumeMessage(item.Value, this, out messageConsumed);
321 if (messageConsumed)
322 {
324 return true;
325 }
326 }
327 finally
328 {
329 if (!messageConsumed)
330 {
332 {
333 _boundingState.CurrentCount--;
334 }
335 }
336 }
337 }
338 }
339
341 {
343 {
344 return;
345 }
347 if (_boundingState != null && _boundingState.PostponedMessages.Count > 0)
348 {
349 Task.Factory.StartNew(delegate(object state)
350 {
353 if (bufferBlock._boundingState != null)
354 {
355 Common.ReleaseAllPostponedMessages(bufferBlock, bufferBlock._boundingState.PostponedMessages, ref exceptions);
356 }
357 if (exceptions != null)
358 {
359 bufferBlock._source.AddExceptions(exceptions);
360 }
361 bufferBlock._source.Complete();
363 }
364 else
365 {
367 }
368 }
369
370 public override string ToString()
371 {
373 }
374}
static string Argument_InvalidMessageHeader
Definition SR.cs:24
static string Argument_CantConsumeFromANullSource
Definition SR.cs:22
Definition SR.cs:7
readonly SourceCore< T >.DebuggingInformation _sourceDebuggingInformation
QueuedMap< ISourceBlock< T >, DataflowMessageHeader > PostponedMessages
readonly BoundingStateWithPostponedAndTask< T > _boundingState
IDisposable LinkTo(ITargetBlock< T > target, DataflowLinkOptions linkOptions)
void ConsumeAsyncIfNecessary(bool isReplacementReplica=false)
BufferBlock(DataflowBlockOptions dataflowBlockOptions)
void OnItemsRemoved(int numItemsRemoved)
bool TryReceive(Predicate< T >? filter, [MaybeNullWhen(false)] out T item)
void CompleteCore(Exception exception, bool storeExceptionEvenIfAlreadyCompleting, bool revertProcessingState=false)
bool TryReceiveAll([NotNullWhen(true)] out IList< T >? items)
static Exception StartTaskSafe(Task task, TaskScheduler scheduler)
Definition Common.cs:277
static TaskContinuationOptions GetContinuationOptions(TaskContinuationOptions toInclude=TaskContinuationOptions.None)
Definition Common.cs:262
static TaskCreationOptions GetCreationOptionsForTask(bool isReplacementReplica=false)
Definition Common.cs:267
static string GetNameForDebugger(IDataflowBlock block, DataflowBlockOptions options=null)
Definition Common.cs:66
static void WireCancellationToComplete(CancellationToken cancellationToken, Task completionTask, Action< object > completeAction, object completeState)
Definition Common.cs:93
static int GetBlockId(IDataflowBlock block)
Definition Common.cs:61
TOutput ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock< TOutput > target, out bool messageConsumed)
bool TryReceiveAll([NotNullWhen(true)] out IList< TOutput > items)
IDisposable LinkTo(ITargetBlock< TOutput > target, DataflowLinkOptions linkOptions)
bool ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock< TOutput > target)
void ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock< TOutput > target)
bool TryReceive(Predicate< TOutput > filter, [MaybeNullWhen(false)] out TOutput item)
Task ContinueWith(Action< Task< TResult > > continuationAction)
Definition Task.cs:263
static new TaskFactory< TResult > Factory
Definition Task.cs:56