|
@@ -523,32 +523,49 @@ namespace Abc.Zebus.Tests.Transport
|
|
|
[Test]
|
|
|
public void should_process_all_messages_in_buffer_on_stop()
|
|
|
{
|
|
|
- var receivedMessages = new List<TransportMessage>();
|
|
|
+ var state = new should_process_all_messages_in_buffer_on_stop_state { ShouldSend = true };
|
|
|
|
|
|
- var receivingTransport = CreateAndStartZmqTransport(onMessageReceived: receivedMessages.Add);
|
|
|
+ var receivingTransport = CreateAndStartZmqTransport(onMessageReceived: x => state.ReceivedMessageCount++);
|
|
|
var sendingTransport = CreateAndStartZmqTransport();
|
|
|
var receivingPeer = new Peer(sendingTransport.PeerId, receivingTransport.InboundEndPoint);
|
|
|
- var count = 0;
|
|
|
- var shouldSendMessages = true;
|
|
|
+
|
|
|
var senderTask = new Thread(() =>
|
|
|
{
|
|
|
- while (shouldSendMessages)
|
|
|
- sendingTransport.Send(new FakeCommand(count++).ToTransportMessage(), new[] { receivingPeer });
|
|
|
+ Log($"Send loop started");
|
|
|
+
|
|
|
+ while (state.ShouldSend)
|
|
|
+ {
|
|
|
+ sendingTransport.Send(new FakeCommand(state.SentMessageCount++).ToTransportMessage(), new[] { receivingPeer });
|
|
|
+ }
|
|
|
+
|
|
|
+ Log($"Send loop terminated, Count: {state.SentMessageCount}");
|
|
|
|
|
|
sendingTransport.Stop();
|
|
|
+
|
|
|
+ Log($"Sender stopped");
|
|
|
});
|
|
|
+
|
|
|
senderTask.Start();
|
|
|
- Wait.Until(() => receivedMessages.Count != 0, 2.Seconds());
|
|
|
+ Wait.Until(() => state.ReceivedMessageCount != 0, 2.Seconds());
|
|
|
|
|
|
- Console.WriteLine("Stopping the sender for the end\r\n\r\n");
|
|
|
- shouldSendMessages = false;
|
|
|
+ Log($"Stopping the sender");
|
|
|
+ state.ShouldSend = false;
|
|
|
senderTask.Join();
|
|
|
- Console.WriteLine("Stopping the receiver for the end\r\n\r\n");
|
|
|
+
|
|
|
+ Log($"Stopping the receiver");
|
|
|
receivingTransport.Stop();
|
|
|
- Console.WriteLine("Receiver stopped\r\n\r\n");
|
|
|
+ Log($"Receiver stopped");
|
|
|
|
|
|
- Thread.MemoryBarrier();
|
|
|
- receivedMessages.Count.ShouldEqual(count);
|
|
|
+ state.ReceivedMessageCount.ShouldEqual(state.SentMessageCount);
|
|
|
+
|
|
|
+ void Log(string text) => Console.WriteLine(DateTime.Now.TimeOfDay + " " + text + Environment.NewLine + Environment.NewLine);
|
|
|
+ }
|
|
|
+
|
|
|
+ private class should_process_all_messages_in_buffer_on_stop_state
|
|
|
+ {
|
|
|
+ public volatile int ReceivedMessageCount;
|
|
|
+ public volatile int SentMessageCount;
|
|
|
+ public volatile bool ShouldSend;
|
|
|
}
|
|
|
|
|
|
[Test]
|