|
@@ -72,7 +72,7 @@ namespace Abc.Zebus.Tests.Transport
|
|
|
var message = new FakeCommand(1).ToTransportMessage();
|
|
|
transport1.Send(message, new[] { transport2Peer });
|
|
|
|
|
|
- Wait.Until(() => transport2ReceivedMessages.Count >= 1, 2.Seconds());
|
|
|
+ Wait.Until(() => transport2ReceivedMessages.Count >= 1, 30.Seconds());
|
|
|
transport2ReceivedMessages.Single().Id.ShouldEqual(message.Id);
|
|
|
}
|
|
|
|
|
@@ -90,7 +90,7 @@ namespace Abc.Zebus.Tests.Transport
|
|
|
senderTransport.Send(message, new[] { nonExistingPeer });
|
|
|
senderTransport.Send(message, new[] { destinationPeer });
|
|
|
|
|
|
- Wait.Until(() => receivedMessages.Count >= 1, 2.Seconds(), "The outbound thread was killed and couldn't connect to the next peer");
|
|
|
+ Wait.Until(() => receivedMessages.Count >= 1, 30.Seconds(), "The outbound thread was killed and couldn't connect to the next peer");
|
|
|
}
|
|
|
|
|
|
[Test]
|
|
@@ -111,7 +111,7 @@ namespace Abc.Zebus.Tests.Transport
|
|
|
transport2.Configure(transport2Peer.Id, _environment);
|
|
|
transport1.Send(message2, new[] { transport2Peer }); //should arrive
|
|
|
|
|
|
- Wait.Until(() => transport2ReceivedMessages.Count >= 1, 2.Seconds());
|
|
|
+ Wait.Until(() => transport2ReceivedMessages.Count >= 1, 30.Seconds());
|
|
|
transport2ReceivedMessages.Single().Id.ShouldEqual(message2.Id);
|
|
|
}
|
|
|
|
|
@@ -129,7 +129,7 @@ namespace Abc.Zebus.Tests.Transport
|
|
|
var message1 = new FakeCommand(1).ToTransportMessage();
|
|
|
transport1.Send(message1, new[] { transport2Peer });
|
|
|
|
|
|
- Wait.Until(() => transport2ReceivedMessages.Count == 1, 2.Seconds());
|
|
|
+ Wait.Until(() => transport2ReceivedMessages.Count == 1, 30.Seconds());
|
|
|
var transport2ReceivedMessage = transport2ReceivedMessages.ExpectedSingle();
|
|
|
transport2ReceivedMessage.ShouldHaveSamePropertiesAs(message1, "Environment", "WasPersisted");
|
|
|
transport2ReceivedMessage.Environment.ShouldEqual("Test");
|
|
@@ -138,7 +138,7 @@ namespace Abc.Zebus.Tests.Transport
|
|
|
var message2 = new FakeCommand(2).ToTransportMessage();
|
|
|
transport2.Send(message2, new[] { transport1Peer });
|
|
|
|
|
|
- Wait.Until(() => transport1ReceivedMessages.Count == 1, 2.Seconds());
|
|
|
+ Wait.Until(() => transport1ReceivedMessages.Count == 1, 30.Seconds());
|
|
|
var transport1ReceivedMessage = transport1ReceivedMessages.ExpectedSingle();
|
|
|
transport1ReceivedMessage.ShouldHaveSamePropertiesAs(message2, "Environment", "WasPersisted");
|
|
|
transport1ReceivedMessage.Environment.ShouldEqual("Test");
|
|
@@ -164,13 +164,13 @@ namespace Abc.Zebus.Tests.Transport
|
|
|
var message = new FakeCommand(999).ToTransportMessage();
|
|
|
senderTransport.Send(message, new[] { receiverPeer }, new SendContext { PersistentPeerIds = { receiverPeer.Id }, PersistencePeer = persistencePeer });
|
|
|
|
|
|
- Wait.Until(() => receiverMessages.Count == 1, 2.Seconds());
|
|
|
+ Wait.Until(() => receiverMessages.Count == 1, 30.Seconds());
|
|
|
var messageFromReceiver = receiverMessages.ExpectedSingle();
|
|
|
messageFromReceiver.ShouldHaveSamePropertiesAs(message, "Environment", "WasPersisted");
|
|
|
messageFromReceiver.Environment.ShouldEqual("Test");
|
|
|
messageFromReceiver.WasPersisted.ShouldEqual(true);
|
|
|
|
|
|
- Wait.Until(() => persistenceMessages.Count == 1, 2.Seconds());
|
|
|
+ Wait.Until(() => persistenceMessages.Count == 1, 30.Seconds());
|
|
|
var messageFromPersistence = persistenceMessages.ExpectedSingle();
|
|
|
messageFromPersistence.ShouldHaveSamePropertiesAs(message, "Environment", "WasPersisted", "PersistentPeerIds", "IsPersistTransportMessage");
|
|
|
messageFromPersistence.Environment.ShouldEqual("Test");
|
|
@@ -194,7 +194,7 @@ namespace Abc.Zebus.Tests.Transport
|
|
|
var message = new FakeCommand(999).ToTransportMessage();
|
|
|
senderTransport.Send(message, Enumerable.Empty<Peer>(), new SendContext { PersistentPeerIds = { receiverPeerId }, PersistencePeer = persistencePeer });
|
|
|
|
|
|
- Wait.Until(() => persistenceMessages.Count == 1, 2.Seconds());
|
|
|
+ Wait.Until(() => persistenceMessages.Count == 1, 30.Seconds());
|
|
|
var messageFromPersistence = persistenceMessages.ExpectedSingle();
|
|
|
messageFromPersistence.ShouldHaveSamePropertiesAs(message, "Environment", "WasPersisted", "PersistentPeerIds", "IsPersistTransportMessage");
|
|
|
messageFromPersistence.Environment.ShouldEqual("Test");
|
|
@@ -217,7 +217,7 @@ namespace Abc.Zebus.Tests.Transport
|
|
|
var message = new FakeCommand(999).ToTransportMessage().ToPersistTransportMessage(receiverPeerId);
|
|
|
senderTransport.Send(message, new[] { persistencePeer });
|
|
|
|
|
|
- Wait.Until(() => persistenceMessages.Count == 1, 2.Seconds());
|
|
|
+ Wait.Until(() => persistenceMessages.Count == 1, 30.Seconds());
|
|
|
var messageFromPersistence = persistenceMessages.ExpectedSingle();
|
|
|
messageFromPersistence.ShouldHaveSamePropertiesAs(message, "Environment", "WasPersisted");
|
|
|
messageFromPersistence.Environment.ShouldEqual("Test");
|
|
@@ -238,7 +238,7 @@ namespace Abc.Zebus.Tests.Transport
|
|
|
sender.Send(message, new[] { receivingPeer }, new SendContext { PersistentPeerIds = { receivingPeer.Id } });
|
|
|
sender.Send(otherMessage, new[] { receivingPeer }, new SendContext());
|
|
|
|
|
|
- Wait.Until(() => receivedMessages.Count >= 2, 2.Seconds());
|
|
|
+ Wait.Until(() => receivedMessages.Count >= 2, 30.Seconds());
|
|
|
receivedMessages.Single(x => x.Id == message.Id).WasPersisted.ShouldEqual(true);
|
|
|
receivedMessages.Single(x => x.Id == otherMessage.Id).WasPersisted.ShouldEqual(false);
|
|
|
}
|
|
@@ -259,7 +259,7 @@ namespace Abc.Zebus.Tests.Transport
|
|
|
|
|
|
sender.Send(message, new[] { receivingPeer1, receivingPeer2 }, new SendContext { PersistentPeerIds = { receivingPeer1.Id } });
|
|
|
|
|
|
- Wait.Until(() => receivedMessages.Count >= 2, 2.Seconds());
|
|
|
+ Wait.Until(() => receivedMessages.Count >= 2, 30.Seconds());
|
|
|
receivedMessages.ShouldContain(x => x.Id == message.Id && x.WasPersisted == true);
|
|
|
receivedMessages.ShouldContain(x => x.Id == message.Id && x.WasPersisted == false);
|
|
|
}
|
|
@@ -275,13 +275,13 @@ namespace Abc.Zebus.Tests.Transport
|
|
|
var receiver = receiverTransport1.GetPeer();
|
|
|
|
|
|
senderTransport.Send(new FakeCommand(0).ToTransportMessage(), new[] { receiver });
|
|
|
- Wait.Until(() => receivedMessages.Count == 1, 2.Seconds());
|
|
|
+ Wait.Until(() => receivedMessages.Count == 1, 30.Seconds());
|
|
|
|
|
|
receiverTransport1.Stop();
|
|
|
receiver.EndPoint = receiverTransport2.InboundEndPoint;
|
|
|
|
|
|
senderTransport.Send(new FakeCommand(0).ToTransportMessage(), new[] { receiver });
|
|
|
- Wait.Until(() => receivedMessages.Count == 2, 2.Seconds(), "unable to receive message");
|
|
|
+ Wait.Until(() => receivedMessages.Count == 2, 30.Seconds(), "unable to receive message");
|
|
|
}
|
|
|
|
|
|
[Test]
|
|
@@ -301,7 +301,7 @@ namespace Abc.Zebus.Tests.Transport
|
|
|
|
|
|
var message = new FakeCommand(1).ToTransportMessage();
|
|
|
senderTransport.Send(message, new[] { receiverPeer });
|
|
|
- Wait.Until(() => senderTransport.OutboundSocketCount == 1, 2.Seconds());
|
|
|
+ Wait.Until(() => senderTransport.OutboundSocketCount == 1, 30.Seconds());
|
|
|
|
|
|
senderTransport.OnPeerUpdated(receiverPeer.Id, PeerUpdateAction.Decommissioned);
|
|
|
|
|
@@ -311,7 +311,7 @@ namespace Abc.Zebus.Tests.Transport
|
|
|
|
|
|
using (SystemDateTime.PauseTime(SystemDateTime.UtcNow.Add(30.Seconds())))
|
|
|
{
|
|
|
- Wait.Until(() => senderTransport.OutboundSocketCount == 0, 1.Seconds(), "Socket should be disconnected");
|
|
|
+ Wait.Until(() => senderTransport.OutboundSocketCount == 0, 30.Seconds(), "Socket should be disconnected");
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -324,11 +324,11 @@ namespace Abc.Zebus.Tests.Transport
|
|
|
|
|
|
var message = new FakeCommand(1).ToTransportMessage();
|
|
|
senderTransport.Send(message, new[] { receiverPeer });
|
|
|
- Wait.Until(() => senderTransport.OutboundSocketCount == 1, 2.Seconds());
|
|
|
+ Wait.Until(() => senderTransport.OutboundSocketCount == 1, 30.Seconds());
|
|
|
|
|
|
senderTransport.OnPeerUpdated(receiverPeer.Id, PeerUpdateAction.Started);
|
|
|
|
|
|
- Wait.Until(() => senderTransport.OutboundSocketCount == 0, 2.Seconds(), "Socket should be disconnected");
|
|
|
+ Wait.Until(() => senderTransport.OutboundSocketCount == 0, 30.Seconds(), "Socket should be disconnected");
|
|
|
}
|
|
|
|
|
|
[Test]
|
|
@@ -346,7 +346,7 @@ namespace Abc.Zebus.Tests.Transport
|
|
|
senderTransport.Send(message, new[] { receiver });
|
|
|
}
|
|
|
|
|
|
- Wait.Until(() => receivedMessages.Count == 10, 1.Second());
|
|
|
+ Wait.Until(() => receivedMessages.Count == 10, 30.Seconds());
|
|
|
|
|
|
for (var i = 0; i < 10; ++i)
|
|
|
{
|
|
@@ -355,7 +355,6 @@ namespace Abc.Zebus.Tests.Transport
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- [Timeout(10 * 60 * 1000)]
|
|
|
[TestCase(10)]
|
|
|
[TestCase(25)]
|
|
|
// Cases with high peer counts are too slow to run automatically, but they are required to validate edge cases.
|
|
@@ -376,7 +375,7 @@ namespace Abc.Zebus.Tests.Transport
|
|
|
var message = new FakeCommand(999).ToTransportMessage();
|
|
|
senderTransport.Send(message, receiverTransports.Select(x => x.GetPeer()));
|
|
|
|
|
|
- Wait.Until(() => Volatile.Read(ref receivedMessagesCount) == peerCount, 30.Second());
|
|
|
+ Wait.Until(() => Volatile.Read(ref receivedMessagesCount) == peerCount, 30.Seconds());
|
|
|
}
|
|
|
|
|
|
[Test]
|
|
@@ -400,7 +399,7 @@ namespace Abc.Zebus.Tests.Transport
|
|
|
var message = new FakeCommand(999).ToTransportMessage();
|
|
|
senderTransport.Send(message, receiverTransports.Select(x => x.GetPeer()));
|
|
|
|
|
|
- Wait.Until(() => receivedMessages.Count == maximumSocketCount - 1, 10.Seconds());
|
|
|
+ Wait.Until(() => receivedMessages.Count == maximumSocketCount - 1, 30.Seconds());
|
|
|
|
|
|
Thread.Sleep(1.Second());
|
|
|
|
|
@@ -432,7 +431,7 @@ namespace Abc.Zebus.Tests.Transport
|
|
|
senderTransport.Send(message, new[] { upReceiver, downReceiver });
|
|
|
|
|
|
var expectedMessageCount = i;
|
|
|
- Wait.Until(() => receivedMessages.Count == expectedMessageCount, 2.Seconds(), "Failed to send message after " + i + " successful sent");
|
|
|
+ Wait.Until(() => receivedMessages.Count == expectedMessageCount, 30.Seconds(), "Failed to send message after " + i + " successful sent");
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -468,7 +467,7 @@ namespace Abc.Zebus.Tests.Transport
|
|
|
}
|
|
|
|
|
|
var receiverStopwatch = Stopwatch.StartNew();
|
|
|
- Wait.Until(() => receivedMessages.Count == 10, 10.Seconds(), "Timed out while waiting for messages");
|
|
|
+ Wait.Until(() => receivedMessages.Count == 10, 30.Seconds(), "Timed out while waiting for messages");
|
|
|
receiverStopwatch.Stop();
|
|
|
Console.WriteLine("Elapsed time to get messages: " + receiverStopwatch.Elapsed);
|
|
|
receiverStopwatch.ElapsedMilliseconds.ShouldBeLessOrEqualThan(1000, "Throughput is too low");
|
|
@@ -489,7 +488,7 @@ namespace Abc.Zebus.Tests.Transport
|
|
|
senderTransport.Send(message, new[] { invalidPeer, receiver });
|
|
|
}
|
|
|
|
|
|
- Wait.Until(() => receivedMessageCount == 1000, 5.Seconds());
|
|
|
+ Wait.Until(() => receivedMessageCount == 1000, 30.Seconds());
|
|
|
}
|
|
|
|
|
|
[Test]
|
|
@@ -510,14 +509,14 @@ namespace Abc.Zebus.Tests.Transport
|
|
|
var bigMessage = new TransportMessage(new MessageTypeId(typeof(FakeCommand)), messageBytes, new PeerId("X"), senderTransport.InboundEndPoint);
|
|
|
senderTransport.Send(bigMessage, new[] { receiver });
|
|
|
|
|
|
- Wait.Until(() => receivedMessages.Count == 1, 2.Seconds());
|
|
|
+ Wait.Until(() => receivedMessages.Count == 1, 30.Seconds());
|
|
|
|
|
|
receivedMessages[0].ShouldHaveSamePropertiesAs(bigMessage, "Environment", "WasPersisted");
|
|
|
|
|
|
var smallMessage = new TransportMessage(new MessageTypeId(typeof(FakeCommand)), new byte[1], new PeerId("X"), senderTransport.InboundEndPoint);
|
|
|
senderTransport.Send(smallMessage, new[] { receiver });
|
|
|
|
|
|
- Wait.Until(() => receivedMessages.Count == 2, 2.Seconds());
|
|
|
+ Wait.Until(() => receivedMessages.Count == 2, 30.Seconds());
|
|
|
|
|
|
receivedMessages[1].ShouldHaveSamePropertiesAs(smallMessage, "Environment", "WasPersisted");
|
|
|
}
|
|
@@ -531,7 +530,7 @@ namespace Abc.Zebus.Tests.Transport
|
|
|
|
|
|
transport.Send(new FakeCommand(1).ToTransportMessage(), new[] { self });
|
|
|
|
|
|
- Wait.Until(() => receivedMessages.Count == 1, 2.Seconds());
|
|
|
+ Wait.Until(() => receivedMessages.Count == 1, 30.Seconds());
|
|
|
}
|
|
|
|
|
|
[Test]
|
|
@@ -561,7 +560,7 @@ namespace Abc.Zebus.Tests.Transport
|
|
|
Console.WriteLine($"{sendCount} messages sent");
|
|
|
});
|
|
|
|
|
|
- Wait.Until(() => receivedMessages.Count > 1, 10.Seconds());
|
|
|
+ Wait.Until(() => receivedMessages.Count > 1, 30.Seconds());
|
|
|
Console.WriteLine("Message received");
|
|
|
|
|
|
receivingTransport.Stop();
|
|
@@ -602,7 +601,7 @@ namespace Abc.Zebus.Tests.Transport
|
|
|
});
|
|
|
|
|
|
senderTask.Start();
|
|
|
- Wait.Until(() => receivedMessageCount != 0, 2.Seconds());
|
|
|
+ Wait.Until(() => receivedMessageCount != 0, 30.Seconds());
|
|
|
|
|
|
Log($"Stopping the sender");
|
|
|
shouldSend[0] = false;
|
|
@@ -632,12 +631,12 @@ namespace Abc.Zebus.Tests.Transport
|
|
|
|
|
|
transport1.Send(new FakeCommand(0).ToTransportMessage(), new[] { peer2 });
|
|
|
transport2.Send(new FakeCommand(0).ToTransportMessage(), new[] { peer1 });
|
|
|
- Wait.Until(() => transport1.OutboundSocketCount == 1, 10.Seconds());
|
|
|
- Wait.Until(() => transport2.OutboundSocketCount == 1, 10.Seconds());
|
|
|
+ Wait.Until(() => transport1.OutboundSocketCount == 1, 30.Seconds());
|
|
|
+ Wait.Until(() => transport2.OutboundSocketCount == 1, 30.Seconds());
|
|
|
|
|
|
transport2.Stop();
|
|
|
|
|
|
- Wait.Until(() => transport1.OutboundSocketCount == 0, 10.Seconds());
|
|
|
+ Wait.Until(() => transport1.OutboundSocketCount == 0, 30.Seconds());
|
|
|
}
|
|
|
|
|
|
private ZmqTransport CreateZmqTransport(string endPoint = "tcp://*:*", Action<TransportMessage> onMessageReceived = null, string peerId = null, string environment = _environment, ZmqSocketOptions socketOptions = null)
|