Browse Source

Log a warning when we drop peer updates for non-existent peers

mendelmonteiro 10 years ago
parent
commit
b367e726ee

+ 4 - 1
.gitignore

@@ -22,10 +22,13 @@ Thumbs.db
 [Dd]ebug*/
 *.lib
 *.sbr
+*.ncrunchproject
+*.ncrunchsolution
 obj/
 [Rr]elease*/
 _ReSharper*/
 [Tt]est[Rr]esult*
 output/**
-
+
+src/_NCrunch_Abc.Zebus/**
 src/packages/**

+ 40 - 0
src/Abc.Zebus.Tests/Directory/PeerDirectoryClientTests.cs

@@ -173,6 +173,46 @@ namespace Abc.Zebus.Tests.Directory
             targetPeer.Id.ShouldEqual(_otherPeer.Id);
         }
 
+        [Test]
+        public void should_discard_outdated_updates_on_PeerSubscriptionsUpdated()
+        {
+            int updates = 0;
+            _directory.PeerUpdated += (id, action) => updates++;
+
+            var descriptor1 = _otherPeer.ToPeerDescriptor(true);
+            descriptor1.TimestampUtc = DateTime.UtcNow;
+            _directory.Handle(new PeerStarted(descriptor1));
+
+            var descriptor2 = _otherPeer.ToPeerDescriptor(true, typeof(FakeCommand));
+            descriptor2.TimestampUtc = DateTime.UtcNow;
+            _directory.Handle(new PeerSubscriptionsUpdated(descriptor2));
+            
+            descriptor2.TimestampUtc = default(DateTime);
+            _directory.Handle(new PeerSubscriptionsUpdated(descriptor2));
+
+            _directory.GetPeersHandlingMessage(new FakeCommand(0)).ExpectedSingle();
+
+            updates.ShouldEqual(2);
+        }
+
+        [Test]
+        public void should_discard_outdated_updates_on_PeerSubscriptionsForTypesUpdated()
+        {
+            int updates = 0;
+            _directory.PeerUpdated += (id, action) => updates++;
+
+            var peerDescriptor = _otherPeer.ToPeerDescriptor(true);
+            peerDescriptor.TimestampUtc = DateTime.UtcNow;
+            _directory.Handle(new PeerStarted(peerDescriptor));
+
+            _directory.Handle(new PeerSubscriptionsForTypesUpdated(peerDescriptor.PeerId, DateTime.UtcNow, MessageUtil.TypeId<FakeCommand>(), BindingKey.Empty));
+            _directory.Handle(new PeerSubscriptionsForTypesUpdated(peerDescriptor.PeerId, default(DateTime), MessageUtil.TypeId<FakeCommand>(), BindingKey.Empty));
+
+            _directory.GetPeersHandlingMessage(new FakeCommand(0)).ExpectedSingle();
+
+            updates.ShouldEqual(2);
+        }
+
         [Test]
         public void should_update_started_peer_with_no_timestamp_on_PeerSubscriptionsForTypesUpdated()
         {

+ 49 - 15
src/Abc.Zebus/Directory/PeerDirectoryClient.cs

@@ -175,7 +175,7 @@ namespace Abc.Zebus.Directory
         {
             var subscriptions = peerDescriptor.Subscriptions ?? ArrayUtil.Empty<Subscription>();
 
-            var peerEntry = _peers.AddOrUpdate(peerDescriptor.PeerId, (key) => new PeerEntry(peerDescriptor, _globalSubscriptionsIndex), (key, entry) =>
+            var peerEntry = _peers.AddOrUpdate(peerDescriptor.PeerId, key => new PeerEntry(peerDescriptor, _globalSubscriptionsIndex), (key, entry) =>
             {
                 entry.Peer.EndPoint = peerDescriptor.Peer.EndPoint;
                 entry.Peer.IsUp = peerDescriptor.Peer.IsUp;
@@ -230,12 +230,12 @@ namespace Abc.Zebus.Directory
                 return;
 
             var peer = GetPeerCheckTimestamp(message.PeerId, message.TimestampUtc);
-            if (peer == null)
+            if (peer.Value == null)
                 return;
 
-            peer.Peer.IsUp = false;
-            peer.Peer.IsResponding = false;
-            peer.TimestampUtc = message.TimestampUtc ?? DateTime.UtcNow;
+            peer.Value.Peer.IsUp = false;
+            peer.Value.Peer.IsResponding = false;
+            peer.Value.TimestampUtc = message.TimestampUtc ?? DateTime.UtcNow;
 
             PeerUpdated(message.PeerId, PeerUpdateAction.Stopped);
         }
@@ -260,11 +260,14 @@ namespace Abc.Zebus.Directory
                 return;
             
             var peer = GetPeerCheckTimestamp(message.PeerDescriptor.Peer.Id, message.PeerDescriptor.TimestampUtc);
-            if (peer == null)
+            if (peer.Value == null)
+            {
+                WarnWhenPeerDoesNotExist(peer, message.PeerDescriptor.PeerId);
                 return;
+            }
 
-            peer.SetSubscriptions(message.PeerDescriptor.Subscriptions ?? Enumerable.Empty<Subscription>(), message.PeerDescriptor.TimestampUtc);
-            peer.TimestampUtc = message.PeerDescriptor.TimestampUtc ?? DateTime.UtcNow;
+            peer.Value.SetSubscriptions(message.PeerDescriptor.Subscriptions ?? Enumerable.Empty<Subscription>(), message.PeerDescriptor.TimestampUtc);
+            peer.Value.TimestampUtc = message.PeerDescriptor.TimestampUtc ?? DateTime.UtcNow;
 
             PeerUpdated(message.PeerDescriptor.PeerId, PeerUpdateAction.Updated);
         }
@@ -274,15 +277,24 @@ namespace Abc.Zebus.Directory
             if (EnqueueIfRegistering(message))
                 return;
 
-            var peer = _peers.GetValueOrDefault(message.PeerId);
-            if (peer == null)
+            var peer = GetPeerCheckTimestamp(message.PeerId, message.TimestampUtc);
+            if (peer.Value == null)
+            {
+                WarnWhenPeerDoesNotExist(peer, message.PeerId);
                 return;
+            }
 
-            peer.SetSubscriptionsForType(message.SubscriptionsForType ?? Enumerable.Empty<SubscriptionsForType>(), message.TimestampUtc);
+            peer.Value.SetSubscriptionsForType(message.SubscriptionsForType ?? Enumerable.Empty<SubscriptionsForType>(), message.TimestampUtc);
 
             PeerUpdated(message.PeerId, PeerUpdateAction.Updated);
         }
 
+        private void WarnWhenPeerDoesNotExist(PeerEntryResult peer, PeerId peerId)
+        {
+            if (peer.FailureReason == PeerEntryResult.FailureReasonType.PeerNotPresent)
+                _logger.WarnFormat("Received message but no peer existed: {0}", peerId);
+        }
+
         public void Handle(PeerNotResponding message)
         {
             HandlePeerRespondingChange(message.PeerId, false);
@@ -304,19 +316,41 @@ namespace Abc.Zebus.Directory
             PeerUpdated(peerId, PeerUpdateAction.Updated);
         }
 
-        private PeerEntry GetPeerCheckTimestamp(PeerId peerId, DateTime? timestampUtc)
+        private PeerEntryResult GetPeerCheckTimestamp(PeerId peerId, DateTime? timestampUtc)
         {
             var peer = _peers.GetValueOrDefault(peerId);
             if (peer == null)
-                return null;
+                return new PeerEntryResult(PeerEntryResult.FailureReasonType.PeerNotPresent);
 
             if (peer.TimestampUtc > timestampUtc)
             {
                 _logger.InfoFormat("Outdated message ignored");
-                return null;
+                return new PeerEntryResult(PeerEntryResult.FailureReasonType.OutdatedMessage);
+            }
+
+            return new PeerEntryResult(peer);
+        }
+
+        struct PeerEntryResult
+        {
+            internal enum FailureReasonType
+            {
+                PeerNotPresent,
+                OutdatedMessage,
+            }
+
+            public PeerEntryResult(PeerEntry value) : this()
+            {
+                Value = value;
+            }
+
+            public PeerEntryResult(FailureReasonType failureReason) : this()
+            {
+                FailureReason = failureReason;
             }
 
-            return peer;
+            public PeerEntry Value { get; private set; }
+            public FailureReasonType? FailureReason { get; private set; }
         }
     }
 }