Browse Source

Core: instead of only trying to replay messages when a persistence peer starts, try on every send

  - if a message is being sent at the same time as the persistence peer starts up we want to replay all saved messages before sending any new ones
Mendel Monteiro-Beckerman 5 years ago
parent
commit
bab0993faf

+ 15 - 0
src/Abc.Zebus.Testing/Transport/TestTransport.cs

@@ -14,6 +14,7 @@ namespace Abc.Zebus.Testing.Transport
     public class TestTransport : ITransport
     {
         private readonly List<TransportMessageSent> _messages = new List<TransportMessageSent>();
+        private readonly List<TransportMessageSent> _messagesSentToPersistence = new List<TransportMessageSent>();
         private readonly List<UpdatedPeer> _updatedPeers = new List<UpdatedPeer>();
         private readonly List<TransportMessage> _ackedMessages = new List<TransportMessage>();
         private readonly MessageSerializer _messageSerializer = new MessageSerializer();
@@ -84,6 +85,9 @@ namespace Abc.Zebus.Testing.Transport
             if (peerList.Any() || context.PersistencePeer != null)
                 _messages.Add(new TransportMessageSent(message, peerList, context));
 
+            if (context.PersistencePeer != null || peerList.Count > 0 && peerList.All(x => x.Id.IsPersistence()))
+                _messagesSentToPersistence.Add(new TransportMessageSent(message, context.PersistencePeer ?? peerList.First(), true));
+
             var deserializedMessage = _messageSerializer.Deserialize(message.MessageTypeId, message.Content!);
             if (deserializedMessage != null)
                 MessagesSent.Add(deserializedMessage);
@@ -106,6 +110,17 @@ namespace Abc.Zebus.Testing.Transport
 
         public IList<TransportMessageSent> Messages => _messages;
 
+        public IReadOnlyList<TransportMessageSent> MessagesSentToPersistence => _messagesSentToPersistence;
+
+        public void Clear()
+        {
+            _messages.Clear();
+            _messagesSentToPersistence.Clear();
+            _ackedMessages.Clear();
+            MessagesSent.Clear();
+            _updatedPeers.Clear();
+        }
+
         public IList<TransportMessage> AckedMessages => _ackedMessages;
 
         public void ExpectExactly(params TransportMessageSent[] expectedMessages)

+ 2 - 2
src/Abc.Zebus.Tests/Persistence/PersistentTransportFixture.cs

@@ -65,7 +65,7 @@ namespace Abc.Zebus.Tests.Persistence
                 StartMessageReplayCommandTargets = startMessageReplayMessage.Targets;
             }
 
-            InnerTransport.Messages.Clear();
+            InnerTransport.Clear();
         }
 
         [Test]
@@ -115,4 +115,4 @@ namespace Abc.Zebus.Tests.Persistence
             updatedPeer.UpdateAction.ShouldEqual(updateAction);
         }
     }
-}
+}

+ 52 - 4
src/Abc.Zebus.Tests/Persistence/PersistentTransportTests.cs

@@ -14,6 +14,7 @@ using Abc.Zebus.Tests.Messages;
 using Abc.Zebus.Transport;
 using Abc.Zebus.Util;
 using Abc.Zebus.Util.Extensions;
+using Moq;
 using NUnit.Framework;
 
 namespace Abc.Zebus.Tests.Persistence
@@ -135,7 +136,6 @@ namespace Abc.Zebus.Tests.Persistence
                 if (msg == failingMessage)
                     throw new Exception("Failure");
                 successfullyReceivedMessages.Add(msg);
-
             };
 
             InnerTransport.RaiseMessageReceived(new ReplayPhaseEnded(StartMessageReplayCommand.ReplayId).ToTransportMessage());
@@ -270,7 +270,7 @@ namespace Abc.Zebus.Tests.Persistence
 
                 Transport.Send(message, new[] { AnotherPersistentPeer, AnotherNonPersistentPeer });
 
-                InnerTransport.ExpectExactly(new []
+                InnerTransport.ExpectExactly(new[]
                 {
                     new TransportMessageSent(message).To(AnotherPersistentPeer, true).To(AnotherNonPersistentPeer, false).ToPersistence(PersistencePeer),
                 });
@@ -348,7 +348,7 @@ namespace Abc.Zebus.Tests.Persistence
         [Test]
         public void should_not_lose_messages_when_persistent_transport_goes_down_and_comes_back_up()
         {
-            using(MessageId.PauseIdGeneration())
+            using (MessageId.PauseIdGeneration())
             {
                 // Stopping persistence
                 InnerTransport.RaiseMessageReceived(new TransportMessage(MessageTypeId.PersistenceStopping, new MemoryStream(), PersistencePeer));
@@ -443,5 +443,53 @@ namespace Abc.Zebus.Tests.Persistence
                 Transport.Start();
             }
         }
+
+        [Test]
+        public void should_not_try_to_replay_when_no_persistence_peers_are_found()
+        {
+            // Arrange
+            SetupPeersHandlingMessage(Array.Empty<Peer>());
+            var downPeer = new Peer(new PeerId("Another.Peer"), "tcp://anotherpeer:123", false);
+
+            Transport.Send(new FakeCommand(123).ToTransportMessage(), new[] { downPeer });
+
+            // Act
+            Transport.OnPeerUpdated(new PeerId("Abc.Zebus.PersistenceService"), PeerUpdateAction.Started);
+
+            // Assert
+            InnerTransport.Messages.ShouldBeEmpty();
+            InnerTransport.MessagesSentToPersistence.ShouldBeEmpty();
+        }
+
+        [Test]
+        public void should_try_to_replay_waiting_messages_on_each_send()
+        {
+            // Arrange
+            SetupPeersHandlingMessage(Array.Empty<Peer>());
+            var downPeer = new Peer(new PeerId("Another.Peer"), "tcp://anotherpeer:123", false);
+
+            var messageToReplay = new FakeCommand(123).ToTransportMessage();
+            Transport.Send(messageToReplay, new[] { downPeer });
+            InnerTransport.Messages.ShouldBeEmpty();
+            InnerTransport.MessagesSentToPersistence.ShouldBeEmpty();
+
+            // Act
+            SetupPeersHandlingMessage(new[] { PersistencePeer });
+            var messageToEnqueue = new FakeCommand(124).ToTransportMessage();
+            Transport.Send(messageToEnqueue, new[] { downPeer });
+
+            // Assert
+            InnerTransport.MessagesSentToPersistence.Count.ShouldEqual(2);
+            foreach (var message in InnerTransport.MessagesSentToPersistence)
+            {
+                message.Targets.ExpectedSingle().Id.ShouldEqual(PersistencePeer.Id);
+            }
+        }
+
+        private void SetupPeersHandlingMessage(Peer[] peers)
+        {
+            PeerDirectory.Setup(x => x.GetPeersHandlingMessage(It.IsAny<IMessage>())).Returns(peers);
+            PeerDirectory.Setup(x => x.GetPeersHandlingMessage(It.IsAny<MessageBinding>())).Returns(peers);
+        }
     }
