1
0
Эх сурвалжийг харах

directory client: should_support_multiple_peers_for_same_message

Olivier Coanet 11 жил өмнө
parent
commit
b85ae2e89b

+ 18 - 5
src/Abc.Zebus.Tests/Directory/PeerDirectoryClientTests.cs

@@ -1,5 +1,4 @@
 using System;
-using System.Collections.Concurrent;
 using System.Collections.Generic;
 using System.Linq;
 using System.Threading;
@@ -523,18 +522,32 @@ namespace Abc.Zebus.Tests.Directory
         public void should_ignore_old_subscription_update_on_PeerSubscriptionsForTypesUpdated()
         {
             _directory.Handle(new PeerStarted(_otherPeer.ToPeerDescriptor(true)));
-
             _directory.Handle(new PeerSubscriptionsForTypesUpdated(_otherPeer.Id, SystemDateTime.UtcNow.AddMinutes(1), MessageUtil.TypeId<FakeCommand>(), BindingKey.Empty));
-
             _directory.Handle(new PeerSubscriptionsForTypesUpdated(_otherPeer.Id, SystemDateTime.UtcNow, MessageUtil.TypeId<FakeCommand>(), BindingKey.Empty));
 
             _directory.GetPeersHandlingMessage(new FakeCommand(0)).ShouldNotBeEmpty();
         }
 
-        [Test, Ignore]
+        [Test]
         public void should_consider_timestamp_at_message_type_level()
         {
-            throw new NotImplementedException();
+            _directory.Handle(new PeerStarted(_otherPeer.ToPeerDescriptor(true)));
+            _directory.Handle(new PeerSubscriptionsForTypesUpdated(_otherPeer.Id, SystemDateTime.UtcNow.AddSeconds(20), MessageUtil.TypeId<FakeCommand>(), BindingKey.Empty));
+            _directory.Handle(new PeerSubscriptionsForTypesUpdated(_otherPeer.Id, SystemDateTime.UtcNow.AddSeconds(10), MessageUtil.TypeId<FakeEvent>(), BindingKey.Empty));
+
+            _directory.GetPeersHandlingMessage(new FakeEvent(0)).ShouldNotBeEmpty();
+        }
+
+        [Test]
+        public void should_support_multiple_peers_for_same_message()
+        {
+            var otherPeer2 = new Peer(new PeerId("Abc.Testing.2"), "tcp://abctest:987");
+
+            _directory.Handle(new PeerStarted(_otherPeer.ToPeerDescriptor(true, typeof(FakeEvent))));
+            _directory.Handle(new PeerStarted(otherPeer2.ToPeerDescriptor(true, typeof(FakeEvent))));
+
+            var peers = _directory.GetPeersHandlingMessage(new FakeEvent(0));
+            peers.Count.ShouldEqual(2);
         }
 
         [Test]

+ 40 - 18
src/Abc.Zebus/Directory/PeerDirectoryClient.PeerEntry.cs

@@ -1,4 +1,5 @@
-using System.Collections.Concurrent;
+using System;
+using System.Collections.Concurrent;
 using System.Collections.Generic;
 using System.Linq;
 using Abc.Zebus.Routing;
