瀏覽代碼

PeerDirectoryClient no longer stores the peer descriptors (they are instanciated on-demand)

Olivier Coanet 11 年之前
父節點
當前提交
b5f5bbac87

+ 8 - 0
src/Abc.Zebus.Directory/PeerDirectoryServer.cs

@@ -40,6 +40,14 @@ namespace Abc.Zebus.Directory
                                   .ToList();
         }
 
+        public bool IsPersistent(PeerId peerId)
+        {
+            // TODO: avoid loading subscriptions
+
+            var peer = _peerRepository.Get(peerId);
+            return peer != null && peer.IsPersistent;
+        }
+
         public PeerDescriptor GetPeerDescriptor(PeerId peerId)
         {
             return _peerRepository.Get(peerId);

+ 6 - 0
src/Abc.Zebus.Testing/Directory/TestPeerDirectory.cs

@@ -56,6 +56,12 @@ namespace Abc.Zebus.Testing.Directory
             return Peers.Where(x => x.Value.Subscriptions.Any(s => s.Matches(messageBinding))).Select(x => x.Value.Peer).ToList();
         }
 
+        public bool IsPersistent(PeerId peerId)
+        {
+            var peer = Peers.GetValueOrDefault(peerId);
+            return peer != null && peer.IsPersistent;
+        }
+
         public PeerDescriptor GetPeerDescriptor(PeerId peerId)
         {
             return Peers.GetValueOrDefault(peerId);

+ 3 - 1
src/Abc.Zebus.Tests/Directory/PeerDirectoryClientTests.cs

@@ -55,11 +55,13 @@ namespace Abc.Zebus.Tests.Directory
 
                 var expectedRecipientId = new PeerId("Abc.Zebus.DirectoryService.0");
                 _bus.Commands.Count().ShouldEqual(1);
-                var register = _bus.Commands.OfType<RegisterPeerCommand>().SingleOrDefault();
+
+                var register = _bus.Commands.OfType<RegisterPeerCommand>().ExpectedSingle();
                 register.Peer.PeerId.ShouldEqual(_self.Id);
                 register.Peer.IsPersistent.ShouldEqual(isPersistent);
                 register.Peer.TimestampUtc.Value.ShouldBeGreaterOrEqualThan(SystemDateTime.UtcNow);
                 register.Peer.Subscriptions.ShouldBeEquivalentTo(subscriptions);
+
                 _bus.GetRecipientPeer(register).Id.ShouldEqual(expectedRecipientId);
             }
         }

+ 2 - 6
src/Abc.Zebus.Tests/Persistence/PersistentTransportFixture.cs

@@ -25,8 +25,6 @@ namespace Abc.Zebus.Tests.Persistence
 
         protected PersistentTransport Transport { get; private set; }
         protected TestTransport InnerTransport { get; private set; }
-        protected PeerDescriptor AnotherPersistentPeerDescriptor { get; private set; }
-        protected PeerDescriptor AnotherNonPersistentPeerDescriptor { get; private set; }
         protected ConcurrentQueue<TransportMessage> MessagesForwardedToBus { get; private set; }
         protected StartMessageReplayCommand StartMessageReplayCommand { get; private set; }
         protected IEnumerable<Peer> StartMessageReplayCommandTargets { get; private set; }
