DirectoryCommandsHandlerTests.cs 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541
  1. using System;
  2. using System.Linq;
  3. using Abc.Zebus.Directory.Configuration;
  4. using Abc.Zebus.Directory.Handlers;
  5. using Abc.Zebus.Directory.Messages;
  6. using Abc.Zebus.Directory.Storage;
  7. using Abc.Zebus.Routing;
  8. using Abc.Zebus.Testing;
  9. using Abc.Zebus.Testing.Extensions;
  10. using Abc.Zebus.Transport;
  11. using Abc.Zebus.Util;
  12. using Moq;
  13. using NUnit.Framework;
  14. namespace Abc.Zebus.Directory.Tests.Handlers
  15. {
  16. [TestFixture]
  17. public class DirectoryCommandsHandlerTests
  18. {
  19. private readonly Peer _sender = new Peer(new PeerId("Abc.Sender.0"), "tcp://sender:123");
  20. private IDisposable _contextScope;
  21. private TestBus _bus;
  22. private Mock<IPeerRepository> _repositoryMock;
  23. private Mock<IDirectoryConfiguration> _configurationMock;
  24. private DirectoryCommandsHandler _handler;
  25. private Mock<IDirectorySpeedReporter> _speedReporter;
  26. [SetUp]
  27. public void Setup()
  28. {
  29. _contextScope = MessageContext.SetCurrent(MessageContext.CreateTest());
  30. _configurationMock = new Mock<IDirectoryConfiguration>();
  31. _configurationMock.SetupGet(conf => conf.BlacklistedMachines).Returns(new[] { "ANOTHER_BLACKLISTEDMACHINE", "BLACKlistedMACHINE" });
  32. _repositoryMock = new Mock<IPeerRepository>();
  33. _bus = new TestBus();
  34. _speedReporter = new Mock<IDirectorySpeedReporter>();
  35. _handler = new DirectoryCommandsHandler(_bus, _repositoryMock.Object, _configurationMock.Object, _speedReporter.Object) { Context = MessageContext.CreateOverride(_sender.Id, _sender.EndPoint) };
  36. }
  37. [TearDown]
  38. public virtual void Teardown()
  39. {
  40. _contextScope.Dispose();
  41. }
  42. [Test]
  43. public void should_add_peer_to_repository()
  44. {
  45. var peerDescriptor = TestData.PersistentPeerDescriptor("tcp://abctest:123", typeof(FakeCommand));
  46. var registerCommand = new RegisterPeerCommand(peerDescriptor);
  47. _handler.Handle(registerCommand);
  48. _repositoryMock.Verify(x => x.AddOrUpdatePeer(registerCommand.Peer));
  49. }
  50. [Test]
  51. public void should_set_registering_peer_up_and_responding()
  52. {
  53. var peerDescriptor = TestData.PersistentPeerDescriptor("tcp://abctest:123", typeof(FakeCommand));
  54. peerDescriptor.Peer.IsUp = false;
  55. peerDescriptor.Peer.IsResponding = false;
  56. var registerCommand = new RegisterPeerCommand(peerDescriptor);
  57. _handler.Handle(registerCommand);
  58. _repositoryMock.Verify(x => x.AddOrUpdatePeer(It.Is<PeerDescriptor>(p => p.Peer.IsUp && p.Peer.IsResponding)));
  59. }
  60. [Test]
  61. public void should_remove_existing_dynamic_subscriptions_on_register()
  62. {
  63. var peerDescriptor = TestData.PersistentPeerDescriptor("tcp://abctest:123", typeof(FakeCommand));
  64. var registerCommand = new RegisterPeerCommand(peerDescriptor);
  65. _handler.Handle(registerCommand);
  66. _repositoryMock.Verify(x => x.AddOrUpdatePeer(registerCommand.Peer));
  67. _repositoryMock.Verify(x => x.RemoveAllDynamicSubscriptionsForPeer(registerCommand.Peer.PeerId, It.Is<DateTime>(d => d == registerCommand.Peer.TimestampUtc.Value)));
  68. }
  69. [Test]
  70. public void should_specify_datetime_kind_when_removing_all_subscriptions_for_a_peer_during_register()
  71. {
  72. var peerDescriptor = TestData.PersistentPeerDescriptor("tcp://abctest:123", typeof(FakeCommand));
  73. peerDescriptor.TimestampUtc = new DateTime(DateTime.Now.Ticks, DateTimeKind.Unspecified);
  74. var registerCommand = new RegisterPeerCommand(peerDescriptor);
  75. _handler.Handle(registerCommand);
  76. _repositoryMock.Verify(x => x.AddOrUpdatePeer(registerCommand.Peer));
  77. _repositoryMock.Verify(x => x.RemoveAllDynamicSubscriptionsForPeer(registerCommand.Peer.PeerId, It.Is<DateTime>(d => d.Kind == DateTimeKind.Utc)));
  78. }
  79. [Test]
  80. public void should_reply_with_registred_peers()
  81. {
  82. var registredPeerDescriptor = TestData.PersistentPeerDescriptor("tcp://abctest:456", typeof(FakeCommand));
  83. _repositoryMock.Setup(x => x.GetPeers(It.Is<bool>(loadDynamicSubs => loadDynamicSubs))).Returns(new[] { registredPeerDescriptor });
  84. var command = new RegisterPeerCommand(TestData.PersistentPeerDescriptor("tcp://abctest:123", typeof(FakeCommand)));
  85. _handler.Handle(command);
  86. var response = (RegisterPeerResponse)_bus.LastReplyResponse;
  87. response.PeerDescriptors.Single().ShouldHaveSamePropertiesAs(registredPeerDescriptor);
  88. }
  89. [Test]
  90. public void should_publish_started_event()
  91. {
  92. var peerDescriptor = TestData.PersistentPeerDescriptor("tcp://abctest:123", typeof(FakeCommand));
  93. var command = new RegisterPeerCommand(peerDescriptor);
  94. _handler.Handle(command);
  95. _bus.ExpectExactly(new PeerStarted(peerDescriptor));
  96. }
  97. [Test]
  98. public void should_throw_if_a_blacklisted_peer_tries_to_register()
  99. {
  100. var blacklistedPeer = TestData.PersistentPeerDescriptor("tcp://blacklistedpeer:123", typeof(FakeCommand));
  101. var registerCommand = new RegisterPeerCommand(blacklistedPeer);
  102. _handler.Context = MessageContext.CreateTest(new OriginatorInfo(blacklistedPeer.Peer.Id, blacklistedPeer.Peer.EndPoint, "BLACKLISTEDMACHINE", "initiator"));
  103. var exception = typeof(InvalidOperationException).ShouldBeThrownBy(() => _handler.Handle(registerCommand));
  104. exception.Message.ShouldEqual("Peer Abc.Testing.0 on host BLACKLISTEDMACHINE is not allowed to register on this directory");
  105. }
  106. [Test]
  107. public void should_not_throw_if_a_client_with_a_synchronised_clock_tries_to_register()
  108. {
  109. var peerDescriptor = TestData.PersistentPeerDescriptor("tcp://abctest:123", typeof(FakeCommand));
  110. var registerCommand = new RegisterPeerCommand(peerDescriptor);
  111. registerCommand.Peer.TimestampUtc = SystemDateTime.UtcNow + 14.Minutes();
  112. _configurationMock.SetupGet(x => x.MaxAllowedClockDifferenceWhenRegistering).Returns(15.Minutes());
  113. Assert.DoesNotThrow(() => _handler.Handle(registerCommand));
  114. }
  115. [Test]
  116. public void should_throw_if_a_client_with_a_more_recent_clock_tries_to_register()
  117. {
  118. var peerDescriptor = TestData.PersistentPeerDescriptor("tcp://abctest:123", typeof(FakeCommand));
  119. var registerCommand = new RegisterPeerCommand(peerDescriptor);
  120. registerCommand.Peer.TimestampUtc = SystemDateTime.UtcNow + 1.Hour();
  121. _configurationMock.SetupGet(x => x.MaxAllowedClockDifferenceWhenRegistering).Returns(15.Minutes());
  122. var exception = Assert.Throws<InvalidOperationException>(() => _handler.Handle(registerCommand));
  123. exception.Message.ShouldContain("is too far ahead of the the server's current time");
  124. }
  125. [Test]
  126. public void should_throw_if_an_existing_peer_tries_to_register()
  127. {
  128. var existingPeer = TestData.PersistentPeerDescriptor("tcp://existingpeer:123", typeof(FakeCommand));
  129. _repositoryMock.Setup(x => x.Get(existingPeer.PeerId)).Returns(existingPeer);
  130. var newPeer = TestData.PersistentPeerDescriptor("tcp://newpeer:123", typeof(FakeCommand));
  131. var command = new RegisterPeerCommand(newPeer);
  132. var exception = (MessageProcessingException)typeof(MessageProcessingException).ShouldBeThrownBy(() => _handler.Handle(command));
  133. exception.ErrorCode.ShouldEqual(DirectoryErrorCodes.PeerAlreadyExists);
  134. }
  135. [Test]
  136. public void should_not_throw_if_a_not_responding_peer_already_exists()
  137. {
  138. var existingPeer = TestData.PersistentPeerDescriptor("tcp://existingpeer:123", typeof(FakeCommand));
  139. existingPeer.Peer.IsResponding = false;
  140. _repositoryMock.Setup(x => x.GetPeers(It.IsAny<bool>())).Returns(new[] { existingPeer });
  141. var newPeer = TestData.PersistentPeerDescriptor("tcp://newpeer:123", typeof(FakeCommand));
  142. var command = new RegisterPeerCommand(newPeer);
  143. _handler.Handle(command);
  144. _repositoryMock.Verify(x => x.AddOrUpdatePeer(newPeer));
  145. }
  146. [Test]
  147. public void should_not_throw_if_an_existing_peer_is_on_the_same_host()
  148. {
  149. var existingPeer = TestData.PersistentPeerDescriptor("tcp://existingpeer:123", typeof(FakeCommand));
  150. existingPeer.Peer.IsResponding = false;
  151. _repositoryMock.Setup(x => x.GetPeers(It.IsAny<bool>())).Returns(new[] { existingPeer });
  152. var newPeer = TestData.PersistentPeerDescriptor("tcp://existingpeer:123", typeof(FakeCommand));
  153. var command = new RegisterPeerCommand(newPeer);
  154. _handler.Handle(command);
  155. _repositoryMock.Verify(x => x.AddOrUpdatePeer(newPeer));
  156. }
  157. [Test]
  158. public void should_unregister_persistent_peer_when_unregistering()
  159. {
  160. var peerDescriptor = TestData.PersistentPeerDescriptor("tcp://abctest:123", typeof(FakeCommand));
  161. peerDescriptor.TimestampUtc = SystemDateTime.UtcNow.AddSeconds(-30);
  162. _repositoryMock.Setup(x => x.Get(peerDescriptor.Peer.Id)).Returns(peerDescriptor);
  163. var command = new UnregisterPeerCommand(peerDescriptor.Peer);
  164. _handler.Handle(command);
  165. _repositoryMock.Verify(x => x.AddOrUpdatePeer(It.Is<PeerDescriptor>(peer => peer.Peer.Id == peerDescriptor.Peer.Id && peer.Peer.IsUp == false && peer.TimestampUtc == command.TimestampUtc)));
  166. }
  167. [Test]
  168. public void should_remove_transient_peer_when_unregistering()
  169. {
  170. var peerDescriptor = TestData.TransientPeerDescriptor("tcp://abctest:123", typeof(FakeCommand));
  171. _repositoryMock.Setup(x => x.Get(peerDescriptor.Peer.Id)).Returns(peerDescriptor);
  172. _handler.Handle(new UnregisterPeerCommand(peerDescriptor.Peer));
  173. _repositoryMock.Verify(x => x.RemovePeer(It.Is<PeerId>(peerId => peerId == peerDescriptor.Peer.Id)));
  174. }
  175. [Test]
  176. public void should_publish_stopped_event_when_unregistering_a_persistent_client()
  177. {
  178. var peerDescriptor = TestData.PersistentPeerDescriptor("tcp://abctest:123", typeof(FakeCommand));
  179. peerDescriptor.TimestampUtc = SystemDateTime.UtcNow.AddSeconds(-30);
  180. _repositoryMock.Setup(x => x.Get(peerDescriptor.Peer.Id)).Returns(peerDescriptor);
  181. var command = new UnregisterPeerCommand(peerDescriptor.Peer, SystemDateTime.UtcNow.AddSeconds(-2));
  182. _handler.Handle(command);
  183. _bus.ExpectExactly(new PeerStopped(peerDescriptor.Peer, command.TimestampUtc));
  184. }
  185. [Test]
  186. public void should_publish_decommissioned_event_when_unregistering_a_transient_client()
  187. {
  188. var peerDescriptor = TestData.TransientPeerDescriptor("tcp://abctest:123", typeof(FakeCommand));
  189. _repositoryMock.Setup(x => x.Get(peerDescriptor.Peer.Id)).Returns(peerDescriptor);
  190. _handler.Handle(new UnregisterPeerCommand(peerDescriptor.Peer));
  191. _bus.ExpectExactly(new PeerDecommissioned(peerDescriptor.Peer.Id));
  192. }
  193. [Test]
  194. public void should_publish_peer_decommissioned()
  195. {
  196. var peerId = new PeerId("Abc.Testing.0");
  197. _handler.Handle(new DecommissionPeerCommand(peerId));
  198. _bus.ExpectExactly(new PeerDecommissioned(peerId));
  199. }
  200. [Test]
  201. public void should_remove_peer_from_repository()
  202. {
  203. var peerId = new PeerId("Abc.Testing.0");
  204. _handler.Handle(new DecommissionPeerCommand(peerId));
  205. _repositoryMock.Verify(x => x.RemovePeer(peerId));
  206. }
  207. [Test]
  208. public void should_update_peer_handled_message_and_publish_event()
  209. {
  210. var originalPeerDescriptor = TestData.PersistentPeerDescriptor("tcp://abctest:123", typeof(FakeCommand));
  211. _repositoryMock.Setup(x => x.Get(originalPeerDescriptor.Peer.Id)).Returns(originalPeerDescriptor);
  212. PeerDescriptor updatedPeerDescriptor = null;
  213. _repositoryMock.Setup(x => x.AddOrUpdatePeer(It.IsAny<PeerDescriptor>())).Callback<PeerDescriptor>(peer => updatedPeerDescriptor = peer);
  214. var newSubscriptions = new[] { new Subscription(new MessageTypeId("Another.Handled.Type")) };
  215. _handler.Handle(new UpdatePeerSubscriptionsCommand(originalPeerDescriptor.Peer.Id, newSubscriptions, DateTime.UtcNow));
  216. updatedPeerDescriptor.Subscriptions.ShouldBeEquivalentTo(newSubscriptions);
  217. var handledMessageUpdateds = _bus.Messages.OfType<PeerSubscriptionsUpdated>().ToList();
  218. handledMessageUpdateds.Count.ShouldEqual(1);
  219. handledMessageUpdateds.Single().PeerDescriptor.ShouldHaveSamePropertiesAs(updatedPeerDescriptor);
  220. }
  221. [Test]
  222. public void should_update_peer_subscriptions_by_types_and_publish_event()
  223. {
  224. var peerDescriptor = TestData.PersistentPeerDescriptor("tcp://abctest:123", typeof(FakeCommand));
  225. var subscriptionsForTypes = new[]
  226. {
  227. new SubscriptionsForType(MessageUtil.GetTypeId(typeof(int)), BindingKey.Empty),
  228. new SubscriptionsForType(MessageUtil.GetTypeId(typeof(double)), new BindingKey("bla"))
  229. };
  230. var now = DateTime.UtcNow;
  231. _handler.Handle(new UpdatePeerSubscriptionsForTypesCommand(peerDescriptor.PeerId, now, subscriptionsForTypes));
  232. _repositoryMock.Verify(repo => repo.AddDynamicSubscriptionsForTypes(peerDescriptor.PeerId, now, subscriptionsForTypes));
  233. _bus.ExpectExactly(new PeerSubscriptionsForTypesUpdated(peerDescriptor.PeerId, now, subscriptionsForTypes));
  234. }
  235. [Test]
  236. public void should_handle_null_timestamp_when_removing_all_subscriptions_for_a_peer()
  237. {
  238. var peerDescriptor = TestData.PersistentPeerDescriptor("tcp://abctest:123", typeof(FakeCommand));
  239. peerDescriptor.TimestampUtc = null;
  240. var registerCommand = new RegisterPeerCommand(peerDescriptor);
  241. Assert.That(() => _handler.Handle(registerCommand), Throws.InstanceOf<InvalidOperationException>().With.Message.EqualTo("The TimestampUtc must be provided when registering"));
  242. }
  243. [Test]
  244. public void should_specify_datetime_kind_when_adding_subscriptions_for_a_type()
  245. {
  246. var peerDescriptor = TestData.PersistentPeerDescriptor("tcp://abctest:123", typeof(FakeCommand));
  247. var subscriptionsForTypes = new[] { new SubscriptionsForType(MessageUtil.GetTypeId(typeof(int)), BindingKey.Empty) };
  248. var unspecifiedNow = DateTime.SpecifyKind(DateTime.UtcNow, DateTimeKind.Unspecified);
  249. _handler.Handle(new UpdatePeerSubscriptionsForTypesCommand(peerDescriptor.PeerId, unspecifiedNow, subscriptionsForTypes));
  250. _repositoryMock.Verify(repo => repo.AddDynamicSubscriptionsForTypes(peerDescriptor.PeerId, It.Is<DateTime>(date => date.Kind == DateTimeKind.Utc), subscriptionsForTypes));
  251. }
  252. [Test]
  253. public void should_specify_datetime_kind_when_removing_subscriptions_for_a_type()
  254. {
  255. var peerDescriptor = TestData.PersistentPeerDescriptor("tcp://abctest:123", typeof(FakeCommand));
  256. var subscriptionsForTypes = new[] { new SubscriptionsForType(MessageUtil.GetTypeId(typeof(int)), new BindingKey[0]) };
  257. var unspecifiedNow = DateTime.SpecifyKind(DateTime.UtcNow, DateTimeKind.Unspecified);
  258. _handler.Handle(new UpdatePeerSubscriptionsForTypesCommand(peerDescriptor.PeerId, unspecifiedNow, subscriptionsForTypes));
  259. _repositoryMock.Verify(repo => repo.RemoveDynamicSubscriptionsForTypes(peerDescriptor.PeerId, It.Is<DateTime>(date => date.Kind == DateTimeKind.Utc), new[] { MessageUtil.GetTypeId(typeof(int)) }));
  260. }
  261. [Test]
  262. public void should_handle_null_bindingkeys_array_when_removing_subscriptions()
  263. {
  264. var peerDescriptor = TestData.PersistentPeerDescriptor("tcp://abctest:123", typeof(FakeCommand));
  265. var subscriptionsForTypes = new[] { new SubscriptionsForType(MessageUtil.GetTypeId(typeof(int)), null) };
  266. var now = DateTime.UtcNow;
  267. _handler.Handle(new UpdatePeerSubscriptionsForTypesCommand(peerDescriptor.PeerId, now, subscriptionsForTypes));
  268. _repositoryMock.Verify(repo => repo.RemoveDynamicSubscriptionsForTypes(peerDescriptor.PeerId, now, new[] { MessageUtil.GetTypeId(typeof(int)) }));
  269. }
  270. [Test]
  271. public void should_handle_null_subscriptionsByType_array()
  272. {
  273. var peerDescriptor = TestData.PersistentPeerDescriptor("tcp://abctest:123", typeof(FakeCommand));
  274. var now = DateTime.UtcNow;
  275. Assert.That(() => _handler.Handle(new UpdatePeerSubscriptionsForTypesCommand(peerDescriptor.PeerId, now, null)),
  276. Throws.Nothing);
  277. _bus.ExpectNothing();
  278. }
  279. [Test]
  280. public void should_remove_peer_subscriptions_for_a_type_if_there_are_no_binding_keys()
  281. {
  282. var now = DateTime.UtcNow;
  283. var peerDescriptor = TestData.PersistentPeerDescriptor("tcp://abctest:123", typeof(FakeCommand));
  284. SubscriptionsForType[] addedSubscriptions = null;
  285. MessageTypeId[] removedMessageTypeIds = null;
  286. _repositoryMock.Setup(repo => repo.AddDynamicSubscriptionsForTypes(peerDescriptor.PeerId, now, It.IsAny<SubscriptionsForType[]>()))
  287. .Callback((PeerId peerId, DateTime timestampUtc, SubscriptionsForType[] subs) => addedSubscriptions = subs);
  288. _repositoryMock.Setup(repo => repo.RemoveDynamicSubscriptionsForTypes(peerDescriptor.PeerId, now, It.IsAny<MessageTypeId[]>()))
  289. .Callback((PeerId peerId, DateTime timestampUtc, MessageTypeId[] ids) => removedMessageTypeIds = ids);
  290. var subscriptionsForTypes = new[]
  291. {
  292. new SubscriptionsForType(MessageUtil.GetTypeId(typeof(int))),
  293. new SubscriptionsForType(MessageUtil.GetTypeId(typeof(double)), BindingKey.Empty)
  294. };
  295. _handler.Handle(new UpdatePeerSubscriptionsForTypesCommand(peerDescriptor.PeerId, now, subscriptionsForTypes));
  296. var addedSubscription = addedSubscriptions.ExpectedSingle();
  297. addedSubscription.ShouldHaveSamePropertiesAs(new SubscriptionsForType(MessageUtil.GetTypeId(typeof(double)), BindingKey.Empty));
  298. var removedMessageTypeId = removedMessageTypeIds.ExpectedSingle();
  299. removedMessageTypeId.ShouldHaveSamePropertiesAs(MessageUtil.GetTypeId(typeof(int)));
  300. _bus.ExpectExactly(new PeerSubscriptionsForTypesUpdated(peerDescriptor.PeerId, now, subscriptionsForTypes));
  301. }
  302. [Test]
  303. public void should_throw_an_explicit_exception_when_updating_the_subscriptions_of_a_decommissioned_peer()
  304. {
  305. Assert.That(() => _handler.Handle(new UpdatePeerSubscriptionsCommand(new PeerId("Abc.NonExistingPeer.0"), new Subscription[0], DateTime.UtcNow)),
  306. Throws.InstanceOf<InvalidOperationException>().With.Property("Message").EqualTo("The specified Peer (Abc.NonExistingPeer.0) does not exist."));
  307. }
  308. [Test]
  309. public void should_ignore_old_peer_updates()
  310. {
  311. var originalPeerDescriptor = TestData.PersistentPeerDescriptor("tcp://abctest:123", typeof(FakeCommand));
  312. originalPeerDescriptor.TimestampUtc = SystemDateTime.UtcNow.AddMinutes(1);
  313. _repositoryMock.Setup(x => x.Get(originalPeerDescriptor.Peer.Id)).Returns(originalPeerDescriptor);
  314. var newSubscriptions = new[] { new Subscription(new MessageTypeId("Another.Handled.Type")) };
  315. _handler.Handle(new UpdatePeerSubscriptionsCommand(originalPeerDescriptor.Peer.Id, newSubscriptions, DateTime.UtcNow));
  316. _repositoryMock.Verify(x => x.AddOrUpdatePeer(It.IsAny<PeerDescriptor>()), Times.Never());
  317. _bus.ExpectNothing();
  318. }
  319. [Test]
  320. public void should_not_unregister_a_peer_that_started_after_timestamp()
  321. {
  322. var peerDescriptor = TestData.PersistentPeerDescriptor("tcp://abctest:123", typeof(FakeCommand));
  323. _repositoryMock.Setup(x => x.Get(peerDescriptor.PeerId)).Returns(peerDescriptor);
  324. var command = new UnregisterPeerCommand(peerDescriptor.Peer, peerDescriptor.TimestampUtc.Value.AddSeconds(-2));
  325. _handler.Handle(command);
  326. _repositoryMock.Verify(x => x.AddOrUpdatePeer(It.IsAny<PeerDescriptor>()), Times.Never());
  327. _bus.ExpectNothing();
  328. }
  329. [Test]
  330. public void should_report_registration_speed()
  331. {
  332. var peerDescriptor = TestData.PersistentPeerDescriptor("tcp://abctest:123", typeof(FakeCommand));
  333. var registerCommand = new RegisterPeerCommand(peerDescriptor);
  334. _handler.Handle(registerCommand);
  335. _speedReporter.Verify(x => x.ReportRegistrationDuration(It.Is<TimeSpan>(t => t < 1.Second())));
  336. }
  337. [Test]
  338. public void should_report_unregistration_speed()
  339. {
  340. var peerDescriptor = TestData.TransientPeerDescriptor("tcp://abctest:123", typeof(FakeCommand));
  341. _repositoryMock.Setup(x => x.Get(peerDescriptor.Peer.Id)).Returns(peerDescriptor);
  342. _handler.Handle(new UnregisterPeerCommand(peerDescriptor.Peer));
  343. _speedReporter.Verify(x => x.ReportUnregistrationDuration(It.Is<TimeSpan>(t => t < 1.Second())));
  344. }
  345. [Test]
  346. public void should_report_subscription_update_speed()
  347. {
  348. var originalPeerDescriptor = TestData.PersistentPeerDescriptor("tcp://abctest:123", typeof(FakeCommand));
  349. _repositoryMock.Setup(x => x.Get(originalPeerDescriptor.Peer.Id)).Returns(originalPeerDescriptor);
  350. PeerDescriptor updatedPeerDescriptor = null;
  351. _repositoryMock.Setup(x => x.AddOrUpdatePeer(It.IsAny<PeerDescriptor>())).Callback<PeerDescriptor>(peer => updatedPeerDescriptor = peer);
  352. var newSubscriptions = new[] { new Subscription(new MessageTypeId("Another.Handled.Type")) };
  353. _handler.Handle(new UpdatePeerSubscriptionsCommand(originalPeerDescriptor.Peer.Id, newSubscriptions, DateTime.UtcNow));
  354. _speedReporter.Verify(x => x.ReportSubscriptionUpdateDuration(It.Is<TimeSpan>(t => t < 1.Second())));
  355. }
  356. [Test]
  357. public void should_report_subscription_update_for_types_speed()
  358. {
  359. var peerDescriptor = TestData.PersistentPeerDescriptor("tcp://abctest:123", typeof(FakeCommand));
  360. var subscriptionsForTypes = new[]
  361. {
  362. new SubscriptionsForType(MessageUtil.GetTypeId(typeof(int)), BindingKey.Empty),
  363. new SubscriptionsForType(MessageUtil.GetTypeId(typeof(double)), new BindingKey("bla"))
  364. };
  365. var now = DateTime.UtcNow;
  366. _handler.Handle(new UpdatePeerSubscriptionsForTypesCommand(peerDescriptor.PeerId, now, subscriptionsForTypes));
  367. _speedReporter.Verify(x => x.ReportSubscriptionUpdateForTypesDuration(It.Is<TimeSpan>(t => t < 1.Second())));
  368. }
  369. [Test]
  370. public void should_persist_not_responding_peer()
  371. {
  372. var peerDescriptor = TestData.PersistentPeerDescriptor("tcp://abctest:123");
  373. _repositoryMock.Setup(x => x.Get(peerDescriptor.PeerId)).Returns(peerDescriptor);
  374. _handler.Handle(new MarkPeerAsNotRespondingCommand(peerDescriptor.PeerId, default));
  375. var expectedTimestampUtc = peerDescriptor.TimestampUtc.Value.AddMilliseconds(1);
  376. _repositoryMock.Verify(x => x.SetPeerResponding(peerDescriptor.PeerId, false, expectedTimestampUtc));
  377. _bus.ExpectExactly(new PeerNotResponding(peerDescriptor.PeerId, expectedTimestampUtc));
  378. }
  379. [Test]
  380. public void should_persist_not_responding_peer_with_timestamp()
  381. {
  382. var peerDescriptor = TestData.PersistentPeerDescriptor("tcp://abctest:123");
  383. _repositoryMock.Setup(x => x.Get(peerDescriptor.PeerId)).Returns(peerDescriptor);
  384. var timestampUtc = peerDescriptor.TimestampUtc.Value.AddSeconds(1);
  385. _handler.Handle(new MarkPeerAsNotRespondingInternalCommand(peerDescriptor.PeerId, timestampUtc));
  386. _repositoryMock.Verify(x => x.SetPeerResponding(peerDescriptor.PeerId, false, timestampUtc));
  387. _bus.ExpectExactly(new PeerNotResponding(peerDescriptor.PeerId, timestampUtc));
  388. }
  389. [Test]
  390. public void should_persist_responding_peer()
  391. {
  392. var peerDescriptor = TestData.PersistentPeerDescriptor("tcp://abctest:123", typeof(FakeCommand));
  393. _repositoryMock.Setup(x => x.Get(peerDescriptor.PeerId)).Returns(peerDescriptor);
  394. _handler.Handle(new MarkPeerAsRespondingCommand(peerDescriptor.PeerId, default));
  395. var expectedTimestampUtc = peerDescriptor.TimestampUtc.Value.AddMilliseconds(1);
  396. _repositoryMock.Verify(x => x.SetPeerResponding(peerDescriptor.PeerId, true, expectedTimestampUtc));
  397. _bus.ExpectExactly(new PeerResponding(peerDescriptor.PeerId, expectedTimestampUtc));
  398. }
  399. [Test]
  400. public void should_persist_responding_peer_with_timestamp()
  401. {
  402. var peerDescriptor = TestData.PersistentPeerDescriptor("tcp://abctest:123", typeof(FakeCommand));
  403. _repositoryMock.Setup(x => x.Get(peerDescriptor.PeerId)).Returns(peerDescriptor);
  404. var timestampUtc = peerDescriptor.TimestampUtc.Value.AddSeconds(1);
  405. _handler.Handle(new MarkPeerAsRespondingInternalCommand(peerDescriptor.PeerId, timestampUtc));
  406. _repositoryMock.Verify(x => x.SetPeerResponding(peerDescriptor.PeerId, true, timestampUtc));
  407. _bus.ExpectExactly(new PeerResponding(peerDescriptor.PeerId, timestampUtc));
  408. }
  409. [Test]
  410. public void should_not_revive_decommissioned_peer()
  411. {
  412. var peerDescriptor = TestData.PersistentPeerDescriptor("tcp://abctest:123", typeof(FakeCommand));
  413. _repositoryMock.Setup(x => x.Get(peerDescriptor.PeerId)).Returns((PeerDescriptor)null);
  414. var timestampUtc = peerDescriptor.TimestampUtc.Value.AddSeconds(-1);
  415. _handler.Handle(new MarkPeerAsNotRespondingCommand(peerDescriptor.PeerId, timestampUtc));
  416. _repositoryMock.Verify(repo => repo.SetPeerResponding(It.IsAny<PeerId>(), It.IsAny<bool>(), It.IsAny<DateTime>()), Times.Never());
  417. }
  418. }
  419. }