Browse Source

Add speed reporting capability to main handlers

Introduced IDirectorySpeedReporter to allow DirectoryCommandsHandler to
report method calls duration. The default NOOP implementation is
configured in the IoC to prevent any breaking change.
Kevin Lovato 9 years ago
parent
commit
f104b21ee8

+ 2 - 0
.gitignore

@@ -29,3 +29,5 @@ _ReSharper*/
 output/**
 
 src/packages/**
+*.ncrunchproject
+*.ncrunchsolution

+ 1 - 0
src/Abc.Zebus.Directory.Tests/Abc.Zebus.Directory.Tests.csproj

@@ -93,6 +93,7 @@
     <Compile Include="Configuration\ConfigurationTests.cs" />
     <Compile Include="DeadPeerDetection\DeadPeerDetectorEntryTests.cs" />
     <Compile Include="DeadPeerDetection\DeadPeerDetectorTests.cs" />
+    <Compile Include="DirectorySpeedReporterTests.cs" />
     <Compile Include="PeerDirectoryServerTests.cs" />
     <Compile Include="FakeCommand.cs" />
     <Compile Include="FakeRoutableCommand.cs" />

+ 52 - 0
src/Abc.Zebus.Directory.Tests/DirectorySpeedReporterTests.cs

@@ -0,0 +1,52 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using Abc.Zebus.Testing.Extensions;
+using Abc.Zebus.Util;
+using NUnit.Framework;
+
+namespace Abc.Zebus.Directory.Tests
+{
+    public class DirectorySpeedReporterTests
+    {
+        private DirectorySpeedReporter _speedReporter;
+
+        [SetUp]
+        public void SetUp()
+        {
+            _speedReporter = new DirectorySpeedReporter();
+        }
+
+        public readonly ReportingMethod[] ReportingMethods =
+        {
+            new ReportingMethod { Name = "Registration", EnqueueMethod = (x, value) => x.ReportRegistrationDuration(value), RetrievalMethod = x => x.GetAndResetRegistrationDurations() },
+            new ReportingMethod { Name = "Unregistration", EnqueueMethod = (x, value) => x.ReportUnregistrationDuration(value), RetrievalMethod = x => x.GetAndResetUnregistrationDurations() },
+            new ReportingMethod { Name = "Subscription update", EnqueueMethod = (x, value) => x.ReportSubscriptionUpdateDuration(value), RetrievalMethod = x => x.GetAndResetSubscriptionUpdateDurations() },
+            new ReportingMethod { Name = "Subscription update for types", EnqueueMethod = (x, value) => x.ReportSubscriptionUpdateForTypesDuration(value), RetrievalMethod = x => x.GetAndResetSubscriptionUpdateForTypesDurations() },
+        };
+
+        [Test]
+        [TestCaseSource("ReportingMethods")]
+        public void should_report_speed_for_given_method(ReportingMethod reportingMethod)
+        {
+            reportingMethod.EnqueueMethod(_speedReporter, 1.Second());
+            reportingMethod.EnqueueMethod(_speedReporter, 2.Second());
+            reportingMethod.EnqueueMethod(_speedReporter, 3.Second());
+
+            reportingMethod.RetrievalMethod(_speedReporter).SequenceEqual(new[] { 1.Second(), 2.Second(), 3.Second() }).ShouldBeTrue();
+            reportingMethod.RetrievalMethod(_speedReporter).ShouldBeEmpty();
+        }
+
+        public class ReportingMethod
+        {
+            public string Name { get; set; }
+            public Action<IDirectorySpeedReporter, TimeSpan> EnqueueMethod { get; set; }
+            public Func<IDirectorySpeedReporter, IList<TimeSpan>> RetrievalMethod { get; set; }
+
+            public override string ToString()
+            {
+                return Name;
+            }
+        }
+    }
+}

+ 55 - 1
src/Abc.Zebus.Directory.Tests/Handlers/DirectoryCommandsHandlerTests.cs

@@ -22,6 +22,7 @@ namespace Abc.Zebus.Directory.Tests.Handlers
         private Mock<IPeerRepository> _repositoryMock;
         private Mock<IDirectoryConfiguration> _configurationMock;
         private DirectoryCommandsHandler _handler;
+        private Mock<IDirectorySpeedReporter> _speedReporter;
 
         [SetUp]
         public void Setup()
@@ -32,7 +33,8 @@ namespace Abc.Zebus.Directory.Tests.Handlers
             _configurationMock.SetupGet(conf => conf.BlacklistedMachines).Returns(new[] { "ANOTHER_BLACKLISTEDMACHINE", "BLACKlistedMACHINE" });
             _repositoryMock = new Mock<IPeerRepository>();
             _bus = new TestBus();
-            _handler = new DirectoryCommandsHandler(_bus, _repositoryMock.Object, _configurationMock.Object) { Context = MessageContext.CreateOverride(_sender.Id, _sender.EndPoint) };
+            _speedReporter = new Mock<IDirectorySpeedReporter>();
+            _handler = new DirectoryCommandsHandler(_bus, _repositoryMock.Object, _configurationMock.Object, _speedReporter.Object) { Context = MessageContext.CreateOverride(_sender.Id, _sender.EndPoint) };
         }
 
         [TearDown]
@@ -388,5 +390,57 @@ namespace Abc.Zebus.Directory.Tests.Handlers
             _repositoryMock.Verify(x => x.AddOrUpdatePeer(It.IsAny<PeerDescriptor>()), Times.Never());
             _bus.ExpectNothing();
         }
+
+        [Test]
+        public void should_report_registration_speed()
+        {
+            var peerDescriptor = TestDataBuilder.CreatePersistentPeerDescriptor("tcp://abctest:123", typeof(FakeCommand));
+            var registerCommand = new RegisterPeerCommand(peerDescriptor);
+
+            _handler.Handle(registerCommand);
+
+            _speedReporter.Verify(x => x.ReportRegistrationDuration(It.Is<TimeSpan>(t => t < 1.Second())));
+        }
+
+        [Test]
+        public void should_report_unregistration_speed()
+        {
+            var peerDescriptor = TestDataBuilder.CreateTransientPeerDescriptor("tcp://abctest:123", typeof(FakeCommand));
+            _repositoryMock.Setup(x => x.Get(peerDescriptor.Peer.Id)).Returns(peerDescriptor);
+
+            _handler.Handle(new UnregisterPeerCommand(peerDescriptor.Peer));
+
+            _speedReporter.Verify(x => x.ReportUnregistrationDuration(It.Is<TimeSpan>(t => t < 1.Second())));
+        }
+
+        [Test]
+        public void should_report_subscription_update_speed()
+        {
+            var originalPeerDescriptor = TestDataBuilder.CreatePersistentPeerDescriptor("tcp://abctest:123", typeof(FakeCommand));
+            _repositoryMock.Setup(x => x.Get(originalPeerDescriptor.Peer.Id)).Returns(originalPeerDescriptor);
+            PeerDescriptor updatedPeerDescriptor = null;
+            _repositoryMock.Setup(x => x.AddOrUpdatePeer(It.IsAny<PeerDescriptor>())).Callback<PeerDescriptor>(peer => updatedPeerDescriptor = peer);
+            var newSubscriptions = new[] { new Subscription(new MessageTypeId("Another.Handled.Type")) };
+
+            _handler.Handle(new UpdatePeerSubscriptionsCommand(originalPeerDescriptor.Peer.Id, newSubscriptions, DateTime.UtcNow));
+
+            _speedReporter.Verify(x => x.ReportSubscriptionUpdateDuration(It.Is<TimeSpan>(t => t < 1.Second())));
+        }
+
+        [Test]
+        public void should_report_subscription_update_for_types_speed()
+        {
+            var peerDescriptor = TestDataBuilder.CreatePersistentPeerDescriptor("tcp://abctest:123", typeof(FakeCommand));
+            var subscriptionsForTypes = new[]
+            {
+                new SubscriptionsForType(MessageUtil.GetTypeId(typeof(int)), BindingKey.Empty),
+                new SubscriptionsForType(MessageUtil.GetTypeId(typeof(double)), new BindingKey("bla"))
+            };
+            var now = DateTime.UtcNow;
+
+            _handler.Handle(new UpdatePeerSubscriptionsForTypesCommand(peerDescriptor.PeerId, now, subscriptionsForTypes));
+
+            _speedReporter.Verify(x => x.ReportSubscriptionUpdateForTypesDuration(It.Is<TimeSpan>(t => t < 1.Second())));
+        }
     }
 }

+ 4 - 0
src/Abc.Zebus.Directory/Abc.Zebus.Directory.csproj

@@ -74,8 +74,12 @@
     <Compile Include="DeadPeerDetection\DeadPeerDetectorEntry.cs" />
     <Compile Include="DeadPeerDetection\DeadPeerStatus.cs" />
     <Compile Include="DeadPeerDetection\IDeadPeerDetector.cs" />
+    <Compile Include="DirectorySpeedReporter.cs" />
     <Compile Include="Handlers\DirectoryCommandsHandler.cs" />
     <Compile Include="Configuration\IDirectoryConfiguration.cs" />
+    <Compile Include="IDirectorySpeedReporter.cs" />
+    <Compile Include="Initialization\DirectoryRegistry.cs" />
+    <Compile Include="NoopDirectorySpeedReporter.cs" />
     <Compile Include="PeerDirectoryServer.cs" />
     <Compile Include="Program.cs" />
     <Compile Include="Properties\AssemblyInfo.cs" />

+ 62 - 0
src/Abc.Zebus.Directory/DirectorySpeedReporter.cs

@@ -0,0 +1,62 @@
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Linq;
+
+namespace Abc.Zebus.Directory
+{
+    public class DirectorySpeedReporter : IDirectorySpeedReporter
+    {
+        private readonly ConcurrentStack<TimeSpan> _registrationDurations = new ConcurrentStack<TimeSpan>();
+        private readonly ConcurrentStack<TimeSpan> _unregistrationDurations = new ConcurrentStack<TimeSpan>();
+        private readonly ConcurrentStack<TimeSpan> _subscriptionUpdateDurations = new ConcurrentStack<TimeSpan>();
+        private readonly ConcurrentStack<TimeSpan> _subscriptionUpdateForTypesDurations = new ConcurrentStack<TimeSpan>();
+
+        public void ReportRegistrationDuration(TimeSpan elapsed)
+        {
+            _registrationDurations.Push(elapsed);
+        }
+
+        public void ReportUnregistrationDuration(TimeSpan elapsed)
+        {
+            _unregistrationDurations.Push(elapsed);
+        }
+
+        public void ReportSubscriptionUpdateDuration(TimeSpan elaped)
+        {
+            _subscriptionUpdateDurations.Push(elaped);
+        }
+
+        public void ReportSubscriptionUpdateForTypesDuration(TimeSpan elapsed)
+        {
+            _subscriptionUpdateForTypesDurations.Push(elapsed);
+        }
+
+        public IList<TimeSpan> GetAndResetRegistrationDurations()
+        {
+            return GetDurationsAndReset(_registrationDurations);
+        }
+
+        public IList<TimeSpan> GetAndResetUnregistrationDurations()
+        {
+            return GetDurationsAndReset(_unregistrationDurations);
+        }
+
+        public IList<TimeSpan> GetAndResetSubscriptionUpdateDurations()
+        {
+            return GetDurationsAndReset(_subscriptionUpdateDurations);
+        }
+
+        public IList<TimeSpan> GetAndResetSubscriptionUpdateForTypesDurations()
+        {
+            return GetDurationsAndReset(_subscriptionUpdateForTypesDurations);
+        }
+
+        private IList<TimeSpan> GetDurationsAndReset(ConcurrentStack<TimeSpan> durations)
+        {
+            var poppedItems = new TimeSpan[Math.Max(durations.Count, 10)];
+            var poppedItemsCount = durations.TryPopRange(poppedItems, 0, poppedItems.Length);
+            return poppedItems.Take(poppedItemsCount).Reverse().ToList();
+        }
+    }
+}

+ 14 - 1
src/Abc.Zebus.Directory/Handlers/DirectoryCommandsHandler.cs

@@ -1,5 +1,6 @@
 using System;
 using System.Collections.Generic;
+using System.Diagnostics;
 using System.Linq;
 using Abc.Zebus.Directory.Configuration;
 using Abc.Zebus.Directory.Storage;
@@ -18,11 +19,13 @@ namespace Abc.Zebus.Directory.Handlers
         private readonly HashSet<string> _blacklistedMachines;
         private readonly IBus _bus;
         private readonly IPeerRepository _peerRepository;
+        private readonly IDirectorySpeedReporter _speedReporter;
 
-        public DirectoryCommandsHandler(IBus bus, IPeerRepository peerRepository, IDirectoryConfiguration configuration)
+        public DirectoryCommandsHandler(IBus bus, IPeerRepository peerRepository, IDirectoryConfiguration configuration, IDirectorySpeedReporter speedReporter)
         {
             _bus = bus;
             _peerRepository = peerRepository;
+            _speedReporter = speedReporter;
             _blacklistedMachines = configuration.BlacklistedMachines.ToHashSet(StringComparer.OrdinalIgnoreCase);
         }
 
@@ -41,6 +44,7 @@ namespace Abc.Zebus.Directory.Handlers
             if (!message.Peer.TimestampUtc.HasValue)
                 throw new InvalidOperationException("The TimestampUtc must be provided when registering");
 
+            var stopwatch = Stopwatch.StartNew();
             var peerDescriptor = message.Peer;
             peerDescriptor.Peer.IsUp = true;
             peerDescriptor.Peer.IsResponding = true;
@@ -55,10 +59,12 @@ namespace Abc.Zebus.Directory.Handlers
 
             var registredPeerDescriptors = _peerRepository.GetPeers(loadDynamicSubscriptions: true);
             _bus.Reply(new RegisterPeerResponse(registredPeerDescriptors.ToArray()));
+            _speedReporter.ReportRegistrationDuration(stopwatch.Elapsed);
         }
 
         public void Handle(UnregisterPeerCommand message)
         {
+            var stopwatch = Stopwatch.StartNew();
             var peer = _peerRepository.Get(message.PeerId);
             if (peer == null || peer.TimestampUtc > message.TimestampUtc)
                 return;
@@ -67,13 +73,16 @@ namespace Abc.Zebus.Directory.Handlers
                 StopPeer(message, peer);
             else
                 RemovePeer(message.PeerId);
+            _speedReporter.ReportUnregistrationDuration(stopwatch.Elapsed);
         }
 
         public void Handle(UpdatePeerSubscriptionsCommand message)
         {
+            var stopwatch = Stopwatch.StartNew();
             var peerDescriptor = _peerRepository.UpdatePeerSubscriptions(message.PeerId, message.Subscriptions, message.TimestampUtc);
             if (peerDescriptor != null)
                 _bus.Publish(new PeerSubscriptionsUpdated(peerDescriptor));
+            _speedReporter.ReportSubscriptionUpdateDuration(stopwatch.Elapsed);
         }
 
         private bool IsPeerInConflict(PeerDescriptor existingPeer, PeerDescriptor peerToAdd)
@@ -105,6 +114,8 @@ namespace Abc.Zebus.Directory.Handlers
             if (message.SubscriptionsForTypes == null || message.SubscriptionsForTypes.Length == 0)
                 return;
 
+            var stopwatch = Stopwatch.StartNew();
+
             var subscriptionsToAdd = message.SubscriptionsForTypes.Where(sub => sub.BindingKeys != null && sub.BindingKeys.Any()).ToArray();
             var subscriptionsToRemove = message.SubscriptionsForTypes.Where(sub => sub.BindingKeys == null || !sub.BindingKeys.Any()).ToList();
 
@@ -113,6 +124,8 @@ namespace Abc.Zebus.Directory.Handlers
             if (subscriptionsToRemove.Any())
                 _peerRepository.RemoveDynamicSubscriptionsForTypes(message.PeerId, DateTime.SpecifyKind(message.TimestampUtc, DateTimeKind.Utc), subscriptionsToRemove.Select(sub => sub.MessageTypeId).ToArray());
             _bus.Publish(new PeerSubscriptionsForTypesUpdated(message.PeerId, message.TimestampUtc, message.SubscriptionsForTypes));
+
+            _speedReporter.ReportSubscriptionUpdateForTypesDuration(stopwatch.Elapsed);
         }
     }
 }

+ 19 - 0
src/Abc.Zebus.Directory/IDirectorySpeedReporter.cs

@@ -0,0 +1,19 @@
+using System;
+using System.Collections;
+using System.Collections.Generic;
+
+namespace Abc.Zebus.Directory
+{
+    public interface IDirectorySpeedReporter
+    {
+        void ReportRegistrationDuration(TimeSpan elapsed);
+        void ReportUnregistrationDuration(TimeSpan elapsed);
+        void ReportSubscriptionUpdateDuration(TimeSpan elaped);
+        void ReportSubscriptionUpdateForTypesDuration(TimeSpan elapsed);
+
+        IList<TimeSpan> GetAndResetRegistrationDurations();
+        IList<TimeSpan> GetAndResetUnregistrationDurations();
+        IList<TimeSpan> GetAndResetSubscriptionUpdateDurations();
+        IList<TimeSpan> GetAndResetSubscriptionUpdateForTypesDurations();
+    }
+}

+ 12 - 0
src/Abc.Zebus.Directory/Initialization/DirectoryRegistry.cs

@@ -0,0 +1,12 @@
+using StructureMap.Configuration.DSL;
+
+namespace Abc.Zebus.Directory.Initialization
+{
+    public class DirectoryRegistry : Registry
+    {
+        public DirectoryRegistry()
+        {
+            ForSingletonOf<IDirectorySpeedReporter>().UseIfNone<NoopDirectorySpeedReporter>();
+        }
+    }
+}

+ 45 - 0
src/Abc.Zebus.Directory/NoopDirectorySpeedReporter.cs

@@ -0,0 +1,45 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+
+namespace Abc.Zebus.Directory
+{
+    public class NoopDirectorySpeedReporter : IDirectorySpeedReporter
+    {
+        public void ReportRegistrationDuration(TimeSpan elapsed)
+        { 
+        }
+
+        public void ReportUnregistrationDuration(TimeSpan elapsed)
+        {
+        }
+
+        public void ReportSubscriptionUpdateDuration(TimeSpan elaped)
+        {
+        }
+
+        public void ReportSubscriptionUpdateForTypesDuration(TimeSpan elapsed)
+        {
+        }
+
+        public IList<TimeSpan> GetAndResetRegistrationDurations()
+        {
+            return Enumerable.Empty<TimeSpan>().ToList();
+        }
+
+        public IList<TimeSpan> GetAndResetUnregistrationDurations()
+        {
+            return Enumerable.Empty<TimeSpan>().ToList();
+        }
+
+        public IList<TimeSpan> GetAndResetSubscriptionUpdateDurations()
+        {
+            return Enumerable.Empty<TimeSpan>().ToList();
+        }
+
+        public IList<TimeSpan> GetAndResetSubscriptionUpdateForTypesDurations()
+        {
+            return Enumerable.Empty<TimeSpan>().ToList();
+        }
+    }
+}