Browse Source

Zebus.Directory: Keep pinging peers when peer repository is unavailable

Olivier Coanet 2 years ago
parent
commit
8956a910ee

+ 96 - 18
src/Abc.Zebus.Directory.Tests/DeadPeerDetection/DeadPeerDetectorTests.cs

@@ -130,7 +130,7 @@ namespace Abc.Zebus.Directory.Tests.DeadPeerDetection
         public void should_not_timeout_if_a_transient_service_responds_to_the_second_ping()
         {
             SetupPeerRepository(_transientAlivePeer0);
-            SetupPeerResponse(_transientAlivePeer0.PeerId, false, true);
+            SetupPeerResponses(_transientAlivePeer0.PeerId, false, true);
 
             using (SystemDateTime.PauseTime())
             {
@@ -153,8 +153,8 @@ namespace Abc.Zebus.Directory.Tests.DeadPeerDetection
         public void should_not_timeout_if_a_persistent_service_responds_to_the_second_ping()
         {
             SetupPeerRepository(_persistentAlivePeer, _transientAlivePeer0);
-            SetupPeerResponse(_transientAlivePeer0.PeerId, true, true);
-            SetupPeerResponse(_persistentAlivePeer.PeerId, false, true);
+            SetupPeerResponses(_transientAlivePeer0.PeerId, true, true);
+            SetupPeerResponses(_persistentAlivePeer.PeerId, false, true);
 
             using (SystemDateTime.PauseTime())
             {
@@ -184,8 +184,8 @@ namespace Abc.Zebus.Directory.Tests.DeadPeerDetection
         public void should_not_timeout_if_a_debug_service_responds_to_the_second_ping()
         {
             SetupPeerRepository(_debugPersistentAlivePeer, _debugTransientAlivePeer);
-            SetupPeerResponse(_debugPersistentAlivePeer.PeerId, false, true);
-            SetupPeerResponse(_debugTransientAlivePeer.PeerId, false, true);
+            SetupPeerResponses(_debugPersistentAlivePeer.PeerId, false, true);
+            SetupPeerResponses(_debugTransientAlivePeer.PeerId, false, true);
 
             using (SystemDateTime.PauseTime())
             {
@@ -210,7 +210,7 @@ namespace Abc.Zebus.Directory.Tests.DeadPeerDetection
         public void should_timeout_if_a_transient_service_does_not_respond_in_time()
         {
             SetupPeerRepository(_transientAlivePeer0);
-            SetupPeerResponse(_transientAlivePeer0.PeerId, false, false);
+            SetupPeerResponses(_transientAlivePeer0.PeerId, false, false);
 
             using (SystemDateTime.PauseTime())
             {
@@ -244,7 +244,7 @@ namespace Abc.Zebus.Directory.Tests.DeadPeerDetection
         {
             _peersNotToDecommission = new[] { peerNotToDecommission };
             SetupPeerRepository(_transientAlivePeer0);
-            SetupPeerResponse(_transientAlivePeer0.PeerId, false, false);
+            SetupPeerResponses(_transientAlivePeer0.PeerId, false, false);
 
             using (SystemDateTime.PauseTime())
             {
@@ -270,8 +270,8 @@ namespace Abc.Zebus.Directory.Tests.DeadPeerDetection
         public void should_timeout_if_a_persistent_service_does_not_respond_in_time()
         {
             SetupPeerRepository(_persistentAlivePeer, _transientAlivePeer0);
-            SetupPeerResponse(_transientAlivePeer0.PeerId, true, true);
-            SetupPeerResponse(_persistentAlivePeer.PeerId, false, false);
+            SetupPeerResponses(_transientAlivePeer0.PeerId, true, true);
+            SetupPeerResponses(_persistentAlivePeer.PeerId, false, false);
 
             using (SystemDateTime.PauseTime())
             {
@@ -303,8 +303,8 @@ namespace Abc.Zebus.Directory.Tests.DeadPeerDetection
         public void should_not_decommission_directory_peer()
         {
             SetupPeerRepository(_directoryPeer, _transientAlivePeer0);
-            SetupPeerResponse(_transientAlivePeer0.PeerId, false, false);
-            SetupPeerResponse(_directoryPeer.PeerId, false, false);
+            SetupPeerResponses(_transientAlivePeer0.PeerId, false, false);
+            SetupPeerResponses(_directoryPeer.PeerId, false, false);
 
             using (SystemDateTime.PauseTime())
             {
@@ -330,7 +330,7 @@ namespace Abc.Zebus.Directory.Tests.DeadPeerDetection
         public void should_not_decommission_the_persistence()
         {
             SetupPeerRepository(_persistencePeer);
-            SetupPeerResponse(_persistencePeer.PeerId, false, false);
+            SetupPeerResponses(_persistencePeer.PeerId, false, false);
 
             using (SystemDateTime.PauseTime())
             {
@@ -370,8 +370,8 @@ namespace Abc.Zebus.Directory.Tests.DeadPeerDetection
         public void should_raise_PingMissed_before_a_peer_is_marked_as_timed_out()
         {
             SetupPeerRepository(_persistentAlivePeer, _transientAlivePeer0);
-            SetupPeerResponse(_transientAlivePeer0.PeerId, true, true);
-            SetupPeerResponse(_persistentAlivePeer.PeerId, false, false);
+            SetupPeerResponses(_transientAlivePeer0.PeerId, true, true);
+            SetupPeerResponses(_persistentAlivePeer.PeerId, false, false);
 
             using (SystemDateTime.PauseTime())
             {
@@ -401,7 +401,7 @@ namespace Abc.Zebus.Directory.Tests.DeadPeerDetection
         public void should_raise_PeerResponding_when_peer_starts_replying_again_to_ping()
         {
             SetupPeerRepository(_persistentAlivePeer);
-            SetupPeerResponse(_persistentAlivePeer.PeerId, false, false, false, true);
+            SetupPeerResponses(_persistentAlivePeer.PeerId, false, false, false, true);
 
             using (SystemDateTime.PauseTime())
             {
@@ -440,8 +440,8 @@ namespace Abc.Zebus.Directory.Tests.DeadPeerDetection
         public void should_timeout_if_any_debug_service_does_not_respond_in_time()
         {
             SetupPeerRepository(_debugPersistentAlivePeer, _debugTransientAlivePeer);
-            SetupPeerResponse(_debugPersistentAlivePeer.PeerId, false, false);
-            SetupPeerResponse(_debugTransientAlivePeer.PeerId, false, false);
+            SetupPeerResponses(_debugPersistentAlivePeer.PeerId, false, false);
+            SetupPeerResponses(_debugTransientAlivePeer.PeerId, false, false);
 
             using (SystemDateTime.PauseTime())
             {
@@ -466,12 +466,79 @@ namespace Abc.Zebus.Directory.Tests.DeadPeerDetection
             }
         }
 
+        [Test]
+        public void should_ping_peers_on_peer_repository_timeout()
+        {
+            using (SystemDateTime.PauseTime())
+            {
+                // 1: Repository is operational
+                SetupPeerRepository(_transientAlivePeer0, _transientAlivePeer1);
+                SetupPeerResponse(_transientAlivePeer0.PeerId, true);
+                SetupPeerResponse(_transientAlivePeer1.PeerId, true);
+
+                _detector.DetectDeadPeers();
+
+                _bus.ExpectExactly(PingPeerCommands(2));
+
+                SystemDateTime.AddToPausedTime(_transientPeerTimeout.Seconds());
+
+                _detector.DetectDeadPeers();
+
+                _bus.ExpectExactly(PingPeerCommands(4));
+
+                // 2: Repository is broken
+                SetupPeerRepository(() => new TimeoutException("The task didn't complete before timeout."));
+
+                SystemDateTime.AddToPausedTime(_transientPeerTimeout.Seconds());
+
+                _detector.DetectDeadPeers();
+
+                _bus.ExpectExactly(PingPeerCommands(6));
+
+                SystemDateTime.AddToPausedTime(_transientPeerTimeout.Seconds());
+
+                // 3: Peer0 stops responding to pings
+                SetupPeerResponse(_transientAlivePeer0.PeerId, false);
+
+                _detector.DetectDeadPeers();
+
+                _bus.ExpectExactly(PingPeerCommands(8));
+
+                var firstUnansweredPingTimestampUtc = SystemDateTime.UtcNow;
+
+                SystemDateTime.AddToPausedTime(_transientPeerTimeout.Seconds());
+
+                _detector.DetectDeadPeers();
+
+                _bus.ExpectExactly(PingPeerCommands(10));
+
+                SystemDateTime.AddToPausedTime(_transientPeerTimeout.Seconds());
+
+                // 4: Repository is operational again
+                SetupPeerRepository(_transientAlivePeer0, _transientAlivePeer1);
+
+                _detector.DetectDeadPeers();
+
+                _bus.ExpectExactly(PingPeerCommands(11).Append(new UnregisterPeerCommand(_transientAlivePeer0.Peer, firstUnansweredPingTimestampUtc)).ToArray());
+            }
+        }
+
+        private IMessage[] PingPeerCommands(int count)
+        {
+            return Enumerable.Range(0, count).Select(x => (IMessage)new PingPeerCommand()).ToArray();
+        }
+
         private void SetupPeerRepository(params PeerDescriptor[] peer)
         {
             _peerRepositoryMock.Setup(repo => repo.GetPeers(It.IsAny<bool>())).Returns(new List<PeerDescriptor>(peer));
         }
 
-        private void SetupPeerResponse(PeerId peerId, params bool[] respondToPing)
+        private void SetupPeerRepository(Func<Exception> exception)
+        {
+            _peerRepositoryMock.Setup(repo => repo.GetPeers(It.IsAny<bool>())).Throws(exception);
+        }
+
+        private void SetupPeerResponses(PeerId peerId, params bool[] respondToPing)
         {
             var invocationCount = 0;
             _bus.AddHandlerForPeer<PingPeerCommand>(peerId, cmd =>
@@ -485,6 +552,17 @@ namespace Abc.Zebus.Directory.Tests.DeadPeerDetection
             });
         }
 
+        private void SetupPeerResponse(PeerId peerId, bool respondToPing)
+        {
+            _bus.AddHandlerForPeer<PingPeerCommand>(peerId, cmd =>
+            {
+                if(respondToPing)
+                    return true;
+
+                throw new InvalidOperationException();
+            });
+        }
+
         private static PeerDescriptor CreatePeerDescriptor(string peerId, bool isPersistent, bool isUp, bool hasDebuggerAttached)
         {
             var descriptor = new Peer(new PeerId(peerId), "tcp://abcdell348:58920", isUp).ToPeerDescriptor(isPersistent);

+ 41 - 21
src/Abc.Zebus.Directory/DeadPeerDetection/DeadPeerDetector.cs

@@ -1,5 +1,6 @@
 using System;
 using System.Collections.Generic;
+using System.Diagnostics.CodeAnalysis;
 using System.Linq;
 using System.Text.RegularExpressions;
 using System.Threading;
@@ -22,7 +23,6 @@ namespace Abc.Zebus.Directory.DeadPeerDetection
         private readonly IDirectoryConfiguration _configuration;
         private readonly TimeSpan _detectionPeriod = 5.Seconds();
         private Thread? _detectionThread;
-        private DateTime? _lastPingTimeUtc;
         private bool _isRunning;
 
         public DeadPeerDetector(IBus bus, IPeerRepository peerRepository, IDirectoryConfiguration configuration)
@@ -44,22 +44,51 @@ namespace Abc.Zebus.Directory.DeadPeerDetection
         internal void DetectDeadPeers()
         {
             var timestampUtc = SystemDateTime.UtcNow;
-            var entries = _peerRepository.GetPeers(loadDynamicSubscriptions: false)
-                                         .Where(peer => peer.Subscriptions.All(sub => sub.MessageTypeId != MessageUtil.TypeId<RegisterPeerCommand>()))
-                                         .Select(ToPeerEntry)
-                                         .ToList();
 
-            var shouldSendPing = ShouldSendPing(timestampUtc);
-            if (shouldSendPing)
-                _lastPingTimeUtc = timestampUtc;
+            if (TryLoadPeerEntries(out var entries))
+            {
+                foreach (var entry in entries.Where(x => x.IsUp))
+                {
+                    entry.Process(timestampUtc);
+                }
+
+                var decommissionedPeerIds = _peers.Keys.Except(entries.Select(x => x.Descriptor.PeerId)).ToList();
+                _peers.RemoveRange(decommissionedPeerIds);
+            }
+            else
+            {
+                // Fallback for unavailable peer repository, simply ping peers
 
-            foreach (var entry in entries.Where(x => x.IsUp))
+                foreach (var entry in _peers.Values.Where(x => x.IsUp))
+                {
+                    entry.PingIfRequired(timestampUtc);
+                }
+            }
+        }
+
+        private bool TryLoadPeerEntries([NotNullWhen(true)] out List<DeadPeerDetectorEntry>? entries)
+        {
+            try
+            {
+                entries = LoadPeerEntries();
+                return true;
+            }
+            catch (Exception e)
             {
-                entry.Process(timestampUtc, shouldSendPing);
+                OnError(e);
+
+                entries = default;
+                return false;
             }
+        }
 
-            var decommissionedPeerIds = _peers.Keys.Except(entries.Select(x => x.Descriptor.PeerId)).ToList();
-            _peers.RemoveRange(decommissionedPeerIds);
+        private List<DeadPeerDetectorEntry> LoadPeerEntries()
+        {
+            return _peerRepository.GetPeers(loadDynamicSubscriptions: false)
+                                  // Exclude DirectoryService
+                                  .Where(peer => !peer.Subscriptions.Any(sub => sub.MessageTypeId == MessageUtil.TypeId<RegisterPeerCommand>()))
+                                  .Select(ToPeerEntry)
+                                  .ToList();
         }
 
         private DeadPeerDetectorEntry ToPeerEntry(PeerDescriptor descriptor)
@@ -125,15 +154,6 @@ namespace Abc.Zebus.Directory.DeadPeerDetection
             _bus.Send(new MarkPeerAsRespondingCommand(entry.Descriptor.PeerId, timeoutTimestampUtc)).Wait(_commandTimeout);
         }
 
-        private bool ShouldSendPing(DateTime timestampUtc)
-        {
-            if (_lastPingTimeUtc == null)
-                return true;
-
-            var elapsedSinceLastPing = timestampUtc - _lastPingTimeUtc.Value;
-            return elapsedSinceLastPing >= _configuration.PeerPingInterval;
-        }
-
         public void Start()
         {
             _isRunning = true;

+ 19 - 2
src/Abc.Zebus.Directory/DeadPeerDetection/DeadPeerDetectorEntry.cs

@@ -13,6 +13,7 @@ namespace Abc.Zebus.Directory.DeadPeerDetection
         private readonly IBus _bus;
         private readonly TaskScheduler _taskScheduler;
         private readonly object _lock = new object();
+        private DateTime? _lastPingTimeUtc;
         private DateTime? _oldestUnansweredPingTimeUtc;
         private DateTime? _timeoutTimestampUtc;
 
@@ -44,7 +45,7 @@ namespace Abc.Zebus.Directory.DeadPeerDetection
             }
         }
 
-        public void Process(DateTime timestampUtc, bool shouldSendPing)
+        public void Process(DateTime timestampUtc)
         {
             if (WasRestarted)
                 Reset();
@@ -52,14 +53,30 @@ namespace Abc.Zebus.Directory.DeadPeerDetection
             var hasReachedTimeout = Status == DeadPeerStatus.Up && HasReachedTimeout();
             if (hasReachedTimeout)
                 Timeout();
-            else if (shouldSendPing)
+            else
+                PingIfRequired(timestampUtc);
+        }
+
+        public void PingIfRequired(DateTime timestampUtc)
+        {
+            if (ShouldSendPing(timestampUtc))
                 Ping(timestampUtc);
         }
 
+        private bool ShouldSendPing(DateTime timestampUtc)
+        {
+            if (_lastPingTimeUtc == null)
+                return true;
+
+            var elapsedSinceLastPing = timestampUtc - _lastPingTimeUtc.Value;
+            return elapsedSinceLastPing >= _configuration.PeerPingInterval;
+        }
+
         public void Ping(DateTime timestampUtc)
         {
             lock (_lock)
             {
+                _lastPingTimeUtc = timestampUtc;
                 if (_oldestUnansweredPingTimeUtc == null)
                     _oldestUnansweredPingTimeUtc = timestampUtc;
                 var elapsedTimeSinceFirstPing = SystemDateTime.UtcNow - _oldestUnansweredPingTimeUtc;

+ 8 - 0
src/Abc.Zebus/Util/SystemDateTime.cs

@@ -25,6 +25,14 @@ namespace Abc.Zebus.Util
             return new Scope();
         }
 
+        public static void AddToPausedTime(TimeSpan timeSpan)
+        {
+            if (_pausedValue == null)
+                throw new InvalidOperationException($"{nameof(SystemDateTime)} is not paused");
+
+            _pausedValue = _pausedValue.Value.Add(timeSpan);
+        }
+
         private class Scope : IDisposable
         {
             public void Dispose()