PipeWriterStream.cs 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT license.
  3. using System.Buffers;
  4. using System.Threading;
  5. using System.Threading.Tasks;
  6. namespace System.IO.Pipelines;
  7. // Write only stream implementation for efficiently writing bytes from the request body
  8. internal class PipeWriterStream : Stream
  9. {
  10. private long _length;
  11. private readonly PipeWriter _pipeWriter;
  12. public PipeWriterStream(PipeWriter pipeWriter)
  13. {
  14. _pipeWriter = pipeWriter;
  15. }
  16. public override bool CanRead => false;
  17. public override bool CanSeek => false;
  18. public override bool CanWrite => true;
  19. public override long Length => _length;
  20. public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); }
  21. public override void Flush()
  22. {
  23. }
  24. public override int Read(byte[] buffer, int offset, int count)
  25. {
  26. throw new NotSupportedException();
  27. }
  28. public override long Seek(long offset, SeekOrigin origin)
  29. {
  30. throw new NotSupportedException();
  31. }
  32. public override void SetLength(long value)
  33. {
  34. throw new NotSupportedException();
  35. }
  36. public override void Write(byte[] buffer, int offset, int count)
  37. {
  38. _pipeWriter.Write(new ReadOnlySpan<byte>(buffer, offset, count));
  39. _length += count;
  40. }
  41. public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
  42. {
  43. return WriteCoreAsync(buffer.AsMemory(offset, count), cancellationToken).AsTask();
  44. }
  45. #if NETCOREAPP || NETSTANDARD2_1_OR_GREATER
  46. public override ValueTask WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default)
  47. {
  48. return WriteCoreAsync(source, cancellationToken);
  49. }
  50. #endif
  51. private ValueTask WriteCoreAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default)
  52. {
  53. if (cancellationToken.IsCancellationRequested)
  54. {
  55. return new ValueTask(Task.FromCanceled(cancellationToken));
  56. }
  57. _length += source.Length;
  58. var task = _pipeWriter.WriteAsync(source, cancellationToken);
  59. if (task.IsCompletedSuccessfully)
  60. {
  61. // Cancellation can be triggered by PipeWriter.CancelPendingFlush
  62. if (task.Result.IsCanceled)
  63. {
  64. throw new OperationCanceledException();
  65. }
  66. }
  67. else
  68. {
  69. return WriteSlowAsync(task);
  70. }
  71. return default;
  72. static async ValueTask WriteSlowAsync(ValueTask<FlushResult> flushTask)
  73. {
  74. var flushResult = await flushTask;
  75. // Cancellation can be triggered by PipeWriter.CancelPendingFlush
  76. if (flushResult.IsCanceled)
  77. {
  78. throw new OperationCanceledException();
  79. }
  80. }
  81. }
  82. public void Reset()
  83. {
  84. _length = 0;
  85. }
  86. }