Terraria v1.4.4.9
Terraria source code documentation
Loading...
Searching...
No Matches
SourceCore.cs
Go to the documentation of this file.
4using System.Linq;
5
7
8[DebuggerDisplay("{DebuggerDisplayContent,nq}")]
9internal sealed class SourceCore<TOutput>
10{
11 internal sealed class DebuggingInformation
12 {
13 private readonly SourceCore<TOutput> _source;
14
15 internal int OutputCount => _source._messages.Count;
16
17 internal IEnumerable<TOutput> OutputQueue => _source._messages.ToList();
18
19 internal Task TaskForOutputProcessing => _source._taskForOutputProcessing;
20
21 internal DataflowBlockOptions DataflowBlockOptions => _source._dataflowBlockOptions;
22
23 internal bool IsCompleted => _source.Completion.IsCompleted;
24
25 internal TargetRegistry<TOutput> LinkedTargets => _source._targetRegistry;
26
27 internal ITargetBlock<TOutput> NextMessageReservedFor => _source._nextMessageReservedFor;
28
33 }
34
36
38
40
42
44
46
48
50
52
54 {
55 Value = 1L
56 };
57
59
61
62 private bool _enableOffering = true;
63
64 private bool _completionReserved;
65
67
68 private object OutgoingLock => _completionTask;
69
70 private object ValueLock => _targetRegistry;
71
73
74 internal int OutputCount
75 {
76 get
77 {
79 {
81 {
82 return _messages.Count;
83 }
84 }
85 }
86 }
87
88 internal bool HasExceptions => Volatile.Read(ref _exceptions) != null;
89
91
92 private bool CanceledOrFaulted
93 {
94 get
95 {
97 {
98 if (HasExceptions)
99 {
101 }
102 return false;
103 }
104 return true;
105 }
106 }
107
109 {
110 get
111 {
113 return $"Block=\"{((debuggerDisplay != null) ? debuggerDisplay.Content : _owningSource)}\"";
114 }
115 }
116
126
128 {
129 if (target == null)
130 {
131 throw new ArgumentNullException("target");
132 }
133 if (linkOptions == null)
134 {
135 throw new ArgumentNullException("linkOptions");
136 }
137 if (_completionTask.Task.IsCompleted)
138 {
139 if (linkOptions.PropagateCompletion)
140 {
141 Common.PropagateCompletion(_completionTask.Task, target, null);
142 }
143 return Disposables.Nop;
144 }
146 {
148 {
150 OfferToTargets(target);
151 return Common.CreateUnlinker(OutgoingLock, _targetRegistry, target);
152 }
153 }
154 if (linkOptions.PropagateCompletion)
155 {
157 }
158 return Disposables.Nop;
159 }
160
162 {
163 if (!messageHeader.IsValid)
164 {
165 throw new ArgumentException(System.SR.Argument_InvalidMessageHeader, "messageHeader");
166 }
167 if (target == null)
168 {
169 throw new ArgumentNullException("target");
170 }
171 TOutput result = default(TOutput);
173 {
174 if (_nextMessageReservedFor != target && _nextMessageReservedFor != null)
175 {
176 messageConsumed = false;
177 return default(TOutput);
178 }
180 {
181 if (messageHeader.Id != _nextMessageId.Value || !_messages.TryDequeue(out result))
182 {
183 messageConsumed = false;
184 return default(TOutput);
185 }
188 _enableOffering = true;
192 }
193 }
194 if (_itemsRemovedAction != null)
195 {
196 int arg = ((_itemCountingFunc == null) ? 1 : _itemCountingFunc(_owningSource, result, null));
198 }
199 messageConsumed = true;
200 return result;
201 }
202
204 {
205 if (!messageHeader.IsValid)
206 {
207 throw new ArgumentException(System.SR.Argument_InvalidMessageHeader, "messageHeader");
208 }
209 if (target == null)
210 {
211 throw new ArgumentNullException("target");
212 }
214 {
215 if (_nextMessageReservedFor == null)
216 {
218 {
219 if (messageHeader.Id == _nextMessageId.Value && !_messages.IsEmpty)
220 {
222 _enableOffering = false;
223 return true;
224 }
225 }
226 }
227 }
228 return false;
229 }
230
232 {
233 if (!messageHeader.IsValid)
234 {
235 throw new ArgumentException(System.SR.Argument_InvalidMessageHeader, "messageHeader");
236 }
237 if (target == null)
238 {
239 throw new ArgumentNullException("target");
240 }
242 {
243 if (_nextMessageReservedFor != target)
244 {
246 }
248 {
249 if (messageHeader.Id != _nextMessageId.Value || _messages.IsEmpty)
250 {
252 }
254 _enableOffering = true;
257 }
258 }
259 }
260
261 internal bool TryReceive(Predicate<TOutput> filter, [MaybeNullWhen(false)] out TOutput item)
262 {
263 item = default(TOutput);
264 bool flag = false;
266 {
267 if (_nextMessageReservedFor == null)
268 {
270 {
271 if (_messages.TryDequeueIf(filter, out item))
272 {
274 _enableOffering = true;
277 flag = true;
278 }
279 }
280 }
281 }
282 if (flag && _itemsRemovedAction != null)
283 {
284 int arg = ((_itemCountingFunc == null) ? 1 : _itemCountingFunc(_owningSource, item, null));
286 }
287 return flag;
288 }
289
290 internal bool TryReceiveAll([NotNullWhen(true)] out IList<TOutput> items)
291 {
292 items = null;
293 int num = 0;
295 {
296 if (_nextMessageReservedFor == null)
297 {
299 {
300 if (!_messages.IsEmpty)
301 {
303 TOutput result;
304 while (_messages.TryDequeue(out result))
305 {
306 list.Add(result);
307 }
308 num = list.Count;
309 items = list;
311 _enableOffering = true;
313 }
314 }
315 }
316 }
317 if (num > 0)
318 {
319 if (_itemsRemovedAction != null)
320 {
321 int arg = ((_itemCountingFunc != null) ? _itemCountingFunc(_owningSource, default(TOutput), items) : num);
323 }
324 return true;
325 }
326 return false;
327 }
328
329 internal void AddMessage(TOutput item)
330 {
332 {
333 _messages.Enqueue(item);
335 if (_taskForOutputProcessing == null)
336 {
338 }
339 }
340 }
341
343 {
345 {
346 return;
347 }
348 if (items is List<TOutput> list)
349 {
350 for (int i = 0; i < list.Count; i++)
351 {
352 _messages.Enqueue(list[i]);
353 }
354 }
355 else if (items is TOutput[] array)
356 {
357 for (int j = 0; j < array.Length; j++)
358 {
359 _messages.Enqueue(array[j]);
360 }
361 }
362 else
363 {
364 foreach (TOutput item in items)
365 {
366 _messages.Enqueue(item);
367 }
368 }
370 if (_taskForOutputProcessing == null)
371 {
373 }
374 }
375
377 {
379 {
381 }
382 }
383
385 {
387 {
388 foreach (Exception exception in exceptions)
389 {
391 }
392 }
393 }
394
402
403 internal void Complete()
404 {
406 {
408 Task.Factory.StartNew(delegate(object state)
409 {
411 lock (sourceCore.OutgoingLock)
412 {
413 lock (sourceCore.ValueLock)
414 {
415 sourceCore.CompleteBlockIfPossible();
416 }
417 }
419 }
420 }
421
423 {
424 if (_nextMessageReservedFor != null)
425 {
426 return false;
427 }
429 TOutput result = default(TOutput);
430 bool flag = false;
432 {
433 if (linkToTarget == null)
434 {
435 return false;
436 }
437 flag = true;
438 }
439 if (_messages.TryPeek(out result))
440 {
442 }
443 bool messageWasAccepted = false;
444 if (header.IsValid)
445 {
446 if (flag)
447 {
449 }
450 else
451 {
453 while (linkedTargetInfo != null)
454 {
455 TargetRegistry<TOutput>.LinkedTargetInfo next = linkedTargetInfo.Next;
457 {
458 break;
459 }
460 linkedTargetInfo = next;
461 }
463 {
465 {
466 _enableOffering = false;
467 }
468 }
469 }
470 }
472 {
474 {
475 if (_nextMessageId.Value == header.Id)
476 {
477 _messages.TryDequeue(out var _);
478 }
480 _enableOffering = true;
481 if (linkToTarget != null)
482 {
485 }
486 }
487 if (_itemsRemovedAction != null)
488 {
489 int arg = ((_itemCountingFunc == null) ? 1 : _itemCountingFunc(_owningSource, result, null));
491 }
492 }
493 return messageWasAccepted;
494 }
495
497 {
498 DataflowMessageStatus dataflowMessageStatus = target.OfferMessage(header, message, _owningSource, consumeToAccept: false);
499 messageWasAccepted = false;
500 switch (dataflowMessageStatus)
501 {
502 case DataflowMessageStatus.Accepted:
504 messageWasAccepted = true;
505 return true;
506 case DataflowMessageStatus.DecliningPermanently:
507 _targetRegistry.Remove(target);
508 break;
509 default:
510 if (_nextMessageReservedFor != null)
511 {
512 return true;
513 }
514 break;
515 }
516 return false;
517 }
518
526
534
536 {
537 bool flag = true;
539 {
540 flag = _targetRegistry.FirstTargetNode != null;
541 }
542 if (!flag || CanceledOrFaulted)
543 {
544 return;
545 }
547 {
548 ((SourceCore<TOutput>)thisSourceCore).OfferMessagesLoopCore();
551 if (log.IsEnabled())
552 {
553 log.TaskLaunchedForMessageHandling(_owningSource, _taskForOutputProcessing, DataflowEtwProvider.TaskLaunchedReason.OfferingOutputMessages, _messages.Count);
554 }
556 if (ex == null)
557 {
558 return;
559 }
563 Task.Factory.StartNew(delegate(object state)
564 {
566 lock (sourceCore.OutgoingLock)
567 {
568 lock (sourceCore.ValueLock)
569 {
570 sourceCore.CompleteBlockIfPossible();
571 }
572 }
574 }
575
577 {
578 try
579 {
581 int num = ((_dataflowBlockOptions.MaxMessagesPerTask == -1) ? 10 : actualMaxMessagesPerTask);
582 int num2 = 0;
584 {
586 {
587 int num3 = 0;
589 {
590 if (!OfferToTargets())
591 {
592 return;
593 }
594 num2++;
595 num3++;
596 }
597 }
598 }
599 }
600 catch (Exception exception)
601 {
604 }
605 finally
606 {
608 {
610 {
615 }
616 }
617 }
618 }
619
627
629 {
630 if (_messages.IsEmpty || CanceledOrFaulted)
631 {
632 _completionReserved = true;
633 Task.Factory.StartNew(delegate(object state)
634 {
635 ((SourceCore<TOutput>)state).CompleteBlockOncePossible();
637 }
638 }
639
641 {
642 TargetRegistry<TOutput>.LinkedTargetInfo firstTarget;
645 {
648 {
649 _messages.Clear();
651 _exceptions = null;
652 }
653 }
654 if (exceptions != null)
655 {
656 _completionTask.TrySetException(exceptions);
657 }
659 {
660 _completionTask.TrySetCanceled();
661 }
662 else
663 {
664 _completionTask.TrySetResult(default(VoidResult));
665 }
668 if (log.IsEnabled())
669 {
670 log.DataflowBlockCompleted(_owningSource);
671 }
672 }
673
675 {
676 return new DebuggingInformation(this);
677 }
678}
static string Argument_InvalidMessageHeader
Definition SR.cs:24
static string InvalidOperation_MessageNotReservedByTarget
Definition SR.cs:34
Definition SR.cs:7
static bool IsEntered(object obj)
Definition Monitor.cs:71
static Exception StartTaskSafe(Task task, TaskScheduler scheduler)
Definition Common.cs:277
static void PropagateCompletionOnceCompleted(Task sourceCompletionTask, IDataflowBlock target)
Definition Common.cs:357
static TaskCreationOptions GetCreationOptionsForTask(bool isReplacementReplica=false)
Definition Common.cs:267
static void PropagateCompletion(Task sourceCompletionTask, IDataflowBlock target, Action< Exception > exceptionHandler)
Definition Common.cs:324
static void AddException([NotNull] ref List< Exception > list, Exception exception, bool unwrapInnerExceptions=false)
Definition Common.cs:183
TOutput ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock< TOutput > target, out bool messageConsumed)
bool TryReceiveAll([NotNullWhen(true)] out IList< TOutput > items)
readonly Func< ISourceBlock< TOutput >, TOutput, IList< TOutput >, int > _itemCountingFunc
Definition SourceCore.cs:49
void AddMessages(IEnumerable< TOutput > items)
void OfferAsyncIfNecessary(bool isReplacementReplica, bool outgoingLockKnownAcquired)
bool OfferMessageToTarget(DataflowMessageHeader header, TOutput message, ITargetBlock< TOutput > target, out bool messageWasAccepted)
IDisposable LinkTo(ITargetBlock< TOutput > target, DataflowLinkOptions linkOptions)
bool ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock< TOutput > target)
readonly System.Threading.Tasks.SingleProducerSingleConsumerQueue< TOutput > _messages
Definition SourceCore.cs:39
readonly Action< ISourceBlock< TOutput >, int > _itemsRemovedAction
Definition SourceCore.cs:47
void OfferAsyncIfNecessary_Slow(bool isReplacementReplica, bool outgoingLockKnownAcquired)
void ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock< TOutput > target)
readonly ISourceBlock< TOutput > _owningSource
Definition SourceCore.cs:41
bool TryReceive(Predicate< TOutput > filter, [MaybeNullWhen(false)] out TOutput item)
readonly Action< ISourceBlock< TOutput > > _completeAction
Definition SourceCore.cs:45
readonly TargetRegistry< TOutput > _targetRegistry
Definition SourceCore.cs:37
SourceCore(ISourceBlock< TOutput > owningSource, DataflowBlockOptions dataflowBlockOptions, Action< ISourceBlock< TOutput > > completeAction, Action< ISourceBlock< TOutput >, int > itemsRemovedAction=null, Func< ISourceBlock< TOutput >, TOutput, IList< TOutput >, int > itemCountingFunc=null)
void AddAndUnwrapAggregateException(AggregateException aggregateException)
void AddExceptions(List< Exception > exceptions)
readonly TaskCompletionSource< VoidResult > _completionTask
Definition SourceCore.cs:35
bool OfferToTargets(ITargetBlock< TOutput > linkToTarget=null)
readonly DataflowBlockOptions _dataflowBlockOptions
Definition SourceCore.cs:43
void PropagateCompletion(LinkedTargetInfo firstTarget)
void Remove(ITargetBlock< T > target, bool onlyIfReachedMaxMessages=false)
void Add(ref ITargetBlock< T > target, DataflowLinkOptions linkOptions)
static new TaskFactory< TResult > Factory
Definition Task.cs:56
static bool Read(ref bool location)
Definition Volatile.cs:67