1
0
Эх сурвалжийг харах

Directory: Add timestamps to responding state updates

Previously, a responding state update would always succeed,
even when the update timestamp was outdated. Now the update
timestamp is propagated and outdated updates are discarded, both
in DirectoryCommandsHandler and in CqlPeerRepository (through
Cassandra last-write-wins).
Olivier Coanet 4 сар өмнө
parent
commit
33797bcfaa

+ 19 - 3
src/Abc.Zebus.Directory.Cassandra.Tests/Cql/CqlPeerRepositoryTests.cs

@@ -182,18 +182,34 @@ namespace Abc.Zebus.Directory.Cassandra.Tests.Cql
         public void should_mark_peer_as_responding()
         {
             var descriptor = _peer1.ToPeerDescriptorWithRoundedTime(true);
-            descriptor.TimestampUtc = DateTime.UtcNow.AddTicks(-10);
+            var baseTimestampUtc = DateTime.UtcNow;
+            descriptor.TimestampUtc = baseTimestampUtc;
             _repository.AddOrUpdatePeer(descriptor);
 
-            _repository.SetPeerResponding(_peer1.Id, false);
+            _repository.SetPeerResponding(_peer1.Id, false, baseTimestampUtc.AddMilliseconds(1));
             _repository.Get(_peer1.Id).Peer.IsResponding.ShouldBeFalse();
             _repository.GetPeers().ExpectedSingle().Peer.IsResponding.ShouldBeFalse();
 
-            _repository.SetPeerResponding(_peer1.Id, true);
+            _repository.SetPeerResponding(_peer1.Id, true, baseTimestampUtc.AddMilliseconds(1));
             _repository.Get(_peer1.Id).Peer.IsResponding.ShouldBeTrue();
             _repository.GetPeers().ExpectedSingle().Peer.IsResponding.ShouldBeTrue();
         }
 
+
+        [Test]
+        public void should_not_mark_removed_peer_as_responding()
+        {
+            var descriptor = _peer1.ToPeerDescriptorWithRoundedTime(true);
+            var baseTimestampUtc = DateTime.UtcNow;
+            descriptor.TimestampUtc = baseTimestampUtc;
+            _repository.AddOrUpdatePeer(descriptor);
+
+            _repository.RemovePeer(descriptor.PeerId, baseTimestampUtc.AddMilliseconds(100));
+            _repository.SetPeerResponding(_peer1.Id, true, baseTimestampUtc.AddMilliseconds(99));
+
+            _repository.GetPeers().ShouldBeEmpty();
+        }
+
         [Test]
         public void should_handle_peers_with_null_subscriptions_gracefully()
         {

+ 9 - 6
src/Abc.Zebus.Directory.Cassandra/Cql/CqlPeerRepository.cs

@@ -81,31 +81,34 @@ namespace Abc.Zebus.Directory.Cassandra.Storage
 
         public void RemovePeer(PeerId peerId)
         {
-            var now = DateTime.UtcNow;
+            RemovePeer(peerId, DateTime.UtcNow);
+        }
 
+        public void RemovePeer(PeerId peerId, DateTime timestampUtc)
+        {
             _dataContext.Peers
                         .SetConsistencyLevel(ConsistencyLevel.LocalQuorum)
                         .Where(peer => peer.PeerId == peerId.ToString())
                         .Delete()
-                        .SetTimestamp(now)
+                        .SetTimestamp(timestampUtc)
                         .Execute();
 
             _dataContext.DynamicSubscriptions
                         .SetConsistencyLevel(ConsistencyLevel.LocalQuorum)
                         .Where(s => s.PeerId == peerId.ToString())
                         .Delete()
-                        .SetTimestamp(now)
+                        .SetTimestamp(timestampUtc)
                         .Execute();
         }
 
-        public void SetPeerResponding(PeerId peerId, bool isResponding)
+        public void SetPeerResponding(PeerId peerId, bool isResponding, DateTime timestampUtc)
         {
             _dataContext.Peers
                         .SetConsistencyLevel(ConsistencyLevel.LocalQuorum)
                         .Where(peer => peer.PeerId == peerId.ToString())
-                        .Select(peer =>  new CassandraPeer { IsResponding = isResponding })
+                        .Select(peer =>  new CassandraPeer { IsResponding = isResponding, TimestampUtc = timestampUtc})
                         .Update()
-                        .SetTimestamp(DateTime.UtcNow)
+                        .SetTimestamp(timestampUtc)
                         .Execute();
 
         }

+ 5 - 3
src/Abc.Zebus.Directory.RocksDb.Tests/RocksDbPeerRepositoryTests.cs

@@ -126,15 +126,17 @@ namespace Abc.Zebus.Directory.RocksDb.Tests
         [Test]
         public void should_mark_peer_as_responding()
         {
+            var baseTimestampUtc = DateTime.UtcNow;
+
             var descriptor = CreatePeerDescriptor(_peer1.Id);
-            descriptor.TimestampUtc = DateTime.UtcNow.AddTicks(-10);
+            descriptor.TimestampUtc = baseTimestampUtc;
             _repository.AddOrUpdatePeer(descriptor);
 
-            _repository.SetPeerResponding(_peer1.Id, false);
+            _repository.SetPeerResponding(_peer1.Id, false, baseTimestampUtc.AddMilliseconds(1));
             _repository.Get(_peer1.Id).Peer.IsResponding.ShouldBeFalse();
             _repository.GetPeers().ExpectedSingle().Peer.IsResponding.ShouldBeFalse();
 
-            _repository.SetPeerResponding(_peer1.Id, true);
+            _repository.SetPeerResponding(_peer1.Id, true, baseTimestampUtc.AddMilliseconds(1));
             _repository.Get(_peer1.Id).Peer.IsResponding.ShouldBeTrue();
             _repository.GetPeers().ExpectedSingle().Peer.IsResponding.ShouldBeTrue();
         }

+ 2 - 1
src/Abc.Zebus.Directory.RocksDb/Storage/RocksDbPeerRepository.cs

@@ -252,12 +252,13 @@ namespace Abc.Zebus.Directory.RocksDb.Storage
             RemoveAllDynamicSubscriptionsForPeer(peerId);
         }
 
