Просмотр исходного кода

Streaming Interop Followup Items (#33916)

* Streaming Interop Followup Items

* Streaming CI Debugging (#33917)

* CI Debugging

* CiData message

* CiData message

* Update RemoteJSDataStream.cs

* Remove Task.Delay

* Update RemoteJSDataStream.cs

* Update RemoteJSDataStream.cs

* PR Feedback (Including PipeReader Property)

* Improve documentation on safe defaults

* Task Completion Source & DefaultTimeout

* InvalidateLastDataReceivedTimeForTimeout
Tanay Parikh 4 лет назад
Родитель
Сommit
774b2af72b

+ 24 - 6
src/Components/Server/src/Circuits/RemoteJSDataStream.cs

@@ -40,9 +40,10 @@ namespace Microsoft.AspNetCore.Components.Server.Circuits
             RemoteJSRuntime runtime,
             RemoteJSRuntime runtime,
             IJSStreamReference jsStreamReference,
             IJSStreamReference jsStreamReference,
             long totalLength,
             long totalLength,
-            long maxBufferSize,
             long maximumIncomingBytes,
             long maximumIncomingBytes,
             TimeSpan jsInteropDefaultCallTimeout,
             TimeSpan jsInteropDefaultCallTimeout,
+            long pauseIncomingBytesThreshold = -1,
+            long resumeIncomingBytesThreshold = -1,
             CancellationToken cancellationToken = default)
             CancellationToken cancellationToken = default)
         {
         {
             // Enforce minimum 1 kb, maximum 50 kb, SignalR message size.
             // Enforce minimum 1 kb, maximum 50 kb, SignalR message size.
@@ -54,7 +55,7 @@ namespace Microsoft.AspNetCore.Components.Server.Circuits
                 throw new ArgumentException($"SignalR MaximumIncomingBytes must be at least 1 kb.");
                 throw new ArgumentException($"SignalR MaximumIncomingBytes must be at least 1 kb.");
 
 
             var streamId = runtime.RemoteJSDataStreamNextInstanceId++;
             var streamId = runtime.RemoteJSDataStreamNextInstanceId++;
-            var remoteJSDataStream = new RemoteJSDataStream(runtime, streamId, totalLength, maxBufferSize, jsInteropDefaultCallTimeout, cancellationToken);
+            var remoteJSDataStream = new RemoteJSDataStream(runtime, streamId, totalLength, jsInteropDefaultCallTimeout, pauseIncomingBytesThreshold, resumeIncomingBytesThreshold, cancellationToken);
             await runtime.InvokeVoidAsync("Blazor._internal.sendJSDataStream", jsStreamReference, streamId, chunkSize);
             await runtime.InvokeVoidAsync("Blazor._internal.sendJSDataStream", jsStreamReference, streamId, chunkSize);
             return remoteJSDataStream;
             return remoteJSDataStream;
         }
         }
@@ -63,8 +64,9 @@ namespace Microsoft.AspNetCore.Components.Server.Circuits
             RemoteJSRuntime runtime,
             RemoteJSRuntime runtime,
             long streamId,
             long streamId,
             long totalLength,
             long totalLength,
-            long maxBufferSize,
             TimeSpan jsInteropDefaultCallTimeout,
             TimeSpan jsInteropDefaultCallTimeout,
+            long pauseIncomingBytesThreshold,
+            long resumeIncomingBytesThreshold,
             CancellationToken cancellationToken)
             CancellationToken cancellationToken)
         {
         {
             _runtime = runtime;
             _runtime = runtime;
@@ -78,10 +80,16 @@ namespace Microsoft.AspNetCore.Components.Server.Circuits
 
 
             _runtime.RemoteJSDataStreamInstances.Add(_streamId, this);
             _runtime.RemoteJSDataStreamInstances.Add(_streamId, this);
 
 
-            _pipe = new Pipe(new PipeOptions(pauseWriterThreshold: maxBufferSize, resumeWriterThreshold: maxBufferSize / 2));
+            _pipe = new Pipe(new PipeOptions(pauseWriterThreshold: pauseIncomingBytesThreshold, resumeWriterThreshold: resumeIncomingBytesThreshold));
             _pipeReaderStream = _pipe.Reader.AsStream();
             _pipeReaderStream = _pipe.Reader.AsStream();
+            PipeReader = _pipe.Reader;
         }
         }
 
 
+        /// <summary>
+        /// Gets a <see cref="PipeReader"/> to directly read data sent by the JavaScript client.
+        /// </summary>
+        public PipeReader PipeReader { get; }
+
         private async Task<bool> ReceiveData(long chunkId, byte[] chunk, string error)
         private async Task<bool> ReceiveData(long chunkId, byte[] chunk, string error)
         {
         {
             try
             try
@@ -199,13 +207,23 @@ namespace Microsoft.AspNetCore.Components.Server.Circuits
             if (!_disposed && (DateTimeOffset.UtcNow >= _lastDataReceivedTime.Add(_jsInteropDefaultCallTimeout)))
             if (!_disposed && (DateTimeOffset.UtcNow >= _lastDataReceivedTime.Add(_jsInteropDefaultCallTimeout)))
             {
             {
                 // Dispose of the stream if a chunk isn't received within the jsInteropDefaultCallTimeout.
                 // Dispose of the stream if a chunk isn't received within the jsInteropDefaultCallTimeout.
-                var timeoutException = new TimeoutException("Did not receive any data in the alloted time.");
+                var timeoutException = new TimeoutException("Did not receive any data in the allotted time.");
                 await CompletePipeAndDisposeStream(timeoutException);
                 await CompletePipeAndDisposeStream(timeoutException);
                 _runtime.RaiseUnhandledException(timeoutException);
                 _runtime.RaiseUnhandledException(timeoutException);
             }
             }
         }
         }
 
 
