Terraria v1.4.4.9
Terraria source code documentation
Loading...
Searching...
No Matches
SpscTargetCore.cs
Go to the documentation of this file.
3using System.Linq;
4
6
7[DebuggerDisplay("{DebuggerDisplayContent,nq}")]
8internal sealed class SpscTargetCore<TInput>
9{
10 internal sealed class DebuggingInformation
11 {
13
14 internal IEnumerable<TInput> InputQueue => _target._messages.ToList();
15
17 {
18 get
19 {
20 if (_target._activeConsumer == null || _target.Completion.IsCompleted)
21 {
22 return 0;
23 }
24 return 1;
25 }
26 }
27
29
30 internal bool IsDecliningPermanently => _target._decliningPermanently;
31
32 internal bool IsCompleted => _target.Completion.IsCompleted;
33
35 {
36 _target = target;
37 }
38 }
39
41
43
45
46 private readonly Action<TInput> _action;
47
48 private volatile List<Exception> _exceptions;
49
50 private volatile bool _decliningPermanently;
51
52 private volatile bool _completionReserved;
53
54 private volatile Task _activeConsumer;
55
57
58 internal int InputCount => _messages.Count;
59
61
63
65
67 {
68 get
69 {
71 return $"Block=\"{((debuggerDisplay != null) ? debuggerDisplay.Content : _owningTarget)}\"";
72 }
73 }
74
81
82 internal bool Post(TInput messageValue)
83 {
85 {
86 return false;
87 }
88 _messages.Enqueue(messageValue);
90 if (_activeConsumer == null)
91 {
93 }
94 return true;
95 }
96
105
107 {
109 {
110 return DataflowMessageStatus.DecliningPermanently;
111 }
112 if (!messageHeader.IsValid)
113 {
114 throw new ArgumentException(System.SR.Argument_InvalidMessageHeader, "messageHeader");
115 }
116 if (consumeToAccept)
117 {
118 if (source == null)
119 {
121 }
123 if (!messageConsumed)
124 {
125 return DataflowMessageStatus.NotAvailable;
126 }
127 }
128 _messages.Enqueue(messageValue);
130 if (_activeConsumer == null)
131 {
133 }
134 return DataflowMessageStatus.Accepted;
135 }
136
138 {
139 if (_activeConsumer != null)
140 {
141 return;
142 }
143 Task task = new Task(delegate(object state)
144 {
145 ((SpscTargetCore<TInput>)state).ProcessMessagesLoopCore();
148 {
150 if (log.IsEnabled())
151 {
152 log.TaskLaunchedForMessageHandling(_owningTarget, task, DataflowEtwProvider.TaskLaunchedReason.ProcessingInputMessages, _messages.Count);
153 }
155 }
156 }
157
159 {
160 int num = 0;
162 bool flag = true;
163 while (flag)
164 {
165 flag = false;
166 TInput result = default(TInput);
167 try
168 {
169 while (_exceptions == null && num < actualMaxMessagesPerTask && _messages.TryDequeue(out result))
170 {
171 num++;
172 _action(result);
173 }
174 }
175 catch (Exception ex)
176 {
178 {
180 Common.StoreDataflowMessageValueIntoExceptionData(ex, result);
182 }
183 }
184 finally
185 {
186 if (!_messages.IsEmpty && _exceptions == null && num < actualMaxMessagesPerTask)
187 {
188 flag = true;
189 }
190 else
191 {
193 if ((decliningPermanently && _messages.IsEmpty) || _exceptions != null)
194 {
196 {
197 _completionReserved = true;
199 }
200 }
201 else
202 {
204 if (!_messages.IsEmpty || (!decliningPermanently && _decliningPermanently) || _exceptions != null)
205 {
207 }
208 }
209 }
210 }
211 }
212 }
213
215 {
217 {
218 if (exception != null)
219 {
221 }
224 }
225 }
226
228 {
229 lock (LazyInitializer.EnsureInitialized(ref _exceptions, () => new List<Exception>()))
230 {
232 }
233 }
234
236 {
237 TInput result;
238 while (_messages.TryDequeue(out result))
239 {
240 }
241 if (_exceptions != null)
242 {
245 {
246 exceptions = _exceptions.ToArray();
247 }
248 bool flag = CompletionSource.TrySetException(exceptions);
249 }
250 else
251 {
252 bool flag = CompletionSource.TrySetResult(default(VoidResult));
253 }
255 if (log.IsEnabled())
256 {
257 log.DataflowBlockCompleted(_owningTarget);
258 }
259 }
260
262 {
263 return new DebuggingInformation(this);
264 }
265}
void Add(TKey key, TValue value)
static string Argument_InvalidMessageHeader
Definition SR.cs:24
static string Argument_CantConsumeFromANullSource
Definition SR.cs:22
Definition SR.cs:7
static int CompareExchange(ref int location1, int value, int comparand)
static int Exchange(ref int location1, int value)
static bool IsCooperativeCancellation(Exception exception)
Definition Common.cs:88
static TaskCreationOptions GetCreationOptionsForTask(bool isReplacementReplica=false)
Definition Common.cs:267
SpscTargetCore(ITargetBlock< TInput > owningTarget, Action< TInput > action, ExecutionDataflowBlockOptions dataflowBlockOptions)
DataflowMessageStatus OfferMessage_Slow(DataflowMessageHeader messageHeader, TInput messageValue, ISourceBlock< TInput > source, bool consumeToAccept)
TaskCompletionSource< VoidResult > CompletionSource
readonly ExecutionDataflowBlockOptions _dataflowBlockOptions
TaskCompletionSource< VoidResult > _completionTask
DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, TInput messageValue, ISourceBlock< TInput > source, bool consumeToAccept)
readonly System.Threading.Tasks.SingleProducerSingleConsumerQueue< TInput > _messages