-        public void SetPeerResponding(PeerId peerId, bool isResponding)
+        public void SetPeerResponding(PeerId peerId, bool isResponding, DateTime timestampUtc)
         {
             var peer = Get(peerId);
             if (peer != null)
             {
                 peer.Peer.IsResponding = isResponding;
+                peer.TimestampUtc = timestampUtc;
                 AddOrUpdatePeer(peer);
             }
         }

+ 1 - 1
src/Abc.Zebus.Directory.Tests/DeadPeerDetection/DeadPeerDetectorTests.cs

@@ -433,7 +433,7 @@ namespace Abc.Zebus.Directory.Tests.DeadPeerDetection
 
                 SystemDateTime.PauseTime(SystemDateTime.UtcNow.Add(_pingInterval));
                 _detector.DetectDeadPeers();
-                _bus.Expect(new MarkPeerAsRespondingCommand(_persistentAlivePeer.Peer.Id, SystemDateTime.UtcNow));
+                _bus.Expect(new MarkPeerAsRespondingCommand(_persistentAlivePeer.Peer.Id, _persistentAlivePeer.TimestampUtc.Value + 1.Millisecond()));
             }
         }
 

+ 15 - 12
src/Abc.Zebus.Directory.Tests/Handlers/DirectoryCommandsHandlerTests.cs

@@ -471,40 +471,43 @@ namespace Abc.Zebus.Directory.Tests.Handlers
         public void should_persist_not_responding_peer()
         {
             var peerDescriptor = TestData.PersistentPeerDescriptor("tcp://abctest:123");
-
             _repositoryMock.Setup(x => x.Get(peerDescriptor.PeerId)).Returns(peerDescriptor);
 
-            _handler.Handle(new MarkPeerAsNotRespondingCommand(peerDescriptor.PeerId, SystemDateTime.UtcNow));
+            var timestampUtc = peerDescriptor.TimestampUtc.Value.AddSeconds(1);
 
-            _repositoryMock.Verify(x => x.SetPeerResponding(peerDescriptor.PeerId, false));
+            _handler.Handle(new MarkPeerAsNotRespondingCommand(peerDescriptor.PeerId, timestampUtc));
 
-            var notRespondingEvent = _bus.Events.OfType<PeerNotResponding>().ExpectedSingle();
-            notRespondingEvent.PeerId.ShouldEqual(peerDescriptor.PeerId);
+            _repositoryMock.Verify(x => x.SetPeerResponding(peerDescriptor.PeerId, false, timestampUtc));
+            _bus.ExpectExactly(new PeerNotResponding(peerDescriptor.PeerId, timestampUtc));
         }
 
         [Test]
         public void should_persist_responding_peer()
         {
             var peerDescriptor = TestData.PersistentPeerDescriptor("tcp://abctest:123", typeof(FakeCommand));
+            _repositoryMock.Setup(x => x.Get(peerDescriptor.PeerId)).Returns(peerDescriptor);
 
-            _handler.Handle(new MarkPeerAsRespondingCommand(peerDescriptor.PeerId, SystemDateTime.UtcNow));
+            var timestampUtc = peerDescriptor.TimestampUtc.Value.AddSeconds(1);
 
-            _repositoryMock.Verify(x => x.SetPeerResponding(peerDescriptor.PeerId, true));
+            _handler.Handle(new MarkPeerAsRespondingCommand(peerDescriptor.PeerId, timestampUtc));
 
-            var respondingEvent = _bus.Events.OfType<PeerResponding>().ExpectedSingle();
-            respondingEvent.PeerId.ShouldEqual(peerDescriptor.PeerId);
+            _repositoryMock.Verify(x => x.SetPeerResponding(peerDescriptor.PeerId, true, timestampUtc));
+
+            _bus.ExpectExactly(new PeerResponding(peerDescriptor.PeerId, timestampUtc));
         }
 
         [Test]