-        internal async Task CompletePipeAndDisposeStream(Exception? ex = null)
+        /// <summary>
+        /// For testing purposes only.
+        ///
+        /// Triggers the timeout on the next check.
+        /// </summary>
+        internal void InvalidateLastDataReceivedTimeForTimeout()
+        {
+            _lastDataReceivedTime = _lastDataReceivedTime.Subtract(_jsInteropDefaultCallTimeout);
+        }
+
+        private async Task CompletePipeAndDisposeStream(Exception? ex = null)
         {
         {
             await _pipe.Writer.CompleteAsync(ex);
             await _pipe.Writer.CompleteAsync(ex);
             Dispose(true);
             Dispose(true);

+ 2 - 2
src/Components/Server/src/Circuits/RemoteJSRuntime.cs

@@ -157,8 +157,8 @@ namespace Microsoft.AspNetCore.Components.Server.Circuits
             _clientProxy = null;
             _clientProxy = null;
         }
         }
 
 
-        protected override async Task<Stream> ReadJSDataAsStreamAsync(IJSStreamReference jsStreamReference, long totalLength, long maxBufferSize, CancellationToken cancellationToken)
-            => await RemoteJSDataStream.CreateRemoteJSDataStreamAsync(this, jsStreamReference, totalLength, maxBufferSize, _maximumIncomingBytes, _options.JSInteropDefaultCallTimeout, cancellationToken);
+        protected override async Task<Stream> ReadJSDataAsStreamAsync(IJSStreamReference jsStreamReference, long totalLength, long pauseIncomingBytesThreshold = -1, long resumeIncomingBytesThreshold = -1, CancellationToken cancellationToken = default)
+            => await RemoteJSDataStream.CreateRemoteJSDataStreamAsync(this, jsStreamReference, totalLength, _maximumIncomingBytes, _options.JSInteropDefaultCallTimeout, pauseIncomingBytesThreshold, resumeIncomingBytesThreshold, cancellationToken);
 
 
         public static class Log
         public static class Log
         {
         {

+ 147 - 23
src/Components/Server/test/Circuits/RemoteJSDataStreamTest.cs

@@ -4,8 +4,10 @@
 using System;
 using System;
 using System.IO;
 using System.IO;
 using System.Linq;
 using System.Linq;
+using System.Threading;
 using System.Threading.Tasks;
 using System.Threading.Tasks;
 using Microsoft.AspNetCore.SignalR;
 using Microsoft.AspNetCore.SignalR;
+using Microsoft.AspNetCore.Testing;
 using Microsoft.Extensions.Logging;
 using Microsoft.Extensions.Logging;
 using Microsoft.Extensions.Options;
 using Microsoft.Extensions.Options;
 using Microsoft.JSInterop;
 using Microsoft.JSInterop;
@@ -19,34 +21,34 @@ namespace Microsoft.AspNetCore.Components.Server.Circuits
         private static readonly TestRemoteJSRuntime _jsRuntime = new(Options.Create(new CircuitOptions()), Options.Create(new HubOptions()), Mock.Of<ILogger<RemoteJSRuntime>>());
         private static readonly TestRemoteJSRuntime _jsRuntime = new(Options.Create(new CircuitOptions()), Options.Create(new HubOptions()), Mock.Of<ILogger<RemoteJSRuntime>>());
 
 
         [Fact]
         [Fact]
-        public async void CreateRemoteJSDataStreamAsync_CreatesStream()
+        public async Task CreateRemoteJSDataStreamAsync_CreatesStream()
         {
         {
             // Arrange
             // Arrange
             var jsStreamReference = Mock.Of<IJSStreamReference>();
             var jsStreamReference = Mock.Of<IJSStreamReference>();
 
 
             // Act
             // Act
-            var remoteJSDataStream = await RemoteJSDataStream.CreateRemoteJSDataStreamAsync(_jsRuntime, jsStreamReference, totalLength: 100, maxBufferSize: 50, maximumIncomingBytes: 10_000, jsInteropDefaultCallTimeout: TimeSpan.FromMinutes(1));
+            var remoteJSDataStream = await RemoteJSDataStream.CreateRemoteJSDataStreamAsync(_jsRuntime, jsStreamReference, totalLength: 100, maximumIncomingBytes: 10_000, jsInteropDefaultCallTimeout: TimeSpan.FromMinutes(1), pauseIncomingBytesThreshold: 50, resumeIncomingBytesThreshold: 25, cancellationToken: CancellationToken.None).DefaultTimeout();
 
 
             // Assert
             // Assert
             Assert.NotNull(remoteJSDataStream);
             Assert.NotNull(remoteJSDataStream);
         }
         }
 
 
         [Fact]
         [Fact]
-        public async void ReceiveData_DoesNotFindStream()
+        public async Task ReceiveData_DoesNotFindStream()
         {
         {
             // Arrange
             // Arrange
             var chunk = new byte[] { 3, 5, 6, 7 };
             var chunk = new byte[] { 3, 5, 6, 7 };
             var unrecognizedGuid = 10;
             var unrecognizedGuid = 10;
 
 
             // Act
             // Act
-            var success = await RemoteJSDataStream.ReceiveData(_jsRuntime, streamId: unrecognizedGuid, chunkId: 0, chunk, error: null);
+            var success = await RemoteJSDataStream.ReceiveData(_jsRuntime, streamId: unrecognizedGuid, chunkId: 0, chunk, error: null).DefaultTimeout();
 
 
             // Assert
             // Assert
             Assert.False(success);
             Assert.False(success);
         }
         }
 
 
         [Fact]
         [Fact]
-        public async void ReceiveData_SuccessReadsBackStream()
+        public async Task ReceiveData_SuccessReadsBackStream()
         {
         {
             // Arrange
             // Arrange
             var jsRuntime = new TestRemoteJSRuntime(Options.Create(new CircuitOptions()), Options.Create(new HubOptions()), Mock.Of<ILogger<RemoteJSRuntime>>());
             var jsRuntime = new TestRemoteJSRuntime(Options.Create(new CircuitOptions()), Options.Create(new HubOptions()), Mock.Of<ILogger<RemoteJSRuntime>>());
@@ -59,22 +61,50 @@ namespace Microsoft.AspNetCore.Components.Server.Circuits
             var sendDataTask = Task.Run(async () =>
             var sendDataTask = Task.Run(async () =>
             {
             {
                 // Act 1
                 // Act 1
-                var success = await RemoteJSDataStream.ReceiveData(jsRuntime, streamId, chunkId: 0, chunk, error: null);
+                var success = await RemoteJSDataStream.ReceiveData(jsRuntime, streamId, chunkId: 0, chunk, error: null).DefaultTimeout();
                 return success;
                 return success;
             });
             });
 
 
             // Act & Assert 2
             // Act & Assert 2
             using var memoryStream = new MemoryStream();
             using var memoryStream = new MemoryStream();
-            await remoteJSDataStream.CopyToAsync(memoryStream);
+            await remoteJSDataStream.CopyToAsync(memoryStream).DefaultTimeout();
             Assert.Equal(chunk, memoryStream.ToArray());
             Assert.Equal(chunk, memoryStream.ToArray());
 
 
             // Act & Assert 3
             // Act & Assert 3
-            var sendDataCompleted = await sendDataTask;
+            var sendDataCompleted = await sendDataTask.DefaultTimeout();
             Assert.True(sendDataCompleted);
             Assert.True(sendDataCompleted);
         }
         }
 
 
         [Fact]
         [Fact]
-        public async void ReceiveData_WithError()
+        public async Task ReceiveData_SuccessReadsBackPipeReader()
+        {
+            // Arrange
+            var jsRuntime = new TestRemoteJSRuntime(Options.Create(new CircuitOptions()), Options.Create(new HubOptions()), Mock.Of<ILogger<RemoteJSRuntime>>());
+            var remoteJSDataStream = await CreateRemoteJSDataStreamAsync(jsRuntime);
+            var streamId = GetStreamId(remoteJSDataStream, jsRuntime);
+            var chunk = new byte[100];
+            var random = new Random();
+            random.NextBytes(chunk);
+
+            var sendDataTask = Task.Run(async () =>
+            {
+                // Act 1
+                var success = await RemoteJSDataStream.ReceiveData(jsRuntime, streamId, chunkId: 0, chunk, error: null).DefaultTimeout();
+                return success;
+            });
+
+            // Act & Assert 2
+            using var memoryStream = new MemoryStream();
+            await remoteJSDataStream.PipeReader.CopyToAsync(memoryStream).DefaultTimeout();
+            Assert.Equal(chunk, memoryStream.ToArray());
+
+            // Act & Assert 3
+            var sendDataCompleted = await sendDataTask.DefaultTimeout();
+            Assert.True(sendDataCompleted);
+        }
+
+        [Fact]
+        public async Task ReceiveData_WithError()
         {
         {
             // Arrange
             // Arrange
             var jsRuntime = new TestRemoteJSRuntime(Options.Create(new CircuitOptions()), Options.Create(new HubOptions()), Mock.Of<ILogger<RemoteJSRuntime>>());
             var jsRuntime = new TestRemoteJSRuntime(Options.Create(new CircuitOptions()), Options.Create(new HubOptions()), Mock.Of<ILogger<RemoteJSRuntime>>());
@@ -82,17 +112,17 @@ namespace Microsoft.AspNetCore.Components.Server.Circuits
             var streamId = GetStreamId(remoteJSDataStream, jsRuntime);
             var streamId = GetStreamId(remoteJSDataStream, jsRuntime);
 
 
             // Act & Assert 1
             // Act & Assert 1
-            var success = await RemoteJSDataStream.ReceiveData(jsRuntime, streamId, chunkId: 0, chunk: null, error: "some error");
+            var success = await RemoteJSDataStream.ReceiveData(jsRuntime, streamId, chunkId: 0, chunk: null, error: "some error").DefaultTimeout();
             Assert.False(success);
             Assert.False(success);
 
 
             // Act & Assert 2
             // Act & Assert 2
             using var mem = new MemoryStream();
             using var mem = new MemoryStream();
-            var ex = await Assert.ThrowsAsync<InvalidOperationException>(async () => await remoteJSDataStream.CopyToAsync(mem));
+            var ex = await Assert.ThrowsAsync<InvalidOperationException>(async () => await remoteJSDataStream.CopyToAsync(mem).DefaultTimeout());
             Assert.Equal("An error occurred while reading the remote stream: some error", ex.Message);
             Assert.Equal("An error occurred while reading the remote stream: some error", ex.Message);
         }
         }
 
 
         [Fact]
         [Fact]
-        public async void ReceiveData_WithZeroLengthChunk()
+        public async Task ReceiveData_WithZeroLengthChunk()
         {
         {
             // Arrange
             // Arrange
             var jsRuntime = new TestRemoteJSRuntime(Options.Create(new CircuitOptions()), Options.Create(new HubOptions()), Mock.Of<ILogger<RemoteJSRuntime>>());
             var jsRuntime = new TestRemoteJSRuntime(Options.Create(new CircuitOptions()), Options.Create(new HubOptions()), Mock.Of<ILogger<RemoteJSRuntime>>());
@@ -101,42 +131,42 @@ namespace Microsoft.AspNetCore.Components.Server.Circuits
             var chunk = Array.Empty<byte>();
             var chunk = Array.Empty<byte>();
 
 
             // Act & Assert 1
             // Act & Assert 1
-            var ex = await Assert.ThrowsAsync<EndOfStreamException>(async () => await RemoteJSDataStream.ReceiveData(jsRuntime, streamId, chunkId: 0, chunk, error: null));
+            var ex = await Assert.ThrowsAsync<EndOfStreamException>(async () => await RemoteJSDataStream.ReceiveData(jsRuntime, streamId, chunkId: 0, chunk, error: null).DefaultTimeout());
             Assert.Equal("The incoming data chunk cannot be empty.", ex.Message);
             Assert.Equal("The incoming data chunk cannot be empty.", ex.Message);
 
 
             // Act & Assert 2
             // Act & Assert 2
             using var mem = new MemoryStream();
             using var mem = new MemoryStream();
-            ex = await Assert.ThrowsAsync<EndOfStreamException>(async () => await remoteJSDataStream.CopyToAsync(mem));
+            ex = await Assert.ThrowsAsync<EndOfStreamException>(async () => await remoteJSDataStream.CopyToAsync(mem).DefaultTimeout());
             Assert.Equal("The incoming data chunk cannot be empty.", ex.Message);
             Assert.Equal("The incoming data chunk cannot be empty.", ex.Message);
         }
         }
 
 
         [Fact]
         [Fact]
-        public async void ReceiveData_ProvidedWithMoreBytesThanRemaining()
+        public async Task ReceiveData_ProvidedWithMoreBytesThanRemaining()
         {
         {
             // Arrange
             // Arrange
             var jsRuntime = new TestRemoteJSRuntime(Options.Create(new CircuitOptions()), Options.Create(new HubOptions()), Mock.Of<ILogger<RemoteJSRuntime>>());
             var jsRuntime = new TestRemoteJSRuntime(Options.Create(new CircuitOptions()), Options.Create(new HubOptions()), Mock.Of<ILogger<RemoteJSRuntime>>());
             var jsStreamReference = Mock.Of<IJSStreamReference>();
             var jsStreamReference = Mock.Of<IJSStreamReference>();
-            var remoteJSDataStream = await RemoteJSDataStream.CreateRemoteJSDataStreamAsync(jsRuntime, jsStreamReference, totalLength: 100, maxBufferSize: 50, maximumIncomingBytes: 10_000, jsInteropDefaultCallTimeout: TimeSpan.FromMinutes(1));
+            var remoteJSDataStream = await RemoteJSDataStream.CreateRemoteJSDataStreamAsync(jsRuntime, jsStreamReference, totalLength: 100, maximumIncomingBytes: 10_000, jsInteropDefaultCallTimeout: TimeSpan.FromMinutes(1), pauseIncomingBytesThreshold: 50, resumeIncomingBytesThreshold: 25, cancellationToken: CancellationToken.None);
             var streamId = GetStreamId(remoteJSDataStream, jsRuntime);
             var streamId = GetStreamId(remoteJSDataStream, jsRuntime);
             var chunk = new byte[110]; // 100 byte totalLength for stream
             var chunk = new byte[110]; // 100 byte totalLength for stream
 
 
             // Act & Assert 1
             // Act & Assert 1
-            var ex = await Assert.ThrowsAsync<EndOfStreamException>(async () => await RemoteJSDataStream.ReceiveData(jsRuntime, streamId, chunkId: 0, chunk, error: null));
+            var ex = await Assert.ThrowsAsync<EndOfStreamException>(async () => await RemoteJSDataStream.ReceiveData(jsRuntime, streamId, chunkId: 0, chunk, error: null).DefaultTimeout());
             Assert.Equal("The incoming data stream declared a length 100, but 110 bytes were sent.", ex.Message);
             Assert.Equal("The incoming data stream declared a length 100, but 110 bytes were sent.", ex.Message);
 
 
             // Act & Assert 2
             // Act & Assert 2
             using var mem = new MemoryStream();
             using var mem = new MemoryStream();
-            ex = await Assert.ThrowsAsync<EndOfStreamException>(async () => await remoteJSDataStream.CopyToAsync(mem));
+            ex = await Assert.ThrowsAsync<EndOfStreamException>(async () => await remoteJSDataStream.CopyToAsync(mem).DefaultTimeout());
             Assert.Equal("The incoming data stream declared a length 100, but 110 bytes were sent.", ex.Message);
             Assert.Equal("The incoming data stream declared a length 100, but 110 bytes were sent.", ex.Message);
         }
         }
 
 
         [Fact]
         [Fact]
-        public async void ReceiveData_ProvidedWithOutOfOrderChunk_SimulatesSignalRDisconnect()
+        public async Task ReceiveData_ProvidedWithOutOfOrderChunk_SimulatesSignalRDisconnect()
         {
         {
             // Arrange
             // Arrange
             var jsRuntime = new TestRemoteJSRuntime(Options.Create(new CircuitOptions()), Options.Create(new HubOptions()), Mock.Of<ILogger<RemoteJSRuntime>>());
             var jsRuntime = new TestRemoteJSRuntime(Options.Create(new CircuitOptions()), Options.Create(new HubOptions()), Mock.Of<ILogger<RemoteJSRuntime>>());
             var jsStreamReference = Mock.Of<IJSStreamReference>();
             var jsStreamReference = Mock.Of<IJSStreamReference>();
-            var remoteJSDataStream = await RemoteJSDataStream.CreateRemoteJSDataStreamAsync(jsRuntime, jsStreamReference, totalLength: 100, maxBufferSize: 50, maximumIncomingBytes: 10_000, jsInteropDefaultCallTimeout: TimeSpan.FromMinutes(1));
+            var remoteJSDataStream = await RemoteJSDataStream.CreateRemoteJSDataStreamAsync(jsRuntime, jsStreamReference, totalLength: 100, maximumIncomingBytes: 10_000, jsInteropDefaultCallTimeout: TimeSpan.FromMinutes(1), pauseIncomingBytesThreshold: 50, resumeIncomingBytesThreshold: 25, cancellationToken: CancellationToken.None);
             var streamId = GetStreamId(remoteJSDataStream, jsRuntime);
             var streamId = GetStreamId(remoteJSDataStream, jsRuntime);
             var chunk = new byte[5];
             var chunk = new byte[5];
 
 
@@ -145,19 +175,113 @@ namespace Microsoft.AspNetCore.Components.Server.Circuits
             {
             {
                 await RemoteJSDataStream.ReceiveData(jsRuntime, streamId, chunkId: i, chunk, error: null);
                 await RemoteJSDataStream.ReceiveData(jsRuntime, streamId, chunkId: i, chunk, error: null);
             }
             }
-            var ex = await Assert.ThrowsAsync<EndOfStreamException>(async () => await RemoteJSDataStream.ReceiveData(jsRuntime, streamId, chunkId: 7, chunk, error: null));
+            var ex = await Assert.ThrowsAsync<EndOfStreamException>(async () => await RemoteJSDataStream.ReceiveData(jsRuntime, streamId, chunkId: 7, chunk, error: null).DefaultTimeout());
             Assert.Equal("Out of sequence chunk received, expected 5, but received 7.", ex.Message);
             Assert.Equal("Out of sequence chunk received, expected 5, but received 7.", ex.Message);
 
 
             // Act & Assert 2
             // Act & Assert 2
             using var mem = new MemoryStream();
             using var mem = new MemoryStream();
-            ex = await Assert.ThrowsAsync<EndOfStreamException>(async () => await remoteJSDataStream.CopyToAsync(mem));
+            ex = await Assert.ThrowsAsync<EndOfStreamException>(async () => await remoteJSDataStream.CopyToAsync(mem).DefaultTimeout());
             Assert.Equal("Out of sequence chunk received, expected 5, but received 7.", ex.Message);
             Assert.Equal("Out of sequence chunk received, expected 5, but received 7.", ex.Message);
         }
         }
 
 
+        [Fact]
+        public async Task ReceiveData_NoDataProvidedBeforeTimeout_StreamDisposed()
+        {
+            // Arrange
+            var unhandledExceptionRaisedTask = new TaskCompletionSource<bool>();
+            var jsRuntime = new TestRemoteJSRuntime(Options.Create(new CircuitOptions()), Options.Create(new HubOptions()), Mock.Of<ILogger<RemoteJSRuntime>>());
+            jsRuntime.UnhandledException += (_, ex) =>
+            {
+                Assert.Equal("Did not receive any data in the allotted time.", ex.Message);
+                unhandledExceptionRaisedTask.SetResult(ex is TimeoutException);
+            };
+
+            var jsStreamReference = Mock.Of<IJSStreamReference>();
+            var remoteJSDataStream = await RemoteJSDataStream.CreateRemoteJSDataStreamAsync(
+                jsRuntime,
+                jsStreamReference,
+                totalLength: 15,
+                maximumIncomingBytes: 10_000,
+                jsInteropDefaultCallTimeout: TimeSpan.FromSeconds(2),
+                pauseIncomingBytesThreshold: 50,
+                resumeIncomingBytesThreshold: 25,
+                cancellationToken: CancellationToken.None);
+            var streamId = GetStreamId(remoteJSDataStream, jsRuntime);
+            var chunk = new byte[] { 3, 5, 7 };
+
+            // Act & Assert 1
+            // Trigger timeout and ensure unhandled exception raised to crush circuit
+            remoteJSDataStream.InvalidateLastDataReceivedTimeForTimeout();
+            var unhandledExceptionResult = await unhandledExceptionRaisedTask.Task.DefaultTimeout();
+            Assert.True(unhandledExceptionResult);
+
+            // Act & Assert 2
+            // Confirm exception also raised on pipe reader
+            using var mem = new MemoryStream();
+            var ex = await Assert.ThrowsAsync<TimeoutException>(async () => await remoteJSDataStream.CopyToAsync(mem).DefaultTimeout());
+            Assert.Equal("Did not receive any data in the allotted time.", ex.Message);
+
+            // Act & Assert 3
+            // Ensures stream is disposed after the timeout and any additional chunks aren't accepted
+            var success = await RemoteJSDataStream.ReceiveData(jsRuntime, streamId, chunkId: 0, chunk, error: null).DefaultTimeout();
+            Assert.False(success);
+        }
+
+        [Fact]
+        public async Task ReceiveData_ReceivesDataThenTimesout_StreamDisposed()
+        {
+            // Arrange
+            var unhandledExceptionRaisedTask = new TaskCompletionSource<bool>();
+            var jsRuntime = new TestRemoteJSRuntime(Options.Create(new CircuitOptions()), Options.Create(new HubOptions()), Mock.Of<ILogger<RemoteJSRuntime>>());
+            jsRuntime.UnhandledException += (_, ex) =>
+            {
+                Assert.Equal("Did not receive any data in the allotted time.", ex.Message);
+                unhandledExceptionRaisedTask.SetResult(ex is TimeoutException);
+            };
+
+            var jsStreamReference = Mock.Of<IJSStreamReference>();
+            var remoteJSDataStream = await RemoteJSDataStream.CreateRemoteJSDataStreamAsync(
+                jsRuntime,
+                jsStreamReference,
+                totalLength: 15,
+                maximumIncomingBytes: 10_000,
+                jsInteropDefaultCallTimeout: TimeSpan.FromSeconds(3),
+                pauseIncomingBytesThreshold: 50,
+                resumeIncomingBytesThreshold: 25,
+                cancellationToken: CancellationToken.None);
+            var streamId = GetStreamId(remoteJSDataStream, jsRuntime);
+            var chunk = new byte[] { 3, 5, 7 };
+
+            // Act & Assert 1
+            var success = await RemoteJSDataStream.ReceiveData(jsRuntime, streamId, chunkId: 0, chunk, error: null).DefaultTimeout();
+            Assert.True(success);
+
+            // Act & Assert 2
+            success = await RemoteJSDataStream.ReceiveData(jsRuntime, streamId, chunkId: 1, chunk, error: null).DefaultTimeout();
+            Assert.True(success);
+
+            // Act & Assert 3
+            // Trigger timeout and ensure unhandled exception raised to crush circuit
+            remoteJSDataStream.InvalidateLastDataReceivedTimeForTimeout();
+            var unhandledExceptionResult = await unhandledExceptionRaisedTask.Task.DefaultTimeout();
+            Assert.True(unhandledExceptionResult);
+
+            // Act & Assert 4
+            // Confirm exception also raised on pipe reader
+            using var mem = new MemoryStream();
+            var ex = await Assert.ThrowsAsync<TimeoutException>(async () => await remoteJSDataStream.CopyToAsync(mem).DefaultTimeout());
+            Assert.Equal("Did not receive any data in the allotted time.", ex.Message);
+
+            // Act & Assert 5
+            // Ensures stream is disposed after the timeout and any additional chunks aren't accepted
+            success = await RemoteJSDataStream.ReceiveData(jsRuntime, streamId, chunkId: 2, chunk, error: null).DefaultTimeout();
+            Assert.False(success);
+        }
+
         private static async Task<RemoteJSDataStream> CreateRemoteJSDataStreamAsync(TestRemoteJSRuntime jsRuntime = null)
         private static async Task<RemoteJSDataStream> CreateRemoteJSDataStreamAsync(TestRemoteJSRuntime jsRuntime = null)
         {
         {
             var jsStreamReference = Mock.Of<IJSStreamReference>();
             var jsStreamReference = Mock.Of<IJSStreamReference>();
-            var remoteJSDataStream = await RemoteJSDataStream.CreateRemoteJSDataStreamAsync(jsRuntime ?? _jsRuntime, jsStreamReference, totalLength: 100, maxBufferSize: 50, maximumIncomingBytes: 10_000, jsInteropDefaultCallTimeout: TimeSpan.FromMinutes(1));
+            var remoteJSDataStream = await RemoteJSDataStream.CreateRemoteJSDataStreamAsync(jsRuntime ?? _jsRuntime, jsStreamReference, totalLength: 100, maximumIncomingBytes: 10_000, jsInteropDefaultCallTimeout: TimeSpan.FromMinutes(1), pauseIncomingBytesThreshold: 50, resumeIncomingBytesThreshold: 25, cancellationToken: CancellationToken.None);
             return remoteJSDataStream;
             return remoteJSDataStream;
         }
         }
 
 

+ 5 - 1
src/Components/Web/src/Forms/InputFile/RemoteBrowserFileStream.cs

@@ -45,7 +45,11 @@ namespace Microsoft.AspNetCore.Components.Forms
                 _inputFileElement,
                 _inputFileElement,
                 File.Id);
                 File.Id);
 
 
