|
|
@@ -22,43 +22,71 @@ internal sealed class HttpContextStreamWriter<TResponse> : IServerStreamWriter<T
|
|
|
_writeLock = new object();
|
|
|
}
|
|
|
|
|
|
- public WriteOptions WriteOptions
|
|
|
+ public WriteOptions? WriteOptions
|
|
|
{
|
|
|
get => _context.WriteOptions;
|
|
|
set => _context.WriteOptions = value;
|
|
|
}
|
|
|
|
|
|
+ Task IAsyncStreamWriter<TResponse>.WriteAsync(TResponse message, CancellationToken cancellationToken)
|
|
|
+ {
|
|
|
+ return WriteAsyncCore(message, cancellationToken);
|
|
|
+ }
|
|
|
+
|
|
|
public Task WriteAsync(TResponse message)
|
|
|
+ {
|
|
|
+ return WriteAsyncCore(message, CancellationToken.None);
|
|
|
+ }
|
|
|
+
|
|
|
+ private async Task WriteAsyncCore(TResponse message, CancellationToken cancellationToken)
|
|
|
{
|
|
|
if (message == null)
|
|
|
{
|
|
|
- return Task.FromException(new ArgumentNullException(nameof(message)));
|
|
|
+ throw new ArgumentNullException(nameof(message));
|
|
|
}
|
|
|
|
|
|
- if (_completed || _context.CancellationToken.IsCancellationRequested)
|
|
|
+ // Register cancellation token early to ensure request is canceled if cancellation is requested.
|
|
|
+ CancellationTokenRegistration? registration = null;
|
|
|
+ if (cancellationToken.CanBeCanceled)
|
|
|
{
|
|
|
- return Task.FromException(new InvalidOperationException("Can't write the message because the request is complete."));
|
|
|
+ registration = cancellationToken.Register(
|
|
|
+ static (state) => ((JsonTranscodingServerCallContext)state!).HttpContext.Abort(),
|
|
|
+ _context);
|
|
|
}
|
|
|
|
|
|
- lock (_writeLock)
|
|
|
+ try
|
|
|
{
|
|
|
- // Pending writes need to be awaited first
|
|
|
- if (IsWriteInProgressUnsynchronized)
|
|
|
+ cancellationToken.ThrowIfCancellationRequested();
|
|
|
+
|
|
|
+ if (_completed || _context.CancellationToken.IsCancellationRequested)
|
|
|
{
|
|
|
- return Task.FromException(new InvalidOperationException("Can't write the message because the previous write is in progress."));
|
|
|
+ throw new InvalidOperationException("Can't write the message because the request is complete.");
|
|
|
}
|
|
|
|
|
|
- // Save write task to track whether it is complete. Must be set inside lock.
|
|
|
- _writeTask = WriteMessageAndDelimiter(message);
|
|
|
- }
|
|
|
+ lock (_writeLock)
|
|
|
+ {
|
|
|
+ // Pending writes need to be awaited first
|
|
|
+ if (IsWriteInProgressUnsynchronized)
|
|
|
+ {
|
|
|
+ throw new InvalidOperationException("Can't write the message because the previous write is in progress.");
|
|
|
+ }
|
|
|
+
|
|
|
+ // Save write task to track whether it is complete. Must be set inside lock.
|
|
|
+ _writeTask = WriteMessageAndDelimiter(message, cancellationToken);
|
|
|
+ }
|
|
|
|
|
|
- return _writeTask;
|
|
|
+ await _writeTask;
|
|
|
+ }
|
|
|
+ finally
|
|
|
+ {
|
|
|
+ registration?.Dispose();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- private async Task WriteMessageAndDelimiter(TResponse message)
|
|
|
+ private async Task WriteMessageAndDelimiter(TResponse message, CancellationToken cancellationToken)
|
|
|
{
|
|
|
- await JsonRequestHelpers.SendMessage(_context, _serializerOptions, message);
|
|
|
- await _context.HttpContext.Response.Body.WriteAsync(GrpcProtocolConstants.StreamingDelimiter);
|
|
|
+ await JsonRequestHelpers.SendMessage(_context, _serializerOptions, message, cancellationToken);
|
|
|
+ await _context.HttpContext.Response.Body.WriteAsync(GrpcProtocolConstants.StreamingDelimiter, cancellationToken);
|
|
|
}
|
|
|
|
|
|
public void Complete()
|