@@ -38,8 +36,6 @@ namespace Abc.Zebus.Tests.Persistence
         [SetUp]
         public void Setup()
         {
-            AnotherPersistentPeerDescriptor = AnotherPersistentPeer.ToPeerDescriptor(true);
-            AnotherNonPersistentPeerDescriptor = AnotherNonPersistentPeer.ToPeerDescriptor(false);
             InnerTransport = new TestTransport(Self.EndPoint);
 
             var configuration = new Mock<IBusConfiguration>();
@@ -50,8 +46,8 @@ namespace Abc.Zebus.Tests.Persistence
             PeerDirectory.Setup(dir => dir.GetPeersHandlingMessage(It.IsAny<StartMessageReplayCommand>())).Returns(new[] { PersistencePeer });
             PeerDirectory.Setup(dir => dir.GetPeersHandlingMessage(It.IsAny<PersistMessageCommand>())).Returns(new[] { PersistencePeer });
             PeerDirectory.Setup(dir => dir.GetPeersHandlingMessage(It.IsAny<MessageHandled>())).Returns(new[] { PersistencePeer });
-            PeerDirectory.Setup(dir => dir.GetPeerDescriptor(AnotherPersistentPeer.Id)).Returns(AnotherPersistentPeerDescriptor);
-            PeerDirectory.Setup(dir => dir.GetPeerDescriptor(AnotherNonPersistentPeer.Id)).Returns(AnotherNonPersistentPeerDescriptor);
+            PeerDirectory.Setup(dir => dir.IsPersistent(AnotherPersistentPeer.Id)).Returns(true);
+            PeerDirectory.Setup(dir => dir.IsPersistent(AnotherNonPersistentPeer.Id)).Returns(false);
 
             Transport = new PersistentTransport(configuration.Object, InnerTransport, PeerDirectory.Object);
             Transport.Configure(Self.Id, "test");

+ 3 - 4
src/Abc.Zebus.Tests/SerializationTests.cs

@@ -1,12 +1,10 @@
 using System;
-using Abc.Zebus.Directory;
 using Abc.Zebus.Routing;
 using Abc.Zebus.Testing;
 using Abc.Zebus.Testing.Extensions;
 using Abc.Zebus.Transport;
 using Newtonsoft.Json;
 using NUnit.Framework;
-using NUnit.Framework.Constraints;
 
 namespace Abc.Zebus.Tests
 {
@@ -18,10 +16,11 @@ namespace Abc.Zebus.Tests
         {
             var prebuildObjectTypes = new object[]
             {
-                new MessageTypeId("X"),
                 MessageId.NextId(),
+                new MessageTypeId("X"),
                 new TransportMessage(new MessageTypeId("lol"), new byte[] { 1, 2, 3}, new PeerId("peer"), "endpoint", MessageId.NextId()),
-                BindingKey.Empty
+                new BindingKey("Abc", "123"),
+                new Peer(new PeerId("Abc.Testing.0"), "tcp://abctest:123", true, true),
             };
 
             MessageSerializationTester.CheckSerializationForTypesInSameAssemblyAs<IBus>(prebuildObjectTypes);

+ 3 - 0
src/Abc.Zebus/Directory/IPeerDirectory.cs

@@ -14,6 +14,9 @@ namespace Abc.Zebus.Directory
         IList<Peer> GetPeersHandlingMessage(IMessage message);
         IList<Peer> GetPeersHandlingMessage(MessageBinding messageBinding);
 
+        bool IsPersistent(PeerId peerId);
+
+        // TODO: move to a specific interface (IPeerDirectoryExplorer)
         PeerDescriptor GetPeerDescriptor(PeerId peerId);
         IEnumerable<PeerDescriptor> GetPeerDescriptors();
     }

+ 3 - 11
src/Abc.Zebus/Directory/PeerDescriptor.cs

@@ -25,11 +25,7 @@ namespace Abc.Zebus.Directory
 
         public PeerDescriptor(PeerId id, string endPoint, bool isPersistent, bool isUp, bool isResponding, DateTime timestampUtc, params Subscription[] subscriptions)
         {
-            Peer = new Peer(id, endPoint, isUp)
-            {
-                IsResponding = isResponding
-            };
-
+            Peer = new Peer(id, endPoint, isUp, isResponding);
             Subscriptions = subscriptions;
             IsPersistent = isPersistent;
             TimestampUtc = timestampUtc;
@@ -37,12 +33,8 @@ namespace Abc.Zebus.Directory
 
         internal PeerDescriptor(PeerDescriptor other)
         {
-            Peer = new Peer(other.Peer.Id, other.Peer.EndPoint, other.Peer.IsUp)
-            {
-                IsResponding = other.Peer.IsResponding
-            };
-
-            Subscriptions = other.Subscriptions != null ? (Subscription[])other.Subscriptions.Clone() : ArrayUtil.Empty<Subscription>();
+            Peer = new Peer(other.Peer);
+            Subscriptions = ArrayUtil.Copy(other.Subscriptions) ?? ArrayUtil.Empty<Subscription>();
             IsPersistent = other.IsPersistent;
             TimestampUtc = other.TimestampUtc;
             HasDebuggerAttached = other.HasDebuggerAttached;

+ 17 - 9
src/Abc.Zebus/Directory/PeerDirectoryClient.PeerEntry.cs

@@ -13,23 +13,31 @@ namespace Abc.Zebus.Directory
         {
             private readonly Dictionary<MessageTypeId, MessageTypeEntry> _messageSubscriptions = new Dictionary<MessageTypeId, MessageTypeEntry>();
             private readonly ConcurrentDictionary<MessageTypeId, PeerSubscriptionTree> _subscriptionsByMessageType;
-            private readonly PeerDescriptor _descriptor;
 
             public PeerEntry(PeerDescriptor descriptor, ConcurrentDictionary<MessageTypeId, PeerSubscriptionTree> subscriptionsByMessageType)
             {
-                _descriptor = descriptor;
+                Peer = new Peer(descriptor.Peer);
+                IsPersistent = descriptor.IsPersistent;
+                TimestampUtc = descriptor.TimestampUtc ?? DateTime.UtcNow;
+                HasDebuggerAttached = descriptor.HasDebuggerAttached;
+                
                 _subscriptionsByMessageType = subscriptionsByMessageType;
             }
 
-            public PeerDescriptor Descriptor { get { return _descriptor; } }
+            public Peer Peer { get; private set; }
+            public bool IsPersistent { get; set; }
+            public DateTime TimestampUtc { get; set; }
+            public bool HasDebuggerAttached { get; set; }
 
-            public Subscription[] GetSubscriptions()
+            public PeerDescriptor ToPeerDescriptor()
             {
                 lock (_messageSubscriptions)
                 {
-                    return _messageSubscriptions.SelectMany(x => x.Value.BindingKeys.Select(bk => new Subscription(x.Key, bk)))
-                                                .Distinct()
-                                                .ToArray();
+                    var subscriptions = _messageSubscriptions.SelectMany(x => x.Value.BindingKeys.Select(bk => new Subscription(x.Key, bk)))
+                                                             .Distinct()
+                                                             .ToArray();
+
+                    return new PeerDescriptor(Peer.Id, Peer.EndPoint, IsPersistent, Peer.IsUp, Peer.IsResponding, TimestampUtc, subscriptions);
                 }
             }
 
@@ -110,7 +118,7 @@ namespace Abc.Zebus.Directory
             private void AddToSubscriptionTree(MessageTypeId messageTypeId, BindingKey bindingKey)
             {
                 var subscriptionTree = _subscriptionsByMessageType.GetOrAdd(messageTypeId, _ => new PeerSubscriptionTree());
-                subscriptionTree.Add(Descriptor.Peer, bindingKey);
+                subscriptionTree.Add(Peer, bindingKey);
             }
 
             private void RemoveFromSubscriptionTree(MessageTypeId messageTypeId, BindingKey bindingKey)
@@ -119,7 +127,7 @@ namespace Abc.Zebus.Directory
                 if (subscriptionTree == null)
                     return;
 
-                subscriptionTree.Remove(Descriptor.Peer, bindingKey);
+                subscriptionTree.Remove(Peer, bindingKey);
 
                 if (subscriptionTree.IsEmpty)
                     _subscriptionsByMessageType.Remove(messageTypeId);

+ 20 - 20
src/Abc.Zebus/Directory/PeerDirectoryClient.cs

@@ -3,7 +3,6 @@ using System.Collections.Concurrent;
 using System.Collections.Generic;
 using System.Diagnostics;
 using System.Linq;
-using System.Threading;
 using Abc.Zebus.Util;
 using Abc.Zebus.Util.Extensions;
 using log4net;
@@ -144,15 +143,21 @@ namespace Abc.Zebus.Directory
             return subscriptionList.GetPeers(messageBinding.RoutingKey);
         }
 
+        public bool IsPersistent(PeerId peerId)
+        {
+            var entry = _peers.GetValueOrDefault(peerId);
+            return entry != null && entry.IsPersistent;
+        }
+
         public PeerDescriptor GetPeerDescriptor(PeerId peerId)
         {
             var entry = _peers.GetValueOrDefault(peerId);
-            return entry != null ? entry.Descriptor : null;
+            return entry != null ? entry.ToPeerDescriptor() : null;
         }
 
         public IEnumerable<PeerDescriptor> GetPeerDescriptors()
         {
-            return _peers.Values.Select(x => x.Descriptor).ToList();
+            return _peers.Values.Select(x => x.ToPeerDescriptor()).ToList();
         }
 
         // Only internal for testing purposes
@@ -174,13 +179,12 @@ namespace Abc.Zebus.Directory
 
             var peerEntry = _peers.AddOrUpdate(peerDescriptor.PeerId, (key) => new PeerEntry(peerDescriptor, _subscriptionsByMessageType), (key, entry) =>
             {
-                entry.Descriptor.Peer.EndPoint = peerDescriptor.Peer.EndPoint;
-                entry.Descriptor.Peer.IsUp = peerDescriptor.Peer.IsUp;
-                entry.Descriptor.Peer.IsResponding = peerDescriptor.Peer.IsResponding;
-                entry.Descriptor.IsPersistent = peerDescriptor.IsPersistent;
-                entry.Descriptor.Subscriptions = subscriptions;
-                entry.Descriptor.TimestampUtc = peerDescriptor.TimestampUtc;
-                entry.Descriptor.HasDebuggerAttached = peerDescriptor.HasDebuggerAttached;
+                entry.Peer.EndPoint = peerDescriptor.Peer.EndPoint;
+                entry.Peer.IsUp = peerDescriptor.Peer.IsUp;
+                entry.Peer.IsResponding = peerDescriptor.Peer.IsResponding;
+                entry.IsPersistent = peerDescriptor.IsPersistent;
+                entry.TimestampUtc = peerDescriptor.TimestampUtc ?? DateTime.UtcNow;
+                entry.HasDebuggerAttached = peerDescriptor.HasDebuggerAttached;
 
                 return entry;
             });
@@ -231,9 +235,9 @@ namespace Abc.Zebus.Directory
             if (peer == null)
                 return;
 
-            peer.Descriptor.Peer.IsUp = false;
-            peer.Descriptor.Peer.IsResponding = false;
-            peer.Descriptor.TimestampUtc = message.TimestampUtc;
+            peer.Peer.IsUp = false;
+            peer.Peer.IsResponding = false;
+            peer.TimestampUtc = message.TimestampUtc ?? DateTime.UtcNow;
 
             PeerUpdated(message.PeerId, PeerUpdateAction.Stopped);
         }
@@ -262,9 +266,7 @@ namespace Abc.Zebus.Directory
                 return;
 
             peer.SetSubscriptions(message.PeerDescriptor.Subscriptions ?? Enumerable.Empty<Subscription>(), message.PeerDescriptor.TimestampUtc);
-
-            peer.Descriptor.Subscriptions = peer.GetSubscriptions();
-            peer.Descriptor.TimestampUtc = message.PeerDescriptor.TimestampUtc;
+            peer.TimestampUtc = message.PeerDescriptor.TimestampUtc ?? DateTime.UtcNow;
 
             PeerUpdated(message.PeerDescriptor.PeerId, PeerUpdateAction.Updated);
         }
@@ -279,8 +281,6 @@ namespace Abc.Zebus.Directory
                 return;
 
             peer.SetSubscriptionsForType(message.SubscriptionsForType, message.TimestampUtc);
-            
-            peer.Descriptor.Subscriptions = peer.GetSubscriptions();
 
             PeerUpdated(message.PeerId, PeerUpdateAction.Updated);
         }
@@ -301,7 +301,7 @@ namespace Abc.Zebus.Directory
             if (peer == null)
                 return;
 
-            peer.Descriptor.Peer.IsResponding = isResponding;
+            peer.Peer.IsResponding = isResponding;
 
             PeerUpdated(peerId, PeerUpdateAction.Updated);
         }