-            return await dataReference.OpenReadStreamAsync(_maxAllowedSize, options.MaxBufferSize, cancellationToken);
+            return await dataReference.OpenReadStreamAsync(
+                _maxAllowedSize,
+                pauseIncomingBytesThreshold: options.MaxBufferSize,
+                resumeIncomingBytesThreshold: options.MaxBufferSize / 2,
+                cancellationToken);
         }
         }
 
 
         protected override async ValueTask<int> CopyFileDataIntoBuffer(long sourceOffset, Memory<byte> destination, CancellationToken cancellationToken)
         protected override async ValueTask<int> CopyFileDataIntoBuffer(long sourceOffset, Memory<byte> destination, CancellationToken cancellationToken)

+ 13 - 3
src/JSInterop/Microsoft.JSInterop/src/IJSStreamReference.cs

@@ -16,15 +16,25 @@ namespace Microsoft.JSInterop
         /// <summary>
         /// <summary>
         /// Length of the <see cref="Stream"/> provided by JavaScript.
         /// Length of the <see cref="Stream"/> provided by JavaScript.
         /// </summary>
         /// </summary>
-        public long Length { get; }
+        long Length { get; }
 
 
         /// <summary>
         /// <summary>
         /// Opens a <see cref="Stream"/> with the <see cref="JSRuntime"/> for the current data reference.
         /// Opens a <see cref="Stream"/> with the <see cref="JSRuntime"/> for the current data reference.
         /// </summary>
         /// </summary>
         /// <param name="maxAllowedSize">Maximum number of bytes permitted to be read from JavaScript.</param>
         /// <param name="maxAllowedSize">Maximum number of bytes permitted to be read from JavaScript.</param>
