Terraria v1.4.4.9
Terraria source code documentation
Loading...
Searching...
No Matches
TargetCore.cs
Go to the documentation of this file.
3using System.Linq;
4
6
7[DebuggerDisplay("{DebuggerDisplayContent,nq}")]
8internal sealed class TargetCore<TInput>
9{
10 internal sealed class DebuggingInformation
11 {
12 private readonly TargetCore<TInput> _target;
13
14 internal int InputCount => _target._messages.Count;
15
16 internal IEnumerable<TInput> InputQueue => _target._messages.Select((KeyValuePair<TInput, long> kvp) => kvp.Key).ToList();
17
19 {
20 get
21 {
22 if (_target._boundingState == null)
23 {
24 return null;
25 }
26 return _target._boundingState.PostponedMessages;
27 }
28 }
29
30 internal int CurrentDegreeOfParallelism => _target._numberOfOutstandingOperations - _target._numberOfOutstandingServiceTasks;
31
33
34 internal bool IsDecliningPermanently => _target._decliningPermanently;
35
36 internal bool IsCompleted => _target.Completion.IsCompleted;
37
39 {
40 _target = target;
41 }
42 }
43
45 {
46 return thisTargetCore.TryGetNextAvailableOrPostponedMessage(out messageWithId);
47 };
48
50
52
54
56
58
60
62
64
66
68
70
72
74
75 private bool _completionReserved;
76
78
79 private object IncomingLock => _messages;
80
82
83 internal int InputCount => _messages.GetCountSafe(IncomingLock);
84
85 private bool UsesAsyncCompletion => (_targetCoreOptions & TargetCoreOptions.UsesAsyncCompletion) != 0;
86
88
90 {
91 get
92 {
94 {
96 }
98 {
100 }
101 return false;
102 }
103 }
104
105 private bool CanceledOrFaulted
106 {
107 get
108 {
110 {
111 return Volatile.Read(ref _exceptions) != null;
112 }
113 return true;
114 }
115 }
116
117 internal bool IsBounded => _boundingState != null;
118
120 {
121 get
122 {
124 return $"Block=\"{((debuggerDisplay != null) ? debuggerDisplay.Content : _owningTarget)}\"";
125 }
126 }
127
129
154
156 {
158 {
160 {
162 }
164 {
166 while (_messages.TryDequeue(out result))
167 {
168 }
169 }
171 {
174 {
176 }
177 }
180 }
181 }
182
184 {
185 if (!messageHeader.IsValid)
186 {
187 throw new ArgumentException(System.SR.Argument_InvalidMessageHeader, "messageHeader");
188 }
189 if (source == null && consumeToAccept)
190 {
192 }
194 {
196 {
198 return DataflowMessageStatus.DecliningPermanently;
199 }
200 if (_boundingState == null || (_boundingState.OutstandingTransfers == 0 && _boundingState.CountIsLessThanBound && _boundingState.PostponedMessages.Count == 0))
201 {
202 if (consumeToAccept)
203 {
205 if (!messageConsumed)
206 {
207 return DataflowMessageStatus.NotAvailable;
208 }
209 }
211 if (_boundingState != null)
212 {
213 _boundingState.CurrentCount++;
214 }
217 return DataflowMessageStatus.Accepted;
218 }
219 if (source != null)
220 {
221 _boundingState.PostponedMessages.Push(source, messageHeader);
223 return DataflowMessageStatus.Postponed;
224 }
225 return DataflowMessageStatus.Declined;
226 }
227 }
228
233
235 {
237 {
239 {
241 }
242 if (_boundingState != null && boundingCountChange != 0)
243 {
244 _boundingState.CurrentCount += boundingCountChange;
245 }
248 }
249 }
250
251 private void ProcessAsyncIfNecessary(bool repeat = false)
252 {
254 {
256 }
257 }
258
260 {
261 if ((_messages.IsEmpty && (_decliningPermanently || _boundingState == null || !_boundingState.CountIsLessThanBound || _boundingState.PostponedMessages.Count <= 0)) || CanceledOrFaulted)
262 {
263 return;
264 }
267 {
269 }
270 Task task = new Task(delegate(object thisTargetCore)
271 {
272 ((TargetCore<TInput>)thisTargetCore).ProcessMessagesLoopCore();
275 if (log.IsEnabled())
276 {
277 log.TaskLaunchedForMessageHandling(_owningTarget, task, DataflowEtwProvider.TaskLaunchedReason.ProcessingInputMessages, _messages.Count + ((_boundingState != null) ? _boundingState.PostponedMessages.Count : 0));
278 }
280 if (ex != null)
281 {
282 Task.Factory.StartNew(delegate(object exc)
283 {
286 }
287 }
288
290 {
292 try
293 {
295 bool flag = _boundingState != null && _boundingState.BoundedCapacity > 1;
296 int num = 0;
297 int num2 = 0;
300 {
302 {
304 {
305 _boundingState.OutstandingTransfers--;
306 _messages.Enqueue(result);
308 }
309 }
311 {
313 {
314 break;
315 }
316 }
318 {
320 {
321 break;
322 }
323 if (_keepAliveBanCounter > 0)
324 {
326 break;
327 }
328 num2 = 0;
329 if (!Common.TryKeepAliveUntil(_keepAlivePredicate, this, out messageWithId))
330 {
332 break;
333 }
334 }
335 num++;
336 num2++;
338 }
339 }
340 catch (Exception ex)
341 {
342 Common.StoreDataflowMessageValueIntoExceptionData(ex, messageWithId.Key);
344 }
345 finally
346 {
348 {
351 {
353 }
356 }
357 }
358 }
359
361 {
364 {
367 {
369 }
370 }
373 {
374 bool flag = false;
375 try
376 {
378 }
379 catch
380 {
382 throw;
383 }
384 if (!flag)
385 {
387 }
388 return flag;
389 }
390 return false;
391 }
392
394 {
395 if (_messages.TryDequeue(out messageWithId))
396 {
397 return true;
398 }
400 {
401 return true;
402 }
404 return false;
405 }
406
408 {
409 bool flag = false;
410 long num = -1L;
411 while (true)
412 {
415 {
417 {
418 break;
419 }
420 if (!forPostponementTransfer && _messages.TryDequeue(out result))
421 {
422 return true;
423 }
424 if (!_boundingState.CountIsLessThanBound || !_boundingState.PostponedMessages.TryPop(out item))
425 {
426 if (flag)
427 {
428 flag = false;
429 _boundingState.CurrentCount--;
430 }
431 break;
432 }
433 if (!flag)
434 {
435 flag = true;
437 _boundingState.CurrentCount++;
439 {
440 _boundingState.OutstandingTransfers++;
441 }
442 }
443 goto IL_00d1;
444 }
445 IL_00d1:
446 bool messageConsumed;
447 TInput key = item.Key.ConsumeMessage(item.Value, _owningTarget, out messageConsumed);
448 if (messageConsumed)
449 {
450 result = new KeyValuePair<TInput, long>(key, num);
451 return true;
452 }
454 {
455 _boundingState.OutstandingTransfers--;
456 }
457 }
458 if (_reorderingBuffer != null && num != -1)
459 {
461 }
462 if (flag)
463 {
465 }
466 result = default(KeyValuePair<TInput, long>);
467 return false;
468 }
469
471 {
473 {
475 }
476 }
477
479 {
481 {
482 _completionReserved = true;
484 Task.Factory.StartNew(delegate(object state)
485 {
486 ((TargetCore<TInput>)state).CompleteBlockOncePossible();
488 }
489 }
490
492 {
493 if (_boundingState != null)
494 {
495 Common.ReleaseAllPostponedMessages(_owningTarget, _boundingState.PostponedMessages, ref _exceptions);
496 }
499 while (messages.TryDequeue(out result))
500 {
501 }
502 if (Volatile.Read(ref _exceptions) != null)
503 {
505 }
507 {
508 _completionSource.TrySetCanceled();
509 }
510 else
511 {
512 _completionSource.TrySetResult(default(VoidResult));
513 }
515 if ((_targetCoreOptions & TargetCoreOptions.RepresentsBlockCompletion) != 0 && (log = DataflowEtwProvider.Log).IsEnabled())
516 {
517 log.DataflowBlockCompleted(_owningTarget);
518 }
519 }
520
521 internal void ChangeBoundingCount(int count)
522 {
523 if (_boundingState != null)
524 {
526 {
527 _boundingState.CurrentCount += count;
530 }
531 }
532 }
533
535 {
536 return new DebuggingInformation(this);
537 }
538}
static string Argument_InvalidMessageHeader
Definition SR.cs:24
static string Argument_CantConsumeFromANullSource
Definition SR.cs:22
Definition SR.cs:7
static Exception StartTaskSafe(Task task, TaskScheduler scheduler)
Definition Common.cs:277
static TaskCreationOptions GetCreationOptionsForTask(bool isReplacementReplica=false)
Definition Common.cs:267
static void AddException([NotNull] ref List< Exception > list, Exception exception, bool unwrapInnerExceptions=false)
Definition Common.cs:183
QueuedMap< ISourceBlock< TInput >, DataflowMessageHeader > PostponedMessages
Definition TargetCore.cs:19
readonly Action< KeyValuePair< TInput, long > > _callAction
Definition TargetCore.cs:57
DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, TInput messageValue, ISourceBlock< TInput > source, bool consumeToAccept)
void Complete(Exception exception, bool dropPendingMessages, bool storeExceptionEvenIfAlreadyCompleting=false, bool unwrapInnerExceptions=false, bool revertProcessingState=false)
static readonly Common.KeepAlivePredicate< TargetCore< TInput >, KeyValuePair< TInput, long > > _keepAlivePredicate
Definition TargetCore.cs:44
bool TryGetNextMessageForNewAsyncOperation(out KeyValuePair< TInput, long > messageWithId)
readonly TaskCompletionSource< VoidResult > _completionSource
Definition TargetCore.cs:49
TargetCore(ITargetBlock< TInput > owningTarget, Action< KeyValuePair< TInput, long > > callAction, IReorderingBuffer reorderingBuffer, ExecutionDataflowBlockOptions dataflowBlockOptions, TargetCoreOptions targetCoreOptions)
readonly ITargetBlock< TInput > _owningTarget
Definition TargetCore.cs:51
void SignalOneAsyncMessageCompleted(int boundingCountChange)
readonly ExecutionDataflowBlockOptions _dataflowBlockOptions
Definition TargetCore.cs:55
bool TryGetNextAvailableOrPostponedMessage(out KeyValuePair< TInput, long > messageWithId)
readonly BoundingStateWithPostponed< TInput > _boundingState
Definition TargetCore.cs:61
bool TryConsumePostponedMessage(bool forPostponementTransfer, out KeyValuePair< TInput, long > result)
readonly System.Threading.Tasks.IProducerConsumerQueue< KeyValuePair< TInput, long > > _messages
Definition TargetCore.cs:53
static new TaskFactory< TResult > Factory
Definition Task.cs:56
static bool Read(ref bool location)
Definition Volatile.cs:67