@@ -10,7 +11,7 @@ namespace Abc.Zebus.Directory
     {
         private class PeerEntry
         {
-            private readonly Dictionary<MessageTypeId, HashSet<BindingKey>> _messageSubscriptions = new Dictionary<MessageTypeId, HashSet<BindingKey>>();
+            private readonly Dictionary<MessageTypeId, MessageTypeEntry> _messageSubscriptions = new Dictionary<MessageTypeId, MessageTypeEntry>();
             private readonly ConcurrentDictionary<MessageTypeId, PeerSubscriptionTree> _subscriptionsByMessageType;
             private readonly PeerDescriptor _descriptor;
 
@@ -26,53 +27,66 @@ namespace Abc.Zebus.Directory
             {
                 lock (_messageSubscriptions)
                 {
-                    return _messageSubscriptions.SelectMany(x => x.Value.Select(bk => new Subscription(x.Key, bk)))
+                    return _messageSubscriptions.SelectMany(x => x.Value.BindingKeys.Select(bk => new Subscription(x.Key, bk)))
                                                 .Distinct()
                                                 .ToArray();
                 }
             }
 
-            public void SetSubscriptions(IEnumerable<Subscription> subscriptions)
+            public void SetSubscriptions(IEnumerable<Subscription> subscriptions, DateTime? timestampUtc)
             {
                 lock (_messageSubscriptions)
                 {
-                    var newBindingKeysByMessageType = subscriptions.GroupBy(x => x.MessageTypeId).ToDictionary(g => g.Key, g => g.Select(x => x.BindingKey).ToList());
+                    var newBindingKeysByMessageType = subscriptions.GroupBy(x => x.MessageTypeId).ToDictionary(g => g.Key, g => g.Select(x => x.BindingKey));
 
-                    foreach (var bindingKeysForMessageType in _messageSubscriptions)
+                    foreach (var messageSubscriptions in _messageSubscriptions)
                     {
-                        foreach (var bindingKey in bindingKeysForMessageType.Value)
-                        {
-                            RemoveFromSubscriptionTree(bindingKeysForMessageType.Key, bindingKey);
-                        }
+                        if (!newBindingKeysByMessageType.ContainsKey(messageSubscriptions.Key))
+                            SetSubscriptionsForType(messageSubscriptions.Key, Enumerable.Empty<BindingKey>(), timestampUtc);
                     }
 
-                    foreach (var newBindingKeyForMessageType in newBindingKeysByMessageType)
+                    foreach (var newBindingKeys in newBindingKeysByMessageType)
                     {
-                        SetSubscriptionsForType(newBindingKeyForMessageType.Key, newBindingKeyForMessageType.Value);
+                        SetSubscriptionsForType(newBindingKeys.Key, newBindingKeys.Value, timestampUtc);
                     }
                 }
             }
 
-            public void SetSubscriptionsForType(IEnumerable<SubscriptionsForType> subscriptionsForTypes)
+            public void SetSubscriptionsForType(IEnumerable<SubscriptionsForType> subscriptionsForTypes, DateTime? timestampUtc)
             {
                 foreach (var subscriptionsForType in subscriptionsForTypes)
                 {
-                    SetSubscriptionsForType(subscriptionsForType.MessageTypeId, subscriptionsForType.BindingKeys);
+                    SetSubscriptionsForType(subscriptionsForType.MessageTypeId, subscriptionsForType.BindingKeys, timestampUtc);
                 }
             }
 
-            private void SetSubscriptionsForType(MessageTypeId messageTypeId, IEnumerable<BindingKey> bindingKeys)
+            private void SetSubscriptionsForType(MessageTypeId messageTypeId, IEnumerable<BindingKey> bindingKeys, DateTime? timestampUtc)
             {
                 var newBindingKeys = bindingKeys.ToHashSet();
 
                 lock (_messageSubscriptions)
                 {
-                    _messageSubscriptions[messageTypeId] = newBindingKeys;
+                    var messageSubscriptions = _messageSubscriptions.GetValueOrAdd(messageTypeId, MessageTypeEntry.Create);
+                    if (messageSubscriptions.TimestampUtc > timestampUtc)
+                        return;
 
-                    _subscriptionsByMessageType.Remove(messageTypeId);
+                    messageSubscriptions.TimestampUtc = timestampUtc;
+
+                    foreach (var previousBindingKey in messageSubscriptions.BindingKeys.ToList())
+                    {
+                        if (newBindingKeys.Remove(previousBindingKey))
+                            continue;
+
+                        messageSubscriptions.BindingKeys.Remove(previousBindingKey);
+
+                        RemoveFromSubscriptionTree(messageTypeId, previousBindingKey);
+                    }
 
                     foreach (var newBindingKey in newBindingKeys)
                     {
+                        if (!messageSubscriptions.BindingKeys.Add(newBindingKey))
+                            continue;
+
                         AddToSubscriptionTree(messageTypeId, newBindingKey);
                     }
                 }
@@ -84,7 +98,7 @@ namespace Abc.Zebus.Directory
                 {
                     foreach (var messageSubscriptions in _messageSubscriptions)
                     {
-                        foreach (var bindingKey in messageSubscriptions.Value)
+                        foreach (var bindingKey in messageSubscriptions.Value.BindingKeys)
                         {
                             RemoveFromSubscriptionTree(messageSubscriptions.Key, bindingKey);
                         }
@@ -110,6 +124,14 @@ namespace Abc.Zebus.Directory
                 if (subscriptionTree.IsEmpty)
                     _subscriptionsByMessageType.Remove(messageTypeId);
             }
+
+            private class MessageTypeEntry
+            {
+                public static readonly Func<MessageTypeEntry> Create = () => new MessageTypeEntry();
+
+                public readonly HashSet<BindingKey> BindingKeys = new HashSet<BindingKey>();
+                public DateTime? TimestampUtc;
+            }
         }
     }
 }

+ 4 - 5
src/Abc.Zebus/Directory/PeerDirectoryClient.cs

@@ -185,7 +185,7 @@ namespace Abc.Zebus.Directory
                 return entry;
             });
 
-            peerEntry.SetSubscriptions(subscriptions);
+            peerEntry.SetSubscriptions(subscriptions, peerDescriptor.TimestampUtc);
         }
 
         public void Handle(PeerStarted message)
@@ -261,7 +261,7 @@ namespace Abc.Zebus.Directory
             if (peer == null)
                 return;
 
-            peer.SetSubscriptions(message.PeerDescriptor.Subscriptions ?? Enumerable.Empty<Subscription>());
+            peer.SetSubscriptions(message.PeerDescriptor.Subscriptions ?? Enumerable.Empty<Subscription>(), message.PeerDescriptor.TimestampUtc);
 
             peer.Descriptor.Subscriptions = peer.GetSubscriptions();
             peer.Descriptor.TimestampUtc = message.PeerDescriptor.TimestampUtc;
@@ -274,14 +274,13 @@ namespace Abc.Zebus.Directory
             if (EnqueueIfRegistering(message))
                 return;
 
-            var peer = GetPeerCheckTimestamp(message.PeerId, message.TimestampUtc);
+            var peer = _peers.GetValueOrDefault(message.PeerId);
             if (peer == null)
                 return;
 
-            peer.SetSubscriptionsForType(message.SubscriptionsForType);
+            peer.SetSubscriptionsForType(message.SubscriptionsForType, message.TimestampUtc);
             
             peer.Descriptor.Subscriptions = peer.GetSubscriptions();
-            peer.Descriptor.TimestampUtc = message.TimestampUtc;
 
             PeerUpdated(message.PeerId, PeerUpdateAction.Updated);
         }