Browse Source

Core: move PeerUpdate back to after the peer domain model has been updated

  - if consumers of the event call GetPeersHandlingMessages() during the event the started peer will be returned
  - this caused a regression where the PersistentTransport would not replay messages to the Persistence peer when it started
Mendel Monteiro-Beckerman 5 years ago
parent
commit
8cd79cd326

+ 61 - 0
src/Abc.Zebus.Tests/Directory/PeerDirectoryClientTests.cs

@@ -994,6 +994,67 @@ namespace Abc.Zebus.Tests.Directory
             subscriptions.Count.ShouldEqual(1);
         }
 
+        [Test]
+        public void should_be_able_to_query_peers_during_peer_updated_event()
+        {
+            // Arrange
+            IList<Peer> peersHandlingMessage = Array.Empty<Peer>();
+            _directory.PeerUpdated += (id, action) => peersHandlingMessage = _directory.GetPeersHandlingMessage(new FakeEvent(1));
+
+            // Act
+            _directory.Handle(new PeerStarted(_otherPeer.ToPeerDescriptor(false, typeof(FakeEvent))));
+
+            // Assert
+            peersHandlingMessage.ExpectedSingle().Id.ShouldEqual(_otherPeer.Id);
+        }
+
+        [Test]
+        public void should_be_able_to_query_peers_during_peer_subscriptions_updated_event_on_start()
+        {
+            // Arrange
+            IList<Peer> peersHandlingMessage = Array.Empty<Peer>();
+            _directory.PeerSubscriptionsUpdated += (id, action) => peersHandlingMessage = _directory.GetPeersHandlingMessage(new FakeEvent(1));
+            _directory.EnableSubscriptionsUpdatedFor(new[] { typeof(FakeEvent) });
+
+            // Act
+            _directory.Handle(new PeerStarted(_otherPeer.ToPeerDescriptor(false, typeof(FakeEvent))));
+
+            // Assert
+            peersHandlingMessage.ExpectedSingle().Id.ShouldEqual(_otherPeer.Id);
+        }
+
+        [Test]
+        public void should_be_able_to_query_peers_during_peer_subscriptions_updated_event_on_subscriptions_updated()
+        {
+            // Arrange
+            IList<Peer> peersHandlingMessage = Array.Empty<Peer>();
+            _directory.PeerSubscriptionsUpdated += (id, action) => peersHandlingMessage = _directory.GetPeersHandlingMessage(new FakeEvent(1));
+            _directory.EnableSubscriptionsUpdatedFor(new[] { typeof(FakeEvent) });
+            _directory.Handle(new PeerStarted(_otherPeer.ToPeerDescriptor(false)));
+
+            // Act
+            _directory.Handle(new PeerSubscriptionsUpdated(_otherPeer.ToPeerDescriptor(false, new MessageTypeId(typeof(FakeEvent)))));
+
+            // Assert
+            peersHandlingMessage.ExpectedSingle().Id.ShouldEqual(_otherPeer.Id);
+        }
+
+        [Test]
+        public void should_be_able_to_query_peers_during_peer_subscriptions_updated_event_on_subscriptions_for_types_updated()
+        {
+            // Arrange
+            IList<Peer> peersHandlingMessage = Array.Empty<Peer>();
+            _directory.PeerSubscriptionsUpdated += (id, action) => peersHandlingMessage = _directory.GetPeersHandlingMessage(new FakeEvent(1));
+            _directory.EnableSubscriptionsUpdatedFor(new[] { typeof(FakeEvent) });
+            _directory.Handle(new PeerStarted(_otherPeer.ToPeerDescriptor(false)));
+
+            // Act
+            _directory.Handle(new PeerSubscriptionsForTypesUpdated(_otherPeer.Id, DateTime.UtcNow, SubscriptionsForType.Create<FakeEvent>(new BindingKey(BindingKeyPart.SharpToken))));
+
+            // Assert
+            peersHandlingMessage.ExpectedSingle().Id.ShouldEqual(_otherPeer.Id);
+        }
+
         private class OtherFakeEvent1 : IEvent
         {
         }

+ 12 - 12
src/Abc.Zebus/Directory/PeerDirectoryClient.cs

@@ -293,9 +293,9 @@ namespace Abc.Zebus.Directory
             if (EnqueueIfRegistering(message))
                 return;
 
-            PeerUpdated?.Invoke(message.PeerDescriptor.Peer.Id, PeerUpdateAction.Started);
-
             AddOrUpdatePeerEntry(message.PeerDescriptor);
+
+            PeerUpdated?.Invoke(message.PeerDescriptor.Peer.Id, PeerUpdateAction.Started);
         }
 
         private bool EnqueueIfRegistering(IEvent message)
@@ -408,26 +408,26 @@ namespace Abc.Zebus.Directory
 
             PeerUpdated?.Invoke(message.PeerId, PeerUpdateAction.Updated);
 
-            var observedSubscriptions = GetObservedSubscriptions();
+            var observedSubscriptions = GetObservedSubscriptions(subscriptionsForTypes);
             if (observedSubscriptions.Count > 0)
                 PeerSubscriptionsUpdated?.Invoke(message.PeerId, observedSubscriptions);
 
-            IReadOnlyList<Subscription> GetObservedSubscriptions()
+            IReadOnlyList<Subscription> GetObservedSubscriptions(SubscriptionsForType[] subsForType)
             {
-                if (subscriptionsForTypes.Length == 0)
+                if (subsForType.Length == 0)
                     return Array.Empty<Subscription>();
 
                 var observedSubscriptionMessageTypes = _observedSubscriptionMessageTypes;
                 if (observedSubscriptionMessageTypes.Count == 0)
                     return Array.Empty<Subscription>();
 
-                return subscriptionsForTypes.Where(x =>
-                                            {
-                                                var messageType = x.MessageTypeId.GetMessageType();
-                                                return messageType != null && observedSubscriptionMessageTypes.Contains(messageType);
-                                            })
-                                            .SelectMany(x => x.ToSubscriptions())
-                                            .ToList();
+                return subsForType.Where(x =>
+                                  {
+                                      var messageType = x.MessageTypeId.GetMessageType();
+                                      return messageType != null && observedSubscriptionMessageTypes.Contains(messageType);
+                                  })
+                                  .SelectMany(x => x.ToSubscriptions())
+                                  .ToList();
             }
         }