Просмотр исходного кода

Multiple accept loops in named pipes transport (#46259)

James Newton-King 3 лет назад
Родитель
Сommit
83237c9afa

+ 13 - 8
src/Servers/Kestrel/Transport.NamedPipes/src/Internal/NamedPipeConnectionListener.cs

@@ -26,7 +26,7 @@ internal sealed class NamedPipeConnectionListener : IConnectionListener
     private readonly PipeOptions _inputOptions;
     private readonly PipeOptions _outputOptions;
     private readonly Mutex _mutex;
-    private Task? _listeningTask;
+    private Task[]? _listeningTasks;
     private int _disposed;
 
     public NamedPipeConnectionListener(
@@ -45,7 +45,7 @@ internal sealed class NamedPipeConnectionListener : IConnectionListener
         // The OS maintains a backlog of clients that are waiting to connect, so the app queue only stores a single connection.
         // We want to have a queue plus a background task that populates the queue, rather than creating NamedPipeServerStream
         // when AcceptAsync is called, so that the server is always the owner of the pipe name.
-        _acceptedQueue = Channel.CreateBounded<ConnectionContext>(new BoundedChannelOptions(capacity: 1) { SingleWriter = true });
+        _acceptedQueue = Channel.CreateBounded<ConnectionContext>(new BoundedChannelOptions(capacity: 1));
 
         var maxReadBufferSize = _options.MaxReadBufferSize ?? 0;
         var maxWriteBufferSize = _options.MaxWriteBufferSize ?? 0;
@@ -56,12 +56,17 @@ internal sealed class NamedPipeConnectionListener : IConnectionListener
 
     public void Start()
     {
-        Debug.Assert(_listeningTask == null, "Already started");
+        Debug.Assert(_listeningTasks == null, "Already started");
 
-        // Start first stream inline to catch creation errors.
-        var initialStream = CreateServerStream();
+        _listeningTasks = new Task[_options.ListenerQueueCount];
 
-        _listeningTask = StartAsync(initialStream);
+        for (var i = 0; i < _listeningTasks.Length; i++)
+        {
+            // Start first stream inline to catch creation errors.
+            var initialStream = CreateServerStream();
+
+            _listeningTasks[i] = Task.Run(() => StartAsync(initialStream));
+        }
     }
 
     public EndPoint EndPoint => _endpoint;
@@ -182,9 +187,9 @@ internal sealed class NamedPipeConnectionListener : IConnectionListener
 
         _listeningTokenSource.Dispose();
         _mutex.Dispose();
-        if (_listeningTask != null)
+        if (_listeningTasks != null)
         {
-            await _listeningTask;
+            await Task.WhenAll(_listeningTasks);
         }
     }
 }

+ 8 - 0
src/Servers/Kestrel/Transport.NamedPipes/src/NamedPipeTransportOptions.cs

@@ -11,6 +11,14 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes;
 /// </summary>
 public sealed class NamedPipeTransportOptions
 {
+    /// <summary>
+    /// The number of listener queues used to accept name pipe connections.
+    /// </summary>
+    /// <remarks>
+    /// Defaults to <see cref="Environment.ProcessorCount" /> rounded down and clamped between 1 and 16.
+    /// </remarks>
+    public int ListenerQueueCount { get; set; } = Math.Min(Environment.ProcessorCount, 16);
+
     /// <summary>
     /// Gets or sets the maximum unconsumed incoming bytes the transport will buffer.
     /// <para>

+ 2 - 0
src/Servers/Kestrel/Transport.NamedPipes/src/PublicAPI.Unshipped.txt

@@ -3,6 +3,8 @@ Microsoft.AspNetCore.Hosting.WebHostBuilderNamedPipeExtensions
 Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes.NamedPipeTransportOptions
 Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes.NamedPipeTransportOptions.CurrentUserOnly.get -> bool
 Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes.NamedPipeTransportOptions.CurrentUserOnly.set -> void
+Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes.NamedPipeTransportOptions.ListenerQueueCount.get -> int
+Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes.NamedPipeTransportOptions.ListenerQueueCount.set -> void
 Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes.NamedPipeTransportOptions.MaxReadBufferSize.get -> long?
 Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes.NamedPipeTransportOptions.MaxReadBufferSize.set -> void
 Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes.NamedPipeTransportOptions.MaxWriteBufferSize.get -> long?

+ 67 - 0
src/Servers/Kestrel/Transport.NamedPipes/test/NamedPipeConnectionListenerTests.cs

@@ -68,6 +68,73 @@ public class NamedPipeConnectionListenerTests : TestApplicationErrorLoggerLogged
         Assert.Contains(LogMessages, m => m.EventId.Name == "ConnectionListenerAborted");
     }
 
+    [Theory]
+    [InlineData(1)]
+    [InlineData(4)]
+    [InlineData(16)]
+    public async Task AcceptAsync_ParallelConnections_ClientConnectionsSuccessfullyAccepted(int listenerQueueCount)
+    {
+        // Arrange
+        const int ParallelCount = 10;
+        const int ParallelCallCount = 250;
+        const int TotalCallCount = ParallelCount * ParallelCallCount;
+
+        var currentCallCount = 0;
+        var options = new NamedPipeTransportOptions();
+        options.ListenerQueueCount = listenerQueueCount;
+        await using var connectionListener = await NamedPipeTestHelpers.CreateConnectionListenerFactory(LoggerFactory, options: options);
+
+        // Act
+        var serverTask = Task.Run(async () =>
+        {
+            while (currentCallCount < TotalCallCount)
+            {
+                _ = await connectionListener.AcceptAsync();
+
+                currentCallCount++;
+
+                Logger.LogInformation($"Server accepted {currentCallCount} calls.");
+            }
+
+            Logger.LogInformation($"Server task complete.");
+        });
+
+        var cts = new CancellationTokenSource();
+        var parallelTasks = new List<Task>();
+        for (var i = 0; i < ParallelCount; i++)
+        {
+            parallelTasks.Add(Task.Run(async () =>
+            {
+                var clientStreamCount = 0;
+                while (clientStreamCount < ParallelCallCount)
+                {
+                    try
+                    {
+                        var clientStream = NamedPipeTestHelpers.CreateClientStream(connectionListener.EndPoint);
+                        await clientStream.ConnectAsync(cts.Token);
+
+                        await clientStream.WriteAsync(new byte[1], cts.Token);
+                        await clientStream.DisposeAsync();
+                        clientStreamCount++;
+                    }
+                    catch (IOException ex)
+                    {
+                        Logger.LogInformation(ex, "Client exception.");
+                    }
+                    catch (OperationCanceledException)
+                    {
+                        break;
+                    }
+                }
+            }));
+        }
+
+        await serverTask.DefaultTimeout();
+
+        cts.Cancel();
+        await Task.WhenAll(parallelTasks).DefaultTimeout();
+    }
+
     [ConditionalFact]
     [OSSkipCondition(OperatingSystems.Linux | OperatingSystems.MacOSX, SkipReason = "Non-OS implementations use UDS with an unlimited accept limit.")]
     public async Task AcceptAsync_HitBacklogLimit_ClientConnectionsSuccessfullyAccepted()