-        public void should_not_revive_decommissionned_peer()
+        public void should_not_revive_decommissioned_peer()
         {
             var peerDescriptor = TestData.PersistentPeerDescriptor("tcp://abctest:123", typeof(FakeCommand));
 
             _repositoryMock.Setup(x => x.Get(peerDescriptor.PeerId)).Returns((PeerDescriptor)null);
 
-            _handler.Handle(new MarkPeerAsNotRespondingCommand(peerDescriptor.PeerId, SystemDateTime.UtcNow));
+            var timestampUtc = peerDescriptor.TimestampUtc.Value.AddSeconds(-1);
+
+            _handler.Handle(new MarkPeerAsNotRespondingCommand(peerDescriptor.PeerId, timestampUtc));
 
-            _repositoryMock.Verify(repo => repo.SetPeerResponding(It.IsAny<PeerId>(), It.IsAny<bool>()), Times.Never());
+            _repositoryMock.Verify(repo => repo.SetPeerResponding(It.IsAny<PeerId>(), It.IsAny<bool>(), It.IsAny<DateTime>()), Times.Never());
         }
     }
 }

+ 2 - 2
src/Abc.Zebus.Directory/DeadPeerDetection/DeadPeerDetector.cs

@@ -148,9 +148,9 @@ namespace Abc.Zebus.Directory.DeadPeerDetection
             return true;
         }
 
-        private void OnPeerResponding(DeadPeerDetectorEntry entry, DateTime timeoutTimestampUtc)
+        private void OnPeerResponding(DeadPeerDetectorEntry entry, DateTime timestampUtc)
         {
-            _bus.Send(new MarkPeerAsRespondingCommand(entry.Descriptor.PeerId, timeoutTimestampUtc)).Wait(_commandTimeout);
+            _bus.Send(new MarkPeerAsRespondingCommand(entry.Descriptor.PeerId, timestampUtc)).Wait(_commandTimeout);
         }
 
         public void Start()

+ 3 - 2
src/Abc.Zebus.Directory/DeadPeerDetection/DeadPeerDetectorEntry.cs

@@ -164,7 +164,7 @@ namespace Abc.Zebus.Directory.DeadPeerDetection
                 return;
             }
 
-            var pingTimestampUtc = (DateTime)state!;
+            var timestampUtc = default(DateTime);
             var peerRespondingDetected = false;
 
             lock (_lock)
@@ -175,6 +175,7 @@ namespace Abc.Zebus.Directory.DeadPeerDetection
                     return;
                 }
 
+                timestampUtc = Descriptor.TimestampUtc + TimeSpan.FromMilliseconds(1) ?? SystemDateTime.UtcNow;
                 var peer = Descriptor.Peer;
                 if (!peer.IsResponding)
                 {
@@ -184,7 +185,7 @@ namespace Abc.Zebus.Directory.DeadPeerDetection
             }
 
             if (peerRespondingDetected)
-                PeerRespondingDetected?.Invoke(this, pingTimestampUtc);
+                PeerRespondingDetected?.Invoke(this, timestampUtc);
 
             Reset();
         }

+ 4 - 11
src/Abc.Zebus.Directory/Handlers/DirectoryCommandsHandler.cs

