Browse Source

Mark all replayed messages as persisted

Olivier Coanet 7 years ago
parent
commit
0cc5815ea4

+ 23 - 3
src/Abc.Zebus.Tests/Persistence/PersistentTransportTests.cs

@@ -67,6 +67,25 @@ namespace Abc.Zebus.Tests.Persistence
             forwardedTransportMessage.WasPersisted.ShouldEqual(true);
         }
 
+        [Test]
+        public void should_force_WasPersisted_for_replayed_messages_during_safety_phase()
+        {
+            Transport.Start();
+
+            InnerTransport.RaiseMessageReceived(new ReplayPhaseEnded(ReplayId).ToTransportMessage());
+
+            var sourceTransportMessage = new FakeCommand(123).ToTransportMessage();
+            sourceTransportMessage.WasPersisted = null;
+
+            var replayTransportMessage = sourceTransportMessage.ToReplayedTransportMessage(ReplayId);
+            InnerTransport.RaiseMessageReceived(replayTransportMessage);
+
+            Wait.Until(() => MessagesForwardedToBus.Count == 1, 150.Milliseconds());
+
+            var forwardedTransportMessage = MessagesForwardedToBus.ExpectedSingle();
+            forwardedTransportMessage.WasPersisted.ShouldEqual(true);
+        }
+
         [Test]
         public void should_forward_a_normal_message_after_a_back_to_live_event()
         {
@@ -78,9 +97,10 @@ namespace Abc.Zebus.Tests.Persistence
 
             InnerTransport.RaiseMessageReceived(new ReplayPhaseEnded(StartMessageReplayCommand.ReplayId).ToTransportMessage());
 
-            Thread.Sleep(50);
-            MessagesForwardedToBus.Count.ShouldEqual(1);
-            MessagesForwardedToBus.Single().ShouldEqualDeeply(transportMessageToForward);
+            Wait.Until(() => MessagesForwardedToBus.Count == 1, 150.Milliseconds());
+
+            var transportMessage = MessagesForwardedToBus.Single();
+            transportMessage.ShouldEqualDeeply(transportMessageToForward);
         }
 
         [Test, Repeat(20)]

+ 5 - 5
src/Abc.Zebus/Persistence/PersistentTransport.Phases.cs

@@ -32,6 +32,11 @@ namespace Abc.Zebus.Persistence
                 var messageReplayed = replayEvent as MessageReplayed;
                 if (messageReplayed != null)
                 {
+                    // the message was persisted because it comes from the persistence
+                    // but previous Zebus versions do not specify the WasPersisted field
+                    // => force WasPersisted to support previous Zebus version and make sure the message will be acked
+                    messageReplayed.Message.WasPersisted = true;
+
                     OnMessageReplayed(messageReplayed);
                     return;
                 }
@@ -120,11 +125,6 @@ namespace Abc.Zebus.Persistence
             {
                 Transport._logger.DebugFormat("REPLAY: {0} {1}", messageReplayed.Message.MessageTypeId, messageReplayed.Message.Id);
 
-                // the message was persisted because it comes from the persistence
-                // but previous Zebus versions do not specify the WasPersisted field
-                // => force WasPersisted to support previous Zebus version and make sure the message will be acked
-                messageReplayed.Message.WasPersisted = true;
-
                 Transport.TriggerMessageReceived(messageReplayed.Message);
                 Transport._receivedMessagesIds.TryAdd(messageReplayed.Message.Id, true);