Terraria v1.4.4.9
Terraria source code documentation
Loading...
Searching...
No Matches
JoinBlockTargetSharedResources.cs
Go to the documentation of this file.
2using System.Linq;
3
5
6[DebuggerDisplay("{DebuggerDisplayContent,nq}")]
7internal sealed class JoinBlockTargetSharedResources
8{
9 internal readonly IDataflowBlock _ownerJoin;
10
11 internal readonly JoinBlockTargetBase[] _targets;
12
13 internal readonly Action<Exception> _exceptionAction;
14
15 internal readonly Action _joinFilledAction;
16
18
19 internal readonly BoundingState _boundingState;
20
21 internal bool _decliningPermanently;
22
24
25 internal bool _hasExceptions;
26
27 internal long _joinsCreated;
28
29 private bool _completionReserved;
30
31 internal object IncomingLock => _targets;
32
34 {
35 get
36 {
37 JoinBlockTargetBase[] targets = _targets;
38 foreach (JoinBlockTargetBase joinBlockTargetBase in targets)
39 {
40 if (!joinBlockTargetBase.HasAtLeastOneMessageAvailable)
41 {
42 return false;
43 }
44 }
45 return true;
46 }
47 }
48
50 {
51 get
52 {
53 if (_boundingState == null)
54 {
55 JoinBlockTargetBase[] targets = _targets;
56 foreach (JoinBlockTargetBase joinBlockTargetBase in targets)
57 {
58 if (!joinBlockTargetBase.HasAtLeastOneMessageAvailable && (_decliningPermanently || joinBlockTargetBase.IsDecliningPermanently || !joinBlockTargetBase.HasAtLeastOnePostponedMessage))
59 {
60 return false;
61 }
62 }
63 return true;
64 }
65 bool countIsLessThanBound = _boundingState.CountIsLessThanBound;
66 bool flag = true;
67 bool flag2 = false;
68 JoinBlockTargetBase[] targets2 = _targets;
69 foreach (JoinBlockTargetBase joinBlockTargetBase2 in targets2)
70 {
71 bool flag3 = !_decliningPermanently && !joinBlockTargetBase2.IsDecliningPermanently && joinBlockTargetBase2.HasAtLeastOnePostponedMessage;
72 if (_dataflowBlockOptions.Greedy && flag3 && (countIsLessThanBound || !joinBlockTargetBase2.HasTheHighestNumberOfMessagesAvailable))
73 {
74 return true;
75 }
76 bool hasAtLeastOneMessageAvailable = joinBlockTargetBase2.HasAtLeastOneMessageAvailable;
77 flag = flag && (hasAtLeastOneMessageAvailable || flag3);
78 if (hasAtLeastOneMessageAvailable)
79 {
80 flag2 = true;
81 }
82 }
83 if (flag)
84 {
85 return flag2 || countIsLessThanBound;
86 }
87 return false;
88 }
89 }
90
91 private bool CanceledOrFaulted
92 {
93 get
94 {
96 {
97 return _hasExceptions;
98 }
99 return true;
100 }
101 }
102
104 {
105 get
106 {
108 {
110 }
111 return false;
112 }
113 }
114
116 {
117 get
118 {
119 IDebuggerDisplay debuggerDisplay = _ownerJoin as IDebuggerDisplay;
120 return $"Block=\"{((debuggerDisplay != null) ? debuggerDisplay.Content : _ownerJoin)}\"";
121 }
122 }
123
124 internal JoinBlockTargetSharedResources(IDataflowBlock ownerJoin, JoinBlockTargetBase[] targets, Action joinFilledAction, Action<Exception> exceptionAction, GroupingDataflowBlockOptions dataflowBlockOptions)
125 {
126 _ownerJoin = ownerJoin;
127 _targets = targets;
128 _joinFilledAction = joinFilledAction;
129 _exceptionAction = exceptionAction;
130 _dataflowBlockOptions = dataflowBlockOptions;
131 if (dataflowBlockOptions.BoundedCapacity > 0)
132 {
133 _boundingState = new BoundingState(dataflowBlockOptions.BoundedCapacity);
134 }
135 }
136
137 internal void CompleteEachTarget()
138 {
139 JoinBlockTargetBase[] targets = _targets;
140 foreach (JoinBlockTargetBase joinBlockTargetBase in targets)
141 {
142 joinBlockTargetBase.CompleteCore(null, dropPendingMessages: true, releaseReservedMessages: false);
143 }
144 }
145
147 {
148 lock (IncomingLock)
149 {
151 {
152 return false;
153 }
154 }
155 bool flag = true;
156 JoinBlockTargetBase[] targets = _targets;
157 foreach (JoinBlockTargetBase joinBlockTargetBase in targets)
158 {
159 if (!joinBlockTargetBase.ReserveOneMessage())
160 {
161 flag = false;
162 break;
163 }
164 }
165 if (flag)
166 {
167 JoinBlockTargetBase[] targets2 = _targets;
168 foreach (JoinBlockTargetBase joinBlockTargetBase2 in targets2)
169 {
170 if (!joinBlockTargetBase2.ConsumeReservedMessage())
171 {
172 flag = false;
173 break;
174 }
175 }
176 }
177 if (!flag)
178 {
179 JoinBlockTargetBase[] targets3 = _targets;
180 foreach (JoinBlockTargetBase joinBlockTargetBase3 in targets3)
181 {
182 joinBlockTargetBase3.ReleaseReservedMessage();
183 }
184 }
185 return flag;
186 }
187
189 {
190 bool flag = false;
191 JoinBlockTargetBase[] targets = _targets;
192 foreach (JoinBlockTargetBase joinBlockTargetBase in targets)
193 {
194 flag |= joinBlockTargetBase.ConsumeOnePostponedMessage();
195 }
196 return flag;
197 }
198
199 internal void ProcessAsyncIfNecessary(bool isReplacementReplica = false)
200 {
202 {
203 ProcessAsyncIfNecessary_Slow(isReplacementReplica);
204 }
205 }
206
207 private void ProcessAsyncIfNecessary_Slow(bool isReplacementReplica)
208 {
209 _taskForInputProcessing = new Task(delegate(object thisSharedResources)
210 {
211 ((JoinBlockTargetSharedResources)thisSharedResources).ProcessMessagesLoopCore();
212 }, this, Common.GetCreationOptionsForTask(isReplacementReplica));
214 if (log.IsEnabled())
215 {
216 log.TaskLaunchedForMessageHandling(_ownerJoin, _taskForInputProcessing, DataflowEtwProvider.TaskLaunchedReason.ProcessingInputMessages, _targets.Max((JoinBlockTargetBase t) => t.NumberOfMessagesAvailableOrPostponed));
217 }
219 if (ex != null)
220 {
224 }
225 }
226
228 {
230 {
231 return;
232 }
234 if (!flag)
235 {
236 JoinBlockTargetBase[] targets = _targets;
237 foreach (JoinBlockTargetBase joinBlockTargetBase in targets)
238 {
239 if (joinBlockTargetBase.IsDecliningPermanently && !joinBlockTargetBase.HasAtLeastOneMessageAvailable)
240 {
241 flag = true;
242 break;
243 }
244 }
245 }
246 if (_taskForInputProcessing != null || (!flag && !CanceledOrFaulted))
247 {
248 return;
249 }
250 _completionReserved = true;
252 Task.Factory.StartNew(delegate(object state)
253 {
255 JoinBlockTargetBase[] targets2 = joinBlockTargetSharedResources._targets;
256 foreach (JoinBlockTargetBase joinBlockTargetBase2 in targets2)
257 {
258 joinBlockTargetBase2.CompleteOncePossible();
259 }
261 }
262
264 {
265 try
266 {
267 int num = 0;
268 int actualMaxMessagesPerTask = _dataflowBlockOptions.ActualMaxMessagesPerTask;
269 bool flag;
270 do
271 {
273 if (flag)
274 {
275 lock (IncomingLock)
276 {
278 {
282 {
284 }
285 }
286 }
287 }
288 num++;
289 }
290 while (flag && num < actualMaxMessagesPerTask);
291 }
292 catch (Exception exception)
293 {
294 _targets[0].CompleteCore(exception, dropPendingMessages: true, releaseReservedMessages: true);
295 }
296 finally
297 {
298 lock (IncomingLock)
299 {
301 ProcessAsyncIfNecessary(isReplacementReplica: true);
303 }
304 }
305 }
306
307 internal void OnItemsRemoved(int numItemsRemoved)
308 {
309 if (_boundingState != null)
310 {
311 lock (IncomingLock)
312 {
313 _boundingState.CurrentCount -= numItemsRemoved;
316 }
317 }
318 }
319}
static Exception StartTaskSafe(Task task, TaskScheduler scheduler)
Definition Common.cs:277
static TaskCreationOptions GetCreationOptionsForTask(bool isReplacementReplica=false)
Definition Common.cs:267
void CompleteCore(Exception exception, bool dropPendingMessages, bool releaseReservedMessages)
JoinBlockTargetSharedResources(IDataflowBlock ownerJoin, JoinBlockTargetBase[] targets, Action joinFilledAction, Action< Exception > exceptionAction, GroupingDataflowBlockOptions dataflowBlockOptions)
static new TaskFactory< TResult > Factory
Definition Task.cs:56