@@ -312,7 +312,7 @@ namespace Abc.Zebus.Directory
             if (peer == null)
                 return null;
 
-            if (peer.Descriptor.TimestampUtc > timestampUtc)
+            if (peer.TimestampUtc > timestampUtc)
             {
                 _logger.InfoFormat("Outdated message ignored");
                 return null;

+ 10 - 2
src/Abc.Zebus/Peer.cs

@@ -19,12 +19,20 @@ namespace Abc.Zebus
         [ProtoMember(4, IsRequired = false)]
         public bool IsResponding;
 
-        public Peer(PeerId id, string endPoint, bool isUp = true)
+        public Peer(PeerId id, string endPoint, bool isUp = true) : this(id, endPoint, isUp, isUp)
+        {
+        }
+
+        public Peer(Peer other) : this(other.Id, other.EndPoint, other.IsUp, other.IsResponding)
+        {
+        }
+
+        public Peer(PeerId id, string endPoint, bool isUp, bool isResponding)
         {
             Id = id;
             EndPoint = endPoint;
             IsUp = isUp;
-            IsResponding = isUp;
+            IsResponding = isResponding;
         }
 
         [UsedImplicitly]

+ 2 - 3
src/Abc.Zebus/Persistence/PersistentTransport.cs

@@ -155,9 +155,8 @@ namespace Abc.Zebus.Persistence
             if (!message.MessageTypeId.IsPersistent())
                 return;
 
-            var persistentPeerIds = peerList.Select(peer => _peerDirectory.GetPeerDescriptor(peer.Id))
-                                            .Where(pd => pd != null && pd.IsPersistent)
-                                            .Select(pd => pd.Peer.Id)
+            var persistentPeerIds = peerList.Where(x => _peerDirectory.IsPersistent(x.Id))
+                                            .Select(x => x.Id)
                                             .ToArray();
 
             if (!persistentPeerIds.Any())

+ 10 - 0
src/Abc.Zebus/Util/ArrayUtil.cs

@@ -19,6 +19,16 @@ namespace Abc.Zebus.Util
             return EmptyArray<T>.Value;
 		}
 
+	    public static T[] Copy<T>(T[] array)
+	    {
+	        if (array == null)
+	            return null;
+
+	        var clone = new T[array.Length];
+            array.CopyTo(clone, 0);
+	        return clone;
+	    }
+
         private class EmptyArray<T>
         {
             internal static readonly T[] Value = new T[0];