Terraria v1.4.4.9
Terraria source code documentation
Loading...
Searching...
No Matches
MsQuicStream.cs
Go to the documentation of this file.
2using System.IO;
9
11
12internal sealed class MsQuicStream : QuicStreamProvider
13{
14 private sealed class State
15 {
17
19
21
23
24 public string TraceId;
25
27
28 public long ReadErrorCode = -1L;
29
31
33
35
36 public bool ReceiveIsFinal;
37
39
41
43
45
46 public long SendErrorCode = -1L;
47
49
51
53
54 public int SendBufferCount;
55
57
59
61
63
64 public int ShutdownDone;
65
66 public readonly TaskCompletionSource ShutdownCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
67
68 public void Cleanup()
69 {
70 if (System.Net.NetEventSource.Log.IsEnabled())
71 {
72 System.Net.NetEventSource.Info(this, $"{TraceId} releasing handles.", "Cleanup");
73 }
75 CleanupSendState(this);
76 Handle?.Dispose();
80 {
82 }
83 ConnectionState?.RemoveStream(null);
84 }
85 }
86
97
98 private enum ShutdownWriteState
99 {
100 None,
101 Canceled,
102 Finished,
104 }
105
106 private enum ShutdownState
107 {
108 None,
109 Canceled,
110 Pending,
111 Finished,
113 }
114
115 private enum SendState
116 {
117 None,
118 Pending,
119 Finished,
120 Aborted,
122 Closed
123 }
124
125 internal static readonly MsQuicNativeMethods.StreamCallbackDelegate s_streamDelegate = NativeCallbackHandler;
126
127 private readonly State _state = new State();
128
129 private readonly bool _canRead;
130
131 private readonly bool _canWrite;
132
133 private long _streamId = -1L;
134
135 private int _disposed;
136
137 private int _readTimeout = -1;
138
139 private int _writeTimeout = -1;
140
141 internal override bool CanRead
142 {
143 get
144 {
145 if (_disposed == 0)
146 {
147 return _canRead;
148 }
149 return false;
150 }
151 }
152
153 internal override bool CanWrite
154 {
155 get
156 {
157 if (_disposed == 0)
158 {
159 return _canWrite;
160 }
161 return false;
162 }
163 }
164
165 internal override bool ReadsCompleted => _state.ReadState == ReadState.ReadsCompleted;
166
167 internal override bool CanTimeout => true;
168
169 internal override int ReadTimeout
170 {
171 get
172 {
174 return _readTimeout;
175 }
176 set
177 {
179 if (value <= 0 && value != -1)
180 {
182 }
184 }
185 }
186
187 internal override int WriteTimeout
188 {
189 get
190 {
192 return _writeTimeout;
193 }
194 set
195 {
197 if (value <= 0 && value != -1)
198 {
200 }
202 }
203 }
204
205 internal override long StreamId
206 {
207 get
208 {
210 if (_streamId == -1)
211 {
213 }
214 return _streamId;
215 }
216 }
217
218 internal string TraceId()
219 {
220 return _state.TraceId;
221 }
222
224 {
225 if (!connectionState.TryAddStream(this))
226 {
227 throw new ObjectDisposedException("QuicConnection");
228 }
229 _state.ConnectionState = connectionState;
230 _state.Handle = streamHandle;
231 _canRead = true;
232 _canWrite = !flags.HasFlag(QUIC_STREAM_OPEN_FLAGS.UNIDIRECTIONAL);
233 if (!_canWrite)
234 {
235 _state.SendState = SendState.Closed;
236 }
237 _state.StateGCHandle = GCHandle.Alloc(_state);
238 try
239 {
241 }
242 catch
243 {
245 throw;
246 }
247 _state.TraceId = MsQuicTraceHelper.GetTraceId(_state.Handle);
248 if (System.Net.NetEventSource.Log.IsEnabled())
249 {
250 System.Net.NetEventSource.Info(_state, $"{TraceId()} Inbound {(flags.HasFlag(QUIC_STREAM_OPEN_FLAGS.UNIDIRECTIONAL) ? "uni" : "bi")}directional stream created in connection {_state.ConnectionState.TraceId}.", ".ctor");
251 }
252 }
253
255 {
256 if (!connectionState.TryAddStream(this))
257 {
258 throw new ObjectDisposedException("QuicConnection");
259 }
260 _state.ConnectionState = connectionState;
261 _canRead = !flags.HasFlag(QUIC_STREAM_OPEN_FLAGS.UNIDIRECTIONAL);
262 _canWrite = true;
263 _state.StateGCHandle = GCHandle.Alloc(_state);
264 if (!_canRead)
265 {
266 _state.ReadState = ReadState.Closed;
267 }
268 try
269 {
270 uint status = MsQuicApi.Api.StreamOpenDelegate(connectionState.Handle, flags, s_streamDelegate, GCHandle.ToIntPtr(_state.StateGCHandle), out _state.Handle);
271 QuicExceptionHelpers.ThrowIfFailed(status, "Failed to open stream to peer.");
272 status = MsQuicApi.Api.StreamStartDelegate(_state.Handle, QUIC_STREAM_START_FLAGS.FAIL_BLOCKED);
273 QuicExceptionHelpers.ThrowIfFailed(status, "Could not start stream.");
274 }
275 catch
276 {
279 throw;
280 }
281 _state.TraceId = MsQuicTraceHelper.GetTraceId(_state.Handle);
282 if (System.Net.NetEventSource.Log.IsEnabled())
283 {
284 System.Net.NetEventSource.Info(_state, $"{_state.TraceId} Outbound {(flags.HasFlag(QUIC_STREAM_OPEN_FLAGS.UNIDIRECTIONAL) ? "uni" : "bi")}directional stream created in connection {_state.ConnectionState.TraceId}.", ".ctor");
285 }
286 }
287
292
297
299 {
302 {
303 await SendReadOnlySequenceAsync(buffers, endStream ? QUIC_SEND_FLAGS.FIN : QUIC_SEND_FLAGS.NONE).ConfigureAwait(continueOnCapturedContext: false);
305 }
306 }
307
312
314 {
317 {
318 await SendReadOnlyMemoryListAsync(buffers, endStream ? QUIC_SEND_FLAGS.FIN : QUIC_SEND_FLAGS.NONE).ConfigureAwait(continueOnCapturedContext: false);
320 }
321 }
322
324 {
327 {
328 await SendReadOnlyMemoryAsync(buffer, endStream ? QUIC_SEND_FLAGS.FIN : QUIC_SEND_FLAGS.NONE).ConfigureAwait(continueOnCapturedContext: false);
330 }
331 }
332
334 {
335 if (_state.SendState == SendState.Closed)
336 {
338 }
339 if (_state.SendState == SendState.Aborted)
340 {
341 if (_state.SendErrorCode != -1)
342 {
344 }
346 }
347 if (cancellationToken.IsCancellationRequested)
348 {
349 lock (_state)
350 {
351 if (_state.SendState == SendState.None || _state.SendState == SendState.Pending)
352 {
353 _state.SendState = SendState.Aborted;
354 }
355 }
357 }
358 CancellationTokenRegistration result = cancellationToken.UnsafeRegister(delegate(object s, CancellationToken token)
359 {
360 State state = (State)s;
361 bool flag = false;
362 lock (state)
363 {
364 if (state.SendState == SendState.None || state.SendState == SendState.Pending)
365 {
366 state.SendState = SendState.Aborted;
367 flag = true;
368 }
369 }
370 if (flag)
371 {
372 state.SendResettableCompletionSource.CompleteException(ExceptionDispatchInfo.SetCurrentStackTrace(new OperationCanceledException("Write was canceled", token)));
373 }
374 }, _state);
375 lock (_state)
376 {
377 if (_state.SendState == SendState.Aborted)
378 {
379 cancellationToken.ThrowIfCancellationRequested();
380 if (_state.SendErrorCode != -1)
381 {
383 }
385 }
386 if (_state.SendState == SendState.ConnectionClosed)
387 {
389 }
390 _state.SendState = ((!emptyBuffer) ? SendState.Pending : SendState.Finished);
391 return result;
392 }
393 }
394
396 {
397 lock (_state)
398 {
399 if (_state.SendState == SendState.Finished)
400 {
401 _state.SendState = SendState.None;
402 }
403 }
404 }
405
407 {
408 lock (_state)
409 {
410 if (_state.SendState == SendState.Pending)
411 {
412 _state.SendState = SendState.Finished;
413 }
414 }
415 }
416
418 {
420 if (_state.ReadState == ReadState.Closed)
421 {
423 }
424 if (System.Net.NetEventSource.Log.IsEnabled())
425 {
426 System.Net.NetEventSource.Info(_state, $"{TraceId()} Stream reading into Memory of '{destination.Length}' bytes.", "ReadAsync");
427 }
428 bool flag = false;
429 ReadState readState;
430 long readErrorCode;
431 lock (_state)
432 {
433 readState = _state.ReadState;
434 readErrorCode = _state.ReadErrorCode;
435 if (readState != ReadState.PendingRead && cancellationToken.IsCancellationRequested)
436 {
437 readState = ReadState.Aborted;
439 flag = true;
440 }
441 switch (readState)
442 {
443 case ReadState.ReadsCompleted:
444 return new ValueTask<int>(0);
445 case ReadState.None:
446 _state.ReceiveUserBuffer = destination;
447 _state.Stream = this;
448 _state.ReadState = ReadState.PendingRead;
449 if (cancellationToken.CanBeCanceled)
450 {
451 _state.ReceiveCancellationRegistration = cancellationToken.UnsafeRegister(delegate(object obj, CancellationToken token)
452 {
453 State state = (State)obj;
454 bool flag2;
455 lock (state)
456 {
458 }
459 if (flag2)
460 {
461 state.ReceiveResettableCompletionSource.CompleteException(ExceptionDispatchInfo.SetCurrentStackTrace(new OperationCanceledException(token)));
462 }
463 }, _state);
464 }
465 else
466 {
467 _state.ReceiveCancellationRegistration = default(CancellationTokenRegistration);
468 }
470 case ReadState.IndividualReadComplete:
471 {
472 _state.ReadState = ReadState.None;
474 ReceiveComplete(num);
476 {
478 }
479 else if (_state.ReceiveIsFinal)
480 {
481 _state.ReadState = ReadState.ReadsCompleted;
482 }
483 return new ValueTask<int>(num);
484 }
485 }
486 }
487 Exception ex = null;
489 {
490 ReadState.PendingRead => new InvalidOperationException("Only one read is supported at a time."),
491 ReadState.Aborted => flag ? new OperationCanceledException(cancellationToken) : ThrowHelper.GetStreamAbortedException(readErrorCode),
493 }));
494 }
495
496 private unsafe static int CopyMsQuicBuffersToUserBuffer(ReadOnlySpan<MsQuicNativeMethods.QuicBuffer> sourceBuffers, Span<byte> destinationBuffer)
497 {
498 if (sourceBuffers.Length == 0)
499 {
500 return 0;
501 }
502 int length = destinationBuffer.Length;
503 int num = 0;
504 int num2 = 0;
505 do
506 {
507 MsQuicNativeMethods.QuicBuffer quicBuffer = sourceBuffers[num2];
508 num = Math.Min((int)quicBuffer.Length, destinationBuffer.Length);
509 new Span<byte>(quicBuffer.Buffer, num).CopyTo(destinationBuffer);
510 destinationBuffer = destinationBuffer.Slice(num);
511 }
512 while (destinationBuffer.Length != 0 && ++num2 < sourceBuffers.Length);
513 return length - destinationBuffer.Length;
514 }
515
516 internal override void AbortRead(long errorCode)
517 {
518 if (_disposed != 1)
519 {
520 bool flag = false;
521 lock (_state)
522 {
524 }
525 if (flag)
526 {
528 }
529 StartShutdown(QUIC_STREAM_SHUTDOWN_FLAGS.ABORT_RECEIVE, errorCode);
530 }
531 }
532
533 internal override void AbortWrite(long errorCode)
534 {
535 if (_disposed == 1)
536 {
537 return;
538 }
539 bool flag = false;
540 lock (_state)
541 {
542 if (_state.SendState < SendState.Aborted)
543 {
544 _state.SendState = SendState.Aborted;
545 }
547 {
548 _state.ShutdownWriteState = ShutdownWriteState.Canceled;
549 flag = true;
550 }
551 }
552 if (flag)
553 {
555 }
556 StartShutdown(QUIC_STREAM_SHUTDOWN_FLAGS.ABORT_SEND, errorCode);
557 }
558
559 private void StartShutdown(QUIC_STREAM_SHUTDOWN_FLAGS flags, long errorCode)
560 {
561 uint status = MsQuicApi.Api.StreamShutdownDelegate(_state.Handle, flags, errorCode);
562 QuicExceptionHelpers.ThrowIfFailed(status, "StreamShutdown failed.");
563 }
564
566 {
568 lock (_state)
569 {
570 if (_state.ShutdownState == ShutdownState.ConnectionClosed)
571 {
573 }
574 }
575 using (cancellationToken.UnsafeRegister(delegate(object s, CancellationToken token)
576 {
577 State state = (State)s;
578 bool flag = false;
579 lock (state)
580 {
581 if (state.ShutdownState == ShutdownState.None)
582 {
583 state.ShutdownState = ShutdownState.Canceled;
584 flag = true;
585 }
586 }
587 if (flag)
588 {
589 state.ShutdownCompletionSource.SetException(ExceptionDispatchInfo.SetCurrentStackTrace(new OperationCanceledException("Wait for shutdown was canceled", token)));
590 }
591 }, _state))
592 {
593 await _state.ShutdownCompletionSource.Task.ConfigureAwait(continueOnCapturedContext: false);
594 }
595 }
596
609
610 internal override void Shutdown()
611 {
613 lock (_state)
614 {
615 if (_state.SendState < SendState.Finished)
616 {
617 _state.SendState = SendState.Finished;
618 }
619 }
621 }
622
623 internal override int Read(Span<byte> buffer)
624 {
626 byte[] array = ArrayPool<byte>.Shared.Rent(buffer.Length);
627 CancellationTokenSource cancellationTokenSource = null;
628 try
629 {
630 if (_readTimeout > 0)
631 {
632 cancellationTokenSource = new CancellationTokenSource(_readTimeout);
633 }
634 int result = ReadAsync(new Memory<byte>(array, 0, buffer.Length), cancellationTokenSource?.Token ?? default(CancellationToken)).AsTask().GetAwaiter().GetResult();
635 array.AsSpan(0, result).CopyTo(buffer);
636 return result;
637 }
638 catch (OperationCanceledException) when (cancellationTokenSource?.IsCancellationRequested ?? false)
639 {
641 }
642 finally
643 {
645 cancellationTokenSource?.Dispose();
646 }
647 }
648
649 internal override void Write(ReadOnlySpan<byte> buffer)
650 {
652 CancellationTokenSource cancellationTokenSource = null;
653 if (_writeTimeout > 0)
654 {
655 cancellationTokenSource = new CancellationTokenSource(_writeTimeout);
656 }
657 try
658 {
659 WriteAsync(buffer.ToArray()).AsTask().GetAwaiter().GetResult();
660 }
661 catch (OperationCanceledException) when (cancellationTokenSource?.IsCancellationRequested ?? false)
662 {
664 }
665 finally
666 {
667 cancellationTokenSource?.Dispose();
668 }
669 }
670
671 internal override void Flush()
672 {
674 }
675
677 {
679 return Task.CompletedTask;
680 }
681
682 public override ValueTask DisposeAsync()
683 {
684 Dispose(disposing: true);
685 return default(ValueTask);
686 }
687
688 public override void Dispose()
689 {
690 Dispose(disposing: true);
691 GC.SuppressFinalize(this);
692 }
693
695 {
696 Dispose(disposing: false);
697 }
698
699 private void Dispose(bool disposing)
700 {
701 if (Interlocked.Exchange(ref _disposed, 1) != 0)
702 {
703 return;
704 }
705 if (System.Net.NetEventSource.Log.IsEnabled())
706 {
707 System.Net.NetEventSource.Info(_state, $"{TraceId()} Stream disposing {disposing}", "Dispose");
708 }
709 bool flag = false;
710 bool flag2 = false;
711 bool flag3 = false;
712 bool flag4 = false;
713 lock (_state)
714 {
715 if (_state.SendState < SendState.Aborted)
716 {
717 flag = true;
718 }
719 if (_state.ReadState < ReadState.ReadsCompleted || _state.ReadState == ReadState.Aborted)
720 {
721 flag2 = true;
723 }
725 {
726 _state.ShutdownState = ShutdownState.Pending;
727 }
728 flag4 = Interlocked.Exchange(ref _state.ShutdownDone, 1) == 2;
729 if (flag4)
730 {
731 _state.ShutdownState = ShutdownState.Finished;
732 }
733 }
734 if (flag)
735 {
736 try
737 {
739 }
741 {
742 }
743 }
744 if (flag2)
745 {
746 try
747 {
748 StartShutdown(QUIC_STREAM_SHUTDOWN_FLAGS.ABORT_RECEIVE, 4294967295L);
749 }
751 {
752 }
753 }
754 if (flag3)
755 {
757 }
758 if (flag4)
759 {
760 _state.Cleanup();
761 }
762 if (System.Net.NetEventSource.Log.IsEnabled())
763 {
764 System.Net.NetEventSource.Info(_state, $"{TraceId()} Stream disposed", "Dispose");
765 }
766 }
767
768 private void EnableReceive()
769 {
770 uint status = MsQuicApi.Api.StreamReceiveSetEnabledDelegate(_state.Handle, enabled: true);
771 QuicExceptionHelpers.ThrowIfFailed(status, "StreamReceiveSetEnabled failed.");
772 }
773
774 private static uint NativeCallbackHandler(IntPtr stream, IntPtr context, ref MsQuicNativeMethods.StreamEvent streamEvent)
775 {
776 State state = (State)GCHandle.FromIntPtr(context).Target;
777 return HandleEvent(state, ref streamEvent);
778 }
779
781 {
782 if (System.Net.NetEventSource.Log.IsEnabled())
783 {
784 System.Net.NetEventSource.Info(state, $"{state.TraceId} Stream received event {evt.Type}", "HandleEvent");
785 }
786 try
787 {
788 return evt.Type switch
789 {
790 QUIC_STREAM_EVENT_TYPE.START_COMPLETE => HandleEventStartComplete(state, ref evt),
791 QUIC_STREAM_EVENT_TYPE.RECEIVE => HandleEventRecv(state, ref evt),
792 QUIC_STREAM_EVENT_TYPE.SEND_COMPLETE => HandleEventSendComplete(state, ref evt),
793 QUIC_STREAM_EVENT_TYPE.PEER_SEND_SHUTDOWN => HandleEventPeerSendShutdown(state),
794 QUIC_STREAM_EVENT_TYPE.PEER_SEND_ABORTED => HandleEventPeerSendAborted(state, ref evt),
795 QUIC_STREAM_EVENT_TYPE.PEER_RECEIVE_ABORTED => HandleEventPeerRecvAborted(state, ref evt),
796 QUIC_STREAM_EVENT_TYPE.SEND_SHUTDOWN_COMPLETE => HandleEventSendShutdownComplete(state, ref evt),
797 QUIC_STREAM_EVENT_TYPE.SHUTDOWN_COMPLETE => HandleEventShutdownComplete(state, ref evt),
798 _ => 0u,
799 };
800 }
801 catch (Exception ex)
802 {
803 if (System.Net.NetEventSource.Log.IsEnabled())
804 {
805 System.Net.NetEventSource.Error(state, $"{state.TraceId} Exception occurred during handling Stream {evt.Type} event: {ex}", "HandleEvent");
806 }
807 return 2151743491u;
808 }
809 }
810
811 private unsafe static uint HandleEventRecv(State state, ref MsQuicNativeMethods.StreamEvent evt)
812 {
813 ref MsQuicNativeMethods.StreamEventDataReceive receive = ref evt.Data.Receive;
814 if (System.Net.NetEventSource.Log.IsEnabled())
815 {
816 System.Net.NetEventSource.Info(state, FormattableStringFactory.Create("{0} Stream received {1} bytes{2}", state.TraceId, receive.TotalBufferLength, receive.Flags.HasFlag(QUIC_RECEIVE_FLAGS.FIN) ? " with FIN flag" : ""), "HandleEventRecv");
817 }
818 bool flag = false;
819 int num;
820 lock (state)
821 {
822 switch (state.ReadState)
823 {
824 case ReadState.None:
825 {
826 if ((uint)state.ReceiveQuicBuffers.Length < receive.BufferCount)
827 {
828 MsQuicNativeMethods.QuicBuffer[] receiveQuicBuffers = state.ReceiveQuicBuffers;
829 state.ReceiveQuicBuffers = ArrayPool<MsQuicNativeMethods.QuicBuffer>.Shared.Rent((int)receive.BufferCount);
830 if (receiveQuicBuffers.Length != 0)
831 {
832 ArrayPool<MsQuicNativeMethods.QuicBuffer>.Shared.Return(receiveQuicBuffers);
833 }
834 }
835 for (uint num2 = 0u; num2 < receive.BufferCount; num2++)
836 {
837 state.ReceiveQuicBuffers[num2] = receive.Buffers[num2];
838 }
839 state.ReceiveQuicBuffersCount = (int)receive.BufferCount;
840 state.ReceiveQuicBuffersTotalBytes = checked((int)receive.TotalBufferLength);
841 state.ReceiveIsFinal = receive.Flags.HasFlag(QUIC_RECEIVE_FLAGS.FIN);
842 if (state.ReceiveQuicBuffersTotalBytes == 0)
843 {
844 if (state.ReceiveIsFinal)
845 {
846 state.ReadState = ReadState.ReadsCompleted;
847 }
848 return 0u;
849 }
850 state.ReadState = ReadState.IndividualReadComplete;
851 return 459749u;
852 }
853 case ReadState.PendingRead:
854 state.ReceiveCancellationRegistration.Unregister();
855 flag = true;
856 state.Stream = null;
857 state.ReadState = ReadState.None;
858 num = CopyMsQuicBuffersToUserBuffer(new ReadOnlySpan<MsQuicNativeMethods.QuicBuffer>(receive.Buffers, (int)receive.BufferCount), state.ReceiveUserBuffer.Span);
859 if (receive.Flags.HasFlag(QUIC_RECEIVE_FLAGS.FIN) && (uint)num == receive.TotalBufferLength)
860 {
861 state.ReadState = ReadState.ReadsCompleted;
862 }
863 state.ReceiveUserBuffer = null;
864 break;
865 default:
866 return 0u;
867 }
868 }
869 if (flag)
870 {
871 state.ReceiveResettableCompletionSource.Complete(num);
872 }
873 uint result = (((uint)num != receive.TotalBufferLength) ? 459998u : 0u);
874 receive.TotalBufferLength = (uint)num;
875 return result;
876 }
877
879 {
880 bool flag = false;
881 bool flag2 = false;
882 lock (state)
883 {
884 if (state.SendState == SendState.None || state.SendState == SendState.Pending)
885 {
886 flag = true;
887 }
888 if (state.ShutdownWriteState == ShutdownWriteState.None)
889 {
890 state.ShutdownWriteState = ShutdownWriteState.Canceled;
891 flag2 = true;
892 }
893 state.SendState = SendState.Aborted;
894 state.SendErrorCode = evt.Data.PeerReceiveAborted.ErrorCode;
895 }
896 if (flag)
897 {
898 state.SendResettableCompletionSource.CompleteException(ExceptionDispatchInfo.SetCurrentStackTrace(new QuicStreamAbortedException(state.SendErrorCode)));
899 }
900 if (flag2)
901 {
902 state.ShutdownWriteCompletionSource.SetException(ExceptionDispatchInfo.SetCurrentStackTrace(new QuicStreamAbortedException(state.SendErrorCode)));
903 }
904 return 0u;
905 }
906
908 {
909 return 0u;
910 }
911
913 {
914 if (evt.Data.SendShutdownComplete.Graceful != 0)
915 {
916 bool flag = false;
917 lock (state)
918 {
919 if (state.ShutdownWriteState == ShutdownWriteState.None)
920 {
921 state.ShutdownWriteState = ShutdownWriteState.Finished;
922 flag = true;
923 }
924 }
925 if (flag)
926 {
927 state.ShutdownWriteCompletionSource.SetResult();
928 }
929 }
930 return 0u;
931 }
932
934 {
935 MsQuicNativeMethods.StreamEventDataShutdownComplete shutdownComplete = evt.Data.ShutdownComplete;
936 if (shutdownComplete.ConnectionShutdown != 0)
937 {
939 }
940 bool flag = false;
941 bool flag2 = false;
942 bool flag3 = false;
943 lock (state)
944 {
945 if (System.Net.NetEventSource.Log.IsEnabled())
946 {
947 System.Net.NetEventSource.Info(state, $"{state.TraceId} Stream completing resettable event source.", "HandleEventShutdownComplete");
948 }
949 flag = CleanupReadStateAndCheckPending(state, ReadState.ReadsCompleted);
950 if (state.ShutdownWriteState == ShutdownWriteState.None)
951 {
952 state.ShutdownWriteState = ShutdownWriteState.Finished;
953 flag2 = true;
954 }
955 if (state.ShutdownState == ShutdownState.None)
956 {
957 state.ShutdownState = ShutdownState.Finished;
958 flag3 = true;
959 }
960 }
961 if (flag)
962 {
963 state.ReceiveResettableCompletionSource.Complete(0);
964 }
965 if (flag2)
966 {
967 state.ShutdownWriteCompletionSource.SetResult();
968 }
969 if (flag3)
970 {
971 state.ShutdownCompletionSource.SetResult();
972 }
973 if (Interlocked.Exchange(ref state.ShutdownDone, 2) == 1)
974 {
975 state.Cleanup();
976 }
977 return 0u;
978 }
979
981 {
982 bool flag = false;
983 lock (state)
984 {
986 state.ReadErrorCode = evt.Data.PeerSendAborted.ErrorCode;
987 }
988 if (flag)
989 {
990 state.ReceiveResettableCompletionSource.CompleteException(ExceptionDispatchInfo.SetCurrentStackTrace(new QuicStreamAbortedException(state.ReadErrorCode)));
991 }
992 return 0u;
993 }
994
996 {
997 bool flag = false;
998 lock (state)
999 {
1000 if (System.Net.NetEventSource.Log.IsEnabled())
1001 {
1002 System.Net.NetEventSource.Info(state, $"{state.TraceId} Stream completing resettable event source.", "HandleEventPeerSendShutdown");
1003 }
1004 flag = CleanupReadStateAndCheckPending(state, ReadState.ReadsCompleted);
1005 }
1006 if (flag)
1007 {
1008 state.ReceiveResettableCompletionSource.Complete(0);
1009 }
1010 return 0u;
1011 }
1012
1014 {
1015 MsQuicNativeMethods.StreamEventDataSendComplete sendComplete = evt.Data.SendComplete;
1016 bool flag = sendComplete.Canceled != 0;
1017 bool flag2 = false;
1018 lock (state)
1019 {
1020 if (state.SendState == SendState.Pending)
1021 {
1022 state.SendState = SendState.Finished;
1023 flag2 = true;
1024 }
1025 if (flag)
1026 {
1027 state.SendState = SendState.Aborted;
1028 }
1029 }
1030 if (flag2)
1031 {
1033 if (!flag)
1034 {
1035 state.SendResettableCompletionSource.Complete(0u);
1036 }
1037 else
1038 {
1039 state.SendResettableCompletionSource.CompleteException(ExceptionDispatchInfo.SetCurrentStackTrace(new OperationCanceledException("Write was canceled")));
1040 }
1041 }
1042 return 0u;
1043 }
1044
1045 private static void CleanupSendState(State state)
1046 {
1047 lock (state)
1048 {
1049 for (int i = 0; i < state.SendBufferCount; i++)
1050 {
1051 state.BufferArrays[i].Dispose();
1052 }
1053 }
1054 }
1055
1057 {
1058 if (buffer.IsEmpty)
1059 {
1060 if ((flags & QUIC_SEND_FLAGS.FIN) == QUIC_SEND_FLAGS.FIN)
1061 {
1063 }
1064 return default(ValueTask);
1065 }
1066 MemoryHandle memoryHandle = buffer.Pin();
1068 {
1069 _state.SendQuicBuffers = Marshal.AllocHGlobal(sizeof(MsQuicNativeMethods.QuicBuffer));
1070 _state.SendBufferMaxCount = 1;
1071 }
1073 ptr->Length = (uint)buffer.Length;
1074 ptr->Buffer = (byte*)memoryHandle.Pointer;
1075 _state.BufferArrays[0] = memoryHandle;
1076 _state.SendBufferCount = 1;
1077 uint status = MsQuicApi.Api.StreamSendDelegate(_state.Handle, ptr, 1u, flags, IntPtr.Zero);
1079 {
1082 QuicExceptionHelpers.ThrowIfFailed(status, "Could not send data to peer.");
1083 }
1085 }
1086
1088 {
1089 if (buffers.IsEmpty)
1090 {
1091 if ((flags & QUIC_SEND_FLAGS.FIN) == QUIC_SEND_FLAGS.FIN)
1092 {
1094 }
1095 return default(ValueTask);
1096 }
1097 int num = 0;
1098 ReadOnlySequence<byte>.Enumerator enumerator = buffers.GetEnumerator();
1099 while (enumerator.MoveNext())
1100 {
1101 ReadOnlyMemory<byte> current = enumerator.Current;
1102 num++;
1103 }
1104 if (_state.SendBufferMaxCount < num)
1105 {
1107 _state.SendQuicBuffers = IntPtr.Zero;
1108 _state.SendQuicBuffers = Marshal.AllocHGlobal(sizeof(MsQuicNativeMethods.QuicBuffer) * num);
1109 _state.SendBufferMaxCount = num;
1110 _state.BufferArrays = new MemoryHandle[num];
1111 }
1112 _state.SendBufferCount = num;
1113 num = 0;
1115 ReadOnlySequence<byte>.Enumerator enumerator2 = buffers.GetEnumerator();
1116 while (enumerator2.MoveNext())
1117 {
1118 ReadOnlyMemory<byte> current2 = enumerator2.Current;
1119 MemoryHandle memoryHandle = current2.Pin();
1120 ptr[num].Length = (uint)current2.Length;
1121 ptr[num].Buffer = (byte*)memoryHandle.Pointer;
1122 _state.BufferArrays[num] = memoryHandle;
1123 num++;
1124 }
1125 uint status = MsQuicApi.Api.StreamSendDelegate(_state.Handle, ptr, (uint)num, flags, IntPtr.Zero);
1127 {
1130 QuicExceptionHelpers.ThrowIfFailed(status, "Could not send data to peer.");
1131 }
1133 }
1134
1136 {
1137 if (buffers.IsEmpty)
1138 {
1139 if ((flags & QUIC_SEND_FLAGS.FIN) == QUIC_SEND_FLAGS.FIN)
1140 {
1142 }
1143 return default(ValueTask);
1144 }
1146 uint num = (uint)array.Length;
1147 if (_state.SendBufferMaxCount < array.Length)
1148 {
1150 _state.SendQuicBuffers = IntPtr.Zero;
1151 _state.SendQuicBuffers = Marshal.AllocHGlobal(sizeof(MsQuicNativeMethods.QuicBuffer) * array.Length);
1152 _state.SendBufferMaxCount = array.Length;
1153 _state.BufferArrays = new MemoryHandle[array.Length];
1154 }
1155 _state.SendBufferCount = array.Length;
1157 for (int i = 0; i < num; i++)
1158 {
1159 ReadOnlyMemory<byte> readOnlyMemory = array[i];
1160 MemoryHandle memoryHandle = readOnlyMemory.Pin();
1161 ptr[i].Length = (uint)readOnlyMemory.Length;
1162 ptr[i].Buffer = (byte*)memoryHandle.Pointer;
1163 _state.BufferArrays[i] = memoryHandle;
1164 }
1165 uint status = MsQuicApi.Api.StreamSendDelegate(_state.Handle, ptr, num, flags, IntPtr.Zero);
1167 {
1170 QuicExceptionHelpers.ThrowIfFailed(status, "Could not send data to peer.");
1171 }
1173 }
1174
1175 private void ReceiveComplete(int bufferLength)
1176 {
1177 uint status = MsQuicApi.Api.StreamReceiveCompleteDelegate(_state.Handle, (ulong)bufferLength);
1178 QuicExceptionHelpers.ThrowIfFailed(status, "Could not complete receive call.");
1179 }
1180
1181 private long GetStreamId()
1182 {
1184 }
1185
1186 private void ThrowIfDisposed()
1187 {
1188 if (_disposed == 1)
1189 {
1190 throw new ObjectDisposedException("MsQuicStream");
1191 }
1192 }
1193
1195 {
1196 long abortErrorCode = state.ConnectionState.AbortErrorCode;
1197 if (System.Net.NetEventSource.Log.IsEnabled())
1198 {
1199 System.Net.NetEventSource.Info(state, state.TraceId + " Stream handling connection " + state.ConnectionState.TraceId + " close" + ((abortErrorCode != -1) ? $" with code {abortErrorCode}" : ""), "HandleEventConnectionClose");
1200 }
1201 bool flag = false;
1202 bool flag2 = false;
1203 bool flag3 = false;
1204 bool flag4 = false;
1205 lock (state)
1206 {
1207 flag = CleanupReadStateAndCheckPending(state, ReadState.ConnectionClosed);
1208 if (state.SendState == SendState.None || state.SendState == SendState.Pending)
1209 {
1210 flag2 = true;
1211 }
1212 state.SendState = SendState.ConnectionClosed;
1213 if (state.ShutdownWriteState == ShutdownWriteState.None)
1214 {
1215 flag3 = true;
1216 }
1217 state.ShutdownWriteState = ShutdownWriteState.ConnectionClosed;
1218 if (state.ShutdownState == ShutdownState.None)
1219 {
1220 flag4 = true;
1221 }
1222 state.ShutdownState = ShutdownState.ConnectionClosed;
1223 }
1224 if (flag)
1225 {
1226 state.ReceiveResettableCompletionSource.CompleteException(ExceptionDispatchInfo.SetCurrentStackTrace(GetConnectionAbortedException(state)));
1227 }
1228 if (flag2)
1229 {
1230 state.SendResettableCompletionSource.CompleteException(ExceptionDispatchInfo.SetCurrentStackTrace(GetConnectionAbortedException(state)));
1231 }
1232 if (flag3)
1233 {
1234 state.ShutdownWriteCompletionSource.SetException(ExceptionDispatchInfo.SetCurrentStackTrace(GetConnectionAbortedException(state)));
1235 }
1236 if (flag4)
1237 {
1239 }
1240 if (Interlocked.Exchange(ref state.ShutdownDone, 2) == 1)
1241 {
1242 state.Cleanup();
1243 }
1244 return 0u;
1245 }
1246
1248 {
1249 return ThrowHelper.GetConnectionAbortedException(state.ConnectionState.AbortErrorCode);
1250 }
1251
1252 private static bool CleanupReadStateAndCheckPending(State state, ReadState finalState)
1253 {
1254 bool result = false;
1255 if (state.ReadState == ReadState.PendingRead)
1256 {
1257 result = true;
1258 state.Stream = null;
1259 state.ReceiveUserBuffer = null;
1260 state.ReceiveCancellationRegistration.Unregister();
1261 }
1262 if (state.ReadState < ReadState.ReadsCompleted)
1263 {
1264 state.ReadState = finalState;
1265 }
1266 return result;
1267 }
1268}
static ArrayPool< T > Shared
Definition ArrayPool.cs:7
static void SuppressFinalize(object obj)
Definition GC.cs:202
Definition GC.cs:8
static byte Min(byte val1, byte val2)
Definition Math.cs:912
static readonly System.Net.NetEventSource Log
static void Info(object thisOrContextObject, FormattableString formattableString=null, [CallerMemberName] string memberName=null)
static void Error(object thisOrContextObject, FormattableString formattableString, [CallerMemberName] string memberName=null)
static unsafe ulong GetULongParam(MsQuicApi api, SafeHandle nativeObject, QUIC_PARAM_LEVEL level, uint param)
static string GetTraceId(SafeMsQuicStreamHandle handle)
static void ThrowIfFailed(uint status, string message=null, Exception innerException=null)
readonly ResettableCompletionSource< int > ReceiveResettableCompletionSource
readonly ResettableCompletionSource< uint > SendResettableCompletionSource
readonly TaskCompletionSource ShutdownWriteCompletionSource
CancellationTokenRegistration ReceiveCancellationRegistration
CancellationTokenRegistration HandleWriteStartState(bool emptyBuffer, CancellationToken cancellationToken)
override void Write(ReadOnlySpan< byte > buffer)
static unsafe uint HandleEventRecv(State state, ref MsQuicNativeMethods.StreamEvent evt)
override async ValueTask WriteAsync(ReadOnlySequence< byte > buffers, bool endStream, CancellationToken cancellationToken=default(CancellationToken))
static uint HandleEventSendComplete(State state, ref MsQuicNativeMethods.StreamEvent evt)
override async ValueTask WriteAsync(ReadOnlyMemory< byte > buffer, bool endStream, CancellationToken cancellationToken=default(CancellationToken))
unsafe ValueTask SendReadOnlySequenceAsync(ReadOnlySequence< byte > buffers, QUIC_SEND_FLAGS flags)
override async ValueTask ShutdownCompleted(CancellationToken cancellationToken=default(CancellationToken))
static uint HandleEvent(State state, ref MsQuicNativeMethods.StreamEvent evt)
static uint HandleEventPeerRecvAborted(State state, ref MsQuicNativeMethods.StreamEvent evt)
override ValueTask< int > ReadAsync(Memory< byte > destination, CancellationToken cancellationToken=default(CancellationToken))
static uint HandleEventSendShutdownComplete(State state, ref MsQuicNativeMethods.StreamEvent evt)
override ValueTask WriteAsync(ReadOnlyMemory< byte > buffer, CancellationToken cancellationToken=default(CancellationToken))
override ValueTask WriteAsync(ReadOnlyMemory< ReadOnlyMemory< byte > > buffers, CancellationToken cancellationToken=default(CancellationToken))
static Exception GetConnectionAbortedException(State state)
unsafe ValueTask SendReadOnlyMemoryListAsync(ReadOnlyMemory< ReadOnlyMemory< byte > > buffers, QUIC_SEND_FLAGS flags)
MsQuicStream(MsQuicConnection.State connectionState, QUIC_STREAM_OPEN_FLAGS flags)
static uint HandleEventShutdownComplete(State state, ref MsQuicNativeMethods.StreamEvent evt)
override ValueTask WriteAsync(ReadOnlySequence< byte > buffers, CancellationToken cancellationToken=default(CancellationToken))
void StartShutdown(QUIC_STREAM_SHUTDOWN_FLAGS flags, long errorCode)
static readonly MsQuicNativeMethods.StreamCallbackDelegate s_streamDelegate
override Task FlushAsync(CancellationToken cancellationToken=default(CancellationToken))
override ValueTask WaitForWriteCompletionAsync(CancellationToken cancellationToken=default(CancellationToken))
unsafe ValueTask SendReadOnlyMemoryAsync(ReadOnlyMemory< byte > buffer, QUIC_SEND_FLAGS flags)
static uint HandleEventPeerSendAborted(State state, ref MsQuicNativeMethods.StreamEvent evt)
static unsafe int CopyMsQuicBuffersToUserBuffer(ReadOnlySpan< MsQuicNativeMethods.QuicBuffer > sourceBuffers, Span< byte > destinationBuffer)
override async ValueTask WriteAsync(ReadOnlyMemory< ReadOnlyMemory< byte > > buffers, bool endStream, CancellationToken cancellationToken=default(CancellationToken))
MsQuicStream(MsQuicConnection.State connectionState, SafeMsQuicStreamHandle streamHandle, QUIC_STREAM_OPEN_FLAGS flags)
static bool CleanupReadStateAndCheckPending(State state, ReadState finalState)
static uint NativeCallbackHandler(IntPtr stream, IntPtr context, ref MsQuicNativeMethods.StreamEvent streamEvent)
static uint HandleEventStartComplete(State state, ref MsQuicNativeMethods.StreamEvent evt)
static Exception GetStreamAbortedException(long errorCode)
static Exception GetConnectionAbortedException(long errorCode)
Definition ThrowHelper.cs:5
static FormattableString Create(string format, params object?[] arguments)
static void FreeHGlobal(IntPtr hglobal)
Definition Marshal.cs:1680
static IntPtr AllocHGlobal(int cb)
Definition Marshal.cs:625
static string net_quic_writing_notallowed
Definition SR.cs:28
static string net_quic_timeout_use_gt_zero
Definition SR.cs:30
static string net_quic_sending_aborted
Definition SR.cs:22
static string net_quic_reading_notallowed
Definition SR.cs:20
static string net_quic_timeout
Definition SR.cs:32
Definition SR.cs:7
static int Exchange(ref int location1, int value)
new ConfiguredTaskAwaitable< TResult > ConfigureAwait(bool continueOnCapturedContext)
Definition Task.cs:226
new Task< TResult > WaitAsync(CancellationToken cancellationToken)
Definition Task.cs:231
static Task CompletedTask
Definition Task.cs:1120
new TaskAwaiter< TResult > GetAwaiter()
Definition Task.cs:221
static readonly IntPtr Zero
Definition IntPtr.cs:18
unsafe MemoryHandle Pin()
static IntPtr ToIntPtr(GCHandle value)
Definition GCHandle.cs:138
static GCHandle Alloc(object? value)
Definition GCHandle.cs:81
static GCHandle FromIntPtr(IntPtr value)
Definition GCHandle.cs:127
void CopyTo(Span< T > destination)
Definition Span.cs:224
Span< T > Slice(int start)
Definition Span.cs:271
int Length
Definition Span.cs:70
static ValueTask FromException(Exception exception)
Definition ValueTask.cs:190