|
|
@@ -2,16 +2,10 @@
|
|
|
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
|
|
|
|
|
|
using System;
|
|
|
-using System.Collections.Generic;
|
|
|
using System.IO;
|
|
|
using System.Linq;
|
|
|
using System.Net;
|
|
|
using System.Threading.Tasks;
|
|
|
-using Microsoft.AspNetCore.Http;
|
|
|
-using Microsoft.AspNetCore.Connections;
|
|
|
-using Microsoft.AspNetCore.Server.Kestrel.Core;
|
|
|
-using Microsoft.AspNetCore.Server.Kestrel.Core.Internal;
|
|
|
-using Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions.Internal;
|
|
|
using Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal;
|
|
|
using Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal.Networking;
|
|
|
using Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests.TestHelpers;
|
|
|
@@ -28,42 +22,39 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests
|
|
|
{
|
|
|
var libuv = new LibuvFunctions();
|
|
|
|
|
|
- var listenOptions = new ListenOptions(new IPEndPoint(IPAddress.Loopback, 0));
|
|
|
+ var endpoint = new IPEndPoint(IPAddress.Loopback, 0);
|
|
|
|
|
|
- var serviceContextPrimary = new TestServiceContext();
|
|
|
var transportContextPrimary = new TestLibuvTransportContext();
|
|
|
- var builderPrimary = new ConnectionBuilder();
|
|
|
- builderPrimary.UseHttpServer(serviceContextPrimary, new DummyApplication(c => c.Response.WriteAsync("Primary")), HttpProtocols.Http1);
|
|
|
- transportContextPrimary.ConnectionDispatcher = new ConnectionDispatcher(serviceContextPrimary, builderPrimary.Build());
|
|
|
-
|
|
|
- var serviceContextSecondary = new TestServiceContext();
|
|
|
- var builderSecondary = new ConnectionBuilder();
|
|
|
- builderSecondary.UseHttpServer(serviceContextSecondary, new DummyApplication(c => c.Response.WriteAsync("Secondary")), HttpProtocols.Http1);
|
|
|
var transportContextSecondary = new TestLibuvTransportContext();
|
|
|
- transportContextSecondary.ConnectionDispatcher = new ConnectionDispatcher(serviceContextSecondary, builderSecondary.Build());
|
|
|
-
|
|
|
- var libuvTransport = new LibuvTransport(libuv, transportContextPrimary, listenOptions);
|
|
|
|
|
|
var pipeName = (libuv.IsWindows ? @"\\.\pipe\kestrel_" : "/tmp/kestrel_") + Guid.NewGuid().ToString("n");
|
|
|
var pipeMessage = Guid.NewGuid().ToByteArray();
|
|
|
|
|
|
// Start primary listener
|
|
|
- var libuvThreadPrimary = new LibuvThread(libuvTransport);
|
|
|
+ var libuvThreadPrimary = new LibuvThread(libuv, transportContextPrimary);
|
|
|
await libuvThreadPrimary.StartAsync();
|
|
|
var listenerPrimary = new ListenerPrimary(transportContextPrimary);
|
|
|
- await listenerPrimary.StartAsync(pipeName, pipeMessage, listenOptions, libuvThreadPrimary);
|
|
|
- var address = GetUri(listenOptions);
|
|
|
+ await listenerPrimary.StartAsync(pipeName, pipeMessage, endpoint, libuvThreadPrimary);
|
|
|
+ var address = GetUri(listenerPrimary.EndPoint);
|
|
|
|
|
|
- // Until a secondary listener is added, TCP connections get dispatched directly
|
|
|
- Assert.Equal("Primary", await HttpClientSlim.GetStringAsync(address));
|
|
|
- Assert.Equal("Primary", await HttpClientSlim.GetStringAsync(address));
|
|
|
+ var acceptTask = listenerPrimary.AcceptAsync().AsTask();
|
|
|
+ using (var socket = await HttpClientSlim.GetSocket(address))
|
|
|
+ {
|
|
|
+ await (await acceptTask.DefaultTimeout()).DisposeAsync();
|
|
|
+ }
|
|
|
+
|
|
|
+ acceptTask = listenerPrimary.AcceptAsync().AsTask();
|
|
|
+ using (var socket = await HttpClientSlim.GetSocket(address))
|
|
|
+ {
|
|
|
+ await (await acceptTask.DefaultTimeout()).DisposeAsync();
|
|
|
+ }
|
|
|
|
|
|
var listenerCount = listenerPrimary.UvPipeCount;
|
|
|
// Add secondary listener
|
|
|
- var libuvThreadSecondary = new LibuvThread(libuvTransport);
|
|
|
+ var libuvThreadSecondary = new LibuvThread(libuv, transportContextSecondary);
|
|
|
await libuvThreadSecondary.StartAsync();
|
|
|
var listenerSecondary = new ListenerSecondary(transportContextSecondary);
|
|
|
- await listenerSecondary.StartAsync(pipeName, pipeMessage, listenOptions, libuvThreadSecondary);
|
|
|
+ await listenerSecondary.StartAsync(pipeName, pipeMessage, endpoint, libuvThreadSecondary);
|
|
|
|
|
|
var maxWait = Task.Delay(TestConstants.DefaultTimeout);
|
|
|
// wait for ListenerPrimary.ReadCallback to add the secondary pipe
|
|
|
@@ -77,14 +68,18 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests
|
|
|
}
|
|
|
|
|
|
// Once a secondary listener is added, TCP connections start getting dispatched to it
|
|
|
- await AssertResponseEventually(address, "Secondary", allowed: new[] { "Primary" });
|
|
|
+ // This returns the incomplete primary task after the secondary listener got the last
|
|
|
+ // connection
|
|
|
+ var primary = await WaitForSecondaryListener(address, listenerPrimary, listenerSecondary);
|
|
|
|
|
|
// TCP connections will still get round-robined to the primary listener
|
|
|
- Assert.Equal("Primary", await HttpClientSlim.GetStringAsync(address));
|
|
|
- Assert.Equal("Secondary", await HttpClientSlim.GetStringAsync(address));
|
|
|
- Assert.Equal("Primary", await HttpClientSlim.GetStringAsync(address));
|
|
|
+ ListenerContext currentListener = listenerSecondary;
|
|
|
+ Task<LibuvConnection> expected = primary;
|
|
|
+
|
|
|
+ await AssertRoundRobin(address, listenerPrimary, listenerSecondary, currentListener, expected);
|
|
|
|
|
|
await listenerSecondary.DisposeAsync();
|
|
|
+
|
|
|
await libuvThreadSecondary.StopAsync(TimeSpan.FromSeconds(5));
|
|
|
|
|
|
await listenerPrimary.DisposeAsync();
|
|
|
@@ -96,48 +91,36 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests
|
|
|
public async Task NonListenerPipeConnectionsAreLoggedAndIgnored()
|
|
|
{
|
|
|
var libuv = new LibuvFunctions();
|
|
|
- var listenOptions = new ListenOptions(new IPEndPoint(IPAddress.Loopback, 0));
|
|
|
+ var endpoint = new IPEndPoint(IPAddress.Loopback, 0);
|
|
|
var logger = new TestApplicationErrorLogger();
|
|
|
|
|
|
- var serviceContextPrimary = new TestServiceContext();
|
|
|
- var builderPrimary = new ConnectionBuilder();
|
|
|
- builderPrimary.UseHttpServer(serviceContextPrimary, new DummyApplication(c => c.Response.WriteAsync("Primary")), HttpProtocols.Http1);
|
|
|
var transportContextPrimary = new TestLibuvTransportContext { Log = new LibuvTrace(logger) };
|
|
|
- transportContextPrimary.ConnectionDispatcher = new ConnectionDispatcher(serviceContextPrimary, builderPrimary.Build());
|
|
|
-
|
|
|
- var serviceContextSecondary = new TestServiceContext
|
|
|
- {
|
|
|
- DateHeaderValueManager = serviceContextPrimary.DateHeaderValueManager,
|
|
|
- ServerOptions = serviceContextPrimary.ServerOptions,
|
|
|
- Scheduler = serviceContextPrimary.Scheduler,
|
|
|
- HttpParser = serviceContextPrimary.HttpParser,
|
|
|
- };
|
|
|
- var builderSecondary = new ConnectionBuilder();
|
|
|
- builderSecondary.UseHttpServer(serviceContextSecondary, new DummyApplication(c => c.Response.WriteAsync("Secondary")), HttpProtocols.Http1);
|
|
|
var transportContextSecondary = new TestLibuvTransportContext();
|
|
|
- transportContextSecondary.ConnectionDispatcher = new ConnectionDispatcher(serviceContextSecondary, builderSecondary.Build());
|
|
|
-
|
|
|
- var libuvTransport = new LibuvTransport(libuv, transportContextPrimary, listenOptions);
|
|
|
|
|
|
var pipeName = (libuv.IsWindows ? @"\\.\pipe\kestrel_" : "/tmp/kestrel_") + Guid.NewGuid().ToString("n");
|
|
|
var pipeMessage = Guid.NewGuid().ToByteArray();
|
|
|
|
|
|
// Start primary listener
|
|
|
- var libuvThreadPrimary = new LibuvThread(libuvTransport);
|
|
|
+ var libuvThreadPrimary = new LibuvThread(libuv, transportContextPrimary);
|
|
|
await libuvThreadPrimary.StartAsync();
|
|
|
var listenerPrimary = new ListenerPrimary(transportContextPrimary);
|
|
|
- await listenerPrimary.StartAsync(pipeName, pipeMessage, listenOptions, libuvThreadPrimary);
|
|
|
- var address = GetUri(listenOptions);
|
|
|
+ await listenerPrimary.StartAsync(pipeName, pipeMessage, endpoint, libuvThreadPrimary);
|
|
|
+ var address = GetUri(listenerPrimary.EndPoint);
|
|
|
|
|
|
// Add secondary listener
|
|
|
- var libuvThreadSecondary = new LibuvThread(libuvTransport);
|
|
|
+ var libuvThreadSecondary = new LibuvThread(libuv, transportContextSecondary);
|
|
|
await libuvThreadSecondary.StartAsync();
|
|
|
var listenerSecondary = new ListenerSecondary(transportContextSecondary);
|
|
|
- await listenerSecondary.StartAsync(pipeName, pipeMessage, listenOptions, libuvThreadSecondary);
|
|
|
+ await listenerSecondary.StartAsync(pipeName, pipeMessage, endpoint, libuvThreadSecondary);
|
|
|
|
|
|
// TCP Connections get round-robined
|
|
|
- await AssertResponseEventually(address, "Secondary", allowed: new[] { "Primary" });
|
|
|
- Assert.Equal("Primary", await HttpClientSlim.GetStringAsync(address));
|
|
|
+ var primary = await WaitForSecondaryListener(address, listenerPrimary, listenerSecondary);
|
|
|
+
|
|
|
+ // Make sure the pending accept get yields
|
|
|
+ using (var socket = await HttpClientSlim.GetSocket(address))
|
|
|
+ {
|
|
|
+ await (await primary.DefaultTimeout()).DisposeAsync();
|
|
|
+ }
|
|
|
|
|
|
// Create a pipe connection and keep it open without sending any data
|
|
|
var connectTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
|
|
|
@@ -173,9 +156,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests
|
|
|
await connectTcs.Task;
|
|
|
|
|
|
// TCP connections will still get round-robined between only the two listeners
|
|
|
- Assert.Equal("Secondary", await HttpClientSlim.GetStringAsync(address));
|
|
|
- Assert.Equal("Primary", await HttpClientSlim.GetStringAsync(address));
|
|
|
- Assert.Equal("Secondary", await HttpClientSlim.GetStringAsync(address));
|
|
|
+ await AssertRoundRobin(address, listenerPrimary, listenerSecondary, listenerPrimary);
|
|
|
|
|
|
await libuvThreadPrimary.PostAsync(_ => pipe.Dispose(), (object)null);
|
|
|
|
|
|
@@ -186,9 +167,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests
|
|
|
}
|
|
|
|
|
|
// Same for after the non-listener pipe connection is closed
|
|
|
- Assert.Equal("Primary", await HttpClientSlim.GetStringAsync(address));
|
|
|
- Assert.Equal("Secondary", await HttpClientSlim.GetStringAsync(address));
|
|
|
- Assert.Equal("Primary", await HttpClientSlim.GetStringAsync(address));
|
|
|
+ await AssertRoundRobin(address, listenerPrimary, listenerSecondary, listenerPrimary);
|
|
|
|
|
|
await listenerSecondary.DisposeAsync();
|
|
|
await libuvThreadSecondary.StopAsync(TimeSpan.FromSeconds(5));
|
|
|
@@ -207,45 +186,28 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests
|
|
|
public async Task PipeConnectionsWithWrongMessageAreLoggedAndIgnored()
|
|
|
{
|
|
|
var libuv = new LibuvFunctions();
|
|
|
- var listenOptions = new ListenOptions(new IPEndPoint(IPAddress.Loopback, 0));
|
|
|
+ var endpoint = new IPEndPoint(IPAddress.Loopback, 0);
|
|
|
|
|
|
var logger = new TestApplicationErrorLogger();
|
|
|
|
|
|
- var serviceContextPrimary = new TestServiceContext();
|
|
|
- var builderPrimary = new ConnectionBuilder();
|
|
|
- builderPrimary.UseHttpServer(serviceContextPrimary, new DummyApplication(c => c.Response.WriteAsync("Primary")), HttpProtocols.Http1);
|
|
|
var transportContextPrimary = new TestLibuvTransportContext { Log = new LibuvTrace(logger) };
|
|
|
- transportContextPrimary.ConnectionDispatcher = new ConnectionDispatcher(serviceContextPrimary, builderPrimary.Build());
|
|
|
-
|
|
|
- var serviceContextSecondary = new TestServiceContext
|
|
|
- {
|
|
|
- DateHeaderValueManager = serviceContextPrimary.DateHeaderValueManager,
|
|
|
- ServerOptions = serviceContextPrimary.ServerOptions,
|
|
|
- Scheduler = serviceContextPrimary.Scheduler,
|
|
|
- HttpParser = serviceContextPrimary.HttpParser,
|
|
|
- };
|
|
|
- var builderSecondary = new ConnectionBuilder();
|
|
|
- builderSecondary.UseHttpServer(serviceContextSecondary, new DummyApplication(c => c.Response.WriteAsync("Secondary")), HttpProtocols.Http1);
|
|
|
var transportContextSecondary = new TestLibuvTransportContext();
|
|
|
- transportContextSecondary.ConnectionDispatcher = new ConnectionDispatcher(serviceContextSecondary, builderSecondary.Build());
|
|
|
-
|
|
|
- var libuvTransport = new LibuvTransport(libuv, transportContextPrimary, listenOptions);
|
|
|
|
|
|
var pipeName = (libuv.IsWindows ? @"\\.\pipe\kestrel_" : "/tmp/kestrel_") + Guid.NewGuid().ToString("n");
|
|
|
var pipeMessage = Guid.NewGuid().ToByteArray();
|
|
|
|
|
|
// Start primary listener
|
|
|
- var libuvThreadPrimary = new LibuvThread(libuvTransport);
|
|
|
+ var libuvThreadPrimary = new LibuvThread(libuv, transportContextPrimary);
|
|
|
await libuvThreadPrimary.StartAsync();
|
|
|
var listenerPrimary = new ListenerPrimary(transportContextPrimary);
|
|
|
- await listenerPrimary.StartAsync(pipeName, pipeMessage, listenOptions, libuvThreadPrimary);
|
|
|
- var address = GetUri(listenOptions);
|
|
|
+ await listenerPrimary.StartAsync(pipeName, pipeMessage, endpoint, libuvThreadPrimary);
|
|
|
+ var address = GetUri(listenerPrimary.EndPoint);
|
|
|
|
|
|
// Add secondary listener with wrong pipe message
|
|
|
- var libuvThreadSecondary = new LibuvThread(libuvTransport);
|
|
|
+ var libuvThreadSecondary = new LibuvThread(libuv, transportContextSecondary);
|
|
|
await libuvThreadSecondary.StartAsync();
|
|
|
var listenerSecondary = new ListenerSecondary(transportContextSecondary);
|
|
|
- await listenerSecondary.StartAsync(pipeName, Guid.NewGuid().ToByteArray(), listenOptions, libuvThreadSecondary);
|
|
|
+ await listenerSecondary.StartAsync(pipeName, Guid.NewGuid().ToByteArray(), endpoint, libuvThreadSecondary);
|
|
|
|
|
|
// Wait up to 10 seconds for error to be logged
|
|
|
for (var i = 0; i < 10 && logger.TotalErrorsLogged == 0; i++)
|
|
|
@@ -253,10 +215,13 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests
|
|
|
await Task.Delay(100);
|
|
|
}
|
|
|
|
|
|
- // TCP Connections don't get round-robined
|
|
|
- Assert.Equal("Primary", await HttpClientSlim.GetStringAsync(address));
|
|
|
- Assert.Equal("Primary", await HttpClientSlim.GetStringAsync(address));
|
|
|
- Assert.Equal("Primary", await HttpClientSlim.GetStringAsync(address));
|
|
|
+ // TCP Connections don't get round-robined. This should time out if the request goes to the secondary listener
|
|
|
+ for (int i = 0; i < 3; i++)
|
|
|
+ {
|
|
|
+ using var socket = await HttpClientSlim.GetSocket(address);
|
|
|
+
|
|
|
+ await using var connection = await listenerPrimary.AcceptAsync().AsTask().DefaultTimeout();
|
|
|
+ }
|
|
|
|
|
|
await listenerSecondary.DisposeAsync();
|
|
|
await libuvThreadSecondary.StopAsync(TimeSpan.FromSeconds(5));
|
|
|
@@ -270,73 +235,73 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests
|
|
|
Assert.Contains("Bad data", errorMessage.Exception.ToString());
|
|
|
}
|
|
|
|
|
|
- private static async Task AssertResponseEventually(
|
|
|
- Uri address,
|
|
|
- string expected,
|
|
|
- string[] allowed = null,
|
|
|
- int maxRetries = 100,
|
|
|
- int retryDelay = 100)
|
|
|
+
|
|
|
+ private static async Task AssertRoundRobin(Uri address, ListenerPrimary listenerPrimary, ListenerSecondary listenerSecondary, ListenerContext currentListener, Task<LibuvConnection> expected = null, int connections = 4)
|
|
|
{
|
|
|
- for (var i = 0; i < maxRetries; i++)
|
|
|
+ for (int i = 0; i < connections; i++)
|
|
|
{
|
|
|
- var response = await HttpClientSlim.GetStringAsync(address);
|
|
|
- if (response == expected)
|
|
|
+ if (currentListener == listenerPrimary)
|
|
|
{
|
|
|
- return;
|
|
|
+ expected ??= listenerSecondary.AcceptAsync().AsTask();
|
|
|
+ currentListener = listenerSecondary;
|
|
|
}
|
|
|
-
|
|
|
- if (allowed != null)
|
|
|
+ else
|
|
|
{
|
|
|
- Assert.Contains(response, allowed);
|
|
|
+ expected ??= listenerPrimary.AcceptAsync().AsTask();
|
|
|
+ currentListener = listenerPrimary;
|
|
|
}
|
|
|
|
|
|
- await Task.Delay(retryDelay);
|
|
|
- }
|
|
|
+ using var socket = await HttpClientSlim.GetSocket(address);
|
|
|
|
|
|
- Assert.True(false, $"'{address}' failed to respond with '{expected}' in {maxRetries} retries.");
|
|
|
- }
|
|
|
+ await using var connection = await expected.DefaultTimeout();
|
|
|
|
|
|
- private static Uri GetUri(ListenOptions options)
|
|
|
- {
|
|
|
- if (options.Type != ListenType.IPEndPoint)
|
|
|
- {
|
|
|
- throw new InvalidOperationException($"Could not determine a proper URI for options with Type {options.Type}");
|
|
|
+ expected = null;
|
|
|
}
|
|
|
-
|
|
|
- var scheme = options.ConnectionAdapters.Any(f => f.IsHttps)
|
|
|
- ? "https"
|
|
|
- : "http";
|
|
|
-
|
|
|
- return new Uri($"{scheme}://{options.IPEndPoint}");
|
|
|
}
|
|
|
|
|
|
- private class ConnectionBuilder : IConnectionBuilder
|
|
|
+ private static async Task<Task<LibuvConnection>> WaitForSecondaryListener(Uri address, ListenerContext listenerPrimary, ListenerContext listenerSecondary)
|
|
|
{
|
|
|
- private readonly List<Func<ConnectionDelegate, ConnectionDelegate>> _components = new List<Func<ConnectionDelegate, ConnectionDelegate>>();
|
|
|
+ int maxRetries = 100;
|
|
|
+ int retryDelay = 100;
|
|
|
|
|
|
- public IServiceProvider ApplicationServices { get; set; }
|
|
|
+ Task<LibuvConnection> primary = null;
|
|
|
+ Task<LibuvConnection> secondary = null;
|
|
|
|
|
|
- public IConnectionBuilder Use(Func<ConnectionDelegate, ConnectionDelegate> middleware)
|
|
|
+ for (var i = 0; i < maxRetries; i++)
|
|
|
{
|
|
|
- _components.Add(middleware);
|
|
|
- return this;
|
|
|
- }
|
|
|
+ primary ??= listenerPrimary.AcceptAsync().AsTask();
|
|
|
+ secondary ??= listenerSecondary.AcceptAsync().AsTask();
|
|
|
|
|
|
- public ConnectionDelegate Build()
|
|
|
- {
|
|
|
- ConnectionDelegate app = context =>
|
|
|
+ using var _ = await HttpClientSlim.GetSocket(address);
|
|
|
+
|
|
|
+ var task = await Task.WhenAny(primary, secondary);
|
|
|
+
|
|
|
+ if (task == secondary)
|
|
|
{
|
|
|
- return Task.CompletedTask;
|
|
|
- };
|
|
|
+ // Dispose this connection now that we know the seconary listener is working
|
|
|
+ await (await secondary).DisposeAsync();
|
|
|
|
|
|
- for (int i = _components.Count - 1; i >= 0; i--)
|
|
|
+ // Return the primary task (it should be incomplete), we do this so that we can
|
|
|
+ return primary;
|
|
|
+ }
|
|
|
+ else
|
|
|
{
|
|
|
- var component = _components[i];
|
|
|
- app = component(app);
|
|
|
+ // Dispose the connection
|
|
|
+ await (await primary).DisposeAsync();
|
|
|
+
|
|
|
+ primary = null;
|
|
|
}
|
|
|
|
|
|
- return app;
|
|
|
+ await Task.Delay(retryDelay);
|
|
|
}
|
|
|
+
|
|
|
+ Assert.True(false, $"'{address}' failed to get queued connection in secondary listener in {maxRetries} retries.");
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ private static Uri GetUri(EndPoint endpoint)
|
|
|
+ {
|
|
|
+ return new Uri($"http://{endpoint}");
|
|
|
}
|
|
|
}
|
|
|
}
|