فهرست منبع

Directory: Ignore timestamps in responding state update commands

Olivier Coanet 4 ماه پیش
والد
کامیت
e08311afa7

+ 6 - 5
src/Abc.Zebus.Directory.Tests/DeadPeerDetection/DeadPeerDetectorTests.cs

@@ -5,6 +5,7 @@ using System.Threading;
 using System.Threading.Tasks;
 using Abc.Zebus.Directory.Configuration;
 using Abc.Zebus.Directory.DeadPeerDetection;
+using Abc.Zebus.Directory.Messages;
 using Abc.Zebus.Directory.Storage;
 using Abc.Zebus.Testing;
 using Abc.Zebus.Testing.Extensions;
@@ -263,7 +264,7 @@ namespace Abc.Zebus.Directory.Tests.DeadPeerDetection
 
                 SystemDateTime.PauseTime(startTime.AddSeconds(_transientPeerTimeout + 1));
                 _detector.DetectDeadPeers();
-                _bus.ExpectExactly(new MarkPeerAsNotRespondingCommand(_transientAlivePeer0.Peer.Id, firstPingTimestampUtc));
+                _bus.ExpectExactly(new MarkPeerAsNotRespondingInternalCommand(_transientAlivePeer0.Peer.Id, firstPingTimestampUtc));
             }
         }
 
@@ -296,7 +297,7 @@ namespace Abc.Zebus.Directory.Tests.DeadPeerDetection
 
                 SystemDateTime.PauseTime(startTime.AddSeconds(_persistentPeerTimeout + 1));
                 _detector.DetectDeadPeers();
-                _bus.ExpectExactly(new MarkPeerAsNotRespondingCommand(_persistentAlivePeer.Peer.Id, firstPingTimestampUtc));
+                _bus.ExpectExactly(new MarkPeerAsNotRespondingInternalCommand(_persistentAlivePeer.Peer.Id, firstPingTimestampUtc));
             }
         }
 
@@ -420,7 +421,7 @@ namespace Abc.Zebus.Directory.Tests.DeadPeerDetection
 
                 SystemDateTime.PauseTime(startTime.AddSeconds(_persistentPeerTimeout + 1));
                 _detector.DetectDeadPeers();
-                _bus.Expect(new MarkPeerAsNotRespondingCommand(_persistentAlivePeer.Peer.Id, firstPingTimestampUtc));
+                _bus.Expect(new MarkPeerAsNotRespondingInternalCommand(_persistentAlivePeer.Peer.Id, firstPingTimestampUtc));
                 _bus.ClearMessages();
 
                 // simulate MarkPeerAsNotRespondingCommand handler
@@ -433,7 +434,7 @@ namespace Abc.Zebus.Directory.Tests.DeadPeerDetection
 
                 SystemDateTime.PauseTime(SystemDateTime.UtcNow.Add(_pingInterval));
                 _detector.DetectDeadPeers();
-                _bus.Expect(new MarkPeerAsRespondingCommand(_persistentAlivePeer.Peer.Id, _persistentAlivePeer.TimestampUtc.Value + 1.Millisecond()));
+                _bus.Expect(new MarkPeerAsRespondingInternalCommand(_persistentAlivePeer.Peer.Id, _persistentAlivePeer.TimestampUtc.Value + 1.Millisecond()));
             }
         }
 
@@ -550,7 +551,7 @@ namespace Abc.Zebus.Directory.Tests.DeadPeerDetection
                 // Peer marked as not responding
                 SystemDateTime.AddToPausedTime(_persistentPeerTimeout.Seconds());
                 _detector.DetectDeadPeers();
-                _bus.Expect(new MarkPeerAsNotRespondingCommand(_persistentAlivePeer.PeerId, initialTime));
+                _bus.Expect(new MarkPeerAsNotRespondingInternalCommand(_persistentAlivePeer.PeerId, initialTime));
 
                 // Peer removed
                 SetupPeerRepository();

+ 30 - 2
src/Abc.Zebus.Directory.Tests/Handlers/DirectoryCommandsHandlerTests.cs

@@ -2,6 +2,7 @@
 using System.Linq;
 using Abc.Zebus.Directory.Configuration;
 using Abc.Zebus.Directory.Handlers;
+using Abc.Zebus.Directory.Messages;
 using Abc.Zebus.Directory.Storage;
 using Abc.Zebus.Routing;
 using Abc.Zebus.Testing;
@@ -473,9 +474,23 @@ namespace Abc.Zebus.Directory.Tests.Handlers
             var peerDescriptor = TestData.PersistentPeerDescriptor("tcp://abctest:123");
             _repositoryMock.Setup(x => x.Get(peerDescriptor.PeerId)).Returns(peerDescriptor);
 