-        /// <param name="maxBufferSize">Maximum number of bytes that are allowed to be buffered.</param>
+        /// <param name="pauseIncomingBytesThreshold">
+        /// The number of unconsumed bytes to accept from JS before blocking.
+        /// Defaults to -1, which indicates use of the default <see cref="System.IO.Pipelines.PipeOptions.PauseWriterThreshold" />.
+        /// Avoid specifying an excessively large value because this could allow clients to exhaust memory.
+        /// A value of zero prevents JS from blocking, allowing .NET to receive an unlimited number of bytes.
+        /// </param>
+        /// <param name="resumeIncomingBytesThreshold">
+        /// The number of unflushed bytes at which point JS stops blocking.
+        /// Defaults to -1, which indicates use of the default <see cref="System.IO.Pipelines.PipeOptions.PauseWriterThreshold" />.
+        /// Must be less than the <paramref name="pauseIncomingBytesThreshold"/> to prevent thrashing at the limit.
+        /// </param>
         /// <param name="cancellationToken"><see cref="CancellationToken" /> for cancelling read.</param>
         /// <param name="cancellationToken"><see cref="CancellationToken" /> for cancelling read.</param>
         /// <returns><see cref="Stream"/> which can provide data associated with the current data reference.</returns>
         /// <returns><see cref="Stream"/> which can provide data associated with the current data reference.</returns>
