Просмотр исходного кода

Cleanup TestPeerDirectory and add tests

Olivier Coanet 4 лет назад
Родитель
Сommit
c969454a6e

+ 54 - 10
src/Abc.Zebus.Testing/Directory/TestPeerDirectory.cs

@@ -11,21 +11,32 @@ namespace Abc.Zebus.Testing.Directory
 {
     public class TestPeerDirectory : IPeerDirectory
     {
-        public readonly ConcurrentDictionary<PeerId, PeerDescriptor> Peers = new ConcurrentDictionary<PeerId, PeerDescriptor>();
-        public Peer Self = default!;
-        private readonly Peer _remote = new Peer(new PeerId("remote"), "endpoint");
-        private HashSet<Type> _typesToObserve = new HashSet<Type>();
+        private Subscription[] _initialSubscriptions = Array.Empty<Subscription>();
+        private readonly Dictionary<MessageTypeId, SubscriptionsForType> _dynamicSubscriptions = new Dictionary<MessageTypeId, SubscriptionsForType>();
+
+        public TestPeerDirectory()
+        {
+        }
 
         public event Action Registered = delegate { };
         public event Action<PeerId, PeerUpdateAction> PeerUpdated = delegate { };
         public event Action<PeerId, IReadOnlyList<Subscription>> PeerSubscriptionsUpdated = delegate { };
 
+        public ConcurrentDictionary<PeerId, PeerDescriptor> Peers { get; } = new ConcurrentDictionary<PeerId, PeerDescriptor>();
+        public Peer? Self { get; private set; }
+
         public TimeSpan TimeSinceLastPing => TimeSpan.Zero;
 
         public Task RegisterAsync(IBus bus, Peer self, IEnumerable<Subscription> subscriptions)
         {
+            var subscriptionArray = subscriptions.ToArray();
+
             Self = self;
-            Peers[self.Id] = self.ToPeerDescriptor(true, subscriptions);
+            Self.IsResponding = true;
+            Self.IsUp = true;
+            Peers[self.Id] = self.ToPeerDescriptor(true, subscriptionArray);
+
+            _initialSubscriptions = subscriptionArray;
 
             Registered();
             return Task.CompletedTask;
@@ -33,21 +44,31 @@ namespace Abc.Zebus.Testing.Directory
 
         public Task UpdateSubscriptionsAsync(IBus bus, IEnumerable<SubscriptionsForType> subscriptionsForTypes)
         {
-            var newSubscriptions = SubscriptionsForType.CreateDictionary(Peers[Self.Id].Subscriptions);
             foreach (var subscriptionsForType in subscriptionsForTypes)
-                newSubscriptions[subscriptionsForType.MessageTypeId] = subscriptionsForType;
+            {
+                _dynamicSubscriptions[subscriptionsForType.MessageTypeId] = subscriptionsForType;
+            }
+
+            var newSubscriptions = _initialSubscriptions.Concat(_dynamicSubscriptions.SelectMany(x => x.Value.ToSubscriptions()));
 
-            Peers[Self.Id] = Self.ToPeerDescriptor(true, newSubscriptions.Values.SelectMany(subForType => subForType.ToSubscriptions()));
+            Peers[Self!.Id] = Self.ToPeerDescriptor(true, newSubscriptions);
             PeerUpdated(Self.Id, PeerUpdateAction.Updated);
             return Task.CompletedTask;
         }
 
         public Task UnregisterAsync(IBus bus)
         {
-            PeerUpdated(Self.Id, PeerUpdateAction.Stopped);
+            _initialSubscriptions = Array.Empty<Subscription>();
+            _dynamicSubscriptions.Clear();
+
+            Peers[Self!.Id] = Self.ToPeerDescriptor(true);
+            PeerUpdated(Self!.Id, PeerUpdateAction.Stopped);
             return Task.CompletedTask;
         }
 
+        private readonly Peer _remote = new Peer(new PeerId("remote"), "endpoint");
+
+        [Obsolete("Use SetupPeer(new PeerId(\"Abc.Remote.0\"), Subscription.Any<TMessage>()) instead")]
         public void RegisterRemoteListener<TMessage>()
             where TMessage : IMessage
         {
@@ -82,7 +103,6 @@ namespace Abc.Zebus.Testing.Directory
 
         public void EnableSubscriptionsUpdatedFor(IEnumerable<Type> types)
         {
-            _typesToObserve = new HashSet<Type>(types);
         }
 
         public PeerDescriptor? GetPeerDescriptor(PeerId peerId)
@@ -96,5 +116,29 @@ namespace Abc.Zebus.Testing.Directory
         {
             return Peers.Values;
         }
+
+        public IEnumerable<Subscription> GetSelfSubscriptions()
+        {
+            return Self != null && GetPeerDescriptor(Self.Id) is { } descriptor
+                ? descriptor.Subscriptions
+                : Array.Empty<Subscription>();
+        }
+
+        public void SetupPeer(PeerId peerId, params Subscription[] subscriptions)
+        {
+            var peerNumber = Peers.Count;
+            var randomPort = 10000 + Peers.Count;
+            SetupPeer(new Peer(peerId, $"tcp://testing-peer-{peerNumber}:{randomPort}"), subscriptions);
+        }
+
+        public void SetupPeer(Peer peer, params Subscription[] subscriptions)
+        {
+            var descriptor = Peers.GetOrAdd(peer.Id, _ => peer.ToPeerDescriptor(true));
+            descriptor.Peer.IsResponding = peer.IsResponding;
+            descriptor.Peer.IsUp = peer.IsUp;
+            descriptor.Peer.EndPoint = peer.EndPoint;
+            descriptor.TimestampUtc = DateTime.UtcNow;
+            descriptor.Subscriptions = subscriptions;
+        }
     }
 }

+ 1 - 1
src/Abc.Zebus.Testing/TestBus.cs

@@ -231,7 +231,7 @@ namespace Abc.Zebus.Testing
             LastReplyResponse = response;
         }
 
-        public void Configure(PeerId peerId, string environment = "test")
+        public void Configure(PeerId peerId, string environment = "Test")
         {
             PeerId = peerId;
             Environment = environment;

+ 123 - 0
src/Abc.Zebus.Tests/Testing/TestPeerDirectoryTests.cs

@@ -0,0 +1,123 @@
+using System.Collections.Generic;
+using System.Threading.Tasks;
+using Abc.Zebus.Directory;
+using Abc.Zebus.Routing;
+using Abc.Zebus.Testing;
+using Abc.Zebus.Testing.Directory;
+using Abc.Zebus.Testing.Extensions;
+using NUnit.Framework;
+
+namespace Abc.Zebus.Tests.Testing
+{
+    [TestFixture]
+    public class TestPeerDirectoryTests
+    {
+        private TestPeerDirectory _peerDirectory;
+        private Peer _self;
+        private TestBus _bus;
+
+        [SetUp]
+        public void SetUp()
+        {
+            _peerDirectory = new TestPeerDirectory();
+            _self = new Peer(new PeerId("Abc.Testing.Self.0"), "tcp://myself:123");
+            _bus = new TestBus();
+
+            _bus.Configure(_self.Id);
+        }
+
+        [Test]
+        public async Task should_add_new_subscriptions()
+        {
+            // Arrange
+            await _peerDirectory.RegisterAsync(_bus, _self, new List<Subscription>());
+
+            // Act
+            var subscriptionsForTypes = new List<SubscriptionsForType>
+            {
+                new(MessageUtil.TypeId<Message1>(), BindingKey.Empty),
+            };
+
+            await _peerDirectory.UpdateSubscriptionsAsync(_bus, subscriptionsForTypes);
+
+            // Assert
+            var subscriptions = _peerDirectory.GetSelfSubscriptions();
+            subscriptions.ShouldBeEquivalentTo(new []
+            {
+                Subscription.Any<Message1>(),
+            });
+        }
+
+        [Test]
+        public async Task should_add_and_merge_new_subscriptions()
+        {
+            // Arrange
+            await _peerDirectory.RegisterAsync(_bus, _self, new List<Subscription>
+            {
+                new(MessageUtil.TypeId<Message1>(), BindingKey.Empty),
+                new(MessageUtil.TypeId<Message2>(), new BindingKey("1")),
+            });
+
+            // Act
+            var subscriptionsForTypes = new List<SubscriptionsForType>
+            {
+                new(MessageUtil.TypeId<Message2>(), new BindingKey("2")),
+            };
+
+            await _peerDirectory.UpdateSubscriptionsAsync(_bus, subscriptionsForTypes);
+
+            // Assert
+            var subscriptions = _peerDirectory.GetSelfSubscriptions();
+            subscriptions.ShouldBeEquivalentTo(new Subscription[]
+            {
+                new(MessageUtil.TypeId<Message1>(), BindingKey.Empty),
+                new(MessageUtil.TypeId<Message2>(), new BindingKey("1")),
+                new(MessageUtil.TypeId<Message2>(), new BindingKey("2")),
+            });
+        }
+
+        [Test]
+        public async Task should_update_and_merge_new_subscriptions()
+        {
+            // Arrange
+            await _peerDirectory.RegisterAsync(_bus, _self, new List<Subscription>
+            {
+                new(MessageUtil.TypeId<Message2>(), new BindingKey("1")),
+            });
+
+            var update1 = new List<SubscriptionsForType>
+            {
+                new(MessageUtil.TypeId<Message2>(), new BindingKey("2")),
+            };
+
+            await _peerDirectory.UpdateSubscriptionsAsync(_bus, update1);
+
+            // Act
+            var update2 = new List<SubscriptionsForType>
+            {
+                new(MessageUtil.TypeId<Message2>(), new BindingKey("3")),
+            };
+
+            await _peerDirectory.UpdateSubscriptionsAsync(_bus, update2);
+
+            // Assert
+            var subscriptions = _peerDirectory.GetSelfSubscriptions();
+            subscriptions.ShouldBeEquivalentTo(new Subscription[]
+            {
+                new(MessageUtil.TypeId<Message2>(), new BindingKey("1")),
+                new(MessageUtil.TypeId<Message2>(), new BindingKey("3")),
+            });
+        }
+
+        private class Message1 : IEvent
+        {
+        }
+
+        [Routable]
+        private class Message2 : IEvent
+        {
+            [RoutingPosition(1)]
+            public int Key { get; set; }
+        }
+    }
+}