+            _handler.Handle(new MarkPeerAsNotRespondingCommand(peerDescriptor.PeerId, default));
+
+            var expectedTimestampUtc = peerDescriptor.TimestampUtc.Value.AddMilliseconds(1);
+            _repositoryMock.Verify(x => x.SetPeerResponding(peerDescriptor.PeerId, false, expectedTimestampUtc));
+            _bus.ExpectExactly(new PeerNotResponding(peerDescriptor.PeerId, expectedTimestampUtc));
+        }
+
+        [Test]
+        public void should_persist_not_responding_peer_with_timestamp()
+        {
+
+            var peerDescriptor = TestData.PersistentPeerDescriptor("tcp://abctest:123");
+            _repositoryMock.Setup(x => x.Get(peerDescriptor.PeerId)).Returns(peerDescriptor);
+
             var timestampUtc = peerDescriptor.TimestampUtc.Value.AddSeconds(1);
 
-            _handler.Handle(new MarkPeerAsNotRespondingCommand(peerDescriptor.PeerId, timestampUtc));
+            _handler.Handle(new MarkPeerAsNotRespondingInternalCommand(peerDescriptor.PeerId, timestampUtc));
 
             _repositoryMock.Verify(x => x.SetPeerResponding(peerDescriptor.PeerId, false, timestampUtc));
             _bus.ExpectExactly(new PeerNotResponding(peerDescriptor.PeerId, timestampUtc));
@@ -487,9 +502,22 @@ namespace Abc.Zebus.Directory.Tests.Handlers
             var peerDescriptor = TestData.PersistentPeerDescriptor("tcp://abctest:123", typeof(FakeCommand));
             _repositoryMock.Setup(x => x.Get(peerDescriptor.PeerId)).Returns(peerDescriptor);
 
+            _handler.Handle(new MarkPeerAsRespondingCommand(peerDescriptor.PeerId, default));
+
+            var expectedTimestampUtc = peerDescriptor.TimestampUtc.Value.AddMilliseconds(1);
+            _repositoryMock.Verify(x => x.SetPeerResponding(peerDescriptor.PeerId, true, expectedTimestampUtc));
+            _bus.ExpectExactly(new PeerResponding(peerDescriptor.PeerId, expectedTimestampUtc));
+        }
+
+        [Test]
+        public void should_persist_responding_peer_with_timestamp()
+        {
+            var peerDescriptor = TestData.PersistentPeerDescriptor("tcp://abctest:123", typeof(FakeCommand));
+            _repositoryMock.Setup(x => x.Get(peerDescriptor.PeerId)).Returns(peerDescriptor);
+
             var timestampUtc = peerDescriptor.TimestampUtc.Value.AddSeconds(1);
 
-            _handler.Handle(new MarkPeerAsRespondingCommand(peerDescriptor.PeerId, timestampUtc));
+            _handler.Handle(new MarkPeerAsRespondingInternalCommand(peerDescriptor.PeerId, timestampUtc));
 
             _repositoryMock.Verify(x => x.SetPeerResponding(peerDescriptor.PeerId, true, timestampUtc));
 

+ 3 - 2
src/Abc.Zebus.Directory/DeadPeerDetection/DeadPeerDetector.cs

@@ -6,6 +6,7 @@ using System.Text.RegularExpressions;
 using System.Threading;
 using System.Threading.Tasks;
 using Abc.Zebus.Directory.Configuration;
+using Abc.Zebus.Directory.Messages;
 using Abc.Zebus.Directory.Storage;
 using Abc.Zebus.Util;
 using Abc.Zebus.Util.Extensions;
@@ -124,7 +125,7 @@ namespace Abc.Zebus.Directory.DeadPeerDetection
             }
             else if (descriptor.Peer.IsResponding)
             {
-                _bus.Send(new MarkPeerAsNotRespondingCommand(descriptor.PeerId, timeoutTimestampUtc)).Wait(_commandTimeout);
+                _bus.Send(new MarkPeerAsNotRespondingInternalCommand(descriptor.PeerId, timeoutTimestampUtc)).Wait(_commandTimeout);
                 descriptor.Peer.IsResponding = false;
             }
         }
@@ -150,7 +151,7 @@ namespace Abc.Zebus.Directory.DeadPeerDetection
 
         private void OnPeerResponding(DeadPeerDetectorEntry entry, DateTime timestampUtc)
         {
-            _bus.Send(new MarkPeerAsRespondingCommand(entry.Descriptor.PeerId, timestampUtc)).Wait(_commandTimeout);
+            _bus.Send(new MarkPeerAsRespondingInternalCommand(entry.Descriptor.PeerId, timestampUtc)).Wait(_commandTimeout);
         }
 
         public void Start()

+ 18 - 1
src/Abc.Zebus.Directory/Handlers/DirectoryCommandsHandler.cs

@@ -4,6 +4,7 @@ using System.Diagnostics;
 using System.Diagnostics.CodeAnalysis;
 using System.Linq;
 using Abc.Zebus.Directory.Configuration;
