浏览代码

Directory: reject peers who try to register with a timestamp that is too far in the future as this will cause them to be flagged as not registered at the current time
- Sometimes windows will restart with a time that is not synchronised and may be an hour in the future, when this happens an clients will not be correctly registered

Mendel Monteiro-Beckerman 7 年之前
父节点
当前提交
af1d093912

+ 32 - 30
src/Abc.Zebus.Directory.Tests/Configuration/ConfigurationTests.cs

@@ -1,4 +1,4 @@
-using System.Configuration;
+using System.Collections.Specialized;
 using Abc.Zebus.Directory.Configuration;
 using Abc.Zebus.Testing.Extensions;
 using Abc.Zebus.Util;
@@ -9,22 +9,28 @@ namespace Abc.Zebus.Directory.Tests.Configuration
     [TestFixture]
     public class ConfigurationTests
     {
+        private NameValueCollection _appSettings;
+
+        [SetUp]
+        public void SetUp()
+        {
+            _appSettings = new NameValueCollection();
+        }
+
         [Test]
         public void should_read_registration_timeout()
         {
             SetAppSettingsKey("Bus.Directory.RegistrationTimeout", "00:00:42");
 
-            var configuration = new AppSettingsBusConfiguration();
-            configuration.RegistrationTimeout.ShouldEqual(42.Seconds());
+            var busConfiguration = new AppSettingsBusConfiguration(new AppSettings(_appSettings));
+            busConfiguration.RegistrationTimeout.ShouldEqual(42.Seconds());
         }
 
         [Test]
         public void should_read_default_registration_timeout()
         {
-            RemoveAppSettingsKey("Bus.Directory.RegistrationTimeout");
-
-            var configuration = new AppSettingsBusConfiguration();
-            configuration.RegistrationTimeout.ShouldEqual(30.Seconds());
+            var busConfiguration = new AppSettingsBusConfiguration(new AppSettings(_appSettings));
+            busConfiguration.RegistrationTimeout.ShouldEqual(30.Seconds());
         }
         
         [Test]
@@ -32,40 +38,36 @@ namespace Abc.Zebus.Directory.Tests.Configuration
         {
             SetAppSettingsKey("Directory.PingPeers.Interval", "00:02");
 
-            var configuration = new AppSettingsDirectoryConfiguration();
-            configuration.PeerPingInterval.ShouldEqual(2.Minutes());
+            var directoryConfiguration = new AppSettingsDirectoryConfiguration(new AppSettings(_appSettings));
+            directoryConfiguration.PeerPingInterval.ShouldEqual(2.Minutes());
         }
 
         [Test]
-        public void should_read_default_peer_ping_interval()
+        public void should_read_default_max_allowed_clock_diff()
         {
-            RemoveAppSettingsKey("Directory.PingPeers.Interval");
-
-            var configuration = new AppSettingsDirectoryConfiguration();
-            configuration.PeerPingInterval.ShouldEqual(1.Minute());
+            var directoryConfiguration = new AppSettingsDirectoryConfiguration(new AppSettings(_appSettings));
+            directoryConfiguration.MaxAllowedClockDifferenceWhenRegistering.ShouldEqual(null);
         }
 
-        private static void SetAppSettingsKey(string key, string value)
+        [Test]
+        public void should_read_max_allowed_clock_diff()
         {
-            var appConfig = ConfigurationManager.OpenExeConfiguration(ConfigurationUserLevel.None);
-            var element = appConfig.AppSettings.Settings[key];
-            if (element == null)
-                appConfig.AppSettings.Settings.Add(key, value);
-            else
-                element.Value = value;
-
-            appConfig.Save();
+            SetAppSettingsKey("Directory.MaxAllowedClockDifferenceWhenRegistering", "00:02");
 
-            ConfigurationManager.RefreshSection("appSettings");
+            var directoryConfiguration = new AppSettingsDirectoryConfiguration(new AppSettings(_appSettings));
+            directoryConfiguration.MaxAllowedClockDifferenceWhenRegistering.ShouldEqual(2.Minutes());
         }
 
-        private static void RemoveAppSettingsKey(string key)
+        [Test]
+        public void should_read_default_peer_ping_interval()
         {
-            var appConfig = ConfigurationManager.OpenExeConfiguration(ConfigurationUserLevel.None);
-            appConfig.AppSettings.Settings.Remove(key);
-            appConfig.Save();
+            var directoryConfiguration = new AppSettingsDirectoryConfiguration(new AppSettings(_appSettings));
+            directoryConfiguration.PeerPingInterval.ShouldEqual(1.Minute());
+        }
 
-            ConfigurationManager.RefreshSection("appSettings");
+        private void SetAppSettingsKey(string key, string value)
+        {
+            _appSettings[key] = value;
         }
     }
