Terraria v1.4.4.9
Terraria source code documentation
Loading...
Searching...
No Matches
Parallel.cs
Go to the documentation of this file.
5
7
8public static class Parallel
9{
11 {
13
15
16 private readonly Func<object, Task> _taskBody;
17
18 private readonly TaskScheduler _scheduler;
19
21
23
25
26 private int _remainingDop;
27
29
31
48
50 {
51 if (_remainingDop > 0)
52 {
56 {
58 }
59 else
60 {
62 }
63 }
64 }
65
70
72 {
73 lock (this)
74 {
76 }
78 }
79
80 public void Complete()
81 {
83 {
85 }
86 else if (_exceptions == null)
87 {
88 bool flag = TrySetResult();
89 }
90 else
91 {
92 bool flag = TrySetException(_exceptions);
93 }
94 }
95
97 {
98 if (_executionContext == null)
99 {
100 _taskBody(this);
101 return;
102 }
104 {
105 ((ForEachAsyncState<TSource>)o)._taskBody(o);
106 }, this);
107 }
108 }
109
126
145
146 internal static int s_forkJoinContextID;
147
149
151
152 public static void Invoke(params Action[] actions)
153 {
155 }
156
158 {
160 if (actions == null)
161 {
162 throw new ArgumentNullException("actions");
163 }
164 if (parallelOptions2 == null)
165 {
166 throw new ArgumentNullException("parallelOptions");
167 }
168 parallelOptions2.CancellationToken.ThrowIfCancellationRequested();
169 Action[] actionsCopy = new Action[actions.Length];
170 for (int i = 0; i < actionsCopy.Length; i++)
171 {
172 actionsCopy[i] = actions[i];
173 if (actionsCopy[i] == null)
174 {
176 }
177 }
178 int forkJoinContextID = 0;
179 if (ParallelEtwProvider.Log.IsEnabled())
180 {
182 ParallelEtwProvider.Log.ParallelInvokeBegin(TaskScheduler.Current.Id, Task.CurrentId.GetValueOrDefault(), forkJoinContextID, ParallelEtwProvider.ForkJoinOperationType.ParallelInvoke, actionsCopy.Length);
183 }
184 if (actionsCopy.Length < 1)
185 {
186 return;
187 }
188 try
189 {
190 if (OperatingSystem.IsBrowser() || actionsCopy.Length > 10 || (parallelOptions2.MaxDegreeOfParallelism != -1 && parallelOptions2.MaxDegreeOfParallelism < actionsCopy.Length))
191 {
193 int actionIndex = 0;
194 try
195 {
197 {
199 for (int num = Interlocked.Increment(ref actionIndex); num <= actionsCopy.Length; num = Interlocked.Increment(ref actionIndex))
200 {
201 try
202 {
203 actionsCopy[num - 1]();
204 }
205 catch (Exception item)
206 {
207 LazyInitializer.EnsureInitialized(ref exceptionQ, () => new ConcurrentQueue<Exception>());
208 exceptionQ.Enqueue(item);
209 }
210 parallelOptions2.CancellationToken.ThrowIfCancellationRequested();
211 }
213 }
214 catch (Exception ex)
215 {
216 LazyInitializer.EnsureInitialized(ref exceptionQ, () => new ConcurrentQueue<Exception>());
218 {
219 throw;
220 }
222 {
223 foreach (Exception innerException in ex2.InnerExceptions)
224 {
225 exceptionQ.Enqueue(innerException);
226 }
227 }
228 else
229 {
230 exceptionQ.Enqueue(ex);
231 }
232 }
233 if (exceptionQ != null && !exceptionQ.IsEmpty)
234 {
236 }
237 return;
238 }
239 Task[] array = new Task[actionsCopy.Length];
240 parallelOptions2.CancellationToken.ThrowIfCancellationRequested();
241 for (int j = 1; j < array.Length; j++)
242 {
243 array[j] = Task.Factory.StartNew(actionsCopy[j], parallelOptions2.CancellationToken, TaskCreationOptions.None, parallelOptions2.EffectiveTaskScheduler);
244 }
245 array[0] = new Task(actionsCopy[0], parallelOptions2.CancellationToken, TaskCreationOptions.None);
246 array[0].RunSynchronously(parallelOptions2.EffectiveTaskScheduler);
247 try
248 {
250 }
251 catch (AggregateException ex3)
252 {
254 }
255 }
256 finally
257 {
258 if (ParallelEtwProvider.Log.IsEnabled())
259 {
260 ParallelEtwProvider.Log.ParallelInvokeEnd(TaskScheduler.Current.Id, Task.CurrentId.GetValueOrDefault(), forkJoinContextID);
261 }
262 }
263 }
264
266 {
267 if (body == null)
268 {
269 throw new ArgumentNullException("body");
270 }
271 return ForWorker<object>(fromInclusive, toExclusive, s_defaultParallelOptions, body, null, null, null, null);
272 }
273
275 {
276 if (body == null)
277 {
278 throw new ArgumentNullException("body");
279 }
280 return ForWorker64<object>(fromInclusive, toExclusive, s_defaultParallelOptions, body, null, null, null, null);
281 }
282
284 {
285 if (body == null)
286 {
287 throw new ArgumentNullException("body");
288 }
289 if (parallelOptions == null)
290 {
291 throw new ArgumentNullException("parallelOptions");
292 }
293 return ForWorker<object>(fromInclusive, toExclusive, parallelOptions, body, null, null, null, null);
294 }
295
297 {
298 if (body == null)
299 {
300 throw new ArgumentNullException("body");
301 }
302 if (parallelOptions == null)
303 {
304 throw new ArgumentNullException("parallelOptions");
305 }
306 return ForWorker64<object>(fromInclusive, toExclusive, parallelOptions, body, null, null, null, null);
307 }
308
310 {
311 if (body == null)
312 {
313 throw new ArgumentNullException("body");
314 }
315 return ForWorker<object>(fromInclusive, toExclusive, s_defaultParallelOptions, null, body, null, null, null);
316 }
317
319 {
320 if (body == null)
321 {
322 throw new ArgumentNullException("body");
323 }
324 return ForWorker64<object>(fromInclusive, toExclusive, s_defaultParallelOptions, null, body, null, null, null);
325 }
326
328 {
329 if (body == null)
330 {
331 throw new ArgumentNullException("body");
332 }
333 if (parallelOptions == null)
334 {
335 throw new ArgumentNullException("parallelOptions");
336 }
337 return ForWorker<object>(fromInclusive, toExclusive, parallelOptions, null, body, null, null, null);
338 }
339
341 {
342 if (body == null)
343 {
344 throw new ArgumentNullException("body");
345 }
346 if (parallelOptions == null)
347 {
348 throw new ArgumentNullException("parallelOptions");
349 }
350 return ForWorker64<object>(fromInclusive, toExclusive, parallelOptions, null, body, null, null, null);
351 }
352
354 {
355 if (body == null)
356 {
357 throw new ArgumentNullException("body");
358 }
359 if (localInit == null)
360 {
361 throw new ArgumentNullException("localInit");
362 }
363 if (localFinally == null)
364 {
365 throw new ArgumentNullException("localFinally");
366 }
368 }
369
371 {
372 if (body == null)
373 {
374 throw new ArgumentNullException("body");
375 }
376 if (localInit == null)
377 {
378 throw new ArgumentNullException("localInit");
379 }
380 if (localFinally == null)
381 {
382 throw new ArgumentNullException("localFinally");
383 }
385 }
386
388 {
389 if (body == null)
390 {
391 throw new ArgumentNullException("body");
392 }
393 if (localInit == null)
394 {
395 throw new ArgumentNullException("localInit");
396 }
397 if (localFinally == null)
398 {
399 throw new ArgumentNullException("localFinally");
400 }
401 if (parallelOptions == null)
402 {
403 throw new ArgumentNullException("parallelOptions");
404 }
406 }
407
409 {
410 if (body == null)
411 {
412 throw new ArgumentNullException("body");
413 }
414 if (localInit == null)
415 {
416 throw new ArgumentNullException("localInit");
417 }
418 if (localFinally == null)
419 {
420 throw new ArgumentNullException("localFinally");
421 }
422 if (parallelOptions == null)
423 {
424 throw new ArgumentNullException("parallelOptions");
425 }
427 }
428
429 private static bool CheckTimeoutReached(int timeoutOccursAt)
430 {
431 int tickCount = Environment.TickCount;
432 if (tickCount < timeoutOccursAt)
433 {
434 return false;
435 }
436 if (0 > timeoutOccursAt && 0 < tickCount)
437 {
438 return false;
439 }
440 return true;
441 }
442
443 private static int ComputeTimeoutPoint(int timeoutLength)
444 {
445 return Environment.TickCount + timeoutLength;
446 }
447
449 {
450 ParallelLoopResult result = default(ParallelLoopResult);
452 {
453 result._completed = true;
454 return result;
455 }
457 parallelOptions.CancellationToken.ThrowIfCancellationRequested();
458 int nNumExpectedWorkers = ((parallelOptions.EffectiveMaxConcurrencyLevel == -1) ? Environment.ProcessorCount : parallelOptions.EffectiveMaxConcurrencyLevel);
461 CancellationTokenRegistration cancellationTokenRegistration = ((!parallelOptions.CancellationToken.CanBeCanceled) ? default(CancellationTokenRegistration) : parallelOptions.CancellationToken.UnsafeRegister((Action<object?>)delegate
462 {
463 oce = new OperationCanceledException(parallelOptions.CancellationToken);
464 sharedPStateFlags.Cancel();
465 }, (object?)null));
466 int forkJoinContextID = 0;
467 if (ParallelEtwProvider.Log.IsEnabled())
468 {
471 }
472 try
473 {
474 try
475 {
477 {
478 if (!currentWorker.IsInitialized)
479 {
480 currentWorker = rangeManager.RegisterNewWorker();
481 }
484 {
485 return;
486 }
487 if (ParallelEtwProvider.Log.IsEnabled())
488 {
489 ParallelEtwProvider.Log.ParallelFork(TaskScheduler.Current.Id, Task.CurrentId.GetValueOrDefault(), forkJoinContextID);
490 }
491 TLocal val = default(TLocal);
492 bool flag = false;
493 try
494 {
496 if (bodyWithState != null)
497 {
499 }
500 else if (bodyWithLocal != null)
501 {
503 if (localInit != null)
504 {
505 val = localInit();
506 flag = true;
507 }
508 }
510 do
511 {
512 if (body != null)
513 {
514 for (int i = nFromInclusiveLocal; i < nToExclusiveLocal; i++)
515 {
516 if (sharedPStateFlags.LoopStateFlags != 0 && sharedPStateFlags.ShouldExitLoop())
517 {
518 break;
519 }
520 body(i);
521 }
522 }
523 else if (bodyWithState != null)
524 {
525 for (int j = nFromInclusiveLocal; j < nToExclusiveLocal && (sharedPStateFlags.LoopStateFlags == 0 || !sharedPStateFlags.ShouldExitLoop(j)); j++)
526 {
527 parallelLoopState.CurrentIteration = j;
529 }
530 }
531 else
532 {
533 for (int k = nFromInclusiveLocal; k < nToExclusiveLocal && (sharedPStateFlags.LoopStateFlags == 0 || !sharedPStateFlags.ShouldExitLoop(k)); k++)
534 {
535 parallelLoopState.CurrentIteration = k;
536 val = bodyWithLocal(k, parallelLoopState, val);
537 }
538 }
540 {
542 break;
543 }
544 }
545 while (currentWorker.FindNewWork32(out nFromInclusiveLocal, out nToExclusiveLocal) && (sharedPStateFlags.LoopStateFlags == 0 || !sharedPStateFlags.ShouldExitLoop(nFromInclusiveLocal)));
546 }
547 catch (Exception source)
548 {
549 sharedPStateFlags.SetExceptional();
551 }
552 finally
553 {
554 if (localFinally != null && flag)
555 {
556 localFinally(val);
557 }
558 if (ParallelEtwProvider.Log.IsEnabled())
559 {
560 ParallelEtwProvider.Log.ParallelJoin(TaskScheduler.Current.Id, Task.CurrentId.GetValueOrDefault(), forkJoinContextID);
561 }
562 }
564 }
565 finally
566 {
567 if (parallelOptions.CancellationToken.CanBeCanceled)
568 {
570 }
571 }
572 if (oce != null)
573 {
574 throw oce;
575 }
576 }
577 catch (AggregateException ex)
578 {
579 ThrowSingleCancellationExceptionOrOtherException(ex.InnerExceptions, parallelOptions.CancellationToken, ex);
580 }
581 finally
582 {
583 int loopStateFlags = sharedPStateFlags.LoopStateFlags;
584 result._completed = loopStateFlags == 0;
585 if (((uint)loopStateFlags & 2u) != 0)
586 {
587 result._lowestBreakIteration = sharedPStateFlags.LowestBreakIteration;
588 }
589 if (ParallelEtwProvider.Log.IsEnabled())
590 {
591 int num = 0;
592 num = ((loopStateFlags == 0) ? (toExclusive - fromInclusive) : (((loopStateFlags & 2) == 0) ? (-1) : (sharedPStateFlags.LowestBreakIteration - fromInclusive)));
593 ParallelEtwProvider.Log.ParallelLoopEnd(TaskScheduler.Current.Id, Task.CurrentId.GetValueOrDefault(), forkJoinContextID, num);
594 }
595 }
596 return result;
597 }
598
600 {
601 ParallelLoopResult result = default(ParallelLoopResult);
603 {
604 result._completed = true;
605 return result;
606 }
608 parallelOptions.CancellationToken.ThrowIfCancellationRequested();
609 int nNumExpectedWorkers = ((parallelOptions.EffectiveMaxConcurrencyLevel == -1) ? Environment.ProcessorCount : parallelOptions.EffectiveMaxConcurrencyLevel);
612 CancellationTokenRegistration cancellationTokenRegistration = ((!parallelOptions.CancellationToken.CanBeCanceled) ? default(CancellationTokenRegistration) : parallelOptions.CancellationToken.UnsafeRegister((Action<object?>)delegate
613 {
614 oce = new OperationCanceledException(parallelOptions.CancellationToken);
615 sharedPStateFlags.Cancel();
616 }, (object?)null));
617 int forkJoinContextID = 0;
618 if (ParallelEtwProvider.Log.IsEnabled())
619 {
622 }
623 try
624 {
625 try
626 {
628 {
629 if (!currentWorker.IsInitialized)
630 {
631 currentWorker = rangeManager.RegisterNewWorker();
632 }
635 {
636 return;
637 }
638 if (ParallelEtwProvider.Log.IsEnabled())
639 {
640 ParallelEtwProvider.Log.ParallelFork(TaskScheduler.Current.Id, Task.CurrentId.GetValueOrDefault(), forkJoinContextID);
641 }
642 TLocal val = default(TLocal);
643 bool flag = false;
644 try
645 {
647 if (bodyWithState != null)
648 {
650 }
651 else if (bodyWithLocal != null)
652 {
654 if (localInit != null)
655 {
656 val = localInit();
657 flag = true;
658 }
659 }
661 do
662 {
663 if (body != null)
664 {
666 {
667 if (sharedPStateFlags.LoopStateFlags != 0 && sharedPStateFlags.ShouldExitLoop())
668 {
669 break;
670 }
671 body(num2);
672 }
673 }
674 else if (bodyWithState != null)
675 {
676 for (long num3 = nFromInclusiveLocal; num3 < nToExclusiveLocal && (sharedPStateFlags.LoopStateFlags == 0 || !sharedPStateFlags.ShouldExitLoop(num3)); num3++)
677 {
678 parallelLoopState.CurrentIteration = num3;
680 }
681 }
682 else
683 {
684 for (long num4 = nFromInclusiveLocal; num4 < nToExclusiveLocal && (sharedPStateFlags.LoopStateFlags == 0 || !sharedPStateFlags.ShouldExitLoop(num4)); num4++)
685 {
686 parallelLoopState.CurrentIteration = num4;
688 }
689 }
691 {
693 break;
694 }
695 }
696 while (currentWorker.FindNewWork(out nFromInclusiveLocal, out nToExclusiveLocal) && (sharedPStateFlags.LoopStateFlags == 0 || !sharedPStateFlags.ShouldExitLoop(nFromInclusiveLocal)));
697 }
698 catch (Exception source)
699 {
700 sharedPStateFlags.SetExceptional();
702 }
703 finally
704 {
705 if (localFinally != null && flag)
706 {
707 localFinally(val);
708 }
709 if (ParallelEtwProvider.Log.IsEnabled())
710 {
711 ParallelEtwProvider.Log.ParallelJoin(TaskScheduler.Current.Id, Task.CurrentId.GetValueOrDefault(), forkJoinContextID);
712 }
713 }
715 }
716 finally
717 {
718 if (parallelOptions.CancellationToken.CanBeCanceled)
719 {
721 }
722 }
723 if (oce != null)
724 {
725 throw oce;
726 }
727 }
728 catch (AggregateException ex)
729 {
730 ThrowSingleCancellationExceptionOrOtherException(ex.InnerExceptions, parallelOptions.CancellationToken, ex);
731 }
732 finally
733 {
734 int loopStateFlags = sharedPStateFlags.LoopStateFlags;
735 result._completed = loopStateFlags == 0;
736 if (((uint)loopStateFlags & 2u) != 0)
737 {
738 result._lowestBreakIteration = sharedPStateFlags.LowestBreakIteration;
739 }
740 if (ParallelEtwProvider.Log.IsEnabled())
741 {
742 long num = 0L;
743 num = ((loopStateFlags == 0) ? (toExclusive - fromInclusive) : (((loopStateFlags & 2) == 0) ? (-1) : (sharedPStateFlags.LowestBreakIteration - fromInclusive)));
744 ParallelEtwProvider.Log.ParallelLoopEnd(TaskScheduler.Current.Id, Task.CurrentId.GetValueOrDefault(), forkJoinContextID, num);
745 }
746 }
747 return result;
748 }
749
751 {
752 if (source == null)
753 {
754 throw new ArgumentNullException("source");
755 }
756 if (body == null)
757 {
758 throw new ArgumentNullException("body");
759 }
760 return ForEachWorker<TSource, object>(source, s_defaultParallelOptions, body, null, null, null, null, null, null);
761 }
762
764 {
765 if (source == null)
766 {
767 throw new ArgumentNullException("source");
768 }
769 if (body == null)
770 {
771 throw new ArgumentNullException("body");
772 }
773 if (parallelOptions == null)
774 {
775 throw new ArgumentNullException("parallelOptions");
776 }
777 return ForEachWorker<TSource, object>(source, parallelOptions, body, null, null, null, null, null, null);
778 }
779
781 {
782 if (source == null)
783 {
784 throw new ArgumentNullException("source");
785 }
786 if (body == null)
787 {
788 throw new ArgumentNullException("body");
789 }
790 return ForEachWorker<TSource, object>(source, s_defaultParallelOptions, null, body, null, null, null, null, null);
791 }
792
794 {
795 if (source == null)
796 {
797 throw new ArgumentNullException("source");
798 }
799 if (body == null)
800 {
801 throw new ArgumentNullException("body");
802 }
803 if (parallelOptions == null)
804 {
805 throw new ArgumentNullException("parallelOptions");
806 }
807 return ForEachWorker<TSource, object>(source, parallelOptions, null, body, null, null, null, null, null);
808 }
809
811 {
812 if (source == null)
813 {
814 throw new ArgumentNullException("source");
815 }
816 if (body == null)
817 {
818 throw new ArgumentNullException("body");
819 }
820 return ForEachWorker<TSource, object>(source, s_defaultParallelOptions, null, null, body, null, null, null, null);
821 }
822
824 {
825 if (source == null)
826 {
827 throw new ArgumentNullException("source");
828 }
829 if (body == null)
830 {
831 throw new ArgumentNullException("body");
832 }
833 if (parallelOptions == null)
834 {
835 throw new ArgumentNullException("parallelOptions");
836 }
837 return ForEachWorker<TSource, object>(source, parallelOptions, null, null, body, null, null, null, null);
838 }
839
841 {
842 if (source == null)
843 {
844 throw new ArgumentNullException("source");
845 }
846 if (body == null)
847 {
848 throw new ArgumentNullException("body");
849 }
850 if (localInit == null)
851 {
852 throw new ArgumentNullException("localInit");
853 }
854 if (localFinally == null)
855 {
856 throw new ArgumentNullException("localFinally");
857 }
858 return ForEachWorker(source, s_defaultParallelOptions, null, null, null, body, null, localInit, localFinally);
859 }
860
862 {
863 if (source == null)
864 {
865 throw new ArgumentNullException("source");
866 }
867 if (body == null)
868 {
869 throw new ArgumentNullException("body");
870 }
871 if (localInit == null)
872 {
873 throw new ArgumentNullException("localInit");
874 }
875 if (localFinally == null)
876 {
877 throw new ArgumentNullException("localFinally");
878 }
879 if (parallelOptions == null)
880 {
881 throw new ArgumentNullException("parallelOptions");
882 }
883 return ForEachWorker(source, parallelOptions, null, null, null, body, null, localInit, localFinally);
884 }
885
887 {
888 if (source == null)
889 {
890 throw new ArgumentNullException("source");
891 }
892 if (body == null)
893 {
894 throw new ArgumentNullException("body");
895 }
896 if (localInit == null)
897 {
898 throw new ArgumentNullException("localInit");
899 }
900 if (localFinally == null)
901 {
902 throw new ArgumentNullException("localFinally");
903 }
904 return ForEachWorker(source, s_defaultParallelOptions, null, null, null, null, body, localInit, localFinally);
905 }
906
908 {
909 if (source == null)
910 {
911 throw new ArgumentNullException("source");
912 }
913 if (body == null)
914 {
915 throw new ArgumentNullException("body");
916 }
917 if (localInit == null)
918 {
919 throw new ArgumentNullException("localInit");
920 }
921 if (localFinally == null)
922 {
923 throw new ArgumentNullException("localFinally");
924 }
925 if (parallelOptions == null)
926 {
927 throw new ArgumentNullException("parallelOptions");
928 }
929 return ForEachWorker(source, parallelOptions, null, null, null, null, body, localInit, localFinally);
930 }
931
945
947 {
948 int lowerBound = array.GetLowerBound(0);
949 int toExclusive = array.GetUpperBound(0) + 1;
950 if (body != null)
951 {
953 {
954 body(array[i]);
955 }, null, null, null, null);
956 }
957 if (bodyWithState != null)
958 {
960 {
962 }, null, null, null);
963 }
964 if (bodyWithStateAndIndex != null)
965 {
967 {
969 }, null, null, null);
970 }
971 if (bodyWithStateAndLocal != null)
972 {
974 }
976 }
977
979 {
980 if (body != null)
981 {
982 return ForWorker<object>(0, list.Count, parallelOptions, delegate(int i)
983 {
984 body(list[i]);
985 }, null, null, null, null);
986 }
987 if (bodyWithState != null)
988 {
990 {
992 }, null, null, null);
993 }
994 if (bodyWithStateAndIndex != null)
995 {
997 {
999 }, null, null, null);
1000 }
1001 if (bodyWithStateAndLocal != null)
1002 {
1004 }
1006 }
1007
1009 {
1010 if (source == null)
1011 {
1012 throw new ArgumentNullException("source");
1013 }
1014 if (body == null)
1015 {
1016 throw new ArgumentNullException("body");
1017 }
1018 return PartitionerForEachWorker<TSource, object>(source, s_defaultParallelOptions, body, null, null, null, null, null, null);
1019 }
1020
1022 {
1023 if (source == null)
1024 {
1025 throw new ArgumentNullException("source");
1026 }
1027 if (body == null)
1028 {
1029 throw new ArgumentNullException("body");
1030 }
1031 return PartitionerForEachWorker<TSource, object>(source, s_defaultParallelOptions, null, body, null, null, null, null, null);
1032 }
1033
1035 {
1036 if (source == null)
1037 {
1038 throw new ArgumentNullException("source");
1039 }
1040 if (body == null)
1041 {
1042 throw new ArgumentNullException("body");
1043 }
1044 if (!source.KeysNormalized)
1045 {
1047 }
1048 return PartitionerForEachWorker<TSource, object>(source, s_defaultParallelOptions, null, null, body, null, null, null, null);
1049 }
1050
1052 {
1053 if (source == null)
1054 {
1055 throw new ArgumentNullException("source");
1056 }
1057 if (body == null)
1058 {
1059 throw new ArgumentNullException("body");
1060 }
1061 if (localInit == null)
1062 {
1063 throw new ArgumentNullException("localInit");
1064 }
1065 if (localFinally == null)
1066 {
1067 throw new ArgumentNullException("localFinally");
1068 }
1069 return PartitionerForEachWorker(source, s_defaultParallelOptions, null, null, null, body, null, localInit, localFinally);
1070 }
1071
1073 {
1074 if (source == null)
1075 {
1076 throw new ArgumentNullException("source");
1077 }
1078 if (body == null)
1079 {
1080 throw new ArgumentNullException("body");
1081 }
1082 if (localInit == null)
1083 {
1084 throw new ArgumentNullException("localInit");
1085 }
1086 if (localFinally == null)
1087 {
1088 throw new ArgumentNullException("localFinally");
1089 }
1090 if (!source.KeysNormalized)
1091 {
1093 }
1094 return PartitionerForEachWorker(source, s_defaultParallelOptions, null, null, null, null, body, localInit, localFinally);
1095 }
1096
1098 {
1099 if (source == null)
1100 {
1101 throw new ArgumentNullException("source");
1102 }
1103 if (body == null)
1104 {
1105 throw new ArgumentNullException("body");
1106 }
1107 if (parallelOptions == null)
1108 {
1109 throw new ArgumentNullException("parallelOptions");
1110 }
1111 return PartitionerForEachWorker<TSource, object>(source, parallelOptions, body, null, null, null, null, null, null);
1112 }
1113
1115 {
1116 if (source == null)
1117 {
1118 throw new ArgumentNullException("source");
1119 }
1120 if (body == null)
1121 {
1122 throw new ArgumentNullException("body");
1123 }
1124 if (parallelOptions == null)
1125 {
1126 throw new ArgumentNullException("parallelOptions");
1127 }
1128 return PartitionerForEachWorker<TSource, object>(source, parallelOptions, null, body, null, null, null, null, null);
1129 }
1130
1132 {
1133 if (source == null)
1134 {
1135 throw new ArgumentNullException("source");
1136 }
1137 if (body == null)
1138 {
1139 throw new ArgumentNullException("body");
1140 }
1141 if (parallelOptions == null)
1142 {
1143 throw new ArgumentNullException("parallelOptions");
1144 }
1145 if (!source.KeysNormalized)
1146 {
1148 }
1149 return PartitionerForEachWorker<TSource, object>(source, parallelOptions, null, null, body, null, null, null, null);
1150 }
1151
1153 {
1154 if (source == null)
1155 {
1156 throw new ArgumentNullException("source");
1157 }
1158 if (body == null)
1159 {
1160 throw new ArgumentNullException("body");
1161 }
1162 if (localInit == null)
1163 {
1164 throw new ArgumentNullException("localInit");
1165 }
1166 if (localFinally == null)
1167 {
1168 throw new ArgumentNullException("localFinally");
1169 }
1170 if (parallelOptions == null)
1171 {
1172 throw new ArgumentNullException("parallelOptions");
1173 }
1174 return PartitionerForEachWorker(source, parallelOptions, null, null, null, body, null, localInit, localFinally);
1175 }
1176
1178 {
1179 if (source == null)
1180 {
1181 throw new ArgumentNullException("source");
1182 }
1183 if (body == null)
1184 {
1185 throw new ArgumentNullException("body");
1186 }
1187 if (localInit == null)
1188 {
1189 throw new ArgumentNullException("localInit");
1190 }
1191 if (localFinally == null)
1192 {
1193 throw new ArgumentNullException("localFinally");
1194 }
1195 if (parallelOptions == null)
1196 {
1197 throw new ArgumentNullException("parallelOptions");
1198 }
1199 if (!source.KeysNormalized)
1200 {
1202 }
1203 return PartitionerForEachWorker(source, parallelOptions, null, null, null, null, body, localInit, localFinally);
1204 }
1205
1207 {
1209 if (!source.SupportsDynamicPartitions)
1210 {
1212 }
1213 parallelOptions.CancellationToken.ThrowIfCancellationRequested();
1214 int forkJoinContextID = 0;
1215 if (ParallelEtwProvider.Log.IsEnabled())
1216 {
1218 ParallelEtwProvider.Log.ParallelLoopBegin(TaskScheduler.Current.Id, Task.CurrentId.GetValueOrDefault(), forkJoinContextID, ParallelEtwProvider.ForkJoinOperationType.ParallelForEach, 0L, 0L);
1219 }
1221 ParallelLoopResult result = default(ParallelLoopResult);
1223 CancellationTokenRegistration cancellationTokenRegistration = ((!parallelOptions.CancellationToken.CanBeCanceled) ? default(CancellationTokenRegistration) : parallelOptions.CancellationToken.UnsafeRegister((Action<object?>)delegate
1224 {
1225 oce = new OperationCanceledException(parallelOptions.CancellationToken);
1226 sharedPStateFlags.Cancel();
1227 }, (object?)null));
1230 if (orderedSource != null)
1231 {
1232 orderablePartitionerSource = orderedSource.GetOrderableDynamicPartitions();
1233 if (orderablePartitionerSource == null)
1234 {
1236 }
1237 }
1238 else
1239 {
1240 partitionerSource = source.GetDynamicPartitions();
1241 if (partitionerSource == null)
1242 {
1244 }
1245 }
1246 try
1247 {
1248 try
1249 {
1251 {
1253 if (ParallelEtwProvider.Log.IsEnabled())
1254 {
1255 ParallelEtwProvider.Log.ParallelFork(TaskScheduler.Current.Id, Task.CurrentId.GetValueOrDefault(), forkJoinContextID);
1256 }
1257 TLocal val = default(TLocal);
1258 bool flag = false;
1259 try
1260 {
1262 if (bodyWithState != null || bodyWithStateAndIndex != null)
1263 {
1265 }
1266 else if (bodyWithStateAndLocal != null || bodyWithEverything != null)
1267 {
1269 if (localInit != null)
1270 {
1271 val = localInit();
1272 flag = true;
1273 }
1274 }
1276 if (orderedSource != null)
1277 {
1279 if (enumerator == null)
1280 {
1282 }
1283 if (enumerator == null)
1284 {
1286 }
1287 while (enumerator.MoveNext())
1288 {
1289 KeyValuePair<long, TSource> current = enumerator.Current;
1290 long key = current.Key;
1291 TSource value = current.Value;
1292 if (parallelLoopState != null)
1293 {
1294 parallelLoopState.CurrentIteration = key;
1295 }
1296 if (simpleBody != null)
1297 {
1299 }
1300 else if (bodyWithState != null)
1301 {
1303 }
1304 else if (bodyWithStateAndIndex == null)
1305 {
1307 }
1308 else
1309 {
1311 }
1312 if (sharedPStateFlags.ShouldExitLoop(key))
1313 {
1314 break;
1315 }
1317 {
1319 break;
1320 }
1321 }
1322 }
1323 else
1324 {
1326 if (enumerator2 == null)
1327 {
1329 }
1330 if (enumerator2 == null)
1331 {
1333 }
1334 if (parallelLoopState != null)
1335 {
1336 parallelLoopState.CurrentIteration = 0L;
1337 }
1338 while (enumerator2.MoveNext())
1339 {
1340 TSource current2 = enumerator2.Current;
1341 if (simpleBody != null)
1342 {
1344 }
1345 else if (bodyWithState != null)
1346 {
1348 }
1349 else if (bodyWithStateAndLocal != null)
1350 {
1352 }
1353 if (sharedPStateFlags.LoopStateFlags != 0)
1354 {
1355 break;
1356 }
1358 {
1360 break;
1361 }
1362 }
1363 }
1364 }
1365 catch (Exception source2)
1366 {
1367 sharedPStateFlags.SetExceptional();
1369 }
1370 finally
1371 {
1372 if (localFinally != null && flag)
1373 {
1374 localFinally(val);
1375 }
1377 {
1378 disposable2.Dispose();
1379 }
1380 if (ParallelEtwProvider.Log.IsEnabled())
1381 {
1382 ParallelEtwProvider.Log.ParallelJoin(TaskScheduler.Current.Id, Task.CurrentId.GetValueOrDefault(), forkJoinContextID);
1383 }
1384 }
1386 }
1387 finally
1388 {
1389 if (parallelOptions.CancellationToken.CanBeCanceled)
1390 {
1392 }
1393 }
1394 if (oce != null)
1395 {
1396 throw oce;
1397 }
1398 }
1399 catch (AggregateException ex)
1400 {
1401 ThrowSingleCancellationExceptionOrOtherException(ex.InnerExceptions, parallelOptions.CancellationToken, ex);
1402 }
1403 finally
1404 {
1405 int loopStateFlags = sharedPStateFlags.LoopStateFlags;
1406 result._completed = loopStateFlags == 0;
1407 if (((uint)loopStateFlags & 2u) != 0)
1408 {
1409 result._lowestBreakIteration = sharedPStateFlags.LowestBreakIteration;
1410 }
1411 IDisposable disposable = null;
1413 if (ParallelEtwProvider.Log.IsEnabled())
1414 {
1415 ParallelEtwProvider.Log.ParallelLoopEnd(TaskScheduler.Current.Id, Task.CurrentId.GetValueOrDefault(), forkJoinContextID, 0L);
1416 }
1417 }
1418 return result;
1419 }
1420
1422 {
1423 if (exceptions == null || exceptions.Count == 0)
1424 {
1425 return null;
1426 }
1427 if (!cancelToken.IsCancellationRequested)
1428 {
1429 return null;
1430 }
1431 Exception ex = null;
1432 foreach (object exception in exceptions)
1433 {
1435 if (ex == null)
1436 {
1437 ex = ex2;
1438 }
1439 if (!(ex2 is OperationCanceledException ex3) || !cancelToken.Equals(ex3.CancellationToken))
1440 {
1441 return null;
1442 }
1443 }
1445 }
1446
1452
1454 {
1455 if (source == null)
1456 {
1457 throw new ArgumentNullException("source");
1458 }
1459 if (body == null)
1460 {
1461 throw new ArgumentNullException("body");
1462 }
1464 }
1465
1467 {
1468 if (source == null)
1469 {
1470 throw new ArgumentNullException("source");
1471 }
1472 if (body == null)
1473 {
1474 throw new ArgumentNullException("body");
1475 }
1477 }
1478
1480 {
1481 if (source == null)
1482 {
1483 throw new ArgumentNullException("source");
1484 }
1485 if (parallelOptions == null)
1486 {
1487 throw new ArgumentNullException("parallelOptions");
1488 }
1489 if (body == null)
1490 {
1491 throw new ArgumentNullException("body");
1492 }
1493 return ForEachAsync(source, parallelOptions.EffectiveMaxConcurrencyLevel, parallelOptions.EffectiveTaskScheduler, parallelOptions.CancellationToken, body);
1494 }
1495
1497 {
1498 if (cancellationToken.IsCancellationRequested)
1499 {
1501 }
1502 if (dop < 0)
1503 {
1505 }
1507 {
1509 bool launchedNext = false;
1510 try
1511 {
1512 while (!state.Cancellation.IsCancellationRequested)
1513 {
1514 TSource current;
1515 lock (state)
1516 {
1517 if (!state.Enumerator.MoveNext())
1518 {
1519 break;
1520 }
1521 current = state.Enumerator.Current;
1522 }
1523 if (!launchedNext)
1524 {
1525 launchedNext = true;
1526 state.QueueWorkerIfDopAvailable();
1527 }
1528 await state.LoopBody(current, state.Cancellation.Token);
1529 }
1530 }
1531 catch (Exception e)
1532 {
1533 state.RecordException(e);
1534 }
1535 finally
1536 {
1537 if (state.SignalWorkerCompletedIterating())
1538 {
1539 try
1540 {
1541 state.Dispose();
1542 }
1543 catch (Exception e2)
1544 {
1545 state.RecordException(e2);
1546 }
1547 state.Complete();
1548 }
1549 }
1550 };
1551 try
1552 {
1554 syncForEachAsyncState.QueueWorkerIfDopAvailable();
1555 return syncForEachAsyncState.Task;
1556 }
1557 catch (Exception exception)
1558 {
1560 }
1561 }
1562
1564 {
1565 if (source == null)
1566 {
1567 throw new ArgumentNullException("source");
1568 }
1569 if (body == null)
1570 {
1571 throw new ArgumentNullException("body");
1572 }
1574 }
1575
1577 {
1578 if (source == null)
1579 {
1580 throw new ArgumentNullException("source");
1581 }
1582 if (body == null)
1583 {
1584 throw new ArgumentNullException("body");
1585 }
1587 }
1588
1590 {
1591 if (source == null)
1592 {
1593 throw new ArgumentNullException("source");
1594 }
1595 if (parallelOptions == null)
1596 {
1597 throw new ArgumentNullException("parallelOptions");
1598 }
1599 if (body == null)
1600 {
1601 throw new ArgumentNullException("body");
1602 }
1603 return ForEachAsync(source, parallelOptions.EffectiveMaxConcurrencyLevel, parallelOptions.EffectiveTaskScheduler, parallelOptions.CancellationToken, body);
1604 }
1605
1607 {
1608 if (cancellationToken.IsCancellationRequested)
1609 {
1611 }
1612 if (dop < 0)
1613 {
1615 }
1617 {
1619 bool launchedNext = false;
1620 try
1621 {
1622 _ = 2;
1623 try
1624 {
1625 while (!state.Cancellation.IsCancellationRequested)
1626 {
1627 await state.Lock.WaitAsync(state.Cancellation.Token);
1628 TSource current;
1629 try
1630 {
1631 if (await state.Enumerator.MoveNextAsync())
1632 {
1633 current = state.Enumerator.Current;
1634 goto IL_016d;
1635 }
1636 }
1637 finally
1638 {
1639 state.Lock.Release();
1640 }
1641 break;
1642 IL_016d:
1643 if (!launchedNext)
1644 {
1645 launchedNext = true;
1646 state.QueueWorkerIfDopAvailable();
1647 }
1648 await state.LoopBody(current, state.Cancellation.Token);
1649 }
1650 }
1651 catch (Exception e)
1652 {
1653 state.RecordException(e);
1654 }
1655 }
1656 finally
1657 {
1658 if (state.SignalWorkerCompletedIterating())
1659 {
1660 try
1661 {
1662 await state.DisposeAsync();
1663 }
1664 catch (Exception e2)
1665 {
1666 state.RecordException(e2);
1667 }
1668 state.Complete();
1669 }
1670 }
1671 };
1672 try
1673 {
1675 asyncForEachAsyncState.QueueWorkerIfDopAvailable();
1676 return asyncForEachAsyncState.Task;
1677 }
1678 catch (Exception exception)
1679 {
1681 }
1682 }
1683}
static OrderablePartitioner< Tuple< long, long > > Create(long fromInclusive, long toExclusive)
void Add(TKey key, TValue value)
static int ProcessorCount
static int TickCount
static string Parallel_Invoke_ActionNull
Definition SR.cs:14
static string Parallel_ForEach_NullEnumerator
Definition SR.cs:22
static string Parallel_ForEach_PartitionerNotDynamic
Definition SR.cs:18
static string Parallel_ForEach_OrderedPartitionerKeysNotNormalized
Definition SR.cs:16
static string Parallel_ForEach_PartitionerReturnedNull
Definition SR.cs:20
Definition SR.cs:7
static ? ExecutionContext Capture()
static void Run(ExecutionContext executionContext, ContextCallback callback, object? state)
static int Decrement(ref int location)
static int Increment(ref int location)
static readonly ParallelEtwProvider Log
AsyncForEachAsyncState(IAsyncEnumerable< TSource > source, Func< object, Task > taskBody, int dop, TaskScheduler scheduler, CancellationToken cancellationToken, Func< TSource, CancellationToken, ValueTask > body)
Definition Parallel.cs:133
readonly IAsyncEnumerator< TSource > Enumerator
Definition Parallel.cs:131
readonly CancellationToken _externalCancellationToken
Definition Parallel.cs:12
readonly CancellationTokenSource Cancellation
Definition Parallel.cs:30
readonly Func< object, Task > _taskBody
Definition Parallel.cs:16
readonly CancellationTokenRegistration _registration
Definition Parallel.cs:14
readonly ExecutionContext _executionContext
Definition Parallel.cs:20
ForEachAsyncState(Func< object, Task > taskBody, int dop, TaskScheduler scheduler, CancellationToken cancellationToken, Func< TSource, CancellationToken, ValueTask > body)
Definition Parallel.cs:32
readonly Func< TSource, CancellationToken, ValueTask > LoopBody
Definition Parallel.cs:28
SyncForEachAsyncState(IEnumerable< TSource > source, Func< object, Task > taskBody, int dop, TaskScheduler scheduler, CancellationToken cancellationToken, Func< TSource, CancellationToken, ValueTask > body)
Definition Parallel.cs:114
readonly IEnumerator< TSource > Enumerator
Definition Parallel.cs:112
static ParallelLoopResult PartitionerForEachWorker< TSource, TLocal >(Partitioner< TSource > source, ParallelOptions parallelOptions, Action< TSource > simpleBody, Action< TSource, ParallelLoopState > bodyWithState, Action< TSource, ParallelLoopState, long > bodyWithStateAndIndex, Func< TSource, ParallelLoopState, TLocal, TLocal > bodyWithStateAndLocal, Func< TSource, ParallelLoopState, long, TLocal, TLocal > bodyWithEverything, Func< TLocal > localInit, Action< TLocal > localFinally)
Definition Parallel.cs:1206
static int ComputeTimeoutPoint(int timeoutLength)
Definition Parallel.cs:443
static ParallelLoopResult For(int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Action< int > body)
Definition Parallel.cs:283
static readonly ParallelOptions s_defaultParallelOptions
Definition Parallel.cs:148
static ParallelLoopResult For(int fromInclusive, int toExclusive, Action< int > body)
Definition Parallel.cs:265
static void Invoke(params Action[] actions)
Definition Parallel.cs:152
static ParallelLoopResult ForEach< TSource, TLocal >(IEnumerable< TSource > source, Func< TLocal > localInit, Func< TSource, ParallelLoopState, TLocal, TLocal > body, Action< TLocal > localFinally)
Definition Parallel.cs:840
static ParallelLoopResult For(int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Action< int, ParallelLoopState > body)
Definition Parallel.cs:327
static ParallelLoopResult ForEachWorker< TSource, TLocal >(IEnumerable< TSource > source, ParallelOptions parallelOptions, Action< TSource > body, Action< TSource, ParallelLoopState > bodyWithState, Action< TSource, ParallelLoopState, long > bodyWithStateAndIndex, Func< TSource, ParallelLoopState, TLocal, TLocal > bodyWithStateAndLocal, Func< TSource, ParallelLoopState, long, TLocal, TLocal > bodyWithEverything, Func< TLocal > localInit, Action< TLocal > localFinally)
Definition Parallel.cs:932
static ParallelLoopResult ForWorker64< TLocal >(long fromInclusive, long toExclusive, ParallelOptions parallelOptions, Action< long > body, Action< long, ParallelLoopState > bodyWithState, Func< long, ParallelLoopState, TLocal, TLocal > bodyWithLocal, Func< TLocal > localInit, Action< TLocal > localFinally)
Definition Parallel.cs:599
static ParallelLoopResult ForEach< TSource >(IEnumerable< TSource > source, Action< TSource > body)
Definition Parallel.cs:750
static void Invoke(ParallelOptions parallelOptions, params Action[] actions)
Definition Parallel.cs:157
static Task ForEachAsync< TSource >(IEnumerable< TSource > source, Func< TSource, CancellationToken, ValueTask > body)
Definition Parallel.cs:1453
static ParallelLoopResult For(long fromInclusive, long toExclusive, Action< long, ParallelLoopState > body)
Definition Parallel.cs:318
static ParallelLoopResult ForWorker< TLocal >(int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Action< int > body, Action< int, ParallelLoopState > bodyWithState, Func< int, ParallelLoopState, TLocal, TLocal > bodyWithLocal, Func< TLocal > localInit, Action< TLocal > localFinally)
Definition Parallel.cs:448
static void ThrowSingleCancellationExceptionOrOtherException(ICollection exceptions, CancellationToken cancelToken, Exception otherException)
Definition Parallel.cs:1447
static bool CheckTimeoutReached(int timeoutOccursAt)
Definition Parallel.cs:429
static ParallelLoopResult For(long fromInclusive, long toExclusive, ParallelOptions parallelOptions, Action< long, ParallelLoopState > body)
Definition Parallel.cs:340
static ParallelLoopResult For(long fromInclusive, long toExclusive, Action< long > body)
Definition Parallel.cs:274
static ParallelLoopResult For< TLocal >(int fromInclusive, int toExclusive, Func< TLocal > localInit, Func< int, ParallelLoopState, TLocal, TLocal > body, Action< TLocal > localFinally)
Definition Parallel.cs:353
static ParallelLoopResult For(int fromInclusive, int toExclusive, Action< int, ParallelLoopState > body)
Definition Parallel.cs:309
static ParallelLoopResult For(long fromInclusive, long toExclusive, ParallelOptions parallelOptions, Action< long > body)
Definition Parallel.cs:296
static OperationCanceledException ReduceToSingleCancellationException(ICollection exceptions, CancellationToken cancelToken)
Definition Parallel.cs:1421
static ? int CurrentId
Definition Task.cs:1009
static new TaskFactory< TResult > Factory
Definition Task.cs:56
static Task FromException(Exception exception)
Definition Task.cs:3341
static Task FromCanceled(CancellationToken cancellationToken)
Definition Task.cs:3363
static void WaitAll(params Task[] tasks)
Definition Task.cs:3040
static bool UnsafeQueueUserWorkItem(WaitCallback callBack, object? state)