Terraria v1.4.4.9
Terraria source code documentation
Loading...
Searching...
No Matches
BatchBlock.cs
Go to the documentation of this file.
4using System.Linq;
6
8
9[DebuggerDisplay("{DebuggerDisplayContent,nq}")]
12{
13 private sealed class DebugView
14 {
15 private readonly BatchBlock<T> _batchBlock;
16
18
19 private readonly SourceCore<T[]>.DebuggingInformation _sourceDebuggingInformation;
20
22
24
25 public long BatchesCompleted => _targetDebuggingInformation.NumberOfBatchesCompleted;
26
28
29 public Task TaskForOutputProcessing => _sourceDebuggingInformation.TaskForOutputProcessing;
30
32
33 public int BatchSize => _batchBlock.BatchSize;
34
35 public bool IsDecliningPermanently => _targetDebuggingInformation.IsDecliningPermanently;
36
37 public bool IsCompleted => _sourceDebuggingInformation.IsCompleted;
38
40
42
44
46
48 {
50 _targetDebuggingInformation = batchBlock._target.GetDebuggingInformation();
51 _sourceDebuggingInformation = batchBlock._source.GetDebuggingInformation();
52 }
53 }
54
55 [DebuggerDisplay("{DebuggerDisplayContent,nq}")]
56 private sealed class BatchBlockTargetCore
57 {
77
78 internal sealed class DebuggingInformation
79 {
80 private readonly BatchBlockTargetCore _target;
81
83
85 {
86 get
87 {
88 if (_target._nonGreedyState == null)
89 {
90 return null;
91 }
93 }
94 }
95
97 {
98 get
99 {
100 if (_target._nonGreedyState == null)
101 {
102 return null;
103 }
105 }
106 }
107
109
111
113
115 {
116 _target = target;
117 }
118 }
119
120 private readonly Queue<T> _messages = new Queue<T>();
121
123
124 private readonly BatchBlock<T> _owningBatch;
125
126 private readonly int _batchSize;
127
129
131
133
135
137
138 private long _batchesCompleted;
139
141
142 private object IncomingLock => _completionTask;
143
145
146 internal int BatchSize => _batchSize;
147
148 private bool CanceledOrFaulted
149 {
150 get
151 {
153 {
154 return _owningBatch._source.HasExceptions;
155 }
156 return true;
157 }
158 }
159
161 {
162 get
163 {
164 if (_boundingState == null)
165 {
166 return _batchSize;
167 }
168 return _dataflowBlockOptions.BoundedCapacity - _boundingState.CurrentCount;
169 }
170 }
171
173 {
174 get
175 {
177 bool flag2 = _nonGreedyState != null && _nonGreedyState.TaskForInputProcessing != null;
178 if (flag || flag2 || CanceledOrFaulted)
179 {
180 return false;
181 }
182 int num = _batchSize - _messages.Count;
184 if (num <= 0)
185 {
186 return true;
187 }
188 if (_nonGreedyState != null)
189 {
191 {
192 return true;
193 }
195 {
196 return false;
197 }
199 {
200 return true;
201 }
203 {
205 {
206 return true;
207 }
208 }
210 {
211 return true;
212 }
213 }
214 return false;
215 }
216 }
217
219 {
220 get
221 {
223 return $"Block=\"{((owningBatch != null) ? owningBatch.Content : _owningBatch)}\"";
224 }
225 }
226
228 {
233 bool flag = dataflowBlockOptions.BoundedCapacity > 0;
234 if (!_dataflowBlockOptions.Greedy || flag)
235 {
237 }
238 if (flag)
239 {
241 }
242 }
243
244 internal void TriggerBatch()
245 {
247 {
249 {
250 if (_nonGreedyState == null)
251 {
253 }
254 else
255 {
256 _nonGreedyState.AcceptFewerThanBatchSize = true;
258 }
259 }
261 }
262 }
263
265 {
266 if (!messageHeader.IsValid)
267 {
268 throw new ArgumentException(System.SR.Argument_InvalidMessageHeader, "messageHeader");
269 }
270 if (source == null && consumeToAccept)
271 {
273 }
275 {
277 {
279 return DataflowMessageStatus.DecliningPermanently;
280 }
282 {
283 if (consumeToAccept)
284 {
286 if (!messageConsumed)
287 {
288 return DataflowMessageStatus.NotAvailable;
289 }
290 }
291 _messages.Enqueue(messageValue);
292 if (_boundingState != null)
293 {
295 }
297 {
299 }
302 return DataflowMessageStatus.Accepted;
303 }
304 if (source != null)
305 {
308 {
310 }
311 return DataflowMessageStatus.Postponed;
312 }
313 return DataflowMessageStatus.Declined;
314 }
315 }
316
318 {
320 {
322 {
323 _owningBatch._source.AddException(exception);
324 }
326 {
327 _messages.Clear();
328 }
329 }
331 {
332 try
333 {
335 }
336 catch (Exception exception2)
337 {
338 _owningBatch._source.AddException(exception2);
339 }
340 }
342 {
344 {
345 _nonGreedyState.TaskForInputProcessing = null;
346 }
349 }
350 }
351
353 {
355 {
356 return;
357 }
358 bool flag = _nonGreedyState != null && _nonGreedyState.TaskForInputProcessing != null;
360 bool flag3 = _decliningPermanently && _messages.Count < _batchSize;
361 if (flag || (!(flag2 || flag3) && !CanceledOrFaulted))
362 {
363 return;
364 }
365 _completionReserved = true;
367 if (_messages.Count > 0)
368 {
370 }
371 Task.Factory.StartNew(delegate(object thisTargetCore)
372 {
375 if (batchBlockTargetCore._nonGreedyState != null)
376 {
377 Common.ReleaseAllPostponedMessages(batchBlockTargetCore._owningBatch, batchBlockTargetCore._nonGreedyState.PostponedMessages, ref exceptions);
378 }
379 if (exceptions != null)
380 {
381 batchBlockTargetCore._owningBatch._source.AddExceptions(exceptions);
382 }
383 batchBlockTargetCore._completionTask.TrySetResult(default(VoidResult));
385 }
386
394
415
417 {
418 try
419 {
421 int num = 0;
422 bool flag2;
423 do
424 {
427 {
429 }
430 else
431 {
433 }
435 {
437 if (flag2 || flag)
438 {
439 _nonGreedyState.AcceptFewerThanBatchSize = false;
440 }
441 }
442 num++;
443 }
444 while (flag2 && num < actualMaxMessagesPerTask);
445 }
446 catch (Exception exception)
447 {
449 }
450 finally
451 {
453 {
454 _nonGreedyState.TaskForInputProcessing = null;
457 }
458 }
459 }
460
462 {
463 bool flag = _messages.Count >= _batchSize;
464 if (flag || (evenIfFewerThanBatchSize && _messages.Count > 0))
465 {
466 T[] array = new T[flag ? _batchSize : _messages.Count];
467 for (int i = 0; i < array.Length; i++)
468 {
469 array[i] = _messages.Dequeue();
470 }
474 {
476 }
477 return true;
478 }
479 return false;
480 }
481
483 {
488 int num;
490 {
493 {
494 return;
495 }
497 }
498 for (int i = 0; i < num; i++)
499 {
501 if (keyValuePair.Key.ReserveMessage(keyValuePair.Value, _owningBatch))
502 {
506 }
507 }
510 {
513 {
514 if (!postponedMessages.TryPop(out item2))
515 {
516 break;
517 }
518 }
519 if (item2.Key.ReserveMessage(item2.Value, _owningBatch))
520 {
524 }
525 }
527 {
528 bool flag = true;
530 {
532 {
534 {
535 flag = !_decliningPermanently;
537 }
538 }
539 }
541 {
543 }
544 else
545 {
547 }
548 }
550 }
551
553 {
558 int num;
559 int num2;
561 {
563 num = _batchSize - _messages.Count;
565 {
566 return;
567 }
568 if (boundedCapacityAvailable < num)
569 {
571 }
572 num2 = postponedMessages.PopRange(postponedMessagesTemp, 0, num);
573 }
574 for (int i = 0; i < num2; i++)
575 {
580 }
582 while (reservedSourcesTemp.Count < num)
583 {
586 {
587 if (!postponedMessages.TryPop(out item2))
588 {
589 break;
590 }
591 }
595 }
597 {
598 bool flag = true;
600 {
602 {
604 {
605 flag = !_decliningPermanently;
607 }
608 }
609 }
610 if (flag)
611 {
613 }
614 }
616 }
617
619 {
621 for (int i = 0; i < reservedSourcesTemp.Count; i++)
622 {
625 bool messageConsumed;
626 T value = keyValuePair.Key.ConsumeMessage(keyValuePair.Value.Key, _owningBatch, out messageConsumed);
627 if (!messageConsumed)
628 {
629 for (int j = 0; j < i; j++)
630 {
632 }
634 }
638 }
640 {
641 if (_boundingState != null)
642 {
643 _boundingState.CurrentCount += reservedSourcesTemp.Count;
644 }
646 {
647 _messages.Enqueue(item.Value.Value);
648 }
649 }
650 }
651
685
687 {
688 List<Exception> list = null;
690 for (int i = 0; i < reservedSourcesTemp.Count; i++)
691 {
696 if (key == null || !value.Key.IsValid)
697 {
698 continue;
699 }
700 try
701 {
702 key.ReleaseReservation(value.Key, _owningBatch);
703 }
704 catch (Exception item)
705 {
707 {
708 throw;
709 }
710 if (list == null)
711 {
712 list = new List<Exception>(1);
713 }
714 list.Add(item);
715 }
716 }
717 if (list != null)
718 {
719 throw new AggregateException(list);
720 }
721 }
722
724 {
725 if (_boundingState != null)
726 {
728 {
729 _boundingState.CurrentCount -= numItemsRemoved;
732 }
733 }
734 }
735
737 {
738 if (multipleOutputItems == null)
739 {
740 return singleOutputItem.Length;
741 }
742 int num = 0;
744 {
745 num += multipleOutputItem.Length;
746 }
747 return num;
748 }
749
751 {
752 return new DebuggingInformation(this);
753 }
754 }
755
757
758 private readonly SourceCore<T[]> _source;
759
761
763
765
767
768 private object DebuggerDisplayContent => $"{Common.GetNameForDebugger(this, _source.DataflowBlockOptions)}, BatchSize={BatchSize}, OutputCount={OutputCountForDebugger}";
769
770 object IDebuggerDisplay.Content => DebuggerDisplayContent;
771
776
778 {
779 if (batchSize < 1)
780 {
782 }
783 if (dataflowBlockOptions == null)
784 {
785 throw new ArgumentNullException("dataflowBlockOptions");
786 }
787 if (dataflowBlockOptions.BoundedCapacity > 0 && dataflowBlockOptions.BoundedCapacity < batchSize)
788 {
790 }
794 if (dataflowBlockOptions.BoundedCapacity > 0)
795 {
797 {
798 ((BatchBlock<T>)owningSource)._target.OnItemsRemoved(count);
799 };
801 }
803 {
804 ((BatchBlock<T>)owningSource)._target.Complete(null, dropPendingMessages: true, releaseReservedMessages: false);
807 {
811 {
815 {
817 dataflowBlock.Fault(completed.Exception);
820 {
822 }, _target);
824 if (log.IsEnabled())
825 {
826 log.DataflowBlockCreated(this, dataflowBlockOptions);
827 }
828 }
829
830 public void Complete()
831 {
833 }
834
836 {
837 if (exception == null)
838 {
839 throw new ArgumentNullException("exception");
840 }
842 }
843
844 public void TriggerBatch()
845 {
847 }
848
850 {
851 return _source.LinkTo(target, linkOptions);
852 }
853
854 public bool TryReceive(Predicate<T[]>? filter, [NotNullWhen(true)] out T[]? item)
855 {
857 }
858
859 public bool TryReceiveAll([NotNullWhen(true)] out IList<T[]>? items)
860 {
861 return _source.TryReceiveAll(out items);
862 }
863
868
873
875 {
876 return _source.ReserveMessage(messageHeader, target);
877 }
878
880 {
882 }
883
884 public override string ToString()
885 {
887 }
888}
static unsafe void Clear(Array array)
Definition Array.cs:755
void Add(TKey key, TValue value)
static string InvalidOperation_FailedToConsumeReservedMessage
Definition SR.cs:32
static string Argument_InvalidMessageHeader
Definition SR.cs:24
static string Argument_CantConsumeFromANullSource
Definition SR.cs:22
static string ArgumentOutOfRange_GenericPositive
Definition SR.cs:1018
static string ArgumentOutOfRange_BatchSizeMustBeNoGreaterThanBoundedCapacity
Definition SR.cs:14
Definition SR.cs:7
QueuedMap< ISourceBlock< T >, DataflowMessageHeader > PostponedMessages
Definition BatchBlock.cs:97
readonly List< KeyValuePair< ISourceBlock< T >, KeyValuePair< DataflowMessageHeader, T > > > ReservedSourcesTemp
Definition BatchBlock.cs:64
readonly KeyValuePair< ISourceBlock< T >, DataflowMessageHeader >[] PostponedMessagesTemp
Definition BatchBlock.cs:62
readonly QueuedMap< ISourceBlock< T >, DataflowMessageHeader > PostponedMessages
Definition BatchBlock.cs:60
void Complete(Exception exception, bool dropPendingMessages, bool releaseReservedMessages, bool revertProcessingState=false)
DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock< T > source, bool consumeToAccept)
BatchBlockTargetCore(BatchBlock< T > owningBatch, int batchSize, Action< T[]> batchCompletedAction, GroupingDataflowBlockOptions dataflowBlockOptions)
readonly GroupingDataflowBlockOptions _dataflowBlockOptions
readonly TaskCompletionSource< VoidResult > _completionTask
void RetrievePostponedItemsGreedyBounded(bool allowFewerThanBatchSize)
void RetrievePostponedItemsNonGreedy(bool allowFewerThanBatchSize)
void ProcessAsyncIfNecessary(bool isReplacementReplica=false)
static int CountItems(T[] singleOutputItem, IList< T[]> multipleOutputItems)
QueuedMap< ISourceBlock< T >, DataflowMessageHeader > PostponedMessages
Definition BatchBlock.cs:41
readonly BatchBlockTargetCore.DebuggingInformation _targetDebuggingInformation
Definition BatchBlock.cs:17
readonly SourceCore< T[]>.DebuggingInformation _sourceDebuggingInformation
Definition BatchBlock.cs:19
bool TryReceiveAll([NotNullWhen(true)] out IList< T[]>? items)
BatchBlock(int batchSize, GroupingDataflowBlockOptions dataflowBlockOptions)
readonly BatchBlockTargetCore _target
bool TryReceive(Predicate< T[]>? filter, [NotNullWhen(true)] out T[]? item)
IDisposable LinkTo(ITargetBlock< T[]> target, DataflowLinkOptions linkOptions)
static Exception StartTaskSafe(Task task, TaskScheduler scheduler)
Definition Common.cs:277
static TaskContinuationOptions GetContinuationOptions(TaskContinuationOptions toInclude=TaskContinuationOptions.None)
Definition Common.cs:262
static TaskCreationOptions GetCreationOptionsForTask(bool isReplacementReplica=false)
Definition Common.cs:267
static string GetNameForDebugger(IDataflowBlock block, DataflowBlockOptions options=null)
Definition Common.cs:66
static void WireCancellationToComplete(CancellationToken cancellationToken, Task completionTask, Action< object > completeAction, object completeState)
Definition Common.cs:93
static int GetBlockId(IDataflowBlock block)
Definition Common.cs:61
TOutput ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock< TOutput > target, out bool messageConsumed)
bool TryReceiveAll([NotNullWhen(true)] out IList< TOutput > items)
IDisposable LinkTo(ITargetBlock< TOutput > target, DataflowLinkOptions linkOptions)
bool ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock< TOutput > target)
void ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock< TOutput > target)
bool TryReceive(Predicate< TOutput > filter, [MaybeNullWhen(false)] out TOutput item)
Task ContinueWith(Action< Task< TResult > > continuationAction)
Definition Task.cs:263
static new TaskFactory< TResult > Factory
Definition Task.cs:56
static bool Read(ref bool location)
Definition Volatile.cs:67