-}
+}

+ 24 - 0
src/Abc.Zebus.Directory.Tests/Handlers/DirectoryCommandsHandlerTests.cs

@@ -129,6 +129,30 @@ namespace Abc.Zebus.Directory.Tests.Handlers
             exception.Message.ShouldEqual("Peer Abc.Testing.0 on host BLACKLISTEDMACHINE is not allowed to register on this directory");
         }
 
+        [Test]
+        public void should_not_throw_if_a_client_with_a_synchronised_clock_tries_to_register()
+        {
+            var peerDescriptor = TestDataBuilder.CreatePersistentPeerDescriptor("tcp://abctest:123", typeof(FakeCommand));
+            var registerCommand = new RegisterPeerCommand(peerDescriptor);
+            registerCommand.Peer.TimestampUtc = SystemDateTime.UtcNow + 14.Minutes();
+            _configurationMock.SetupGet(x => x.MaxAllowedClockDifferenceWhenRegistering).Returns(15.Minutes());
+
+            Assert.DoesNotThrow(() => _handler.Handle(registerCommand));
+        }
+
+        [Test]
+        public void should_throw_if_a_client_with_a_more_recent_clock_tries_to_register()
+        {
+            var peerDescriptor = TestDataBuilder.CreatePersistentPeerDescriptor("tcp://abctest:123", typeof(FakeCommand));
+            var registerCommand = new RegisterPeerCommand(peerDescriptor);
+            registerCommand.Peer.TimestampUtc = SystemDateTime.UtcNow + 1.Hour();
+            _configurationMock.SetupGet(x => x.MaxAllowedClockDifferenceWhenRegistering).Returns(15.Minutes());
+
+            var exception = Assert.Throws<InvalidOperationException>(() => _handler.Handle(registerCommand));
+
+            exception.Message.ShouldContain("is too far ahead of the the server's current time");
+        }
+
         [Test]
         public void should_throw_if_an_existing_peer_tries_to_register()
         {

+ 25 - 8
src/Abc.Zebus.Directory/Configuration/AppSettings.cs

@@ -1,23 +1,36 @@
 using System;
+using System.Collections.Specialized;
 using System.Configuration;
 using System.Globalization;
 
 namespace Abc.Zebus.Directory.Configuration
 {
-    internal static class AppSettings
+    internal class AppSettings
     {
-        public static T Get<T>(string key, T defaultValue)
+        private readonly NameValueCollection _appSettings;
+
+        public AppSettings()
+            :this(ConfigurationManager.AppSettings)
+        {
+        }
+
+        public AppSettings(NameValueCollection appSettings)
         {
-            var value = ConfigurationManager.AppSettings[key];
+            _appSettings = appSettings;
+        }
+
+        public T Get<T>(string key, T defaultValue)
+        {
+            var value = _appSettings[key];
             if (value == null)
                 return defaultValue;
 
             return Parser<T>.Parse(value);
         }
 
-        public static string[] GetArray(string key)
+        public string[] GetArray(string key)
         {
-            var value = ConfigurationManager.AppSettings[key];
+            var value = _appSettings[key];
             if (value == null)
                 return new string[0];
 
@@ -35,11 +48,15 @@ namespace Abc.Zebus.Directory.Configuration
 
             static Parser()
             {
-                if (typeof(T) == typeof(TimeSpan))
+                var conversionType = typeof(T);
+                if (conversionType.IsGenericType && conversionType.GetGenericTypeDefinition() == typeof(Nullable<>))
+                    conversionType = conversionType.GenericTypeArguments[0];
+
+                if (conversionType == typeof(TimeSpan))
                     _value = s => TimeSpan.Parse(s, CultureInfo.InvariantCulture);
                 else
-                    _value = s => Convert.ChangeType(s, typeof(T));
+                    _value = s => Convert.ChangeType(s, conversionType);
             }
         }
     }
-}
+}