@@ -7,7 +7,6 @@ using Abc.Zebus.Directory.Configuration;
 using Abc.Zebus.Directory.Storage;
 using Abc.Zebus.Util;
 using Abc.Zebus.Util.Extensions;
-using Microsoft.Extensions.Logging;
 
 namespace Abc.Zebus.Directory.Handlers
 {
@@ -20,7 +19,6 @@ namespace Abc.Zebus.Directory.Handlers
                                             IMessageHandler<MarkPeerAsNotRespondingCommand>,
                                             IMessageContextAware
     {
-        private static readonly ILogger _log = ZebusLogManager.GetLogger(typeof(DirectoryCommandsHandler));
         private readonly HashSet<string> _blacklistedMachines;
         private readonly IBus _bus;
         private readonly IPeerRepository _peerRepository;
@@ -154,19 +152,14 @@ namespace Abc.Zebus.Directory.Handlers
 
         public void Handle(MarkPeerAsRespondingCommand message)
         {
-            _peerRepository.SetPeerResponding(message.PeerId, true);
-            _bus.Publish(new PeerResponding(message.PeerId, message.TimestampUtc));
+            if (_peerRepository.SetPeerRespondingState(message.PeerId, true, message.TimestampUtc))
+                _bus.Publish(new PeerResponding(message.PeerId, message.TimestampUtc));
         }
 
         public void Handle(MarkPeerAsNotRespondingCommand message)
         {
-            if (_peerRepository.Get(message.PeerId) == null)
-            {
-                _log.LogWarning("MarkPeerAsNotRespondingCommand ignored because the peer cannot be found");
-                return;
-            }
-            _peerRepository.SetPeerResponding(message.PeerId, false);
-            _bus.Publish(new PeerNotResponding(message.PeerId, message.TimestampUtc));
+            if (_peerRepository.SetPeerRespondingState(message.PeerId, false, message.TimestampUtc))
+                _bus.Publish(new PeerNotResponding(message.PeerId, message.TimestampUtc));
         }
     }
 }

+ 2 - 4
src/Abc.Zebus.Directory/Storage/ExtendPeerRepository.cs

@@ -21,12 +21,10 @@ namespace Abc.Zebus.Directory.Storage
         public static bool SetPeerRespondingState(this IPeerRepository repository, PeerId peerId, bool isResponding, DateTime timestampUtc)
         {
             var peer = repository.Get(peerId);
-            if (peer == null || peer.TimestampUtc > timestampUtc)
+            if (peer == null || peer.TimestampUtc == null || peer.TimestampUtc > timestampUtc)
                 return false;
 
-            peer.Peer.IsResponding = isResponding;
-            peer.TimestampUtc = timestampUtc;
-            repository.AddOrUpdatePeer(peer);
+            repository.SetPeerResponding(peer.PeerId, isResponding, timestampUtc);
 
             return true;
         }

+ 1 - 1
src/Abc.Zebus.Directory/Storage/IPeerRepository.cs

@@ -11,7 +11,7 @@ namespace Abc.Zebus.Directory.Storage
 
         void AddOrUpdatePeer(PeerDescriptor peerDescriptor);
         void RemovePeer(PeerId peerId);
-        void SetPeerResponding(PeerId peerId, bool isResponding);
+        void SetPeerResponding(PeerId peerId, bool isResponding, DateTime timestampUtc);
 
         void AddDynamicSubscriptionsForTypes(PeerId peerId, DateTime timestampUtc, SubscriptionsForType[] subscriptionsForTypes);
         void RemoveDynamicSubscriptionsForTypes(PeerId peerId, DateTime timestampUtc, MessageTypeId[] messageTypeIds);

+ 4 - 1
src/Abc.Zebus.Directory/Storage/MemoryPeerRepository.cs

@@ -63,11 +63,14 @@ namespace Abc.Zebus.Directory.Storage
             _peers.TryRemove(peerId, out _);
         }
 
-        public void SetPeerResponding(PeerId peerId, bool isResponding)
+        public void SetPeerResponding(PeerId peerId, bool isResponding, DateTime timestampUtc)
         {
             var peerEntry = GetEntry(peerId);
             if (peerEntry != null)
+            {
+                peerEntry.PeerDescriptor.TimestampUtc = timestampUtc;
                 peerEntry.PeerDescriptor.Peer.IsResponding = isResponding;
+            }
         }
 
         private PeerEntry? GetEntry(PeerId peerId)