Terraria v1.4.4.9
Terraria source code documentation
Loading...
Searching...
No Matches
BroadcastBlock.cs
Go to the documentation of this file.
4using System.Linq;
6
8
9[DebuggerDisplay("{DebuggerDisplayContent,nq}")]
12{
13 private sealed class DebugView
14 {
16
17 private readonly BroadcastingSourceCore<T>.DebuggingInformation _sourceDebuggingInformation;
18
20
21 public bool HasValue => _broadcastBlock.HasValueForDebugger;
22
23 public T Value => _broadcastBlock.ValueForDebugger;
24
25 public Task TaskForOutputProcessing => _sourceDebuggingInformation.TaskForOutputProcessing;
26
28
29 public bool IsDecliningPermanently => _broadcastBlock._decliningPermanently;
30
31 public bool IsCompleted => _sourceDebuggingInformation.IsCompleted;
32
34
36
38
44 }
45
46 [DebuggerDisplay("{DebuggerDisplayContent,nq}")]
47 private sealed class BroadcastingSourceCore<TOutput>
48 {
49 internal sealed class DebuggingInformation
50 {
52
53 public bool HasValue => _source._currentMessageIsValid;
54
55 public TOutput Value => _source._currentMessage;
56
57 public IEnumerable<TOutput> InputQueue => _source._messages.ToList();
58
59 public Task TaskForOutputProcessing => _source._taskForOutputProcessing;
60
61 public DataflowBlockOptions DataflowBlockOptions => _source._dataflowBlockOptions;
62
63 public bool IsCompleted => _source.Completion.IsCompleted;
64
66
67 public ITargetBlock<TOutput> NextMessageReservedFor => _source._nextMessageReservedFor;
68
73 }
74
76
77 private readonly Queue<TOutput> _messages = new Queue<TOutput>();
78
80
82
84
86
88
90
91 private TOutput _currentMessage;
92
94
95 private bool _enableOffering;
96
98
100
102
103 private long _nextMessageId = 1L;
104
106
107 private object OutgoingLock => _completionTask;
108
109 private object ValueLock => _targetRegistry;
110
111 private bool CanceledOrFaulted
112 {
113 get
114 {
116 {
117 if (Volatile.Read(ref _exceptions) != null)
118 {
120 }
121 return false;
122 }
123 return true;
124 }
125 }
126
128
130
132 {
133 get
134 {
136 return $"Block=\"{((owningSource != null) ? owningSource.Content : _owningSource)}\"";
137 }
138 }
139
148
149 internal bool TryReceive(Predicate<TOutput> filter, [MaybeNullWhen(false)] out TOutput item)
150 {
151 TOutput currentMessage;
154 {
156 {
159 }
160 }
162 {
164 return true;
165 }
166 item = default(TOutput);
167 return false;
168 }
169
170 internal bool TryReceiveAll([NotNullWhen(true)] out IList<TOutput> items)
171 {
172 if (TryReceive(null, out var item))
173 {
174 items = new TOutput[1] { item };
175 return true;
176 }
177 items = null;
178 return false;
179 }
180
181 internal void AddMessage(TOutput item)
182 {
184 {
186 {
188 if (_messages.Count == 1)
189 {
190 _enableOffering = true;
191 }
193 }
194 }
195 }
196
197 internal void Complete()
198 {
200 {
202 Task.Factory.StartNew(delegate(object state)
203 {
205 lock (broadcastingSourceCore.OutgoingLock)
206 {
207 lock (broadcastingSourceCore.ValueLock)
208 {
209 broadcastingSourceCore.CompleteBlockIfPossible();
210 }
211 }
213 }
214 }
215
216 private TOutput CloneItem(TOutput item)
217 {
218 if (_cloningFunction == null)
219 {
220 return item;
221 }
222 return _cloningFunction(item);
223 }
224
226 {
227 TOutput currentMessage;
230 {
233 }
235 {
236 return;
237 }
238 bool flag = _cloningFunction != null;
239 switch (target.OfferMessage(new DataflowMessageHeader(_nextMessageId), currentMessage, _owningSource, flag))
240 {
241 case DataflowMessageStatus.Accepted:
242 if (!flag)
243 {
245 }
246 break;
247 case DataflowMessageStatus.DecliningPermanently:
248 _targetRegistry.Remove(target);
249 break;
250 }
251 }
252
253 private bool OfferToTargets()
254 {
256 TOutput message = default(TOutput);
257 int num = 0;
259 {
260 if (_nextMessageReservedFor != null || _messages.Count <= 0)
261 {
262 _enableOffering = false;
263 return false;
264 }
266 {
267 while (_messages.Count > 1)
268 {
270 num++;
271 }
272 }
273 message = (_currentMessage = _messages.Dequeue());
274 num++;
277 if (_messages.Count == 0)
278 {
279 _enableOffering = false;
280 }
281 }
282 if (header.IsValid)
283 {
284 if (_itemsRemovedAction != null)
285 {
287 }
289 while (linkedTargetInfo != null)
290 {
291 TargetRegistry<TOutput>.LinkedTargetInfo next = linkedTargetInfo.Next;
293 OfferMessageToTarget(header, message, target);
294 linkedTargetInfo = next;
295 }
296 }
297 return true;
298 }
299
301 {
302 bool flag = _cloningFunction != null;
303 switch (target.OfferMessage(header, message, _owningSource, flag))
304 {
305 case DataflowMessageStatus.Accepted:
306 if (!flag)
307 {
309 }
310 break;
311 case DataflowMessageStatus.DecliningPermanently:
312 _targetRegistry.Remove(target);
313 break;
314 case DataflowMessageStatus.Declined:
315 case DataflowMessageStatus.Postponed:
316 case DataflowMessageStatus.NotAvailable:
317 break;
318 }
319 }
320
322 {
323 bool flag = _taskForOutputProcessing != null;
324 bool flag2 = _enableOffering && _messages.Count > 0;
325 if (!(!flag && flag2) || CanceledOrFaulted)
326 {
327 return;
328 }
330 {
331 ((BroadcastingSourceCore<TOutput>)thisSourceCore).OfferMessagesLoopCore();
334 if (log.IsEnabled())
335 {
336 log.TaskLaunchedForMessageHandling(_owningSource, _taskForOutputProcessing, DataflowEtwProvider.TaskLaunchedReason.OfferingOutputMessages, _messages.Count);
337 }
339 if (ex == null)
340 {
341 return;
342 }
346 Task.Factory.StartNew(delegate(object state)
347 {
349 lock (broadcastingSourceCore.OutgoingLock)
350 {
351 lock (broadcastingSourceCore.ValueLock)
352 {
353 broadcastingSourceCore.CompleteBlockIfPossible();
354 }
355 }
357 }
358
360 {
361 try
362 {
365 {
366 for (int i = 0; i < actualMaxMessagesPerTask; i++)
367 {
369 {
370 break;
371 }
372 if (!OfferToTargets())
373 {
374 break;
375 }
376 }
377 }
378 }
379 catch (Exception exception)
380 {
382 }
383 finally
384 {
386 {
388 {
392 }
393 }
394 }
395 }
396
398 {
400 {
401 bool flag = _taskForOutputProcessing != null;
402 bool flag2 = _decliningPermanently && _messages.Count == 0;
403 if (!flag && (flag2 || CanceledOrFaulted))
404 {
406 }
407 }
408 }
409
411 {
412 _completionReserved = true;
413 Task.Factory.StartNew(delegate(object thisSourceCore)
414 {
415 ((BroadcastingSourceCore<TOutput>)thisSourceCore).CompleteBlockOncePossible();
417 }
418
420 {
421 TargetRegistry<TOutput>.LinkedTargetInfo firstTarget;
424 {
427 {
430 _exceptions = null;
431 }
432 }
433 if (exceptions != null)
434 {
435 _completionTask.TrySetException(exceptions);
436 }
438 {
439 _completionTask.TrySetCanceled();
440 }
441 else
442 {
443 _completionTask.TrySetResult(default(VoidResult));
444 }
447 if (log.IsEnabled())
448 {
449 log.DataflowBlockCompleted(_owningSource);
450 }
451 }
452
454 {
455 if (target == null)
456 {
457 throw new ArgumentNullException("target");
458 }
459 if (linkOptions == null)
460 {
461 throw new ArgumentNullException("linkOptions");
462 }
464 {
466 {
468 if (linkOptions.PropagateCompletion)
469 {
471 }
472 return Disposables.Nop;
473 }
476 return Common.CreateUnlinker(OutgoingLock, _targetRegistry, target);
477 }
478 }
479
481 {
482 if (!messageHeader.IsValid)
483 {
484 throw new ArgumentException(System.SR.Argument_InvalidMessageHeader, "messageHeader");
485 }
486 if (target == null)
487 {
488 throw new ArgumentNullException("target");
489 }
490 TOutput currentMessage;
492 {
494 {
496 {
497 messageConsumed = false;
498 return default(TOutput);
499 }
500 if (_nextMessageReservedFor == target)
501 {
503 _enableOffering = true;
504 }
509 }
510 }
511 messageConsumed = true;
513 }
514
516 {
517 if (!messageHeader.IsValid)
518 {
519 throw new ArgumentException(System.SR.Argument_InvalidMessageHeader, "messageHeader");
520 }
521 if (target == null)
522 {
523 throw new ArgumentNullException("target");
524 }
526 {
527 if (_nextMessageReservedFor == null)
528 {
530 {
532 {
534 _enableOffering = false;
535 return true;
536 }
537 }
538 }
539 }
540 return false;
541 }
542
544 {
545 if (!messageHeader.IsValid)
546 {
547 throw new ArgumentException(System.SR.Argument_InvalidMessageHeader, "messageHeader");
548 }
549 if (target == null)
550 {
551 throw new ArgumentNullException("target");
552 }
554 {
555 if (_nextMessageReservedFor != target)
556 {
558 }
559 TOutput currentMessage;
561 {
563 {
565 }
567 _enableOffering = true;
570 }
572 }
573 }
574
576 {
578 {
580 }
581 }
582
584 {
586 {
587 foreach (Exception exception in exceptions)
588 {
590 }
591 }
592 }
593
595 {
596 return new DebuggingInformation(this);
597 }
598 }
599
601
603
605
607
608 private object IncomingLock => _source;
609
611
613
615
616 private object DebuggerDisplayContent => $"{Common.GetNameForDebugger(this, _source.DataflowBlockOptions)}, HasValue={HasValueForDebugger}, Value={ValueForDebugger}";
617
618 object IDebuggerDisplay.Content => DebuggerDisplayContent;
619
624
654
655 public void Complete()
656 {
658 }
659
661 {
662 if (exception == null)
663 {
664 throw new ArgumentNullException("exception");
665 }
667 }
668
670 {
672 {
674 {
676 }
678 {
679 _boundingState.TaskForInputProcessing = null;
680 }
683 }
684 }
685
690
692 {
694 }
695
696 bool IReceivableSourceBlock<T>.TryReceiveAll([NotNullWhen(true)] out IList<T> items)
697 {
698 return _source.TryReceiveAll(out items);
699 }
700
702 {
703 if (!messageHeader.IsValid)
704 {
705 throw new ArgumentException(System.SR.Argument_InvalidMessageHeader, "messageHeader");
706 }
707 if (source == null && consumeToAccept)
708 {
710 }
712 {
714 {
716 return DataflowMessageStatus.DecliningPermanently;
717 }
718 if (_boundingState == null || (_boundingState.CountIsLessThanBound && _boundingState.PostponedMessages.Count == 0 && _boundingState.TaskForInputProcessing == null))
719 {
720 if (consumeToAccept)
721 {
722 messageValue = source.ConsumeMessage(messageHeader, this, out var messageConsumed);
723 if (!messageConsumed)
724 {
725 return DataflowMessageStatus.NotAvailable;
726 }
727 }
729 if (_boundingState != null)
730 {
731 _boundingState.CurrentCount++;
732 }
733 return DataflowMessageStatus.Accepted;
734 }
735 if (source != null)
736 {
737 _boundingState.PostponedMessages.Push(source, messageHeader);
738 return DataflowMessageStatus.Postponed;
739 }
740 return DataflowMessageStatus.Declined;
741 }
742 }
743
745 {
746 if (_boundingState != null)
747 {
749 {
750 _boundingState.CurrentCount -= numItemsRemoved;
753 }
754 }
755 }
756
758 {
759 if (_decliningPermanently || _boundingState.TaskForInputProcessing != null || _boundingState.PostponedMessages.Count <= 0 || !_boundingState.CountIsLessThanBound)
760 {
761 return;
762 }
763 _boundingState.TaskForInputProcessing = new Task(delegate(object state)
764 {
765 ((BroadcastBlock<T>)state).ConsumeMessagesLoopCore();
768 if (log.IsEnabled())
769 {
770 log.TaskLaunchedForMessageHandling(this, _boundingState.TaskForInputProcessing, DataflowEtwProvider.TaskLaunchedReason.ProcessingInputMessages, _boundingState.PostponedMessages.Count);
771 }
773 if (ex != null)
774 {
775 Task.Factory.StartNew(delegate(object exc)
776 {
779 }
780 }
781
783 {
784 try
785 {
787 for (int i = 0; i < actualMaxMessagesPerTask; i++)
788 {
790 {
791 break;
792 }
793 }
794 }
795 catch (Exception exception)
796 {
798 }
799 finally
800 {
802 {
803 _boundingState.TaskForInputProcessing = null;
806 }
807 }
808 }
809
811 {
812 while (true)
813 {
816 {
817 if (!_boundingState.CountIsLessThanBound)
818 {
819 return false;
820 }
821 if (!_boundingState.PostponedMessages.TryPop(out item))
822 {
823 return false;
824 }
825 _boundingState.CurrentCount++;
826 }
827 bool messageConsumed = false;
828 try
829 {
830 T item2 = item.Key.ConsumeMessage(item.Value, this, out messageConsumed);
831 if (messageConsumed)
832 {
834 return true;
835 }
836 }
837 finally
838 {
839 if (!messageConsumed)
840 {
842 {
843 _boundingState.CurrentCount--;
844 }
845 }
846 }
847 }
848 }
849
851 {
853 {
854 return;
855 }
856 _completionReserved = true;
857 if (_boundingState != null && _boundingState.PostponedMessages.Count > 0)
858 {
859 Task.Factory.StartNew(delegate(object state)
860 {
863 if (broadcastBlock._boundingState != null)
864 {
865 Common.ReleaseAllPostponedMessages(broadcastBlock, broadcastBlock._boundingState.PostponedMessages, ref exceptions);
866 }
867 if (exceptions != null)
868 {
869 broadcastBlock._source.AddExceptions(exceptions);
870 }
871 broadcastBlock._source.Complete();
873 }
874 else
875 {
877 }
878 }
879
884
886 {
887 return _source.ReserveMessage(messageHeader, target);
888 }
889
894
895 public override string ToString()
896 {
898 }
899}
static string Argument_InvalidMessageHeader
Definition SR.cs:24
static string InvalidOperation_MessageNotReservedByTarget
Definition SR.cs:34
static string Argument_CantConsumeFromANullSource
Definition SR.cs:22
Definition SR.cs:7
TOutput ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock< TOutput > target, out bool messageConsumed)
void ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock< TOutput > target)
readonly TaskCompletionSource< VoidResult > _completionTask
void OfferMessageToTarget(DataflowMessageHeader header, TOutput message, ITargetBlock< TOutput > target)
BroadcastingSourceCore(BroadcastBlock< TOutput > owningSource, Func< TOutput, TOutput > cloningFunction, DataflowBlockOptions dataflowBlockOptions, Action< int > itemsRemovedAction)
bool TryReceive(Predicate< TOutput > filter, [MaybeNullWhen(false)] out TOutput item)
bool ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock< TOutput > target)
IDisposable LinkTo(ITargetBlock< TOutput > target, DataflowLinkOptions linkOptions)
bool TryReceiveAll([NotNullWhen(true)] out IList< TOutput > items)
readonly BroadcastingSourceCore< T >.DebuggingInformation _sourceDebuggingInformation
BroadcastBlock(Func< T, T >? cloningFunction, DataflowBlockOptions dataflowBlockOptions)
readonly BroadcastingSourceCore< T > _source
void CompleteCore(Exception exception, bool storeExceptionEvenIfAlreadyCompleting, bool revertProcessingState=false)
readonly BoundingStateWithPostponedAndTask< T > _boundingState
bool TryReceive(Predicate< T >? filter, [MaybeNullWhen(false)] out T item)
IDisposable LinkTo(ITargetBlock< T > target, DataflowLinkOptions linkOptions)
void ConsumeAsyncIfNecessary(bool isReplacementReplica=false)
BroadcastBlock(Func< T, T >? cloningFunction)
static Exception StartTaskSafe(Task task, TaskScheduler scheduler)
Definition Common.cs:277
static TaskContinuationOptions GetContinuationOptions(TaskContinuationOptions toInclude=TaskContinuationOptions.None)
Definition Common.cs:262
static void PropagateCompletionOnceCompleted(Task sourceCompletionTask, IDataflowBlock target)
Definition Common.cs:357
static TaskCreationOptions GetCreationOptionsForTask(bool isReplacementReplica=false)
Definition Common.cs:267
static string GetNameForDebugger(IDataflowBlock block, DataflowBlockOptions options=null)
Definition Common.cs:66
static void AddException([NotNull] ref List< Exception > list, Exception exception, bool unwrapInnerExceptions=false)
Definition Common.cs:183
static void WireCancellationToComplete(CancellationToken cancellationToken, Task completionTask, Action< object > completeAction, object completeState)
Definition Common.cs:93
static int GetBlockId(IDataflowBlock block)
Definition Common.cs:61
void PropagateCompletion(LinkedTargetInfo firstTarget)
void Remove(ITargetBlock< T > target, bool onlyIfReachedMaxMessages=false)
void Add(ref ITargetBlock< T > target, DataflowLinkOptions linkOptions)
Task ContinueWith(Action< Task< TResult > > continuationAction)
Definition Task.cs:263
static new TaskFactory< TResult > Factory
Definition Task.cs:56
static bool Read(ref bool location)
Definition Volatile.cs:67