Terraria v1.4.4.9
Terraria source code documentation
Loading...
Searching...
No Matches
BufferedStream.cs
Go to the documentation of this file.
3
4namespace System.IO;
5
6public sealed class BufferedStream : Stream
7{
8 private Stream _stream;
9
10 private byte[] _buffer;
11
12 private readonly int _bufferSize;
13
14 private int _readPos;
15
16 private int _readLen;
17
18 private int _writePos;
19
21
23
24 public int BufferSize => _bufferSize;
25
26 public override bool CanRead
27 {
28 get
29 {
30 if (_stream != null)
31 {
32 return _stream.CanRead;
33 }
34 return false;
35 }
36 }
37
38 public override bool CanWrite
39 {
40 get
41 {
42 if (_stream != null)
43 {
44 return _stream.CanWrite;
45 }
46 return false;
47 }
48 }
49
50 public override bool CanSeek
51 {
52 get
53 {
54 if (_stream != null)
55 {
56 return _stream.CanSeek;
57 }
58 return false;
59 }
60 }
61
62 public override long Length
63 {
64 get
65 {
67 if (_writePos > 0)
68 {
69 FlushWrite();
70 }
71 return _stream.Length;
72 }
73 }
74
75 public override long Position
76 {
77 get
78 {
81 return _stream.Position + (_readPos - _readLen + _writePos);
82 }
83 set
84 {
85 if (value < 0)
86 {
88 }
89 Seek(value, SeekOrigin.Begin);
90 }
91 }
92
94 : this(stream, 4096)
95 {
96 }
97
98 public BufferedStream(Stream stream, int bufferSize)
99 {
100 if (stream == null)
101 {
103 }
104 if (bufferSize <= 0)
105 {
106 throw new ArgumentOutOfRangeException("bufferSize", SR.Format(SR.ArgumentOutOfRange_MustBePositive, "bufferSize"));
107 }
108 _stream = stream;
109 _bufferSize = bufferSize;
111 {
113 }
114 }
115
116 private void EnsureNotClosed()
117 {
118 if (_stream == null)
119 {
121 }
122 }
123
124 private void EnsureCanSeek()
125 {
126 if (!_stream.CanSeek)
127 {
129 }
130 }
131
132 private void EnsureCanRead()
133 {
134 if (!_stream.CanRead)
135 {
137 }
138 }
139
140 private void EnsureCanWrite()
141 {
142 if (!_stream.CanWrite)
143 {
145 }
146 }
147
149 {
150 if (_buffer.Length == _bufferSize && _bufferSize < 81920)
151 {
152 byte[] array = new byte[Math.Min(_bufferSize + _bufferSize, 81920)];
154 _buffer = array;
155 }
156 }
157
159 {
160 if (_buffer == null)
161 {
162 _buffer = new byte[_bufferSize];
163 }
164 }
165
166 protected override void Dispose(bool disposing)
167 {
168 try
169 {
170 if (disposing && _stream != null)
171 {
172 try
173 {
174 Flush();
175 return;
176 }
177 finally
178 {
180 }
181 }
182 }
183 finally
184 {
185 _stream = null;
186 _buffer = null;
187 _writePos = 0;
188 base.Dispose(disposing);
189 }
190 }
191
192 public override async ValueTask DisposeAsync()
193 {
194 _ = 1;
195 try
196 {
197 if (_stream != null)
198 {
199 try
200 {
201 await FlushAsync().ConfigureAwait(continueOnCapturedContext: false);
202 }
203 finally
204 {
205 await _stream.DisposeAsync().ConfigureAwait(continueOnCapturedContext: false);
206 }
207 }
208 }
209 finally
210 {
211 _stream = null;
212 _buffer = null;
213 _writePos = 0;
214 }
215 }
216
217 public override void Flush()
218 {
220 if (_writePos > 0)
221 {
222 FlushWrite();
223 }
224 else if (_readPos < _readLen)
225 {
226 if (_stream.CanSeek)
227 {
228 FlushRead();
229 }
230 if (_stream.CanWrite)
231 {
232 _stream.Flush();
233 }
234 }
235 else
236 {
237 if (_stream.CanWrite)
238 {
239 _stream.Flush();
240 }
241 _writePos = (_readPos = (_readLen = 0));
242 }
243 }
244
246 {
247 if (cancellationToken.IsCancellationRequested)
248 {
250 }
253 }
254
256 {
257 await EnsureAsyncActiveSemaphoreInitialized().WaitAsync(cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
258 try
259 {
260 if (_writePos > 0)
261 {
262 await FlushWriteAsync(cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
263 }
264 else if (_readPos < _readLen)
265 {
266 if (_stream.CanSeek)
267 {
268 FlushRead();
269 }
270 if (_stream.CanWrite)
271 {
272 await _stream.FlushAsync(cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
273 }
274 }
275 else if (_stream.CanWrite)
276 {
277 await _stream.FlushAsync(cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
278 }
279 }
280 finally
281 {
283 }
284 }
285
286 private void FlushRead()
287 {
288 if (_readPos - _readLen != 0)
289 {
291 }
292 _readPos = 0;
293 _readLen = 0;
294 }
295
297 {
298 if (_readPos == _readLen)
299 {
300 _readPos = (_readLen = 0);
301 return;
302 }
303 if (!_stream.CanSeek)
304 {
306 }
307 FlushRead();
308 }
309
310 private void FlushWrite()
311 {
313 _writePos = 0;
314 _stream.Flush();
315 }
316
318 {
319 await _stream.WriteAsync(new ReadOnlyMemory<byte>(_buffer, 0, _writePos), cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
320 _writePos = 0;
321 await _stream.FlushAsync(cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
322 }
323
324 private int ReadFromBuffer(byte[] buffer, int offset, int count)
325 {
326 int num = _readLen - _readPos;
327 if (num == 0)
328 {
329 return 0;
330 }
331 if (num > count)
332 {
333 num = count;
334 }
336 _readPos += num;
337 return num;
338 }
339
341 {
342 int num = Math.Min(_readLen - _readPos, destination.Length);
343 if (num > 0)
344 {
346 _readPos += num;
347 }
348 return num;
349 }
350
351 private int ReadFromBuffer(byte[] buffer, int offset, int count, out Exception error)
352 {
353 try
354 {
355 error = null;
357 }
358 catch (Exception ex)
359 {
360 error = ex;
361 return 0;
362 }
363 }
364
365 public override int Read(byte[] buffer, int offset, int count)
366 {
370 int num = ReadFromBuffer(buffer, offset, count);
371 if (num == count)
372 {
373 return num;
374 }
375 int num2 = num;
376 if (num > 0)
377 {
378 count -= num;
379 offset += num;
380 }
381 _readPos = (_readLen = 0);
382 if (_writePos > 0)
383 {
384 FlushWrite();
385 }
386 if (count >= _bufferSize)
387 {
388 return _stream.Read(buffer, offset, count) + num2;
389 }
393 return num + num2;
394 }
395
396 public override int Read(Span<byte> destination)
397 {
400 int num = ReadFromBuffer(destination);
401 if (num == destination.Length)
402 {
403 return num;
404 }
405 if (num > 0)
406 {
407 destination = destination.Slice(num);
408 }
409 _readPos = (_readLen = 0);
410 if (_writePos > 0)
411 {
412 FlushWrite();
413 }
414 if (destination.Length >= _bufferSize)
415 {
416 return _stream.Read(destination) + num;
417 }
420 return ReadFromBuffer(destination) + num;
421 }
422
424 {
425 Task<int> lastSyncCompletedReadTask = _lastSyncCompletedReadTask;
426 if (lastSyncCompletedReadTask != null && lastSyncCompletedReadTask.Result == val)
427 {
428 return lastSyncCompletedReadTask;
429 }
430 return _lastSyncCompletedReadTask = Task.FromResult(val);
431 }
432
434 {
436 if (cancellationToken.IsCancellationRequested)
437 {
439 }
442 int num = 0;
444 Task task = semaphoreSlim.WaitAsync(cancellationToken);
445 if (task.IsCompletedSuccessfully)
446 {
447 bool flag = true;
448 try
449 {
450 num = ReadFromBuffer(buffer, offset, count, out var error);
451 flag = num == count || error != null;
452 if (flag)
453 {
454 return (error == null) ? LastSyncCompletedReadTask(num) : Task.FromException<int>(error);
455 }
456 }
457 finally
458 {
459 if (flag)
460 {
461 semaphoreSlim.Release();
462 }
463 }
464 }
466 }
467
469 {
470 if (cancellationToken.IsCancellationRequested)
471 {
473 }
476 int num = 0;
478 Task task = semaphoreSlim.WaitAsync(cancellationToken);
479 if (task.IsCompletedSuccessfully)
480 {
481 bool flag = true;
482 try
483 {
484 num = ReadFromBuffer(buffer.Span);
485 flag = num == buffer.Length;
486 if (flag)
487 {
488 return new ValueTask<int>(num);
489 }
490 }
491 finally
492 {
493 if (flag)
494 {
495 semaphoreSlim.Release();
496 }
497 }
498 }
500 }
501
502 private async ValueTask<int> ReadFromUnderlyingStreamAsync(Memory<byte> buffer, CancellationToken cancellationToken, int bytesAlreadySatisfied, Task semaphoreLockTask)
503 {
504 await semaphoreLockTask.ConfigureAwait(continueOnCapturedContext: false);
505 try
506 {
507 int num = ReadFromBuffer(buffer.Span);
508 if (num == buffer.Length)
509 {
510 return bytesAlreadySatisfied + num;
511 }
512 if (num > 0)
513 {
514 buffer = buffer.Slice(num);
515 bytesAlreadySatisfied += num;
516 }
517 _readPos = (_readLen = 0);
518 if (_writePos > 0)
519 {
520 await FlushWriteAsync(cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
521 }
522 if (buffer.Length >= _bufferSize)
523 {
524 return await _stream.ReadAsync(buffer, cancellationToken).ConfigureAwait(continueOnCapturedContext: false) + bytesAlreadySatisfied;
525 }
527 _readLen = await _stream.ReadAsync(new Memory<byte>(_buffer, 0, _bufferSize), cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
528 num = ReadFromBuffer(buffer.Span);
529 return bytesAlreadySatisfied + num;
530 }
531 finally
532 {
534 }
535 }
536
537 public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
538 {
540 }
541
542 public override int EndRead(IAsyncResult asyncResult)
543 {
544 return TaskToApm.End<int>(asyncResult);
545 }
546
547 public override int ReadByte()
548 {
549 if (_readPos == _readLen)
550 {
551 return ReadByteSlow();
552 }
553 return _buffer[_readPos++];
554 }
555
556 private int ReadByteSlow()
557 {
560 if (_writePos > 0)
561 {
562 FlushWrite();
563 }
566 _readPos = 0;
567 if (_readLen == 0)
568 {
569 return -1;
570 }
571 return _buffer[_readPos++];
572 }
573
574 private void WriteToBuffer(byte[] buffer, ref int offset, ref int count)
575 {
576 int num = Math.Min(_bufferSize - _writePos, count);
577 if (num > 0)
578 {
581 _writePos += num;
582 count -= num;
583 offset += num;
584 }
585 }
586
588 {
589 int num = Math.Min(_bufferSize - _writePos, buffer.Length);
590 if (num > 0)
591 {
593 buffer.Slice(0, num).CopyTo(new Span<byte>(_buffer, _writePos, num));
594 _writePos += num;
595 }
596 return num;
597 }
598
599 public override void Write(byte[] buffer, int offset, int count)
600 {
604 if (_writePos == 0)
605 {
607 }
608 int num;
609 checked
610 {
611 num = _writePos + count;
612 if ((uint)num + count < _bufferSize + _bufferSize)
613 {
614 WriteToBuffer(buffer, ref offset, ref count);
615 if (_writePos >= _bufferSize)
616 {
618 _writePos = 0;
619 WriteToBuffer(buffer, ref offset, ref count);
620 }
621 return;
622 }
623 }
624 if (_writePos > 0)
625 {
626 if (num <= _bufferSize + _bufferSize && num <= 81920)
627 {
630 _stream.Write(_buffer, 0, num);
631 _writePos = 0;
632 return;
633 }
635 _writePos = 0;
636 }
638 }
639
640 public override void Write(ReadOnlySpan<byte> buffer)
641 {
644 if (_writePos == 0)
645 {
647 }
648 int num;
649 checked
650 {
651 num = _writePos + buffer.Length;
652 if ((uint)num + buffer.Length < _bufferSize + _bufferSize)
653 {
655 if (_writePos >= _bufferSize)
656 {
657 buffer = buffer.Slice(start);
659 _writePos = 0;
661 }
662 return;
663 }
664 }
665 if (_writePos > 0)
666 {
667 if (num <= _bufferSize + _bufferSize && num <= 81920)
668 {
670 buffer.CopyTo(new Span<byte>(_buffer, _writePos, buffer.Length));
671 _stream.Write(_buffer, 0, num);
672 _writePos = 0;
673 return;
674 }
676 _writePos = 0;
677 }
679 }
680
686
688 {
689 if (cancellationToken.IsCancellationRequested)
690 {
692 }
696 Task task = semaphoreSlim.WaitAsync(cancellationToken);
697 if (task.IsCompletedSuccessfully)
698 {
699 bool flag = true;
700 try
701 {
702 if (_writePos == 0)
703 {
705 }
706 flag = buffer.Length < _bufferSize - _writePos;
707 if (flag)
708 {
709 int num = WriteToBuffer(buffer.Span);
710 return default(ValueTask);
711 }
712 }
713 finally
714 {
715 if (flag)
716 {
717 semaphoreSlim.Release();
718 }
719 }
720 }
722 }
723
725 {
726 await semaphoreLockTask.ConfigureAwait(continueOnCapturedContext: false);
727 try
728 {
729 if (_writePos == 0)
730 {
732 }
733 int num;
734 checked
735 {
736 num = _writePos + buffer.Length;
737 if (num + buffer.Length < _bufferSize + _bufferSize)
738 {
739 buffer = buffer.Slice(WriteToBuffer(buffer.Span));
740 if (_writePos >= _bufferSize)
741 {
742 await _stream.WriteAsync(new ReadOnlyMemory<byte>(_buffer, 0, _writePos), cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
743 _writePos = 0;
744 WriteToBuffer(buffer.Span);
745 }
746 return;
747 }
748 }
749 if (_writePos > 0)
750 {
751 if (num <= _bufferSize + _bufferSize && num <= 81920)
752 {
754 buffer.Span.CopyTo(new Span<byte>(_buffer, _writePos, buffer.Length));
755 await _stream.WriteAsync(new ReadOnlyMemory<byte>(_buffer, 0, num), cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
756 _writePos = 0;
757 return;
758 }
759 await _stream.WriteAsync(new ReadOnlyMemory<byte>(_buffer, 0, _writePos), cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
760 _writePos = 0;
761 }
762 await _stream.WriteAsync(buffer, cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
763 }
764 finally
765 {
767 }
768 }
769
770 public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
771 {
773 }
774
775 public override void EndWrite(IAsyncResult asyncResult)
776 {
778 }
779
780 public override void WriteByte(byte value)
781 {
782 if (_writePos > 0 && _writePos < _bufferSize - 1)
783 {
785 }
786 else
787 {
789 }
790 }
791
792 private void WriteByteSlow(byte value)
793 {
795 if (_writePos == 0)
796 {
800 }
801 if (_writePos >= _bufferSize - 1)
802 {
803 FlushWrite();
804 }
806 }
807
808 public override long Seek(long offset, SeekOrigin origin)
809 {
812 if (_writePos > 0)
813 {
814 FlushWrite();
815 return _stream.Seek(offset, origin);
816 }
817 if (_readLen - _readPos > 0 && origin == SeekOrigin.Current)
818 {
820 }
821 long position = Position;
822 long num = _stream.Seek(offset, origin);
823 long num2 = num - (position - _readPos);
824 if (0 <= num2 && num2 < _readLen)
825 {
826 _readPos = (int)num2;
828 }
829 else
830 {
831 _readPos = (_readLen = 0);
832 }
833 return num;
834 }
835
836 public override void SetLength(long value)
837 {
838 if (value < 0)
839 {
841 }
845 Flush();
847 }
848
849 public override void CopyTo(Stream destination, int bufferSize)
850 {
854 int num = _readLen - _readPos;
855 if (num > 0)
856 {
857 destination.Write(_buffer, _readPos, num);
858 _readPos = (_readLen = 0);
859 }
860 else if (_writePos > 0)
861 {
862 FlushWrite();
863 }
864 _stream.CopyTo(destination, bufferSize);
865 }
866
868 {
872 if (!cancellationToken.IsCancellationRequested)
873 {
874 return CopyToAsyncCore(destination, bufferSize, cancellationToken);
875 }
877 }
878
880 {
881 await EnsureAsyncActiveSemaphoreInitialized().WaitAsync(cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
882 try
883 {
884 int num = _readLen - _readPos;
885 if (num > 0)
886 {
887 await destination.WriteAsync(new ReadOnlyMemory<byte>(_buffer, _readPos, num), cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
888 _readPos = (_readLen = 0);
889 }
890 else if (_writePos > 0)
891 {
892 await FlushWriteAsync(cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
893 }
894 await _stream.CopyToAsync(destination, bufferSize, cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
895 }
896 finally
897 {
899 }
900 }
901}
static void BlockCopy(Array src, int srcOffset, Array dst, int dstOffset, int count)
Definition Buffer.cs:102
override Task< int > ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
int ReadFromBuffer(byte[] buffer, int offset, int count, out Exception error)
BufferedStream(Stream stream)
override void Dispose(bool disposing)
override int EndRead(IAsyncResult asyncResult)
override int Read(Span< byte > destination)
override async ValueTask DisposeAsync()
Task< int > _lastSyncCompletedReadTask
Task< int > LastSyncCompletedReadTask(int val)
async Task CopyToAsyncCore(Stream destination, int bufferSize, CancellationToken cancellationToken)
BufferedStream(Stream stream, int bufferSize)
override void Write(ReadOnlySpan< byte > buffer)
override void EndWrite(IAsyncResult asyncResult)
override ValueTask WriteAsync(ReadOnlyMemory< byte > buffer, CancellationToken cancellationToken=default(CancellationToken))
override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
override Task FlushAsync(CancellationToken cancellationToken)
void WriteByteSlow(byte value)
int WriteToBuffer(ReadOnlySpan< byte > buffer)
override void Write(byte[] buffer, int offset, int count)
override void SetLength(long value)
void WriteToBuffer(byte[] buffer, ref int offset, ref int count)
override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
override void CopyTo(Stream destination, int bufferSize)
override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken)
override ValueTask< int > ReadAsync(Memory< byte > buffer, CancellationToken cancellationToken=default(CancellationToken))
async Task FlushAsyncInternal(CancellationToken cancellationToken)
override long Seek(long offset, SeekOrigin origin)
async ValueTask FlushWriteAsync(CancellationToken cancellationToken)
override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
override void WriteByte(byte value)
async ValueTask WriteToUnderlyingStreamAsync(ReadOnlyMemory< byte > buffer, CancellationToken cancellationToken, Task semaphoreLockTask)
int ReadFromBuffer(Span< byte > destination)
int ReadFromBuffer(byte[] buffer, int offset, int count)
override int Read(byte[] buffer, int offset, int count)
async ValueTask< int > ReadFromUnderlyingStreamAsync(Memory< byte > buffer, CancellationToken cancellationToken, int bytesAlreadySatisfied, Task semaphoreLockTask)
Task FlushAsync()
Definition Stream.cs:669
static void ValidateBufferArguments(byte[] buffer, int offset, int count)
Definition Stream.cs:1044
void SetLength(long value)
long Seek(long offset, SeekOrigin origin)
static void ValidateCopyToArguments(Stream destination, int bufferSize)
Definition Stream.cs:1060
Task WriteAsync(byte[] buffer, int offset, int count)
Definition Stream.cs:914
int Read(byte[] buffer, int offset, int count)
void CopyTo(Stream destination)
Definition Stream.cs:540
void Dispose()
Definition Stream.cs:639
Task< int > ReadAsync(byte[] buffer, int offset, int count)
Definition Stream.cs:762
SemaphoreSlim _asyncActiveSemaphore
Definition Stream.cs:490
void Write(byte[] buffer, int offset, int count)
Task CopyToAsync(Stream destination)
Definition Stream.cs:571
SemaphoreSlim EnsureAsyncActiveSemaphoreInitialized()
Definition Stream.cs:535
virtual ValueTask DisposeAsync()
Definition Stream.cs:654
static byte Min(byte val1, byte val2)
Definition Math.cs:912
static string Format(string resourceFormat, object p1)
Definition SR.cs:118
static string ArgumentOutOfRange_MustBePositive
Definition SR.cs:1068
static string NotSupported_CannotWriteToBufferedStreamIfReadBufferCannotBeFlushed
Definition SR.cs:2136
Definition SR.cs:7
static IAsyncResult Begin(Task task, AsyncCallback callback, object state)
Definition TaskToApm.cs:43
static void End(IAsyncResult asyncResult)
Definition TaskToApm.cs:48
new ConfiguredTaskAwaitable< TResult > ConfigureAwait(bool continueOnCapturedContext)
Definition Task.cs:226
static Task FromException(Exception exception)
Definition Task.cs:3341
static Task FromCanceled(CancellationToken cancellationToken)
Definition Task.cs:3363
static void ThrowNotSupportedException_UnwritableStream()
static void ThrowArgumentOutOfRangeException(System.ExceptionArgument argument)
static void ThrowNotSupportedException_UnseekableStream()
static void ThrowArgumentNullException(string name)
static void ThrowObjectDisposedException_StreamClosed(string objectName)
static void ThrowNotSupportedException_UnreadableStream()
void CopyTo(Span< T > destination)
static ValueTask FromCanceled(CancellationToken cancellationToken)
Definition ValueTask.cs:180
ConfiguredValueTaskAwaitable ConfigureAwait(bool continueOnCapturedContext)
Definition ValueTask.cs:312