Terraria v1.4.4.9
Terraria source code documentation
Loading...
Searching...
No Matches
TransformBlock.cs
Go to the documentation of this file.
5
7
8[DebuggerDisplay("{DebuggerDisplayContent,nq}")]
10public sealed class TransformBlock<TInput, TOutput> : IPropagatorBlock<TInput, TOutput>, ITargetBlock<TInput>, IDataflowBlock, ISourceBlock<TOutput>, IReceivableSourceBlock<TOutput>, IDebuggerDisplay
11{
12 private sealed class DebugView
13 {
15
16 private readonly TargetCore<TInput>.DebuggingInformation _targetDebuggingInformation;
17
18 private readonly SourceCore<TOutput>.DebuggingInformation _sourceDebuggingInformation;
19
21
23
25
26 public int CurrentDegreeOfParallelism => _targetDebuggingInformation.CurrentDegreeOfParallelism;
27
28 public Task TaskForOutputProcessing => _sourceDebuggingInformation.TaskForOutputProcessing;
29
31
32 public bool IsDecliningPermanently => _targetDebuggingInformation.IsDecliningPermanently;
33
34 public bool IsCompleted => _sourceDebuggingInformation.IsCompleted;
35
37
39
41
43 {
45 _targetDebuggingInformation = transformBlock._target.GetDebuggingInformation();
46 _sourceDebuggingInformation = transformBlock._source.GetDebuggingInformation();
47 }
48 }
49
50 private readonly TargetCore<TInput> _target;
51
53
54 private readonly SourceCore<TOutput> _source;
55
56 private object ParallelSourceLock => _source;
57
58 public Task Completion => _source.Completion;
59
60 public int InputCount => _target.InputCount;
61
62 public int OutputCount => _source.OutputCount;
63
64 private int InputCountForDebugger => _target.GetDebuggingInformation().InputCount;
65
66 private int OutputCountForDebugger => _source.GetDebuggingInformation().OutputCount;
67
68 private object DebuggerDisplayContent => $"{Common.GetNameForDebugger(this, _source.DataflowBlockOptions)}, InputCount={InputCountForDebugger}, OutputCount={OutputCountForDebugger}";
69
70 object IDebuggerDisplay.Content => DebuggerDisplayContent;
71
73 : this(transform, (Func<TInput, Task<TOutput>>)null, ExecutionDataflowBlockOptions.Default)
74 {
75 }
76
81
82 public TransformBlock(Func<TInput, Task<TOutput>> transform)
83 : this((Func<TInput, TOutput>)null, transform, ExecutionDataflowBlockOptions.Default)
84 {
85 }
86
88 : this((Func<TInput, TOutput>)null, transform, dataflowBlockOptions)
89 {
90 }
91
93 {
95 if (transformSync == null && transformAsync == null)
96 {
97 throw new ArgumentNullException("transform");
98 }
99 if (dataflowBlockOptions == null)
100 {
101 throw new ArgumentNullException("dataflowBlockOptions");
102 }
105 if (dataflowBlockOptions.BoundedCapacity > 0)
106 {
108 {
109 ((TransformBlock<TInput, TOutput>)owningSource)._target.ChangeBoundingCount(-count);
110 };
111 }
113 {
116 if (dataflowBlockOptions.SupportsParallelExecution && dataflowBlockOptions.EnsureOrdered)
117 {
118 _reorderingBuffer = new ReorderingBuffer<TOutput>(this, delegate(object owningSource, TOutput message)
119 {
120 ((TransformBlock<TInput, TOutput>)owningSource)._source.AddMessage(message);
121 });
122 }
123 if (transformSync != null)
124 {
126 {
129 }
130 else
131 {
133 {
134 transformBlock.ProcessMessageWithTask(transformAsync, messageWithId);
136 }
137 _target.Completion.ContinueWith(delegate(Task completed, object state)
138 {
140 if (completed.IsFaulted)
141 {
142 sourceCore.AddAndUnwrapAggregateException(completed.Exception);
143 }
144 sourceCore.Complete();
146 _source.Completion.ContinueWith(delegate(Task completed, object state)
147 {
149 dataflowBlock.Fault(completed.Exception);
152 {
153 ((TargetCore<TInput>)state).Complete(null, dropPendingMessages: true);
154 }, _target);
156 if (log.IsEnabled())
157 {
158 log.DataflowBlockCreated(this, dataflowBlockOptions);
159 }
160 }
161
163 {
164 TOutput item = default(TOutput);
165 bool flag = false;
166 try
167 {
168 item = transform(messageWithId.Key);
169 flag = true;
170 }
171 catch (Exception exception)
172 {
174 {
175 throw;
176 }
177 }
178 finally
179 {
180 if (!flag)
181 {
182 _target.ChangeBoundingCount(-1);
183 }
184 if (_reorderingBuffer == null)
185 {
186 if (flag)
187 {
188 if (_target.DataflowBlockOptions.MaxDegreeOfParallelism == 1)
189 {
190 _source.AddMessage(item);
191 }
192 else
193 {
195 {
196 _source.AddMessage(item);
197 }
198 }
199 }
200 }
201 else
202 {
203 _reorderingBuffer.AddItem(messageWithId.Value, item, flag);
204 }
205 }
206 }
207
209 {
210 Task<TOutput> task = null;
211 Exception ex = null;
212 try
213 {
214 task = transform(messageWithId.Key);
215 }
216 catch (Exception ex2)
217 {
218 ex = ex2;
219 }
220 if (task == null)
221 {
222 if (ex != null && !Common.IsCooperativeCancellation(ex))
223 {
224 Common.StoreDataflowMessageValueIntoExceptionData(ex, messageWithId.Key);
226 }
227 if (_reorderingBuffer != null)
228 {
229 _reorderingBuffer.IgnoreItem(messageWithId.Value);
230 }
231 _target.SignalOneAsyncMessageCompleted(-1);
232 }
233 else
234 {
235 task.ContinueWith(delegate(Task<TOutput> completed, object state)
236 {
238 tuple.Item1.AsyncCompleteProcessMessageWithTask(completed, tuple.Item2);
240 }
241 }
242
244 {
245 bool isBounded = _target.IsBounded;
246 bool flag = false;
247 TOutput item = default(TOutput);
248 switch (completed.Status)
249 {
250 case TaskStatus.RanToCompletion:
251 item = completed.Result;
252 flag = true;
253 break;
254 case TaskStatus.Faulted:
255 {
257 Common.StoreDataflowMessageValueIntoExceptionData(exception, messageWithId.Key, targetInnerExceptions: true);
259 break;
260 }
261 }
262 if (!flag && isBounded)
263 {
264 _target.ChangeBoundingCount(-1);
265 }
266 if (_reorderingBuffer == null)
267 {
268 if (flag)
269 {
270 if (_target.DataflowBlockOptions.MaxDegreeOfParallelism == 1)
271 {
272 _source.AddMessage(item);
273 }
274 else
275 {
277 {
278 _source.AddMessage(item);
279 }
280 }
281 }
282 }
283 else
284 {
285 _reorderingBuffer.AddItem(messageWithId.Value, item, flag);
286 }
287 _target.SignalOneAsyncMessageCompleted();
288 }
289
290 public void Complete()
291 {
292 _target.Complete(null, dropPendingMessages: false);
293 }
294
296 {
297 if (exception == null)
298 {
299 throw new ArgumentNullException("exception");
300 }
301 _target.Complete(exception, dropPendingMessages: true);
302 }
303
305 {
306 return _source.LinkTo(target, linkOptions);
307 }
308
310 {
311 return _source.TryReceive(filter, out item);
312 }
313
314 public bool TryReceiveAll([NotNullWhen(true)] out IList<TOutput>? items)
315 {
316 return _source.TryReceiveAll(out items);
317 }
318
323
325 {
326 return _source.ConsumeMessage(messageHeader, target, out messageConsumed);
327 }
328
330 {
331 return _source.ReserveMessage(messageHeader, target);
332 }
333
335 {
336 _source.ReleaseReservation(messageHeader, target);
337 }
338
339 public override string ToString()
340 {
341 return Common.GetNameForDebugger(this, _source.DataflowBlockOptions);
342 }
343}
static bool IsCooperativeCancellation(Exception exception)
Definition Common.cs:88
static TaskContinuationOptions GetContinuationOptions(TaskContinuationOptions toInclude=TaskContinuationOptions.None)
Definition Common.cs:262
static string GetNameForDebugger(IDataflowBlock block, DataflowBlockOptions options=null)
Definition Common.cs:66
static void WireCancellationToComplete(CancellationToken cancellationToken, Task completionTask, Action< object > completeAction, object completeState)
Definition Common.cs:93
static int GetBlockId(IDataflowBlock block)
Definition Common.cs:61
readonly SourceCore< TOutput >.DebuggingInformation _sourceDebuggingInformation
readonly TransformBlock< TInput, TOutput > _transformBlock
readonly TargetCore< TInput >.DebuggingInformation _targetDebuggingInformation
DebugView(TransformBlock< TInput, TOutput > transformBlock)
QueuedMap< ISourceBlock< TInput >, DataflowMessageHeader > PostponedMessages
void AsyncCompleteProcessMessageWithTask(Task< TOutput > completed, KeyValuePair< TInput, long > messageWithId)
TransformBlock(Func< TInput, TOutput > transform)
bool TryReceive(Predicate< TOutput >? filter, [MaybeNullWhen(false)] out TOutput item)
readonly ReorderingBuffer< TOutput > _reorderingBuffer
TransformBlock(Func< TInput, TOutput > transformSync, Func< TInput, Task< TOutput > > transformAsync, ExecutionDataflowBlockOptions dataflowBlockOptions)
IDisposable LinkTo(ITargetBlock< TOutput > target, DataflowLinkOptions linkOptions)
void ProcessMessageWithTask(Func< TInput, Task< TOutput > > transform, KeyValuePair< TInput, long > messageWithId)
TransformBlock(Func< TInput, Task< TOutput > > transform, ExecutionDataflowBlockOptions dataflowBlockOptions)
TransformBlock(Func< TInput, TOutput > transform, ExecutionDataflowBlockOptions dataflowBlockOptions)
bool TryReceiveAll([NotNullWhen(true)] out IList< TOutput >? items)
TransformBlock(Func< TInput, Task< TOutput > > transform)
void ProcessMessage(Func< TInput, TOutput > transform, KeyValuePair< TInput, long > messageWithId)