+ 11 - 6
src/Abc.Zebus.Directory/Configuration/AppSettingsBusConfiguration.cs

@@ -6,12 +6,17 @@ namespace Abc.Zebus.Directory.Configuration
     public class AppSettingsBusConfiguration : IBusConfiguration
     {
         public AppSettingsBusConfiguration()
+            : this(new AppSettings())
         {
-            RegistrationTimeout = AppSettings.Get("Bus.Directory.RegistrationTimeout", 30.Seconds());
-            StartReplayTimeout = AppSettings.Get("Bus.Persistence.StartReplayTimeout", 30.Seconds());
-            IsDirectoryPickedRandomly = AppSettings.Get("Bus.Directory.PickRandom", true);
-            IsErrorPublicationEnabled = AppSettings.Get("Bus.IsErrorPublicationEnabled", true);
-            MessagesBatchSize = AppSettings.Get("Bus.MessagesBatchSize", 100);
+        }
+
+        internal AppSettingsBusConfiguration(AppSettings appSettings)
+        {
+            RegistrationTimeout = appSettings.Get("Bus.Directory.RegistrationTimeout", 30.Seconds());
+            StartReplayTimeout = appSettings.Get("Bus.Persistence.StartReplayTimeout", 30.Seconds());
+            IsDirectoryPickedRandomly = appSettings.Get("Bus.Directory.PickRandom", true);
+            IsErrorPublicationEnabled = appSettings.Get("Bus.IsErrorPublicationEnabled", true);
+            MessagesBatchSize = appSettings.Get("Bus.MessagesBatchSize", 100);
         }
 
         public string[] DirectoryServiceEndPoints { get { return new string[0]; } }
@@ -23,4 +28,4 @@ namespace Abc.Zebus.Directory.Configuration
         public bool IsErrorPublicationEnabled { get; private set; }
         public int MessagesBatchSize { get; private set; }
     }
-}
+}

+ 14 - 7
src/Abc.Zebus.Directory/Configuration/AppSettingsDirectoryConfiguration.cs

@@ -6,14 +6,20 @@ namespace Abc.Zebus.Directory.Configuration
     public class AppSettingsDirectoryConfiguration : IDirectoryConfiguration
     {
         public AppSettingsDirectoryConfiguration()
+            : this(new AppSettings())
         {
-            PeerPingInterval = AppSettings.Get("Directory.PingPeers.Interval", 1.Minute());
-            TransientPeerPingTimeout = AppSettings.Get("Directory.TransientPeers.PingTimeout", 5.Minutes());
-            PersistentPeerPingTimeout = AppSettings.Get("Directory.PersistentPeers.PingTimeout", 5.Minutes());
-            DebugPeerPingTimeout = AppSettings.Get("Directory.DebugPeers.PingTimeout", 10.Minutes());
-            BlacklistedMachines = AppSettings.GetArray("Directory.BlacklistedMachines");
-            DisableDynamicSubscriptionsForDirectoryOutgoingMessages = AppSettings.Get("Directory.DisableDynamicSubscriptionsForDirectoryOutgoingMessages", false);
+        }
+
+        internal AppSettingsDirectoryConfiguration(AppSettings appSettings)
+        {
+            PeerPingInterval = appSettings.Get("Directory.PingPeers.Interval", 1.Minute());
+            TransientPeerPingTimeout = appSettings.Get("Directory.TransientPeers.PingTimeout", 5.Minutes());
+            PersistentPeerPingTimeout = appSettings.Get("Directory.PersistentPeers.PingTimeout", 5.Minutes());
+            DebugPeerPingTimeout = appSettings.Get("Directory.DebugPeers.PingTimeout", 10.Minutes());
+            BlacklistedMachines = appSettings.GetArray("Directory.BlacklistedMachines");
+            DisableDynamicSubscriptionsForDirectoryOutgoingMessages = appSettings.Get("Directory.DisableDynamicSubscriptionsForDirectoryOutgoingMessages", false);
             WildcardsForPeersNotToDecommissionOnTimeout = new string[0];
+            MaxAllowedClockDifferenceWhenRegistering = appSettings.Get<TimeSpan?>("Directory.MaxAllowedClockDifferenceWhenRegistering", null);
         }
 
         public TimeSpan PeerPingInterval { get; private set; }
@@ -23,5 +29,6 @@ namespace Abc.Zebus.Directory.Configuration
         public string[] BlacklistedMachines { get; private set; }
         public string[] WildcardsForPeersNotToDecommissionOnTimeout { get; private set; }
         public bool DisableDynamicSubscriptionsForDirectoryOutgoingMessages { get; private set; }
+        public TimeSpan? MaxAllowedClockDifferenceWhenRegistering { get; }
     }