-        ValueTask<Stream> OpenReadStreamAsync(long maxAllowedSize = 512000, long maxBufferSize = 100 * 1024, CancellationToken cancellationToken = default);
+        ValueTask<Stream> OpenReadStreamAsync(long maxAllowedSize = 512000, long pauseIncomingBytesThreshold = -1, long resumeIncomingBytesThreshold = -1, CancellationToken cancellationToken = default);
     }
     }
 }
 }

+ 2 - 2
src/JSInterop/Microsoft.JSInterop/src/Implementation/JSStreamReference.cs

@@ -36,14 +36,14 @@ namespace Microsoft.JSInterop.Implementation
         }
         }
 
 
         /// <inheritdoc />
         /// <inheritdoc />
-        async ValueTask<Stream> IJSStreamReference.OpenReadStreamAsync(long maxLength, long maxBufferSize, CancellationToken cancellationToken)
+        async ValueTask<Stream> IJSStreamReference.OpenReadStreamAsync(long maxLength, long pauseIncomingBytesThreshold, long resumeIncomingBytesThreshold, CancellationToken cancellationToken)
         {
         {
             if (Length > maxLength)
             if (Length > maxLength)
             {
             {
                 throw new ArgumentOutOfRangeException(nameof(maxLength), $"The incoming data stream of length {Length} exceeds the maximum length {maxLength}.");
                 throw new ArgumentOutOfRangeException(nameof(maxLength), $"The incoming data stream of length {Length} exceeds the maximum length {maxLength}.");
             }
             }
 
 
-            return await _jsRuntime.ReadJSDataAsStreamAsync(this, Length, maxBufferSize, cancellationToken);
+            return await _jsRuntime.ReadJSDataAsStreamAsync(this, Length, pauseIncomingBytesThreshold, resumeIncomingBytesThreshold, cancellationToken);
         }
         }
     }
     }
 }
 }

