9[DebuggerDisplay(
"{DebuggerDisplayContent,nq}")]
55 [DebuggerDisplay(
"{DebuggerDisplayContent,nq}")]
223 return $"Block=\"{((owningBatch != null) ? owningBatch.Content : _owningBatch)}\"";
233 bool flag = dataflowBlockOptions.BoundedCapacity > 0;
256 _nonGreedyState.AcceptFewerThanBatchSize =
true;
345 _nonGreedyState.TaskForInputProcessing =
null;
358 bool flag =
_nonGreedyState !=
null && _nonGreedyState.TaskForInputProcessing !=
null;
439 _nonGreedyState.AcceptFewerThanBatchSize =
false;
454 _nonGreedyState.TaskForInputProcessing =
null;
467 for (
int i = 0; i <
array.Length; i++)
498 for (
int i = 0; i < num; i++)
574 for (
int i = 0; i <
num2; i++)
629 for (
int j = 0;
j < i;
j++)
674 _boundingState.CurrentCount += num;
678 if (
item.Key !=
null)
696 if (
key ==
null || !
value.Key.IsValid)
768 private object DebuggerDisplayContent =>
$"{Common.GetNameForDebugger(this, _source.DataflowBlockOptions)}, BatchSize={BatchSize}, OutputCount={OutputCountForDebugger}";
static unsafe void Clear(Array array)
void Add(TKey key, TValue value)
static string InvalidOperation_FailedToConsumeReservedMessage
static string Argument_InvalidMessageHeader
static string Argument_CantConsumeFromANullSource
static string ArgumentOutOfRange_GenericPositive
static string ArgumentOutOfRange_BatchSizeMustBeNoGreaterThanBoundedCapacity
readonly List< KeyValuePair< ISourceBlock< T >, KeyValuePair< DataflowMessageHeader, T > > > ReservedSourcesTemp
bool AcceptFewerThanBatchSize
NonGreedyState(int batchSize)
Task TaskForInputProcessing
readonly KeyValuePair< ISourceBlock< T >, DataflowMessageHeader >[] PostponedMessagesTemp
readonly QueuedMap< ISourceBlock< T >, DataflowMessageHeader > PostponedMessages
void ConsumeReservedMessagesNonGreedy()
void CompleteBlockIfPossible()
bool BatchesNeedProcessing
readonly BatchBlock< T > _owningBatch
object DebuggerDisplayContent
readonly Queue< T > _messages
void ProcessMessagesLoopCore()
DebuggingInformation GetDebuggingInformation()
readonly Action< T[]> _batchCompletedAction
readonly NonGreedyState _nonGreedyState
void OnItemsRemoved(int numItemsRemoved)
void Complete(Exception exception, bool dropPendingMessages, bool releaseReservedMessages, bool revertProcessingState=false)
int BoundedCapacityAvailable
readonly BoundingState _boundingState
bool _decliningPermanently
void ProcessAsyncIfNecessary_Slow(bool isReplacementReplica)
DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock< T > source, bool consumeToAccept)
BatchBlockTargetCore(BatchBlock< T > owningBatch, int batchSize, Action< T[]> batchCompletedAction, GroupingDataflowBlockOptions dataflowBlockOptions)
void ReleaseReservedMessages(bool throwOnFirstException)
void ConsumeReservedMessagesGreedyBounded()
bool MakeBatchIfPossible(bool evenIfFewerThanBatchSize)
readonly GroupingDataflowBlockOptions _dataflowBlockOptions
readonly TaskCompletionSource< VoidResult > _completionTask
void RetrievePostponedItemsGreedyBounded(bool allowFewerThanBatchSize)
void RetrievePostponedItemsNonGreedy(bool allowFewerThanBatchSize)
void ProcessAsyncIfNecessary(bool isReplacementReplica=false)
static int CountItems(T[] singleOutputItem, IList< T[]> multipleOutputItems)
IEnumerable< T > InputQueue
readonly BatchBlock< T > _batchBlock
QueuedMap< ISourceBlock< T >, DataflowMessageHeader > PostponedMessages
Task TaskForInputProcessing
DebugView(BatchBlock< T > batchBlock)
IEnumerable< T[]> OutputQueue
ITargetBlock< T[]> NextMessageReservedFor
TargetRegistry< T[]> LinkedTargets
readonly BatchBlockTargetCore.DebuggingInformation _targetDebuggingInformation
bool IsDecliningPermanently
Task TaskForOutputProcessing
readonly SourceCore< T[]>.DebuggingInformation _sourceDebuggingInformation
bool TryReceiveAll([NotNullWhen(true)] out IList< T[]>? items)
override string ToString()
BatchBlock(int batchSize, GroupingDataflowBlockOptions dataflowBlockOptions)
readonly SourceCore< T[]> _source
readonly BatchBlockTargetCore _target
BatchBlock(int batchSize)
object DebuggerDisplayContent
int OutputCountForDebugger
bool TryReceive(Predicate< T[]>? filter, [NotNullWhen(true)] out T[]? item)
IDisposable LinkTo(ITargetBlock< T[]> target, DataflowLinkOptions linkOptions)
TaskScheduler TaskScheduler
int ActualMaxMessagesPerTask
CancellationToken CancellationToken
long ActualMaxNumberOfGroups
bool CountIsLessThanBound
static Exception StartTaskSafe(Task task, TaskScheduler scheduler)
static TaskContinuationOptions GetContinuationOptions(TaskContinuationOptions toInclude=TaskContinuationOptions.None)
static TaskCreationOptions GetCreationOptionsForTask(bool isReplacementReplica=false)
static string GetNameForDebugger(IDataflowBlock block, DataflowBlockOptions options=null)
static void WireCancellationToComplete(CancellationToken cancellationToken, Task completionTask, Action< object > completeAction, object completeState)
static int GetBlockId(IDataflowBlock block)
static readonly DataflowEtwProvider Log
TOutput ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock< TOutput > target, out bool messageConsumed)
DataflowBlockOptions DataflowBlockOptions
bool TryReceiveAll([NotNullWhen(true)] out IList< TOutput > items)
IDisposable LinkTo(ITargetBlock< TOutput > target, DataflowLinkOptions linkOptions)
bool ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock< TOutput > target)
void ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock< TOutput > target)
bool TryReceive(Predicate< TOutput > filter, [MaybeNullWhen(false)] out TOutput item)
DebuggingInformation GetDebuggingInformation()
void AddMessage(TOutput item)
static TaskScheduler Default
Task ContinueWith(Action< Task< TResult > > continuationAction)
static new TaskFactory< TResult > Factory
static bool Read(ref bool location)
void Fault(Exception exception)
bool IsCancellationRequested
static CancellationToken None