-}
+}

+ 6 - 1
src/Abc.Zebus.Directory/Configuration/IDirectoryConfiguration.cs

@@ -47,5 +47,10 @@ namespace Abc.Zebus.Directory.Configuration
         /// with massive (> 50 000) amounts of dynamic subscriptions (which is not recommended anyway)
         /// </summary>
         bool DisableDynamicSubscriptionsForDirectoryOutgoingMessages { get; }
+
+        /// <summary>
+        /// Used to evaluate whether to reject register peer commands when the client's clock is ahead of the server's clock.
+        /// </summary>
+        TimeSpan? MaxAllowedClockDifferenceWhenRegistering { get; }
     }
-}
+}

+ 9 - 1
src/Abc.Zebus.Directory/Handlers/DirectoryCommandsHandler.cs

@@ -1,6 +1,7 @@
 using System;
 using System.Collections.Generic;
 using System.Diagnostics;
+using System.IO;
 using System.Linq;
 using Abc.Zebus.Directory.Configuration;
 using Abc.Zebus.Directory.Storage;
@@ -23,12 +24,14 @@ namespace Abc.Zebus.Directory.Handlers
         private readonly HashSet<string> _blacklistedMachines;
         private readonly IBus _bus;
         private readonly IPeerRepository _peerRepository;
+        private readonly IDirectoryConfiguration _configuration;
         private readonly IDirectorySpeedReporter _speedReporter;
 
         public DirectoryCommandsHandler(IBus bus, IPeerRepository peerRepository, IDirectoryConfiguration configuration, IDirectorySpeedReporter speedReporter)
         {
             _bus = bus;
             _peerRepository = peerRepository;
+            _configuration = configuration;
             _speedReporter = speedReporter;
             _blacklistedMachines = configuration.BlacklistedMachines.ToHashSet(StringComparer.OrdinalIgnoreCase);
         }
@@ -45,9 +48,14 @@ namespace Abc.Zebus.Directory.Handlers
             if (_blacklistedMachines.Contains(Context.Originator.SenderMachineName))
                 throw new InvalidOperationException($"Peer {Context.SenderId} on host {Context.Originator.SenderMachineName} is not allowed to register on this directory");
 
-            if (!message.Peer.TimestampUtc.HasValue)
+            var peerTimestampUtc = message.Peer.TimestampUtc;
+            if (!peerTimestampUtc.HasValue)
                 throw new InvalidOperationException("The TimestampUtc must be provided when registering");
 
+            var utcNow = SystemDateTime.UtcNow;
+            if (_configuration.MaxAllowedClockDifferenceWhenRegistering != null && peerTimestampUtc.Value > utcNow + _configuration.MaxAllowedClockDifferenceWhenRegistering)
+                throw new InvalidOperationException($"The client provided timestamp [{peerTimestampUtc}] is too far ahead of the the server's current time [{utcNow}]");
+
             var stopwatch = Stopwatch.StartNew();
             var peerDescriptor = message.Peer;
             peerDescriptor.Peer.IsUp = true;

+ 1 - 1
src/Abc.Zebus.Testing/Extensions/NUnitExtensions.cs

@@ -313,7 +313,7 @@ namespace Abc.Zebus.Testing.Extensions
         {
             Exception exception = method.GetException();
 
-            Assert.IsNotNull(exception);
+            Assert.IsNotNull(exception, $"{exceptionType.Name} was not thrown");
             Assert.AreEqual(exceptionType, exception.GetType());
 
             return exception;