+ 12 - 2
src/JSInterop/Microsoft.JSInterop/src/JSRuntime.cs

@@ -215,10 +215,20 @@ namespace Microsoft.JSInterop
         /// </summary>
         /// </summary>
         /// <param name="jsStreamReference"><see cref="IJSStreamReference"/> to produce a data stream for.</param>
         /// <param name="jsStreamReference"><see cref="IJSStreamReference"/> to produce a data stream for.</param>
         /// <param name="totalLength">Expected length of the incoming data stream.</param>
         /// <param name="totalLength">Expected length of the incoming data stream.</param>
-        /// <param name="maxBufferSize">Amount of bytes to buffer before flushing.</param>
+        /// <param name="pauseIncomingBytesThreshold">
+        /// The number of unconsumed bytes to accept from JS before blocking.
+        /// Defaults to -1, which indicates use of the default <see cref="System.IO.Pipelines.PipeOptions.PauseWriterThreshold" />.
+        /// Avoid specifying an excessively large value because this could allow clients to exhaust memory.
+        /// A value of zero prevents JS from blocking, allowing .NET to receive an unlimited number of bytes.
+        /// </param>
+        /// <param name="resumeIncomingBytesThreshold">
+        /// The number of unflushed bytes at which point JS stops blocking.
+        /// Defaults to -1, which indicates use of the default <see cref="System.IO.Pipelines.PipeOptions.PauseWriterThreshold" />.
+        /// Must be less than the <paramref name="pauseIncomingBytesThreshold"/> to prevent thrashing at the limit.
+        /// </param>
         /// <param name="cancellationToken"><see cref="CancellationToken" /> for cancelling read.</param>
         /// <param name="cancellationToken"><see cref="CancellationToken" /> for cancelling read.</param>
         /// <returns><see cref="Stream"/> for the data reference represented by <paramref name="jsStreamReference"/>.</returns>
         /// <returns><see cref="Stream"/> for the data reference represented by <paramref name="jsStreamReference"/>.</returns>