+using Abc.Zebus.Directory.Messages;
 using Abc.Zebus.Directory.Storage;
 using Abc.Zebus.Util;
 using Abc.Zebus.Util.Extensions;
@@ -17,6 +18,8 @@ namespace Abc.Zebus.Directory.Handlers
                                             IMessageHandler<UpdatePeerSubscriptionsForTypesCommand>,
                                             IMessageHandler<MarkPeerAsRespondingCommand>,
                                             IMessageHandler<MarkPeerAsNotRespondingCommand>,
+                                            IMessageHandler<MarkPeerAsRespondingInternalCommand>,
+                                            IMessageHandler<MarkPeerAsNotRespondingInternalCommand>,
                                             IMessageContextAware
     {
         private readonly HashSet<string> _blacklistedMachines;
@@ -151,12 +154,26 @@ namespace Abc.Zebus.Directory.Handlers
         }
 
         public void Handle(MarkPeerAsRespondingCommand message)
+        {
+            // Explicitly ignores the command timestamp to avoid update / delete race conditions.
+            if (_peerRepository.SetPeerRespondingState(message.PeerId, true, out var timestampUtc))
+                _bus.Publish(new PeerResponding(message.PeerId, timestampUtc));
+        }
+
+        public void Handle(MarkPeerAsNotRespondingCommand message)
+        {
+            // Explicitly ignores the command timestamp to avoid update / delete race conditions.
+            if (_peerRepository.SetPeerRespondingState(message.PeerId, false, out var timestampUtc))
+                _bus.Publish(new PeerNotResponding(message.PeerId, timestampUtc));
+        }
+
+        public void Handle(MarkPeerAsRespondingInternalCommand message)
         {
             if (_peerRepository.SetPeerRespondingState(message.PeerId, true, message.TimestampUtc))
                 _bus.Publish(new PeerResponding(message.PeerId, message.TimestampUtc));
         }
 
-        public void Handle(MarkPeerAsNotRespondingCommand message)
+        public void Handle(MarkPeerAsNotRespondingInternalCommand message)
         {
             if (_peerRepository.SetPeerRespondingState(message.PeerId, false, message.TimestampUtc))
                 _bus.Publish(new PeerNotResponding(message.PeerId, message.TimestampUtc));

+ 23 - 0
src/Abc.Zebus.Directory/Messages/MarkPeerAsNotRespondingInternalCommand.cs

@@ -0,0 +1,23 @@
+using System;
+using ProtoBuf;
+
+namespace Abc.Zebus.Directory.Messages;
+
+[ProtoContract]
+public class MarkPeerAsNotRespondingInternalCommand : ICommand
+{
+    [ProtoMember(1, IsRequired = true)]
+    public readonly PeerId PeerId;
+
+    [ProtoMember(2, IsRequired = true)]
+    public readonly DateTime TimestampUtc;
+
+    public MarkPeerAsNotRespondingInternalCommand(PeerId peerId, DateTime timestampUtc)
+    {
+        PeerId = peerId;
+        TimestampUtc = timestampUtc;
+    }
+
+    public override string ToString()
+        => PeerId.ToString();
+}

+ 23 - 0
src/Abc.Zebus.Directory/Messages/MarkPeerAsRespondingCommand.cs

@@ -0,0 +1,23 @@
+using System;
+using ProtoBuf;
+
+namespace Abc.Zebus.Directory.Messages;
+
+[ProtoContract]
+public class MarkPeerAsRespondingInternalCommand : ICommand
+{
+    [ProtoMember(1, IsRequired = true)]
+    public readonly PeerId PeerId;
+
+    [ProtoMember(2, IsRequired = true)]
+    public readonly DateTime TimestampUtc;
+
+    public MarkPeerAsRespondingInternalCommand(PeerId peerId, DateTime timestampUtc)
+    {
+        PeerId = peerId;
+        TimestampUtc = timestampUtc;
+    }
+
+    public override string ToString()
+        => PeerId.ToString();
+}

+ 15 - 0
src/Abc.Zebus.Directory/Storage/ExtendPeerRepository.cs

@@ -18,6 +18,21 @@ namespace Abc.Zebus.Directory.Storage
             return true;
         }
 
+        public static bool SetPeerRespondingState(this IPeerRepository repository, PeerId peerId, bool isResponding, out DateTime timestampUtc)
+        {
+            var peer = repository.Get(peerId);
+            if (peer == null || peer.TimestampUtc == null)
+            {
+                timestampUtc = default;
+                return false;
+            }
+
+            timestampUtc = peer.TimestampUtc.Value.AddMilliseconds(1);
+            repository.SetPeerResponding(peer.PeerId, isResponding, peer.TimestampUtc.Value.AddMilliseconds(1));
+
+            return true;
+        }
+
         public static bool SetPeerRespondingState(this IPeerRepository repository, PeerId peerId, bool isResponding, DateTime timestampUtc)
         {
             var peer = repository.Get(peerId);