Terraria v1.4.4.9
Terraria source code documentation
Loading...
Searching...
No Matches
DataflowBlock.cs
Go to the documentation of this file.
6
8
9public static class DataflowBlock
10{
11 [DebuggerDisplay("{DebuggerDisplayContent,nq}")]
14 {
15 private sealed class DebugView
16 {
18
20
25 }
26
27 private readonly ISourceBlock<T> _source;
28
29 private readonly ITargetBlock<T> _target;
30
32
33 Task IDataflowBlock.Completion => _source.Completion;
34
36 {
37 get
38 {
41 return $"{Common.GetNameForDebugger(this)} Source=\"{((debuggerDisplay != null) ? debuggerDisplay.Content : _source)}\", Target=\"{((debuggerDisplay2 != null) ? debuggerDisplay2.Content : _target)}\"";
42 }
43 }
44
45 object IDebuggerDisplay.Content => DebuggerDisplayContent;
46
48 {
50 _target = target;
51 _userProvidedPredicate = predicate;
52 }
53
54 private bool RunPredicate(T item)
55 {
57 }
58
60 {
61 if (!messageHeader.IsValid)
62 {
63 throw new ArgumentException(System.SR.Argument_InvalidMessageHeader, "messageHeader");
64 }
65 if (source == null)
66 {
67 throw new ArgumentNullException("source");
68 }
70 {
72 }
73 return DataflowMessageStatus.Declined;
74 }
75
80
85
90
92 {
93 _target.Complete();
94 }
95
100
105 }
106
107 [DebuggerDisplay("{DebuggerDisplayContent,nq}")]
109 private sealed class SendAsyncSource<TOutput> : TaskCompletionSource<bool>, ISourceBlock<TOutput>, IDataflowBlock, IDebuggerDisplay
110 {
111 private sealed class DebugView
112 {
114
116
117 public TOutput Message => _source._messageValue;
118
120
125 }
126
128
129 private readonly TOutput _messageValue;
130
132
134
136
138
139 Task IDataflowBlock.Completion => base.Task;
140
142 {
143 get
144 {
146 return $"{Common.GetNameForDebugger(this)} Message={_messageValue}, Target=\"{((debuggerDisplay != null) ? debuggerDisplay.Content : _target)}\"";
147 }
148 }
149
150 object IDebuggerDisplay.Content => DebuggerDisplayContent;
151
153 {
154 _target = target;
156 if (cancellationToken.CanBeCanceled)
157 {
160 try
161 {
163 }
164 catch
165 {
166 GC.SuppressFinalize(this);
167 throw;
168 }
169 }
170 }
171
173 {
175 {
177 }
178 }
179
180 private void CompleteAsAccepted(bool runAsync)
181 {
183 {
184 try
185 {
186 ((SendAsyncSource<TOutput>)state).TrySetResult(result: true);
187 }
189 {
190 }
191 }, this, runAsync);
192 }
193
194 private void CompleteAsDeclined(bool runAsync)
195 {
197 {
198 try
199 {
200 ((SendAsyncSource<TOutput>)state).TrySetResult(result: false);
201 }
203 {
204 }
205 }, this, runAsync);
206 }
207
209 {
211 {
213 try
214 {
215 tuple.Item1.TrySetException(tuple.Item2);
216 }
218 {
219 }
220 }, Tuple.Create(this, exception), runAsync);
221 }
222
223 private void CompleteAsCanceled(bool runAsync)
224 {
226 {
227 try
228 {
229 ((SendAsyncSource<TOutput>)state).TrySetCanceled();
230 }
232 {
233 }
234 }, this, runAsync);
235 }
236
253
254 private void OfferToTargetAsync()
255 {
257 {
258 ((SendAsyncSource<TOutput>)state).OfferToTarget();
260 }
261
262 private static void CancellationHandler(object state)
263 {
265 if (sendAsyncSource != null && sendAsyncSource._cancellationState == 1 && Interlocked.CompareExchange(ref sendAsyncSource._cancellationState, 3, 1) == 1)
266 {
267 sendAsyncSource.CompleteAsCanceled(runAsync: true);
268 }
269 }
270
271 internal void OfferToTarget()
272 {
273 try
274 {
275 bool flag = _cancellationState != 0;
277 {
278 case DataflowMessageStatus.Accepted:
279 if (!flag)
280 {
282 }
283 break;
284 case DataflowMessageStatus.Declined:
285 case DataflowMessageStatus.DecliningPermanently:
287 break;
288 case DataflowMessageStatus.Postponed:
289 case DataflowMessageStatus.NotAvailable:
290 break;
291 }
292 }
293 catch (Exception ex)
294 {
295 Common.StoreDataflowMessageValueIntoExceptionData(ex, _messageValue);
297 }
298 }
299
301 {
302 if (!messageHeader.IsValid)
303 {
304 throw new ArgumentException(System.SR.Argument_InvalidMessageHeader, "messageHeader");
305 }
306 if (target == null)
307 {
308 throw new ArgumentNullException("target");
309 }
310 if (base.Task.IsCompleted)
311 {
312 messageConsumed = false;
313 return default(TOutput);
314 }
315 if (messageHeader.Id == 1)
316 {
319 {
321 messageConsumed = true;
322 return _messageValue;
323 }
324 }
325 messageConsumed = false;
326 return default(TOutput);
327 }
328
330 {
331 if (!messageHeader.IsValid)
332 {
333 throw new ArgumentException(System.SR.Argument_InvalidMessageHeader, "messageHeader");
334 }
335 if (target == null)
336 {
337 throw new ArgumentNullException("target");
338 }
339 if (base.Task.IsCompleted)
340 {
341 return false;
342 }
343 if (messageHeader.Id == 1)
344 {
345 if (_cancellationState != 0)
346 {
348 }
349 return true;
350 }
351 return false;
352 }
353
355 {
356 if (!messageHeader.IsValid)
357 {
358 throw new ArgumentException(System.SR.Argument_InvalidMessageHeader, "messageHeader");
359 }
360 if (target == null)
361 {
362 throw new ArgumentNullException("target");
363 }
364 if (messageHeader.Id != 1)
365 {
367 }
368 if (base.Task.IsCompleted)
369 {
370 return;
371 }
372 if (_cancellationState != 0)
373 {
375 {
377 }
379 {
381 }
382 }
384 }
385
390
395
400 }
401
411
412 [DebuggerDisplay("{DebuggerDisplayContent,nq}")]
414 {
420
422 {
424 receiveTarget.TryCleanupAndComplete(ReceiveCoreByLinkingCleanupReason.Cancellation);
425 };
426
427 private T _receivedValue;
428
430
431 internal bool _cleanupReserved;
432
434
436
437 internal Timer _timer;
438
440
442
443 internal object IncomingLock => _cts;
444
445 Task IDataflowBlock.Completion
446 {
447 get
448 {
450 }
451 }
452
453 private object DebuggerDisplayContent => $"{Common.GetNameForDebugger(this)} IsCompleted={base.Task.IsCompleted}";
454
455 object IDebuggerDisplay.Content => DebuggerDisplayContent;
456
457 internal ReceiveTarget()
458 {
459 }
460
462 {
463 if (!messageHeader.IsValid)
464 {
465 throw new ArgumentException(System.SR.Argument_InvalidMessageHeader, "messageHeader");
466 }
467 if (source == null && consumeToAccept)
468 {
470 }
473 {
474 return DataflowMessageStatus.DecliningPermanently;
475 }
477 {
479 {
480 return DataflowMessageStatus.DecliningPermanently;
481 }
482 try
483 {
484 bool messageConsumed = true;
485 T receivedValue = (T)(consumeToAccept ? ((object)source.ConsumeMessage(messageHeader, this, out messageConsumed)) : ((object)messageValue));
486 if (messageConsumed)
487 {
490 _cleanupReserved = true;
491 }
492 }
493 catch (Exception ex)
494 {
495 dataflowMessageStatus = DataflowMessageStatus.DecliningPermanently;
496 Common.StoreDataflowMessageValueIntoExceptionData(ex, messageValue);
498 _cleanupReserved = true;
499 }
500 }
501 switch (dataflowMessageStatus)
502 {
503 case DataflowMessageStatus.Accepted:
505 break;
506 case DataflowMessageStatus.DecliningPermanently:
508 break;
509 }
511 }
512
514 {
516 {
517 return false;
518 }
520 {
522 {
523 return false;
524 }
525 _cleanupReserved = true;
526 }
527 CleanupAndComplete(reason);
528 return true;
529 }
530
532 {
534 if (reason != ReceiveCoreByLinkingCleanupReason.SourceCompletion && unlink != null)
535 {
537 if (disposable != null)
538 {
539 try
540 {
541 disposable.Dispose();
542 }
544 {
546 reason = ReceiveCoreByLinkingCleanupReason.SourceProtocolError;
547 }
548 }
549 }
550 if (_timer != null)
551 {
552 _timer.Dispose();
553 }
554 if (reason != ReceiveCoreByLinkingCleanupReason.Cancellation)
555 {
557 {
558 reason = ReceiveCoreByLinkingCleanupReason.Cancellation;
559 }
560 _cts.Cancel();
561 }
563 switch (reason)
564 {
567 {
569 try
570 {
571 receiveTarget2.TrySetResult(receiveTarget2._receivedValue);
572 }
574 {
575 }
577 return;
578 default:
580 {
582 try
583 {
584 receiveTarget3.TrySetCanceled();
585 }
587 {
588 }
590 return;
591 case ReceiveCoreByLinkingCleanupReason.SourceCompletion:
592 if (_receivedException == null)
593 {
595 }
596 break;
598 if (_receivedException == null)
599 {
601 }
602 break;
603 case ReceiveCoreByLinkingCleanupReason.SourceProtocolError:
604 case ReceiveCoreByLinkingCleanupReason.ErrorDuringCleanup:
605 break;
606 }
608 {
610 try
611 {
613 }
615 {
616 }
618 }
619
624
626 {
628 }
629
634
636 {
637 ((IDataflowBlock)this).Complete();
638 }
639 }
640
641 [DebuggerDisplay("{DebuggerDisplayContent,nq}")]
643 {
644 internal static readonly Func<Task<bool>, object, bool> s_handleCompletion = delegate(Task<bool> antecedent, object state)
645 {
647 outputAvailableAsyncTarget._ctr.Dispose();
648 return antecedent.GetAwaiter().GetResult();
649 };
650
652
654
656
657 Task IDataflowBlock.Completion
658 {
659 get
660 {
662 }
663 }
664
665 private object DebuggerDisplayContent => $"{Common.GetNameForDebugger(this)} IsCompleted={base.Task.IsCompleted}";
666
667 object IDebuggerDisplay.Content => DebuggerDisplayContent;
668
679
681 {
684 {
685 unlinker.Dispose();
686 }
687 }
688
690 {
691 if (!messageHeader.IsValid)
692 {
693 throw new ArgumentException(System.SR.Argument_InvalidMessageHeader, "messageHeader");
694 }
695 if (source == null)
696 {
697 throw new ArgumentNullException("source");
698 }
699 TrySetResult(result: true);
700 return DataflowMessageStatus.DecliningPermanently;
701 }
702
704 {
705 TrySetResult(result: false);
706 }
707
709 {
710 if (exception == null)
711 {
712 throw new ArgumentNullException("exception");
713 }
714 TrySetResult(result: false);
715 }
716 }
717
718 [DebuggerDisplay("{DebuggerDisplayContent,nq}")]
720 private sealed class EncapsulatingPropagator<TInput, TOutput> : IPropagatorBlock<TInput, TOutput>, ITargetBlock<TInput>, IDataflowBlock, ISourceBlock<TOutput>, IReceivableSourceBlock<TOutput>, IDebuggerDisplay
721 {
735
737
739
740 public Task Completion => _source.Completion;
741
743 {
744 get
745 {
748 return $"{Common.GetNameForDebugger(this)} Target=\"{((debuggerDisplay != null) ? debuggerDisplay.Content : _target)}\", Source=\"{((debuggerDisplay2 != null) ? debuggerDisplay2.Content : _source)}\"";
749 }
750 }
751
752 object IDebuggerDisplay.Content => DebuggerDisplayContent;
753
759
760 public void Complete()
761 {
762 _target.Complete();
763 }
764
766 {
767 if (exception == null)
768 {
769 throw new ArgumentNullException("exception");
770 }
771 _target.Fault(exception);
772 }
773
778
783
785 {
787 {
788 return receivableSourceBlock.TryReceive(filter, out item);
789 }
790 item = default(TOutput);
791 return false;
792 }
793
794 public bool TryReceiveAll([NotNullWhen(true)] out IList<TOutput> items)
795 {
797 {
798 return receivableSourceBlock.TryReceiveAll(out items);
799 }
800 items = null;
801 return false;
802 }
803
808
813
818 }
819
820 [DebuggerDisplay("{DebuggerDisplayContent,nq}")]
822 {
823 internal static readonly Func<object, int> s_processBranchFunction = delegate(object state)
824 {
825 Tuple<Action<T>, T, int> tuple = (Tuple<Action<T>, T, int>)state;
826 tuple.Item1(tuple.Item2);
827 return tuple.Item3;
828 };
829
830 private readonly StrongBox<Task> _completed;
831
832 Task IDataflowBlock.Completion
833 {
834 get
835 {
837 }
838 }
839
840 private object DebuggerDisplayContent => $"{Common.GetNameForDebugger(this)} IsCompleted={base.Task.IsCompleted}";
841
842 object IDebuggerDisplay.Content => DebuggerDisplayContent;
843
856
858 {
859 if (!messageHeader.IsValid)
860 {
861 throw new ArgumentException(System.SR.Argument_InvalidMessageHeader, "messageHeader");
862 }
863 if (source == null && consumeToAccept)
864 {
866 }
868 {
869 if (_completed.Value != null || base.Task.IsCompleted)
870 {
871 return DataflowMessageStatus.DecliningPermanently;
872 }
873 if (consumeToAccept)
874 {
875 messageValue = source.ConsumeMessage(messageHeader, this, out var messageConsumed);
876 if (!messageConsumed)
877 {
878 return DataflowMessageStatus.NotAvailable;
879 }
880 }
882 _completed.Value = base.Task;
883 return DataflowMessageStatus.Accepted;
884 }
885 }
886
888 {
890 {
892 }
893 }
894
896 {
897 ((IDataflowBlock)this).Complete();
898 }
899 }
900
901 [DebuggerDisplay("{DebuggerDisplayContent,nq}")]
903 private sealed class SourceObservable<TOutput> : IObservable<TOutput>, IDebuggerDisplay
904 {
905 private sealed class DebugView
906 {
908
910 public IObserver<TOutput>[] Observers => _observable._observersState.Observers.ToArray();
911
916 }
917
918 private sealed class ObserversState
919 {
921
922 internal readonly ActionBlock<TOutput> Target;
923
925
927
929
931
950
951 private Task ProcessItemAsync(TOutput item)
952 {
954 lock (Observable._SubscriptionLock)
955 {
957 }
958 try
959 {
961 {
963 {
964 Task<bool> task = targetObserver.SendAsyncToTarget(item);
965 if (task.Status != TaskStatus.RanToCompletion)
966 {
967 if (_tempSendAsyncTaskList == null)
968 {
970 }
972 }
973 }
974 else
975 {
976 item2.OnNext(item);
977 }
978 }
980 {
983 return result;
984 }
985 }
986 catch (Exception exception)
987 {
988 return Common.CreateTaskFromException<VoidResult>(exception);
989 }
991 }
992
994 {
996 lock (Observable._SubscriptionLock)
997 {
999 if (targetException != null)
1000 {
1001 Observable.ResetObserverState();
1002 }
1004 }
1005 if (observers.Count <= 0)
1006 {
1007 return;
1008 }
1009 Exception ex = targetException ?? Observable.GetCompletionError();
1010 try
1011 {
1012 if (ex != null)
1013 {
1014 foreach (IObserver<TOutput> item in observers)
1015 {
1016 item.OnError(ex);
1017 }
1018 return;
1019 }
1021 {
1022 item2.OnCompleted();
1023 }
1024 }
1025 catch (Exception error)
1026 {
1028 }
1029 }
1030 }
1031
1033
1034 private readonly object _SubscriptionLock = new object();
1035
1037
1039
1041 {
1042 get
1043 {
1045 return $"Observers={_observersState.Observers.Count}, Block=\"{((debuggerDisplay != null) ? debuggerDisplay.Content : _source)}\"";
1046 }
1047 }
1048
1049 object IDebuggerDisplay.Content => DebuggerDisplayContent;
1050
1055
1061
1071
1073 {
1074 if (observer == null)
1075 {
1076 throw new ArgumentNullException("observer");
1077 }
1079 Exception ex = null;
1081 {
1083 {
1084 _observersState.Observers = _observersState.Observers.Add(observer);
1086 {
1087 _observersState.Unlinker = _source.LinkTo(_observersState.Target);
1088 if (_observersState.Unlinker == null)
1089 {
1090 _observersState.Observers = ImmutableArray<IObserver<TOutput>>.Empty;
1091 return Disposables.Nop;
1092 }
1093 }
1095 {
1096 s.Unsubscribe(o);
1097 }, this, observer);
1098 }
1100 }
1101 if (ex != null)
1102 {
1103 observer.OnError(ex);
1104 }
1105 else
1106 {
1107 observer.OnCompleted();
1108 }
1109 return Disposables.Nop;
1110 }
1111
1113 {
1115 {
1117 if (observersState.Observers.Contains(observer))
1118 {
1119 if (observersState.Observers.Count == 1)
1120 {
1122 }
1123 else
1124 {
1125 observersState.Observers = observersState.Observers.Remove(observer);
1126 }
1127 }
1128 }
1129 }
1130
1140 }
1141
1142 [DebuggerDisplay("{DebuggerDisplayContent,nq}")]
1143 private sealed class TargetObserver<TInput> : IObserver<TInput>, IDebuggerDisplay
1144 {
1146
1148 {
1149 get
1150 {
1152 return $"Block=\"{((debuggerDisplay != null) ? debuggerDisplay.Content : _target)}\"";
1153 }
1154 }
1155
1156 object IDebuggerDisplay.Content => DebuggerDisplayContent;
1157
1159 {
1160 _target = target;
1161 }
1162
1164 {
1166 task.GetAwaiter().GetResult();
1167 }
1168
1169 void IObserver<TInput>.OnCompleted()
1170 {
1171 _target.Complete();
1172 }
1173
1175 {
1176 _target.Fault(error);
1177 }
1178
1180 {
1181 return _target.SendAsync(value);
1182 }
1183 }
1184
1185 private sealed class NullTargetBlock<TInput> : ITargetBlock<TInput>, IDataflowBlock
1186 {
1188
1189 Task IDataflowBlock.Completion => LazyInitializer.EnsureInitialized(ref _completion, () => new TaskCompletionSource<VoidResult>().Task);
1190
1192 {
1193 if (!messageHeader.IsValid)
1194 {
1195 throw new ArgumentException(System.SR.Argument_InvalidMessageHeader, "messageHeader");
1196 }
1197 if (consumeToAccept)
1198 {
1199 if (source == null)
1200 {
1201 throw new ArgumentException(System.SR.Argument_CantConsumeFromANullSource, "consumeToAccept");
1202 }
1203 source.ConsumeMessage(messageHeader, this, out var messageConsumed);
1204 if (!messageConsumed)
1205 {
1206 return DataflowMessageStatus.NotAvailable;
1207 }
1208 }
1209 return DataflowMessageStatus.Accepted;
1210 }
1211
1213 {
1214 }
1215
1219 }
1220
1221 private static readonly Action<object> _cancelCts = delegate(object state)
1222 {
1223 ((CancellationTokenSource)state).Cancel();
1224 };
1225
1227 {
1228 BoundedCapacity = 1
1229 };
1230
1232 {
1233 if (source == null)
1234 {
1235 throw new ArgumentNullException("source");
1236 }
1237 if (target == null)
1238 {
1239 throw new ArgumentNullException("target");
1240 }
1241 return source.LinkTo(target, DataflowLinkOptions.Default);
1242 }
1243
1245 {
1246 return source.LinkTo(target, DataflowLinkOptions.Default, predicate);
1247 }
1248
1250 {
1251 if (source == null)
1252 {
1253 throw new ArgumentNullException("source");
1254 }
1255 if (target == null)
1256 {
1257 throw new ArgumentNullException("target");
1258 }
1259 if (linkOptions == null)
1260 {
1261 throw new ArgumentNullException("linkOptions");
1262 }
1263 if (predicate == null)
1264 {
1265 throw new ArgumentNullException("predicate");
1266 }
1268 return source.LinkTo(target2, linkOptions);
1269 }
1270
1271 public static bool Post<TInput>(this ITargetBlock<TInput> target, TInput item)
1272 {
1273 if (target == null)
1274 {
1275 throw new ArgumentNullException("target");
1276 }
1277 return target.OfferMessage(Common.SingleMessageHeader, item, null, consumeToAccept: false) == DataflowMessageStatus.Accepted;
1278 }
1279
1281 {
1282 return target.SendAsync(item, CancellationToken.None);
1283 }
1284
1286 {
1287 if (target == null)
1288 {
1289 throw new ArgumentNullException("target");
1290 }
1291 if (cancellationToken.IsCancellationRequested)
1292 {
1293 return Common.CreateTaskFromCancellation<bool>(cancellationToken);
1294 }
1296 try
1297 {
1298 switch (target.OfferMessage(Common.SingleMessageHeader, item, null, consumeToAccept: false))
1299 {
1300 case DataflowMessageStatus.Accepted:
1302 case DataflowMessageStatus.DecliningPermanently:
1304 default:
1306 break;
1307 }
1308 }
1309 catch (Exception ex)
1310 {
1311 Common.StoreDataflowMessageValueIntoExceptionData(ex, item);
1312 return Common.CreateTaskFromException<bool>(ex);
1313 }
1314 sendAsyncSource.OfferToTarget();
1315 return sendAsyncSource.Task;
1316 }
1317
1319 {
1320 if (source == null)
1321 {
1322 throw new ArgumentNullException("source");
1323 }
1324 return source.TryReceive(null, out item);
1325 }
1326
1331
1336
1341
1343 {
1344 if (source == null)
1345 {
1346 throw new ArgumentNullException("source");
1347 }
1349 {
1351 }
1352 return source.ReceiveCore(attemptTryReceive: true, timeout, cancellationToken);
1353 }
1354
1356 {
1358 }
1359
1364
1366 {
1367 return source.Receive(timeout, CancellationToken.None);
1368 }
1369
1371 {
1372 if (source == null)
1373 {
1374 throw new ArgumentNullException("source");
1375 }
1377 {
1379 }
1380 cancellationToken.ThrowIfCancellationRequested();
1382 {
1383 return item;
1384 }
1386 try
1387 {
1388 return task.GetAwaiter().GetResult();
1389 }
1390 catch
1391 {
1392 if (task.IsCanceled)
1393 {
1394 cancellationToken.ThrowIfCancellationRequested();
1395 }
1396 throw;
1397 }
1398 }
1399
1401 {
1402 if (cancellationToken.IsCancellationRequested)
1403 {
1404 return Common.CreateTaskFromCancellation<TOutput>(cancellationToken);
1405 }
1407 {
1408 try
1409 {
1410 if (receivableSourceBlock.TryReceive(null, out var item))
1411 {
1412 return Task.FromResult(item);
1413 }
1414 }
1415 catch (Exception exception)
1416 {
1417 return Common.CreateTaskFromException<TOutput>(exception);
1418 }
1419 }
1420 int num = (int)timeout.TotalMilliseconds;
1421 if (num == 0)
1422 {
1423 return Common.CreateTaskFromException<TOutput>(ReceiveTarget<TOutput>.CreateExceptionForTimeout());
1424 }
1426 }
1427
1429 {
1431 try
1432 {
1433 if (cancellationToken.CanBeCanceled)
1434 {
1435 receiveTarget._externalCancellationToken = cancellationToken;
1436 receiveTarget._regFromExternalCancellationToken = cancellationToken.Register(_cancelCts, receiveTarget._cts);
1437 }
1438 if (millisecondsTimeout > 0)
1439 {
1440 receiveTarget._timer = new Timer(ReceiveTarget<TOutput>.CachedLinkingTimerCallback, receiveTarget, millisecondsTimeout, -1);
1441 }
1442 if (receiveTarget._cts.Token.CanBeCanceled)
1443 {
1444 receiveTarget._cts.Token.Register(ReceiveTarget<TOutput>.CachedLinkingCancellationCallback, receiveTarget);
1445 }
1447 if (Volatile.Read(ref receiveTarget._cleanupReserved))
1448 {
1449 Interlocked.CompareExchange(ref receiveTarget._unlink, null, comparand)?.Dispose();
1450 }
1451 }
1453 {
1454 receiveTarget._receivedException = receivedException;
1455 receiveTarget.TryCleanupAndComplete(ReceiveCoreByLinkingCleanupReason.SourceProtocolError);
1456 }
1457 return receiveTarget.Task;
1458 }
1459
1461 {
1462 return source.OutputAvailableAsync(CancellationToken.None);
1463 }
1464
1466 {
1467 if (source == null)
1468 {
1469 throw new ArgumentNullException("source");
1470 }
1471 if (cancellationToken.IsCancellationRequested)
1472 {
1473 return Common.CreateTaskFromCancellation<bool>(cancellationToken);
1474 }
1476 try
1477 {
1478 outputAvailableAsyncTarget._unlinker = source.LinkTo(outputAvailableAsyncTarget, DataflowLinkOptions.UnlinkAfterOneAndPropagateCompletion);
1479 if (outputAvailableAsyncTarget.Task.IsCompleted)
1480 {
1481 return outputAvailableAsyncTarget.Task;
1482 }
1483 if (cancellationToken.CanBeCanceled)
1484 {
1485 outputAvailableAsyncTarget._ctr = cancellationToken.Register(OutputAvailableAsyncTarget<TOutput>.s_cancelAndUnlink, outputAvailableAsyncTarget);
1486 }
1488 }
1489 catch (Exception exception)
1490 {
1491 outputAvailableAsyncTarget.TrySetException(exception);
1492 outputAvailableAsyncTarget.AttemptThreadSafeUnlink();
1493 return outputAvailableAsyncTarget.Task;
1494 }
1495 }
1496
1498 {
1499 if (target == null)
1500 {
1501 throw new ArgumentNullException("target");
1502 }
1503 if (source == null)
1504 {
1505 throw new ArgumentNullException("source");
1506 }
1508 }
1509
1514
1516 {
1517 if (source1 == null)
1518 {
1519 throw new ArgumentNullException("source1");
1520 }
1521 if (action1 == null)
1522 {
1523 throw new ArgumentNullException("action1");
1524 }
1525 if (source2 == null)
1526 {
1527 throw new ArgumentNullException("source2");
1528 }
1529 if (action2 == null)
1530 {
1531 throw new ArgumentNullException("action2");
1532 }
1533 if (dataflowBlockOptions == null)
1534 {
1535 throw new ArgumentNullException("dataflowBlockOptions");
1536 }
1538 }
1539
1544
1546 {
1547 if (source1 == null)
1548 {
1549 throw new ArgumentNullException("source1");
1550 }
1551 if (action1 == null)
1552 {
1553 throw new ArgumentNullException("action1");
1554 }
1555 if (source2 == null)
1556 {
1557 throw new ArgumentNullException("source2");
1558 }
1559 if (action2 == null)
1560 {
1561 throw new ArgumentNullException("action2");
1562 }
1563 if (source3 == null)
1564 {
1565 throw new ArgumentNullException("source3");
1566 }
1567 if (action3 == null)
1568 {
1569 throw new ArgumentNullException("action3");
1570 }
1571 if (dataflowBlockOptions == null)
1572 {
1573 throw new ArgumentNullException("dataflowBlockOptions");
1574 }
1576 }
1577
1579 {
1580 bool flag = source3 != null;
1581 if (dataflowBlockOptions.CancellationToken.IsCancellationRequested)
1582 {
1583 return Common.CreateTaskFromCancellation<int>(dataflowBlockOptions.CancellationToken);
1584 }
1585 try
1586 {
1589 {
1590 return task;
1591 }
1592 }
1593 catch (Exception exception)
1594 {
1595 return Common.CreateTaskFromException<int>(exception);
1596 }
1598 }
1599
1601 {
1603 {
1604 task = null;
1605 return false;
1606 }
1608 return true;
1609 }
1610
1612 {
1613 bool flag = source3 != null;
1617 Task<int>[] array = new Task<int>[flag ? 3 : 2];
1620 if (flag)
1621 {
1623 }
1625 Task.Factory.ContinueWhenAll(array, delegate(Task<int>[] tasks)
1626 {
1627 List<Exception> list = null;
1628 int num = -1;
1629 foreach (Task<int> task in tasks)
1630 {
1631 switch (task.Status)
1632 {
1633 case TaskStatus.Faulted:
1634 Common.AddException(ref list, task.Exception, unwrapInnerExceptions: true);
1635 break;
1636 case TaskStatus.RanToCompletion:
1637 {
1638 int result2 = task.Result;
1639 if (result2 >= 0)
1640 {
1641 num = result2;
1642 }
1643 break;
1644 }
1645 }
1646 }
1647 if (list != null)
1648 {
1649 result.TrySetException(list);
1650 }
1651 else if (num >= 0)
1652 {
1653 result.TrySetResult(num);
1654 }
1655 else
1656 {
1657 result.TrySetCanceled();
1658 }
1659 cts.Dispose();
1661 return result.Task;
1662 }
1663
1665 {
1666 if (cts.IsCancellationRequested)
1667 {
1668 return Common.CreateTaskFromCancellation<int>(cts.Token);
1669 }
1672 try
1673 {
1675 }
1676 catch (Exception exception)
1677 {
1678 cts.Cancel();
1679 return Common.CreateTaskFromException<int>(exception);
1680 }
1681 return chooseTarget.Task.ContinueWith(delegate(Task<T> completed)
1682 {
1683 try
1684 {
1685 if (completed.Status == TaskStatus.RanToCompletion)
1686 {
1687 cts.Cancel();
1688 action(completed.Result);
1689 return branchId;
1690 }
1691 return -1;
1692 }
1693 finally
1694 {
1695 unlink.Dispose();
1696 }
1698 }
1699
1701 {
1702 if (source == null)
1703 {
1704 throw new ArgumentNullException("source");
1705 }
1706 return SourceObservable<TOutput>.From(source);
1707 }
1708
1710 {
1711 if (target == null)
1712 {
1713 throw new ArgumentNullException("target");
1714 }
1715 return new TargetObserver<TInput>(target);
1716 }
1717
1719 {
1720 return new NullTargetBlock<TInput>();
1721 }
1722
1724 {
1725 if (source == null)
1726 {
1727 throw new ArgumentNullException("source");
1728 }
1731 {
1732 while (await source.OutputAvailableAsync(cancellationToken).ConfigureAwait(continueOnCapturedContext: false))
1733 {
1734 TOutput item;
1735 while (source.TryReceive(out item))
1736 {
1737 yield return item;
1738 }
1739 }
1740 }
1741 }
1742}
bool ICollection< KeyValuePair< TKey, TValue > >. Remove(KeyValuePair< TKey, TValue > keyValuePair)
bool ICollection< KeyValuePair< TKey, TValue > >. Contains(KeyValuePair< TKey, TValue > keyValuePair)
void Add(TKey key, TValue value)
static bool HasShutdownStarted
static void SuppressFinalize(object obj)
Definition GC.cs:202
Definition GC.cs:8
static string Argument_InvalidMessageHeader
Definition SR.cs:24
static string InvalidOperation_MessageNotReservedByTarget
Definition SR.cs:34
static string NotSupported_MemberNotNeeded
Definition SR.cs:36
static string InvalidOperation_DataNotAvailableForReceive
Definition SR.cs:30
static string ArgumentOutOfRange_NeedNonNegOrNegative1
Definition SR.cs:1072
static string InvalidOperation_ErrorDuringCleanup
Definition SR.cs:38
static string Argument_CantConsumeFromANullSource
Definition SR.cs:22
Definition SR.cs:7
static CancellationTokenSource CreateLinkedTokenSource(CancellationToken token1, CancellationToken token2)
static int CompareExchange(ref int location1, int value, int comparand)
static readonly Func< object, int > s_processBranchFunction
DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock< T > source, bool consumeToAccept)
ChooseTarget(StrongBox< Task > completed, CancellationToken cancellationToken)
readonly EncapsulatingPropagator< TInput, TOutput > _propagator
DebugView(EncapsulatingPropagator< TInput, TOutput > propagator)
TOutput ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock< TOutput > target, out bool messageConsumed)
bool TryReceive(Predicate< TOutput > filter, [MaybeNullWhen(false)] out TOutput item)
bool ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock< TOutput > target)
void ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock< TOutput > target)
bool TryReceiveAll([NotNullWhen(true)] out IList< TOutput > items)
DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, TInput messageValue, ISourceBlock< TInput > source, bool consumeToAccept)
IDisposable LinkTo(ITargetBlock< TOutput > target, DataflowLinkOptions linkOptions)
EncapsulatingPropagator(ITargetBlock< TInput > target, ISourceBlock< TOutput > source)
FilteredLinkPropagator(ISourceBlock< T > source, ITargetBlock< T > target, Predicate< T > predicate)
static readonly Func< Task< bool >, object, bool > s_handleCompletion
static readonly Action< object > CachedLinkingCancellationCallback
void CleanupAndComplete(ReceiveCoreByLinkingCleanupReason reason)
bool TryCleanupAndComplete(ReceiveCoreByLinkingCleanupReason reason)
void RunCompletionAction(Action< object > completionAction, object completionActionState, bool runAsync)
SendAsyncSource(ITargetBlock< TOutput > target, TOutput messageValue, CancellationToken cancellationToken)
void CompleteAsFaulted(Exception exception, bool runAsync)
static IObservable< TOutput > From(ISourceBlock< TOutput > source)
static readonly ConditionalWeakTable< ISourceBlock< TOutput >, SourceObservable< TOutput > > _table
ImmutableArray< IObserver< TOutput > > ResetObserverState()
static Task< TOutput > ReceiveCoreByLinking< TOutput >(ISourceBlock< TOutput > source, int millisecondsTimeout, CancellationToken cancellationToken)
static Task< int > ChooseCore< T1, T2, T3 >(ISourceBlock< T1 > source1, Action< T1 > action1, ISourceBlock< T2 > source2, Action< T2 > action2, ISourceBlock< T3 > source3, Action< T3 > action3, DataflowBlockOptions dataflowBlockOptions)
static Task< int > ChooseCoreByLinking< T1, T2, T3 >(ISourceBlock< T1 > source1, Action< T1 > action1, ISourceBlock< T2 > source2, Action< T2 > action2, ISourceBlock< T3 > source3, Action< T3 > action3, DataflowBlockOptions dataflowBlockOptions)
static Task< bool > SendAsync< TInput >(this ITargetBlock< TInput > target, TInput item)
static Task< int > Choose< T1, T2 >(ISourceBlock< T1 > source1, Action< T1 > action1, ISourceBlock< T2 > source2, Action< T2 > action2)
static readonly Action< object > _cancelCts
static IDisposable LinkTo< TOutput >(this ISourceBlock< TOutput > source, ITargetBlock< TOutput > target)
static bool Post< TInput >(this ITargetBlock< TInput > target, TInput item)
static TOutput Receive< TOutput >(this ISourceBlock< TOutput > source)
static Task< TOutput > ReceiveCore< TOutput >(this ISourceBlock< TOutput > source, bool attemptTryReceive, TimeSpan timeout, CancellationToken cancellationToken)
static bool TryChooseFromSource< T >(ISourceBlock< T > source, Action< T > action, int branchId, TaskScheduler scheduler, [NotNullWhen(true)] out Task< int > task)
static bool TryReceive< TOutput >(this IReceivableSourceBlock< TOutput > source, [MaybeNullWhen(false)] out TOutput item)
static Task< TOutput > ReceiveAsync< TOutput >(this ISourceBlock< TOutput > source)
static IPropagatorBlock< TInput, TOutput > Encapsulate< TInput, TOutput >(ITargetBlock< TInput > target, ISourceBlock< TOutput > source)
static Task< bool > OutputAvailableAsync< TOutput >(this ISourceBlock< TOutput > source)
static readonly ExecutionDataflowBlockOptions _nonGreedyExecutionOptions
static Task< int > Choose< T1, T2, T3 >(ISourceBlock< T1 > source1, Action< T1 > action1, ISourceBlock< T2 > source2, Action< T2 > action2, ISourceBlock< T3 > source3, Action< T3 > action3)
static readonly DataflowLinkOptions UnlinkAfterOneAndPropagateCompletion
static void ThrowAsync(Exception error)
Definition Common.cs:174
static readonly Task< bool > CompletedTaskWithFalseResult
Definition Common.cs:30
static Task GetPotentiallyNotSupportedCompletionTask(IDataflowBlock block)
Definition Common.cs:232
static TaskContinuationOptions GetContinuationOptions(TaskContinuationOptions toInclude=TaskContinuationOptions.None)
Definition Common.cs:262
static bool IsValidTimeout(TimeSpan timeout)
Definition Common.cs:252
static readonly Task< bool > CompletedTaskWithTrueResult
Definition Common.cs:28
static TaskCreationOptions GetCreationOptionsForTask(bool isReplacementReplica=false)
Definition Common.cs:267
static readonly DataflowMessageHeader SingleMessageHeader
Definition Common.cs:26
static Exception InitializeStackTrace(Exception exception)
Definition Common.cs:109
static void WireCancellationToComplete(CancellationToken cancellationToken, Task completionTask, Action< object > completeAction, object completeState)
Definition Common.cs:93
static readonly TimeSpan InfiniteTimeSpan
Definition Common.cs:34
Task ContinueWith(Action< Task< TResult > > continuationAction)
Definition Task.cs:263
static new TaskFactory< TResult > Factory
Definition Task.cs:56
AggregateException? Exception
Definition Task.cs:1014
static Task WhenAll(IEnumerable< Task > tasks)
Definition Task.cs:3504
bool Dispose(WaitHandle notifyObject)
Definition Timer.cs:176
static bool Read(ref bool location)
Definition Volatile.cs:67
bool ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock< TOutput > target)
IDisposable LinkTo(ITargetBlock< TOutput > target, DataflowLinkOptions linkOptions)
void ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock< TOutput > target)
TOutput? ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock< TOutput > target, out bool messageConsumed)
DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, TInput messageValue, ISourceBlock< TInput >? source, bool consumeToAccept)
delegate void TimerCallback(object? state)