-        protected internal virtual Task<Stream> ReadJSDataAsStreamAsync(IJSStreamReference jsStreamReference, long totalLength, long maxBufferSize, CancellationToken cancellationToken)
+        protected internal virtual Task<Stream> ReadJSDataAsStreamAsync(IJSStreamReference jsStreamReference, long totalLength, long pauseIncomingBytesThreshold = -1, long resumeIncomingBytesThreshold = -1, CancellationToken cancellationToken = default)
         {
         {
             // The reason it's virtual and not abstract is just for back-compat
             // The reason it's virtual and not abstract is just for back-compat
 
 

+ 2 - 2
src/JSInterop/Microsoft.JSInterop/src/PublicAPI.Unshipped.txt

@@ -1,7 +1,7 @@
 #nullable enable
 #nullable enable
 Microsoft.JSInterop.IJSStreamReference
 Microsoft.JSInterop.IJSStreamReference
 Microsoft.JSInterop.IJSStreamReference.Length.get -> long
 Microsoft.JSInterop.IJSStreamReference.Length.get -> long
-Microsoft.JSInterop.IJSStreamReference.OpenReadStreamAsync(long maxAllowedSize = 512000, long maxBufferSize = 102400, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask<System.IO.Stream!>
+Microsoft.JSInterop.IJSStreamReference.OpenReadStreamAsync(long maxAllowedSize = 512000, long pauseIncomingBytesThreshold = -1, long resumeIncomingBytesThreshold = -1, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask<System.IO.Stream!>
 Microsoft.JSInterop.Implementation.JSStreamReference
 Microsoft.JSInterop.Implementation.JSStreamReference
 Microsoft.JSInterop.Implementation.JSStreamReference.Length.get -> long
 Microsoft.JSInterop.Implementation.JSStreamReference.Length.get -> long
 Microsoft.JSInterop.Implementation.JSObjectReferenceJsonWorker
 Microsoft.JSInterop.Implementation.JSObjectReferenceJsonWorker
@@ -36,7 +36,7 @@ static Microsoft.JSInterop.JSRuntimeExtensions.InvokeVoidAsync(this Microsoft.JS
 static Microsoft.JSInterop.JSRuntimeExtensions.InvokeVoidAsync(this Microsoft.JSInterop.IJSRuntime! jsRuntime, string! identifier, System.TimeSpan timeout, params object?[]? args) -> System.Threading.Tasks.ValueTask
 static Microsoft.JSInterop.JSRuntimeExtensions.InvokeVoidAsync(this Microsoft.JSInterop.IJSRuntime! jsRuntime, string! identifier, System.TimeSpan timeout, params object?[]? args) -> System.Threading.Tasks.ValueTask
 *REMOVED*static Microsoft.JSInterop.JSRuntimeExtensions.InvokeVoidAsync(this Microsoft.JSInterop.IJSRuntime! jsRuntime, string! identifier, params object![]! args) -> System.Threading.Tasks.ValueTask
 *REMOVED*static Microsoft.JSInterop.JSRuntimeExtensions.InvokeVoidAsync(this Microsoft.JSInterop.IJSRuntime! jsRuntime, string! identifier, params object![]! args) -> System.Threading.Tasks.ValueTask
 static Microsoft.JSInterop.JSRuntimeExtensions.InvokeVoidAsync(this Microsoft.JSInterop.IJSRuntime! jsRuntime, string! identifier, params object?[]? args) -> System.Threading.Tasks.ValueTask
 static Microsoft.JSInterop.JSRuntimeExtensions.InvokeVoidAsync(this Microsoft.JSInterop.IJSRuntime! jsRuntime, string! identifier, params object?[]? args) -> System.Threading.Tasks.ValueTask
-virtual Microsoft.JSInterop.JSRuntime.ReadJSDataAsStreamAsync(Microsoft.JSInterop.IJSStreamReference! jsStreamReference, long totalLength, long maxBufferSize, System.Threading.CancellationToken cancellationToken) -> System.Threading.Tasks.Task<System.IO.Stream!>!
+virtual Microsoft.JSInterop.JSRuntime.ReadJSDataAsStreamAsync(Microsoft.JSInterop.IJSStreamReference! jsStreamReference, long totalLength, long pauseIncomingBytesThreshold = -1, long resumeIncomingBytesThreshold = -1, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<System.IO.Stream!>!
 virtual Microsoft.JSInterop.JSRuntime.ReceiveByteArray(int id, byte[]! data) -> void
 virtual Microsoft.JSInterop.JSRuntime.ReceiveByteArray(int id, byte[]! data) -> void
 virtual Microsoft.JSInterop.JSRuntime.SendByteArray(int id, byte[]! data) -> void
 virtual Microsoft.JSInterop.JSRuntime.SendByteArray(int id, byte[]! data) -> void
 Microsoft.JSInterop.JSDisconnectedException
 Microsoft.JSInterop.JSDisconnectedException

+ 1 - 1
src/JSInterop/Microsoft.JSInterop/test/JSRuntimeTest.cs

@@ -404,7 +404,7 @@ namespace Microsoft.JSInterop
             var dataReference = new JSStreamReference(runtime, 10, 10);
             var dataReference = new JSStreamReference(runtime, 10, 10);
 
 
             // Act
             // Act
-            var exception = await Assert.ThrowsAsync<NotSupportedException>(async () => await runtime.ReadJSDataAsStreamAsync(dataReference, 10, 10, CancellationToken.None));
+            var exception = await Assert.ThrowsAsync<NotSupportedException>(async () => await runtime.ReadJSDataAsStreamAsync(dataReference, 10, 10, 10, CancellationToken.None));
 
 
             // Assert
             // Assert
             Assert.Equal("The current JavaScript runtime does not support reading data streams.", exception.Message);
             Assert.Equal("The current JavaScript runtime does not support reading data streams.", exception.Message);