| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541 |
- using System;
- using System.Linq;
- using Abc.Zebus.Directory.Configuration;
- using Abc.Zebus.Directory.Handlers;
- using Abc.Zebus.Directory.Messages;
- using Abc.Zebus.Directory.Storage;
- using Abc.Zebus.Routing;
- using Abc.Zebus.Testing;
- using Abc.Zebus.Testing.Extensions;
- using Abc.Zebus.Transport;
- using Abc.Zebus.Util;
- using Moq;
- using NUnit.Framework;
- namespace Abc.Zebus.Directory.Tests.Handlers
- {
- [TestFixture]
- public class DirectoryCommandsHandlerTests
- {
- private readonly Peer _sender = new Peer(new PeerId("Abc.Sender.0"), "tcp://sender:123");
- private IDisposable _contextScope;
- private TestBus _bus;
- private Mock<IPeerRepository> _repositoryMock;
- private Mock<IDirectoryConfiguration> _configurationMock;
- private DirectoryCommandsHandler _handler;
- private Mock<IDirectorySpeedReporter> _speedReporter;
- [SetUp]
- public void Setup()
- {
- _contextScope = MessageContext.SetCurrent(MessageContext.CreateTest());
- _configurationMock = new Mock<IDirectoryConfiguration>();
- _configurationMock.SetupGet(conf => conf.BlacklistedMachines).Returns(new[] { "ANOTHER_BLACKLISTEDMACHINE", "BLACKlistedMACHINE" });
- _repositoryMock = new Mock<IPeerRepository>();
- _bus = new TestBus();
- _speedReporter = new Mock<IDirectorySpeedReporter>();
- _handler = new DirectoryCommandsHandler(_bus, _repositoryMock.Object, _configurationMock.Object, _speedReporter.Object) { Context = MessageContext.CreateOverride(_sender.Id, _sender.EndPoint) };
- }
- [TearDown]
- public virtual void Teardown()
- {
- _contextScope.Dispose();
- }
- [Test]
- public void should_add_peer_to_repository()
- {
- var peerDescriptor = TestData.PersistentPeerDescriptor("tcp://abctest:123", typeof(FakeCommand));
- var registerCommand = new RegisterPeerCommand(peerDescriptor);
- _handler.Handle(registerCommand);
- _repositoryMock.Verify(x => x.AddOrUpdatePeer(registerCommand.Peer));
- }
- [Test]
- public void should_set_registering_peer_up_and_responding()
- {
- var peerDescriptor = TestData.PersistentPeerDescriptor("tcp://abctest:123", typeof(FakeCommand));
- peerDescriptor.Peer.IsUp = false;
- peerDescriptor.Peer.IsResponding = false;
- var registerCommand = new RegisterPeerCommand(peerDescriptor);
- _handler.Handle(registerCommand);
- _repositoryMock.Verify(x => x.AddOrUpdatePeer(It.Is<PeerDescriptor>(p => p.Peer.IsUp && p.Peer.IsResponding)));
- }
- [Test]
- public void should_remove_existing_dynamic_subscriptions_on_register()
- {
- var peerDescriptor = TestData.PersistentPeerDescriptor("tcp://abctest:123", typeof(FakeCommand));
- var registerCommand = new RegisterPeerCommand(peerDescriptor);
- _handler.Handle(registerCommand);
- _repositoryMock.Verify(x => x.AddOrUpdatePeer(registerCommand.Peer));
- _repositoryMock.Verify(x => x.RemoveAllDynamicSubscriptionsForPeer(registerCommand.Peer.PeerId, It.Is<DateTime>(d => d == registerCommand.Peer.TimestampUtc.Value)));
- }
- [Test]
- public void should_specify_datetime_kind_when_removing_all_subscriptions_for_a_peer_during_register()
- {
- var peerDescriptor = TestData.PersistentPeerDescriptor("tcp://abctest:123", typeof(FakeCommand));
- peerDescriptor.TimestampUtc = new DateTime(DateTime.Now.Ticks, DateTimeKind.Unspecified);
- var registerCommand = new RegisterPeerCommand(peerDescriptor);
- _handler.Handle(registerCommand);
- _repositoryMock.Verify(x => x.AddOrUpdatePeer(registerCommand.Peer));
- _repositoryMock.Verify(x => x.RemoveAllDynamicSubscriptionsForPeer(registerCommand.Peer.PeerId, It.Is<DateTime>(d => d.Kind == DateTimeKind.Utc)));
- }
- [Test]
- public void should_reply_with_registred_peers()
- {
- var registredPeerDescriptor = TestData.PersistentPeerDescriptor("tcp://abctest:456", typeof(FakeCommand));
- _repositoryMock.Setup(x => x.GetPeers(It.Is<bool>(loadDynamicSubs => loadDynamicSubs))).Returns(new[] { registredPeerDescriptor });
- var command = new RegisterPeerCommand(TestData.PersistentPeerDescriptor("tcp://abctest:123", typeof(FakeCommand)));
- _handler.Handle(command);
- var response = (RegisterPeerResponse)_bus.LastReplyResponse;
- response.PeerDescriptors.Single().ShouldHaveSamePropertiesAs(registredPeerDescriptor);
- }
- [Test]
- public void should_publish_started_event()
- {
- var peerDescriptor = TestData.PersistentPeerDescriptor("tcp://abctest:123", typeof(FakeCommand));
- var command = new RegisterPeerCommand(peerDescriptor);
- _handler.Handle(command);
- _bus.ExpectExactly(new PeerStarted(peerDescriptor));
- }
- [Test]
- public void should_throw_if_a_blacklisted_peer_tries_to_register()
- {
- var blacklistedPeer = TestData.PersistentPeerDescriptor("tcp://blacklistedpeer:123", typeof(FakeCommand));
- var registerCommand = new RegisterPeerCommand(blacklistedPeer);
- _handler.Context = MessageContext.CreateTest(new OriginatorInfo(blacklistedPeer.Peer.Id, blacklistedPeer.Peer.EndPoint, "BLACKLISTEDMACHINE", "initiator"));
- var exception = typeof(InvalidOperationException).ShouldBeThrownBy(() => _handler.Handle(registerCommand));
- exception.Message.ShouldEqual("Peer Abc.Testing.0 on host BLACKLISTEDMACHINE is not allowed to register on this directory");
- }
- [Test]
- public void should_not_throw_if_a_client_with_a_synchronised_clock_tries_to_register()
- {
- var peerDescriptor = TestData.PersistentPeerDescriptor("tcp://abctest:123", typeof(FakeCommand));
- var registerCommand = new RegisterPeerCommand(peerDescriptor);
- registerCommand.Peer.TimestampUtc = SystemDateTime.UtcNow + 14.Minutes();
- _configurationMock.SetupGet(x => x.MaxAllowedClockDifferenceWhenRegistering).Returns(15.Minutes());
- Assert.DoesNotThrow(() => _handler.Handle(registerCommand));
- }
- [Test]
- public void should_throw_if_a_client_with_a_more_recent_clock_tries_to_register()
- {
- var peerDescriptor = TestData.PersistentPeerDescriptor("tcp://abctest:123", typeof(FakeCommand));
- var registerCommand = new RegisterPeerCommand(peerDescriptor);
- registerCommand.Peer.TimestampUtc = SystemDateTime.UtcNow + 1.Hour();
- _configurationMock.SetupGet(x => x.MaxAllowedClockDifferenceWhenRegistering).Returns(15.Minutes());
- var exception = Assert.Throws<InvalidOperationException>(() => _handler.Handle(registerCommand));
- exception.Message.ShouldContain("is too far ahead of the the server's current time");
- }
- [Test]
- public void should_throw_if_an_existing_peer_tries_to_register()
- {
- var existingPeer = TestData.PersistentPeerDescriptor("tcp://existingpeer:123", typeof(FakeCommand));
- _repositoryMock.Setup(x => x.Get(existingPeer.PeerId)).Returns(existingPeer);
- var newPeer = TestData.PersistentPeerDescriptor("tcp://newpeer:123", typeof(FakeCommand));
- var command = new RegisterPeerCommand(newPeer);
- var exception = (MessageProcessingException)typeof(MessageProcessingException).ShouldBeThrownBy(() => _handler.Handle(command));
- exception.ErrorCode.ShouldEqual(DirectoryErrorCodes.PeerAlreadyExists);
- }
- [Test]
- public void should_not_throw_if_a_not_responding_peer_already_exists()
- {
- var existingPeer = TestData.PersistentPeerDescriptor("tcp://existingpeer:123", typeof(FakeCommand));
- existingPeer.Peer.IsResponding = false;
- _repositoryMock.Setup(x => x.GetPeers(It.IsAny<bool>())).Returns(new[] { existingPeer });
- var newPeer = TestData.PersistentPeerDescriptor("tcp://newpeer:123", typeof(FakeCommand));
- var command = new RegisterPeerCommand(newPeer);
- _handler.Handle(command);
- _repositoryMock.Verify(x => x.AddOrUpdatePeer(newPeer));
- }
- [Test]
- public void should_not_throw_if_an_existing_peer_is_on_the_same_host()
- {
- var existingPeer = TestData.PersistentPeerDescriptor("tcp://existingpeer:123", typeof(FakeCommand));
- existingPeer.Peer.IsResponding = false;
- _repositoryMock.Setup(x => x.GetPeers(It.IsAny<bool>())).Returns(new[] { existingPeer });
- var newPeer = TestData.PersistentPeerDescriptor("tcp://existingpeer:123", typeof(FakeCommand));
- var command = new RegisterPeerCommand(newPeer);
- _handler.Handle(command);
- _repositoryMock.Verify(x => x.AddOrUpdatePeer(newPeer));
- }
- [Test]
- public void should_unregister_persistent_peer_when_unregistering()
- {
- var peerDescriptor = TestData.PersistentPeerDescriptor("tcp://abctest:123", typeof(FakeCommand));
- peerDescriptor.TimestampUtc = SystemDateTime.UtcNow.AddSeconds(-30);
- _repositoryMock.Setup(x => x.Get(peerDescriptor.Peer.Id)).Returns(peerDescriptor);
- var command = new UnregisterPeerCommand(peerDescriptor.Peer);
- _handler.Handle(command);
- _repositoryMock.Verify(x => x.AddOrUpdatePeer(It.Is<PeerDescriptor>(peer => peer.Peer.Id == peerDescriptor.Peer.Id && peer.Peer.IsUp == false && peer.TimestampUtc == command.TimestampUtc)));
- }
- [Test]
- public void should_remove_transient_peer_when_unregistering()
- {
- var peerDescriptor = TestData.TransientPeerDescriptor("tcp://abctest:123", typeof(FakeCommand));
- _repositoryMock.Setup(x => x.Get(peerDescriptor.Peer.Id)).Returns(peerDescriptor);
- _handler.Handle(new UnregisterPeerCommand(peerDescriptor.Peer));
- _repositoryMock.Verify(x => x.RemovePeer(It.Is<PeerId>(peerId => peerId == peerDescriptor.Peer.Id)));
- }
- [Test]
- public void should_publish_stopped_event_when_unregistering_a_persistent_client()
- {
- var peerDescriptor = TestData.PersistentPeerDescriptor("tcp://abctest:123", typeof(FakeCommand));
- peerDescriptor.TimestampUtc = SystemDateTime.UtcNow.AddSeconds(-30);
- _repositoryMock.Setup(x => x.Get(peerDescriptor.Peer.Id)).Returns(peerDescriptor);
- var command = new UnregisterPeerCommand(peerDescriptor.Peer, SystemDateTime.UtcNow.AddSeconds(-2));
- _handler.Handle(command);
- _bus.ExpectExactly(new PeerStopped(peerDescriptor.Peer, command.TimestampUtc));
- }
- [Test]
- public void should_publish_decommissioned_event_when_unregistering_a_transient_client()
- {
- var peerDescriptor = TestData.TransientPeerDescriptor("tcp://abctest:123", typeof(FakeCommand));
- _repositoryMock.Setup(x => x.Get(peerDescriptor.Peer.Id)).Returns(peerDescriptor);
- _handler.Handle(new UnregisterPeerCommand(peerDescriptor.Peer));
- _bus.ExpectExactly(new PeerDecommissioned(peerDescriptor.Peer.Id));
- }
- [Test]
- public void should_publish_peer_decommissioned()
- {
- var peerId = new PeerId("Abc.Testing.0");
- _handler.Handle(new DecommissionPeerCommand(peerId));
- _bus.ExpectExactly(new PeerDecommissioned(peerId));
- }
- [Test]
- public void should_remove_peer_from_repository()
- {
- var peerId = new PeerId("Abc.Testing.0");
- _handler.Handle(new DecommissionPeerCommand(peerId));
- _repositoryMock.Verify(x => x.RemovePeer(peerId));
- }
- [Test]
- public void should_update_peer_handled_message_and_publish_event()
- {
- var originalPeerDescriptor = TestData.PersistentPeerDescriptor("tcp://abctest:123", typeof(FakeCommand));
- _repositoryMock.Setup(x => x.Get(originalPeerDescriptor.Peer.Id)).Returns(originalPeerDescriptor);
- PeerDescriptor updatedPeerDescriptor = null;
- _repositoryMock.Setup(x => x.AddOrUpdatePeer(It.IsAny<PeerDescriptor>())).Callback<PeerDescriptor>(peer => updatedPeerDescriptor = peer);
- var newSubscriptions = new[] { new Subscription(new MessageTypeId("Another.Handled.Type")) };
- _handler.Handle(new UpdatePeerSubscriptionsCommand(originalPeerDescriptor.Peer.Id, newSubscriptions, DateTime.UtcNow));
- updatedPeerDescriptor.Subscriptions.ShouldBeEquivalentTo(newSubscriptions);
- var handledMessageUpdateds = _bus.Messages.OfType<PeerSubscriptionsUpdated>().ToList();
- handledMessageUpdateds.Count.ShouldEqual(1);
- handledMessageUpdateds.Single().PeerDescriptor.ShouldHaveSamePropertiesAs(updatedPeerDescriptor);
- }
- [Test]
- public void should_update_peer_subscriptions_by_types_and_publish_event()
- {
- var peerDescriptor = TestData.PersistentPeerDescriptor("tcp://abctest:123", typeof(FakeCommand));
- var subscriptionsForTypes = new[]
- {
- new SubscriptionsForType(MessageUtil.GetTypeId(typeof(int)), BindingKey.Empty),
- new SubscriptionsForType(MessageUtil.GetTypeId(typeof(double)), new BindingKey("bla"))
- };
- var now = DateTime.UtcNow;
- _handler.Handle(new UpdatePeerSubscriptionsForTypesCommand(peerDescriptor.PeerId, now, subscriptionsForTypes));
- _repositoryMock.Verify(repo => repo.AddDynamicSubscriptionsForTypes(peerDescriptor.PeerId, now, subscriptionsForTypes));
- _bus.ExpectExactly(new PeerSubscriptionsForTypesUpdated(peerDescriptor.PeerId, now, subscriptionsForTypes));
- }
- [Test]
- public void should_handle_null_timestamp_when_removing_all_subscriptions_for_a_peer()
- {
- var peerDescriptor = TestData.PersistentPeerDescriptor("tcp://abctest:123", typeof(FakeCommand));
- peerDescriptor.TimestampUtc = null;
- var registerCommand = new RegisterPeerCommand(peerDescriptor);
- Assert.That(() => _handler.Handle(registerCommand), Throws.InstanceOf<InvalidOperationException>().With.Message.EqualTo("The TimestampUtc must be provided when registering"));
- }
- [Test]
- public void should_specify_datetime_kind_when_adding_subscriptions_for_a_type()
- {
- var peerDescriptor = TestData.PersistentPeerDescriptor("tcp://abctest:123", typeof(FakeCommand));
- var subscriptionsForTypes = new[] { new SubscriptionsForType(MessageUtil.GetTypeId(typeof(int)), BindingKey.Empty) };
- var unspecifiedNow = DateTime.SpecifyKind(DateTime.UtcNow, DateTimeKind.Unspecified);
- _handler.Handle(new UpdatePeerSubscriptionsForTypesCommand(peerDescriptor.PeerId, unspecifiedNow, subscriptionsForTypes));
- _repositoryMock.Verify(repo => repo.AddDynamicSubscriptionsForTypes(peerDescriptor.PeerId, It.Is<DateTime>(date => date.Kind == DateTimeKind.Utc), subscriptionsForTypes));
- }
- [Test]
- public void should_specify_datetime_kind_when_removing_subscriptions_for_a_type()
- {
- var peerDescriptor = TestData.PersistentPeerDescriptor("tcp://abctest:123", typeof(FakeCommand));
- var subscriptionsForTypes = new[] { new SubscriptionsForType(MessageUtil.GetTypeId(typeof(int)), new BindingKey[0]) };
- var unspecifiedNow = DateTime.SpecifyKind(DateTime.UtcNow, DateTimeKind.Unspecified);
- _handler.Handle(new UpdatePeerSubscriptionsForTypesCommand(peerDescriptor.PeerId, unspecifiedNow, subscriptionsForTypes));
- _repositoryMock.Verify(repo => repo.RemoveDynamicSubscriptionsForTypes(peerDescriptor.PeerId, It.Is<DateTime>(date => date.Kind == DateTimeKind.Utc), new[] { MessageUtil.GetTypeId(typeof(int)) }));
- }
- [Test]
- public void should_handle_null_bindingkeys_array_when_removing_subscriptions()
- {
- var peerDescriptor = TestData.PersistentPeerDescriptor("tcp://abctest:123", typeof(FakeCommand));
- var subscriptionsForTypes = new[] { new SubscriptionsForType(MessageUtil.GetTypeId(typeof(int)), null) };
- var now = DateTime.UtcNow;
- _handler.Handle(new UpdatePeerSubscriptionsForTypesCommand(peerDescriptor.PeerId, now, subscriptionsForTypes));
- _repositoryMock.Verify(repo => repo.RemoveDynamicSubscriptionsForTypes(peerDescriptor.PeerId, now, new[] { MessageUtil.GetTypeId(typeof(int)) }));
- }
- [Test]
- public void should_handle_null_subscriptionsByType_array()
- {
- var peerDescriptor = TestData.PersistentPeerDescriptor("tcp://abctest:123", typeof(FakeCommand));
- var now = DateTime.UtcNow;
- Assert.That(() => _handler.Handle(new UpdatePeerSubscriptionsForTypesCommand(peerDescriptor.PeerId, now, null)),
- Throws.Nothing);
- _bus.ExpectNothing();
- }
- [Test]
- public void should_remove_peer_subscriptions_for_a_type_if_there_are_no_binding_keys()
- {
- var now = DateTime.UtcNow;
- var peerDescriptor = TestData.PersistentPeerDescriptor("tcp://abctest:123", typeof(FakeCommand));
- SubscriptionsForType[] addedSubscriptions = null;
- MessageTypeId[] removedMessageTypeIds = null;
- _repositoryMock.Setup(repo => repo.AddDynamicSubscriptionsForTypes(peerDescriptor.PeerId, now, It.IsAny<SubscriptionsForType[]>()))
- .Callback((PeerId peerId, DateTime timestampUtc, SubscriptionsForType[] subs) => addedSubscriptions = subs);
- _repositoryMock.Setup(repo => repo.RemoveDynamicSubscriptionsForTypes(peerDescriptor.PeerId, now, It.IsAny<MessageTypeId[]>()))
- .Callback((PeerId peerId, DateTime timestampUtc, MessageTypeId[] ids) => removedMessageTypeIds = ids);
- var subscriptionsForTypes = new[]
- {
- new SubscriptionsForType(MessageUtil.GetTypeId(typeof(int))),
- new SubscriptionsForType(MessageUtil.GetTypeId(typeof(double)), BindingKey.Empty)
- };
- _handler.Handle(new UpdatePeerSubscriptionsForTypesCommand(peerDescriptor.PeerId, now, subscriptionsForTypes));
- var addedSubscription = addedSubscriptions.ExpectedSingle();
- addedSubscription.ShouldHaveSamePropertiesAs(new SubscriptionsForType(MessageUtil.GetTypeId(typeof(double)), BindingKey.Empty));
- var removedMessageTypeId = removedMessageTypeIds.ExpectedSingle();
- removedMessageTypeId.ShouldHaveSamePropertiesAs(MessageUtil.GetTypeId(typeof(int)));
- _bus.ExpectExactly(new PeerSubscriptionsForTypesUpdated(peerDescriptor.PeerId, now, subscriptionsForTypes));
- }
- [Test]
- public void should_throw_an_explicit_exception_when_updating_the_subscriptions_of_a_decommissioned_peer()
- {
- Assert.That(() => _handler.Handle(new UpdatePeerSubscriptionsCommand(new PeerId("Abc.NonExistingPeer.0"), new Subscription[0], DateTime.UtcNow)),
- Throws.InstanceOf<InvalidOperationException>().With.Property("Message").EqualTo("The specified Peer (Abc.NonExistingPeer.0) does not exist."));
- }
- [Test]
- public void should_ignore_old_peer_updates()
- {
- var originalPeerDescriptor = TestData.PersistentPeerDescriptor("tcp://abctest:123", typeof(FakeCommand));
- originalPeerDescriptor.TimestampUtc = SystemDateTime.UtcNow.AddMinutes(1);
- _repositoryMock.Setup(x => x.Get(originalPeerDescriptor.Peer.Id)).Returns(originalPeerDescriptor);
- var newSubscriptions = new[] { new Subscription(new MessageTypeId("Another.Handled.Type")) };
- _handler.Handle(new UpdatePeerSubscriptionsCommand(originalPeerDescriptor.Peer.Id, newSubscriptions, DateTime.UtcNow));
- _repositoryMock.Verify(x => x.AddOrUpdatePeer(It.IsAny<PeerDescriptor>()), Times.Never());
- _bus.ExpectNothing();
- }
- [Test]
- public void should_not_unregister_a_peer_that_started_after_timestamp()
- {
- var peerDescriptor = TestData.PersistentPeerDescriptor("tcp://abctest:123", typeof(FakeCommand));
- _repositoryMock.Setup(x => x.Get(peerDescriptor.PeerId)).Returns(peerDescriptor);
- var command = new UnregisterPeerCommand(peerDescriptor.Peer, peerDescriptor.TimestampUtc.Value.AddSeconds(-2));
- _handler.Handle(command);
- _repositoryMock.Verify(x => x.AddOrUpdatePeer(It.IsAny<PeerDescriptor>()), Times.Never());
- _bus.ExpectNothing();
- }
- [Test]
- public void should_report_registration_speed()
- {
- var peerDescriptor = TestData.PersistentPeerDescriptor("tcp://abctest:123", typeof(FakeCommand));
- var registerCommand = new RegisterPeerCommand(peerDescriptor);
- _handler.Handle(registerCommand);
- _speedReporter.Verify(x => x.ReportRegistrationDuration(It.Is<TimeSpan>(t => t < 1.Second())));
- }
- [Test]
- public void should_report_unregistration_speed()
- {
- var peerDescriptor = TestData.TransientPeerDescriptor("tcp://abctest:123", typeof(FakeCommand));
- _repositoryMock.Setup(x => x.Get(peerDescriptor.Peer.Id)).Returns(peerDescriptor);
- _handler.Handle(new UnregisterPeerCommand(peerDescriptor.Peer));
- _speedReporter.Verify(x => x.ReportUnregistrationDuration(It.Is<TimeSpan>(t => t < 1.Second())));
- }
- [Test]
- public void should_report_subscription_update_speed()
- {
- var originalPeerDescriptor = TestData.PersistentPeerDescriptor("tcp://abctest:123", typeof(FakeCommand));
- _repositoryMock.Setup(x => x.Get(originalPeerDescriptor.Peer.Id)).Returns(originalPeerDescriptor);
- PeerDescriptor updatedPeerDescriptor = null;
- _repositoryMock.Setup(x => x.AddOrUpdatePeer(It.IsAny<PeerDescriptor>())).Callback<PeerDescriptor>(peer => updatedPeerDescriptor = peer);
- var newSubscriptions = new[] { new Subscription(new MessageTypeId("Another.Handled.Type")) };
- _handler.Handle(new UpdatePeerSubscriptionsCommand(originalPeerDescriptor.Peer.Id, newSubscriptions, DateTime.UtcNow));
- _speedReporter.Verify(x => x.ReportSubscriptionUpdateDuration(It.Is<TimeSpan>(t => t < 1.Second())));
- }
- [Test]
- public void should_report_subscription_update_for_types_speed()
- {
- var peerDescriptor = TestData.PersistentPeerDescriptor("tcp://abctest:123", typeof(FakeCommand));
- var subscriptionsForTypes = new[]
- {
- new SubscriptionsForType(MessageUtil.GetTypeId(typeof(int)), BindingKey.Empty),
- new SubscriptionsForType(MessageUtil.GetTypeId(typeof(double)), new BindingKey("bla"))
- };
- var now = DateTime.UtcNow;
- _handler.Handle(new UpdatePeerSubscriptionsForTypesCommand(peerDescriptor.PeerId, now, subscriptionsForTypes));
- _speedReporter.Verify(x => x.ReportSubscriptionUpdateForTypesDuration(It.Is<TimeSpan>(t => t < 1.Second())));
- }
- [Test]
- public void should_persist_not_responding_peer()
- {
- var peerDescriptor = TestData.PersistentPeerDescriptor("tcp://abctest:123");
- _repositoryMock.Setup(x => x.Get(peerDescriptor.PeerId)).Returns(peerDescriptor);
- _handler.Handle(new MarkPeerAsNotRespondingCommand(peerDescriptor.PeerId, default));
- var expectedTimestampUtc = peerDescriptor.TimestampUtc.Value.AddMilliseconds(1);
- _repositoryMock.Verify(x => x.SetPeerResponding(peerDescriptor.PeerId, false, expectedTimestampUtc));
- _bus.ExpectExactly(new PeerNotResponding(peerDescriptor.PeerId, expectedTimestampUtc));
- }
- [Test]
- public void should_persist_not_responding_peer_with_timestamp()
- {
- var peerDescriptor = TestData.PersistentPeerDescriptor("tcp://abctest:123");
- _repositoryMock.Setup(x => x.Get(peerDescriptor.PeerId)).Returns(peerDescriptor);
- var timestampUtc = peerDescriptor.TimestampUtc.Value.AddSeconds(1);
- _handler.Handle(new MarkPeerAsNotRespondingInternalCommand(peerDescriptor.PeerId, timestampUtc));
- _repositoryMock.Verify(x => x.SetPeerResponding(peerDescriptor.PeerId, false, timestampUtc));
- _bus.ExpectExactly(new PeerNotResponding(peerDescriptor.PeerId, timestampUtc));
- }
- [Test]
- public void should_persist_responding_peer()
- {
- var peerDescriptor = TestData.PersistentPeerDescriptor("tcp://abctest:123", typeof(FakeCommand));
- _repositoryMock.Setup(x => x.Get(peerDescriptor.PeerId)).Returns(peerDescriptor);
- _handler.Handle(new MarkPeerAsRespondingCommand(peerDescriptor.PeerId, default));
- var expectedTimestampUtc = peerDescriptor.TimestampUtc.Value.AddMilliseconds(1);
- _repositoryMock.Verify(x => x.SetPeerResponding(peerDescriptor.PeerId, true, expectedTimestampUtc));
- _bus.ExpectExactly(new PeerResponding(peerDescriptor.PeerId, expectedTimestampUtc));
- }
- [Test]
- public void should_persist_responding_peer_with_timestamp()
- {
- var peerDescriptor = TestData.PersistentPeerDescriptor("tcp://abctest:123", typeof(FakeCommand));
- _repositoryMock.Setup(x => x.Get(peerDescriptor.PeerId)).Returns(peerDescriptor);
- var timestampUtc = peerDescriptor.TimestampUtc.Value.AddSeconds(1);
- _handler.Handle(new MarkPeerAsRespondingInternalCommand(peerDescriptor.PeerId, timestampUtc));
- _repositoryMock.Verify(x => x.SetPeerResponding(peerDescriptor.PeerId, true, timestampUtc));
- _bus.ExpectExactly(new PeerResponding(peerDescriptor.PeerId, timestampUtc));
- }
- [Test]
- public void should_not_revive_decommissioned_peer()
- {
- var peerDescriptor = TestData.PersistentPeerDescriptor("tcp://abctest:123", typeof(FakeCommand));
- _repositoryMock.Setup(x => x.Get(peerDescriptor.PeerId)).Returns((PeerDescriptor)null);
- var timestampUtc = peerDescriptor.TimestampUtc.Value.AddSeconds(-1);
- _handler.Handle(new MarkPeerAsNotRespondingCommand(peerDescriptor.PeerId, timestampUtc));
- _repositoryMock.Verify(repo => repo.SetPeerResponding(It.IsAny<PeerId>(), It.IsAny<bool>(), It.IsAny<DateTime>()), Times.Never());
- }
- }
- }
|