-}
+}

+ 18 - 16
src/Abc.Zebus/Persistence/PersistentTransport.cs

@@ -33,7 +33,7 @@ namespace Abc.Zebus.Persistence
         private Phase _phase = default!;
         private Thread? _receptionThread;
         private Guid? _currentReplayId;
-        private volatile bool _persistenceIsDown;
+        private volatile bool _persistenceIsUp = true;
 
         public PersistentTransport(IBusConfiguration configuration, ITransport innerTransport, IPeerDirectory peerDirectory, IMessageSendingStrategy messageSendingStrategy)
         {
@@ -72,12 +72,13 @@ namespace Abc.Zebus.Persistence
 
             if (peerUpdateAction == PeerUpdateAction.Started)
             {
-                _persistenceIsDown = false;
-                ReplayMessagesWaitingForPersistence();
+                _persistenceIsUp = true;
+                // stop here
+                ReplayMessagesWaitingForPersistence(GetPersistencePeers());
             }
             else if (peerUpdateAction == PeerUpdateAction.Decommissioned)
             {
-                _persistenceIsDown = true;
+                _persistenceIsUp = false;
             }
         }
 
@@ -86,10 +87,13 @@ namespace Abc.Zebus.Persistence
             _phase.OnRegistered();
         }
 
-        private void ReplayMessagesWaitingForPersistence()
+        private void ReplayMessagesWaitingForPersistence(IList<Peer> persistencePeers)
         {
-            var persistencePeers = GetPersistencePeers();
-
+            if (persistencePeers.Count == 0)
+            {
+                _logger.InfoFormat("Skipping replay as no persistence peers were found, messages will be replayed before next send");
+                return;
+            }
             _logger.InfoFormat("Sending {0} enqueued messages to the persistence", _messagesWaitingForPersistence.Count);
 
             while (_messagesWaitingForPersistence.TryTake(out var messageToSend))
@@ -119,9 +123,6 @@ namespace Abc.Zebus.Persistence
         {
             _logger.InfoFormat("Enqueing in temp persistence buffer: {0}", transportMessage.Id);
             _messagesWaitingForPersistence.Add(transportMessage);
-
-            if (!_persistenceIsDown)
-                ReplayMessagesWaitingForPersistence();
         }
 
         public void Configure(PeerId peerId, string environment)
@@ -174,7 +175,11 @@ namespace Abc.Zebus.Persistence
             var targetPeers = LoadTargetPeersAndUpdateContext(peers, isMessagePersistent, context);
 
             var mustBeSendToPersistence = context.PersistentPeerIds.Count != 0;
-            context.PersistencePeer = mustBeSendToPersistence ? GetPersistencePeers().FirstOrDefault() : null;
+            var persistencePeers = mustBeSendToPersistence ? GetPersistencePeers() : null;
+            context.PersistencePeer = persistencePeers?.FirstOrDefault();
+
+            if (persistencePeers != null && persistencePeers.Count > 0)
+                ReplayMessagesWaitingForPersistence(persistencePeers);
 
             _innerTransport.Send(message, targetPeers, context);
 
@@ -182,10 +187,7 @@ namespace Abc.Zebus.Persistence
                 Enqueue(new PersistMessageCommand(message, context.PersistentPeerIds));
         }
 
-        private IList<Peer> GetPersistencePeers()
-        {
-            return _persistenceIsDown ? _emptyPeerList : _peerDirectory.GetPeersHandlingMessage(_bindingForPersistence);
-        }
+        private IList<Peer> GetPersistencePeers() => _persistenceIsUp ? _peerDirectory.GetPeersHandlingMessage(_bindingForPersistence) : _emptyPeerList;
 
         private List<Peer> LoadTargetPeersAndUpdateContext(IEnumerable<Peer> peers, bool isMessagePersistent, SendContext context)
         {
@@ -251,7 +253,7 @@ namespace Abc.Zebus.Persistence
 
             if (transportMessage.MessageTypeId == MessageTypeId.PersistenceStopping)
             {
-                _persistenceIsDown = true;
+                _persistenceIsUp = false;
 
                 var ackMessage = new TransportMessage(MessageTypeId.PersistenceStoppingAck, new MemoryStream(), _innerTransport.PeerId, _innerTransport.InboundEndPoint);