Răsfoiți Sursa

Add command to remove message from persistence

Olivier Coanet 3 ani în urmă
părinte
comite
2a679a75d3

+ 0 - 24
src/Abc.Zebus.Persistence.Messages/DeleteBucketCommand.cs

@@ -1,24 +0,0 @@
-using ProtoBuf;
-
-namespace Abc.Zebus.Persistence.Messages
-{
-    [ProtoContract]
-    [MessageTypeId("a60ad98e-dcc6-4688-949d-0584603e853a")]
-    public class DeleteBucketCommand : ICommand
-    {
-        [ProtoMember(1, IsRequired = true)] public readonly string BucketName;
-        [ProtoMember(2, IsRequired = true)] public readonly string InstanceName;
-
-        private DeleteBucketCommand()
-        {
-            BucketName = default!;
-            InstanceName = default!;
-        }
-
-        public DeleteBucketCommand(string bucketName, string instanceName)
-        {
-            BucketName = bucketName;
-            InstanceName = instanceName;
-        }
-    }
-}

+ 26 - 0
src/Abc.Zebus.Persistence.Messages/RemoveMessageFromQueueCommand.cs

@@ -0,0 +1,26 @@
+using JetBrains.Annotations;
+using ProtoBuf;
+
+namespace Abc.Zebus.Persistence.Messages
+{
+    [ProtoContract]
+    public class RemoveMessageFromQueueCommand : ICommand
+    {
+        [ProtoMember(1)]
+        public readonly PeerId PeerId;
+
+        [ProtoMember(2)]
+        public readonly MessageId MessageId;
+
+        public RemoveMessageFromQueueCommand(PeerId peerId, MessageId messageId)
+        {
+            PeerId = peerId;
+            MessageId = messageId;
+        }
+
+        [UsedImplicitly]
+        private RemoveMessageFromQueueCommand()
+        {
+        }
+    }
+}

+ 34 - 7
src/Abc.Zebus.Persistence.Tests/Handlers/MessageHandledHandlerTests.cs

@@ -1,5 +1,6 @@
 using Abc.Zebus.Persistence.Handlers;
 using Abc.Zebus.Persistence.Matching;
+using Abc.Zebus.Persistence.Messages;
 using Abc.Zebus.Persistence.Tests.TestUtil;
 using Moq;
 using NUnit.Framework;
@@ -11,36 +12,62 @@ namespace Abc.Zebus.Persistence.Tests.Handlers
         private readonly PeerId _targetPeerId = new PeerId("Abc.Testing.Target.0");
 
         [Test]
-        public void should_insert_message_ack()
+        public void should_insert_message_ack_for_handled_message()
         {
             var inMemoryMessageMatcher = MockContainer.GetMock<IInMemoryMessageMatcher>();
 
             var messageId = MessageId.NextId();
+
             Handler.Context = MessageContext.CreateOverride(_targetPeerId, null);
-                
             Handler.Handle(new MessageHandled(messageId));
 
             inMemoryMessageMatcher.Verify(x => x.EnqueueAck(_targetPeerId, messageId));
         }
 
         [Test]
-        public void should_forward_MessageHandled_to_active_replayers()
+        public void should_forward_handled_message_to_active_replayers()
         {
             var messageReplayerMock = new Mock<IMessageReplayer>();
             MockContainer.GetMock<IMessageReplayerRepository>().Setup(x => x.GetActiveMessageReplayer(_targetPeerId)).Returns(messageReplayerMock.Object);
 
+            var messageId = MessageId.NextId();
+
             Handler.Context = MessageContext.CreateOverride(_targetPeerId, null);
-            var messageHandled = new MessageHandled(MessageId.NextId());
-            Handler.Handle(messageHandled);
+            Handler.Handle(new MessageHandled(messageId));
 
-            messageReplayerMock.Verify(x => x.Handle(messageHandled));
+            messageReplayerMock.Verify(x => x.OnMessageAcked(messageId));
+        }
+
+        [Test]
+        public void should_insert_message_ack_for_removed_message()
+        {
+            var inMemoryMessageMatcher = MockContainer.GetMock<IInMemoryMessageMatcher>();
+
+            var messageId = MessageId.NextId();
+
+            Handler.Handle(new RemoveMessageFromQueueCommand(_targetPeerId, messageId));
+
+            inMemoryMessageMatcher.Verify(x => x.EnqueueAck(_targetPeerId, messageId));
+        }
+
+        [Test]
+        public void should_forward_removed_message_to_active_replayers()
+        {
+            var messageReplayerMock = new Mock<IMessageReplayer>();
+            MockContainer.GetMock<IMessageReplayerRepository>().Setup(x => x.GetActiveMessageReplayer(_targetPeerId)).Returns(messageReplayerMock.Object);
+
+            var messageId = MessageId.NextId();
+
+            Handler.Handle(new RemoveMessageFromQueueCommand(_targetPeerId, messageId));
+
+            messageReplayerMock.Verify(x => x.OnMessageAcked(messageId));
         }
 
         [Test]
         public void should_not_throw_exception_if_no_replayer_for_given_MessageHandled()
         {
             Handler.Context = MessageContext.CreateOverride(_targetPeerId, null);
-            
+
             Assert.DoesNotThrow(() => Handler.Handle(new MessageHandled(MessageId.NextId())));
         }
     }

