Terraria v1.4.4.9
Terraria source code documentation
Loading...
Searching...
No Matches
OrderPreservingPipeliningMergeHelper.cs
Go to the documentation of this file.
5
7
8internal sealed class OrderPreservingPipeliningMergeHelper<TOutput, TKey> : IMergeHelper<TOutput>
9{
10 private sealed class ProducerComparer : IComparer<Producer<TKey>>
11 {
12 private readonly IComparer<TKey> _keyComparer;
13
14 internal ProducerComparer(IComparer<TKey> keyComparer)
15 {
16 _keyComparer = keyComparer;
17 }
18
20 {
21 return _keyComparer.Compare(y.MaxKey, x.MaxKey);
22 }
23 }
24
25 private sealed class OrderedPipeliningMergeEnumerator : MergeEnumerator<TOutput>
26 {
28
30
31 private readonly TOutput[] _producerNextElement;
32
34
35 private bool _initialized;
36
37 public override TOutput Current
38 {
39 get
40 {
41 int producerIndex = _producerHeap.MaxValue.ProducerIndex;
43 }
44 }
45
55
56 public override bool MoveNext()
57 {
58 if (!_initialized)
59 {
60 _initialized = true;
61 for (int i = 0; i < _mergeHelper._partitions.PartitionCount; i++)
62 {
64 if (TryWaitForElement(i, ref element))
65 {
66 _producerHeap.Insert(new Producer<TKey>(element.First, i));
67 _producerNextElement[i] = element.Second;
68 }
69 else
70 {
72 }
73 }
74 }
75 else
76 {
77 if (_producerHeap.Count == 0)
78 {
79 return false;
80 }
81 int producerIndex = _producerHeap.MaxValue.ProducerIndex;
84 {
85 _producerHeap.ReplaceMax(new Producer<TKey>(element2.First, producerIndex));
87 }
88 else
89 {
91 _producerHeap.RemoveMax();
92 }
93 }
94 return _producerHeap.Count > 0;
95 }
96
97 private void ThrowIfInTearDown()
98 {
99 if (!_mergeHelper._taskGroupState.CancellationState.MergedCancellationToken.IsCancellationRequested)
100 {
101 return;
102 }
103 try
104 {
105 object[] bufferLocks = _mergeHelper._bufferLocks;
106 for (int i = 0; i < bufferLocks.Length; i++)
107 {
108 lock (bufferLocks[i])
109 {
111 }
112 }
114 }
115 finally
116 {
118 }
119 }
120
122 {
124 object obj = _mergeHelper._bufferLocks[producer];
125 lock (obj)
126 {
127 if (queue.Count == 0)
128 {
129 if (_mergeHelper._producerDone[producer])
130 {
131 element = default(Pair<TKey, TOutput>);
132 return false;
133 }
134 _mergeHelper._consumerWaiting[producer] = true;
136 if (queue.Count == 0)
137 {
138 element = default(Pair<TKey, TOutput>);
139 return false;
140 }
141 }
142 if (_mergeHelper._producerWaiting[producer])
143 {
145 _mergeHelper._producerWaiting[producer] = false;
146 }
147 if (queue.Count < 1024)
148 {
149 element = queue.Dequeue();
150 return true;
151 }
153 _mergeHelper._buffers[producer] = new Queue<Pair<TKey, TOutput>>(128);
154 }
155 bool flag = TryGetPrivateElement(producer, ref element);
156 return true;
157 }
158
160 {
162 if (queue != null)
163 {
164 if (queue.Count > 0)
165 {
166 element = queue.Dequeue();
167 return true;
168 }
169 _privateBuffer[producer] = null;
170 }
171 return false;
172 }
173
174 public override void Dispose()
175 {
176 int num = _mergeHelper._buffers.Length;
177 for (int i = 0; i < num; i++)
178 {
179 object obj = _mergeHelper._bufferLocks[i];
180 lock (obj)
181 {
182 if (_mergeHelper._producerWaiting[i])
183 {
185 }
186 }
187 }
188 base.Dispose();
189 }
190 }
191
193
195
197
198 private readonly bool _autoBuffered;
199
201
202 private readonly bool[] _producerDone;
203
204 private readonly bool[] _producerWaiting;
205
206 private readonly bool[] _consumerWaiting;
207
208 private readonly object[] _bufferLocks;
209
211
233
238
243
244 [ExcludeFromCodeCoverage(Justification = "An ordered pipelining merge is not intended to be used this way")]
245 public TOutput[] GetResultsAsArray()
246 {
247 throw new InvalidOperationException();
248 }
249}
OrderedPipeliningMergeEnumerator(OrderPreservingPipeliningMergeHelper< TOutput, TKey > mergeHelper, IComparer< Producer< TKey > > producerComparer)
OrderPreservingPipeliningMergeHelper(PartitionedStream< TOutput, TKey > partitions, TaskScheduler taskScheduler, CancellationState cancellationState, bool autoBuffered, int queryId, IComparer< TKey > keyComparer)
static readonly ProducerComparerInt Instance
void QueryEnd(bool userInitiatedDispose)
static bool Wait(object obj, int millisecondsTimeout)
Definition Monitor.cs:87
static void Pulse(object obj)
Definition Monitor.cs:103