Terraria v1.4.4.9
Terraria source code documentation
Loading...
Searching...
No Matches
TransformManyBlock.cs
Go to the documentation of this file.
4using System.Linq;
6
8
9[DebuggerDisplay("{DebuggerDisplayContent,nq}")]
11public sealed class TransformManyBlock<TInput, TOutput> : IPropagatorBlock<TInput, TOutput>, ITargetBlock<TInput>, IDataflowBlock, ISourceBlock<TOutput>, IReceivableSourceBlock<TOutput>, IDebuggerDisplay
12{
13 private sealed class DebugView
14 {
16
17 private readonly TargetCore<TInput>.DebuggingInformation _targetDebuggingInformation;
18
19 private readonly SourceCore<TOutput>.DebuggingInformation _sourceDebuggingInformation;
20
22
24
26
27 public int CurrentDegreeOfParallelism => _targetDebuggingInformation.CurrentDegreeOfParallelism;
28
29 public Task TaskForOutputProcessing => _sourceDebuggingInformation.TaskForOutputProcessing;
30
32
33 public bool IsDecliningPermanently => _targetDebuggingInformation.IsDecliningPermanently;
34
35 public bool IsCompleted => _sourceDebuggingInformation.IsCompleted;
36
38
40
42
49 }
50
51 private readonly TargetCore<TInput> _target;
52
54
55 private readonly SourceCore<TOutput> _source;
56
57 private object ParallelSourceLock => _source;
58
59 public Task Completion => _source.Completion;
60
61 public int InputCount => _target.InputCount;
62
63 public int OutputCount => _source.OutputCount;
64
65 private int InputCountForDebugger => _target.GetDebuggingInformation().InputCount;
66
67 private int OutputCountForDebugger => _source.GetDebuggingInformation().OutputCount;
68
69 private object DebuggerDisplayContent => $"{Common.GetNameForDebugger(this, _source.DataflowBlockOptions)}, InputCount={InputCountForDebugger}, OutputCount={OutputCountForDebugger}";
70
71 object IDebuggerDisplay.Content => DebuggerDisplayContent;
72
75 {
76 }
77
82
84 : this((Func<TInput, IEnumerable<TOutput>>)null, transform, ExecutionDataflowBlockOptions.Default)
85 {
86 }
87
92
94 {
96 if (transformSync == null && transformAsync == null)
97 {
98 throw new ArgumentNullException("transform");
99 }
100 if (dataflowBlockOptions == null)
101 {
102 throw new ArgumentNullException("dataflowBlockOptions");
103 }
106 if (dataflowBlockOptions.BoundedCapacity > 0)
107 {
109 {
110 ((TransformManyBlock<TInput, TOutput>)owningSource)._target.ChangeBoundingCount(-count);
111 };
112 }
114 {
117 if (dataflowBlockOptions.SupportsParallelExecution && dataflowBlockOptions.EnsureOrdered)
118 {
120 {
121 ((TransformManyBlock<TInput, TOutput>)source)._source.AddMessages(messages);
122 });
123 }
124 if (transformSync != null)
125 {
127 {
130 }
131 else
132 {
134 {
135 transformManyBlock.ProcessMessageWithTask(transformAsync, messageWithId);
137 }
138 _target.Completion.ContinueWith(delegate(Task completed, object state)
139 {
141 if (completed.IsFaulted)
142 {
143 sourceCore.AddAndUnwrapAggregateException(completed.Exception);
144 }
145 sourceCore.Complete();
147 _source.Completion.ContinueWith(delegate(Task completed, object state)
148 {
150 dataflowBlock.Fault(completed.Exception);
153 {
154 ((TargetCore<TInput>)state).Complete(null, dropPendingMessages: true);
155 }, _target);
157 if (log.IsEnabled())
158 {
159 log.DataflowBlockCreated(this, dataflowBlockOptions);
160 }
161 }
162
164 {
165 bool flag = false;
166 try
167 {
169 flag = true;
171 }
172 catch (Exception exception)
173 {
175 {
176 throw;
177 }
178 }
179 finally
180 {
181 if (!flag)
182 {
184 }
185 }
186 }
187
189 {
191 Exception ex = null;
192 try
193 {
194 task = function(messageWithId.Key);
195 }
196 catch (Exception ex2)
197 {
198 ex = ex2;
199 }
200 if (task == null)
201 {
202 if (ex != null && !Common.IsCooperativeCancellation(ex))
203 {
204 Common.StoreDataflowMessageValueIntoExceptionData(ex, messageWithId.Key);
206 }
207 if (_reorderingBuffer != null)
208 {
210 _target.SignalOneAsyncMessageCompleted();
211 }
212 else
213 {
214 _target.SignalOneAsyncMessageCompleted(-1);
215 }
216 }
217 else
218 {
220 {
222 tuple.Item1.AsyncCompleteProcessMessageWithTask(completed, tuple.Item2);
223 }, Tuple.Create(this, messageWithId), CancellationToken.None, Common.GetContinuationOptions(TaskContinuationOptions.ExecuteSynchronously), _source.DataflowBlockOptions.TaskScheduler);
224 }
225 }
226
228 {
229 switch (completed.Status)
230 {
231 case TaskStatus.RanToCompletion:
232 {
233 IEnumerable<TOutput> result = completed.Result;
234 try
235 {
237 }
238 catch (Exception ex)
239 {
241 {
242 Common.StoreDataflowMessageValueIntoExceptionData(ex, messageWithId.Key);
244 }
245 }
246 break;
247 }
248 case TaskStatus.Faulted:
249 {
251 Common.StoreDataflowMessageValueIntoExceptionData(exception, messageWithId.Key, targetInnerExceptions: true);
253 goto case TaskStatus.Canceled;
254 }
255 case TaskStatus.Canceled:
257 break;
258 }
259 _target.SignalOneAsyncMessageCompleted();
260 }
261
263 {
264 if (_reorderingBuffer != null)
265 {
267 }
268 else if (outputItems != null)
269 {
270 if (outputItems is TOutput[] || outputItems is List<TOutput>)
271 {
273 }
274 else
275 {
277 }
278 }
279 else if (_target.IsBounded)
280 {
281 _target.ChangeBoundingCount(-1);
282 }
283 }
284
286 {
288 bool isBounded = target.IsBounded;
289 if (item == null)
290 {
291 _reorderingBuffer.AddItem(id, null, itemIsValid: false);
292 if (isBounded)
293 {
294 target.ChangeBoundingCount(-1);
295 }
296 return;
297 }
298 IList<TOutput> list = item as TOutput[];
299 if (list == null)
300 {
302 }
303 if (list != null && isBounded)
304 {
306 }
307 bool? flag = _reorderingBuffer.AddItemIfNextAndTrusted(id, list, list != null);
308 if (!flag.HasValue)
309 {
310 return;
311 }
312 bool value = flag.Value;
313 List<TOutput> list2 = null;
314 try
315 {
316 if (value)
317 {
319 return;
320 }
321 if (list != null)
322 {
323 list2 = list.ToList();
324 return;
325 }
326 int count = 0;
327 try
328 {
329 list2 = item.ToList();
330 count = list2.Count;
331 }
332 finally
333 {
334 if (isBounded)
335 {
337 }
338 }
339 }
340 finally
341 {
342 _reorderingBuffer.AddItem(id, list2, list2 != null);
343 }
344 }
345
347 {
348 if (_target.IsBounded)
349 {
351 }
352 if (_target.DataflowBlockOptions.MaxDegreeOfParallelism == 1)
353 {
354 _source.AddMessages(outputItems);
355 return;
356 }
358 {
359 _source.AddMessages(outputItems);
360 }
361 }
362
364 {
365 bool flag = _target.DataflowBlockOptions.MaxDegreeOfParallelism == 1 || _reorderingBuffer != null;
366 if (_target.IsBounded)
367 {
368 bool flag2 = false;
369 try
370 {
371 foreach (TOutput outputItem in outputItems)
372 {
373 if (flag2)
374 {
375 _target.ChangeBoundingCount(1);
376 }
377 else
378 {
379 flag2 = true;
380 }
381 if (flag)
382 {
383 _source.AddMessage(outputItem);
384 continue;
385 }
387 {
388 _source.AddMessage(outputItem);
389 }
390 }
391 return;
392 }
393 finally
394 {
395 if (!flag2)
396 {
397 _target.ChangeBoundingCount(-1);
398 }
399 }
400 }
401 if (flag)
402 {
403 foreach (TOutput outputItem2 in outputItems)
404 {
405 _source.AddMessage(outputItem2);
406 }
407 return;
408 }
409 foreach (TOutput outputItem3 in outputItems)
410 {
412 {
413 _source.AddMessage(outputItem3);
414 }
415 }
416 }
417
419 {
420 if (count > 1)
421 {
422 _target.ChangeBoundingCount(count - 1);
423 }
424 else if (count == 0)
425 {
426 _target.ChangeBoundingCount(-1);
427 }
428 }
429
430 public void Complete()
431 {
432 _target.Complete(null, dropPendingMessages: false);
433 }
434
436 {
437 if (exception == null)
438 {
439 throw new ArgumentNullException("exception");
440 }
441 _target.Complete(exception, dropPendingMessages: true);
442 }
443
445 {
446 return _source.LinkTo(target, linkOptions);
447 }
448
450 {
451 return _source.TryReceive(filter, out item);
452 }
453
454 public bool TryReceiveAll([NotNullWhen(true)] out IList<TOutput>? items)
455 {
456 return _source.TryReceiveAll(out items);
457 }
458
463
465 {
466 return _source.ConsumeMessage(messageHeader, target, out messageConsumed);
467 }
468
470 {
471 return _source.ReserveMessage(messageHeader, target);
472 }
473
475 {
476 _source.ReleaseReservation(messageHeader, target);
477 }
478
479 public override string ToString()
480 {
481 return Common.GetNameForDebugger(this, _source.DataflowBlockOptions);
482 }
483}
static bool IsCooperativeCancellation(Exception exception)
Definition Common.cs:88
static TaskContinuationOptions GetContinuationOptions(TaskContinuationOptions toInclude=TaskContinuationOptions.None)
Definition Common.cs:262
static string GetNameForDebugger(IDataflowBlock block, DataflowBlockOptions options=null)
Definition Common.cs:66
static void WireCancellationToComplete(CancellationToken cancellationToken, Task completionTask, Action< object > completeAction, object completeState)
Definition Common.cs:93
static int GetBlockId(IDataflowBlock block)
Definition Common.cs:61
void AddItem(long id, TOutput item, bool itemIsValid)
bool? AddItemIfNextAndTrusted(long id, TOutput item, bool isTrusted)
QueuedMap< ISourceBlock< TInput >, DataflowMessageHeader > PostponedMessages
readonly SourceCore< TOutput >.DebuggingInformation _sourceDebuggingInformation
readonly TargetCore< TInput >.DebuggingInformation _targetDebuggingInformation
readonly TransformManyBlock< TInput, TOutput > _transformManyBlock
DebugView(TransformManyBlock< TInput, TOutput > transformManyBlock)
void ProcessMessageWithTask(Func< TInput, Task< IEnumerable< TOutput > > > function, KeyValuePair< TInput, long > messageWithId)
bool TryReceiveAll([NotNullWhen(true)] out IList< TOutput >? items)
readonly ReorderingBuffer< IEnumerable< TOutput > > _reorderingBuffer
IDisposable LinkTo(ITargetBlock< TOutput > target, DataflowLinkOptions linkOptions)
void AsyncCompleteProcessMessageWithTask(Task< IEnumerable< TOutput > > completed, KeyValuePair< TInput, long > messageWithId)
void StoreOutputItems(KeyValuePair< TInput, long > messageWithId, IEnumerable< TOutput > outputItems)
TransformManyBlock(Func< TInput, Task< IEnumerable< TOutput > > > transform)
TransformManyBlock(Func< TInput, Task< IEnumerable< TOutput > > > transform, ExecutionDataflowBlockOptions dataflowBlockOptions)
TransformManyBlock(Func< TInput, IEnumerable< TOutput > > transform)
void ProcessMessage(Func< TInput, IEnumerable< TOutput > > transformFunction, KeyValuePair< TInput, long > messageWithId)
TransformManyBlock(Func< TInput, IEnumerable< TOutput > > transformSync, Func< TInput, Task< IEnumerable< TOutput > > > transformAsync, ExecutionDataflowBlockOptions dataflowBlockOptions)
void StoreOutputItemsNonReorderedWithIteration(IEnumerable< TOutput > outputItems)
void StoreOutputItemsNonReorderedAtomic(IEnumerable< TOutput > outputItems)
void StoreOutputItemsReordered(long id, IEnumerable< TOutput > item)
bool TryReceive(Predicate< TOutput >? filter, [MaybeNullWhen(false)] out TOutput item)
TransformManyBlock(Func< TInput, IEnumerable< TOutput > > transform, ExecutionDataflowBlockOptions dataflowBlockOptions)