+ 1 - 1
src/Abc.Zebus.Persistence.Tests/MessageReplayerTests.cs

@@ -234,7 +234,7 @@ namespace Abc.Zebus.Persistence.Tests
                 {
                     while (_transport.Messages.Count > messageIndex && messageIndex < unackedTransportMessages.Count)
                     {
-                        _replayer.Handle(new MessageHandled(unackedTransportMessages[messageIndex].Id));
+                        _replayer.OnMessageAcked(unackedTransportMessages[messageIndex].Id);
                         messageIndex++;
                     }
 

+ 16 - 0
src/Abc.Zebus.Persistence.Tests/SerializationTests.cs

@@ -0,0 +1,16 @@
+using Abc.Zebus.Persistence.Messages;
+using Abc.Zebus.Testing;
+using NUnit.Framework;
+
+namespace Abc.Zebus.Persistence.Tests
+{
+    [TestFixture]
+    public class SerializationTests
+    {
+        [Test]
+        public void should_serialize_messages()
+        {
+            MessageSerializationTester.CheckSerializationForTypesInSameAssemblyAs<PublishNonAckMessagesCountCommand>();
+        }
+    }
+}

+ 17 - 6
src/Abc.Zebus.Persistence/Handlers/MessageHandledHandler.cs

@@ -1,10 +1,11 @@
 using System.Linq;
 using Abc.Zebus.Persistence.Matching;
+using Abc.Zebus.Persistence.Messages;
 using log4net;
 
 namespace Abc.Zebus.Persistence.Handlers
 {
-    public class MessageHandledHandler : IMessageHandler<MessageHandled>, IMessageContextAware
+    public class MessageHandledHandler : IMessageHandler<MessageHandled>, IMessageHandler<RemoveMessageFromQueueCommand>, IMessageContextAware
     {
         private static readonly ILog _log = LogManager.GetLogger(typeof(MessageHandledHandler));
 
@@ -23,13 +24,23 @@ namespace Abc.Zebus.Persistence.Handlers
 
         public void Handle(MessageHandled message)
         {
-            if (_configuration.PeerIdsToInvestigate != null && _configuration.PeerIdsToInvestigate.Contains(Context!.SenderId.ToString()))
-                _log.Info($"Ack received from peer {Context.SenderId}. MessageId: {message.MessageId}");
+            AckMessage(Context!.SenderId, message.MessageId);
+        }
+
+        public void Handle(RemoveMessageFromQueueCommand message)
+        {
+            AckMessage(message.PeerId, message.MessageId);
+        }
+
+        private void AckMessage(PeerId peerId, MessageId messageId)
+        {
+            if (_configuration.PeerIdsToInvestigate != null && _configuration.PeerIdsToInvestigate.Contains(peerId.ToString()))
+                _log.Info($"Ack received from peer {peerId}. MessageId: {messageId}");
 
-            _inMemoryMessageMatcher.EnqueueAck(Context!.SenderId, message.MessageId);
+            _inMemoryMessageMatcher.EnqueueAck(peerId, messageId);
 
-            var activeMessageReplayer = _messageReplayerRepository.GetActiveMessageReplayer(Context.SenderId);
-            activeMessageReplayer?.Handle(message);
+            var activeMessageReplayer = _messageReplayerRepository.GetActiveMessageReplayer(peerId);
+            activeMessageReplayer?.OnMessageAcked(messageId);
         }
     }
 }

+ 2 - 2
src/Abc.Zebus.Persistence/IMessageReplayer.cs

@@ -10,6 +10,6 @@ namespace Abc.Zebus.Persistence
         void AddLiveMessage(TransportMessage message);
         void Start();
         bool Cancel();
-        void Handle(MessageHandled messageHandled);
+        void OnMessageAcked(MessageId messageId);
     }
-}
+}

+ 5 - 5
src/Abc.Zebus.Persistence/MessageReplayer.cs

@@ -90,6 +90,11 @@ namespace Abc.Zebus.Persistence
             return false;
         }
 
+        public void OnMessageAcked(MessageId messageId)
+        {
+            _unackedIds.Remove(messageId);
+        }
+
         public bool WaitForCompletion(TimeSpan timeout)
         {
             return _runThread?.Join(timeout) ?? true;
@@ -229,11 +234,6 @@ namespace Abc.Zebus.Persistence
             return new TransportMessage(message.TypeId(), _messageSerializer.Serialize(message), _self) { WasPersisted = wasPersisted };
         }
 
-        public void Handle(MessageHandled messageHandled)
-        {
-            _unackedIds.Remove(messageHandled.MessageId);
-        }
-
         private Lazy<TimeSpan> MeasureDuration()
         {
             var beginning = _stopwatch.Elapsed;

+ 1 - 1
src/Abc.Zebus.Testing/MessageSerializationTester.cs

@@ -92,7 +92,7 @@ namespace Abc.Zebus.Testing
             var result = comparer.Compare(message, messageCopy);
 
             if (!result.AreEqual)
-                Assert.Fail(result.DifferencesString);
+                Assert.Fail($"Messages are not equal after serialization{Environment.NewLine}Message type: {messageType.Name}{Environment.NewLine}{result.DifferencesString}");
         }
     }
 }