浏览代码

Directory: add option to use Cassandra in Directory Runner

Mendel Monteiro-Beckerman 6 年之前
父节点
当前提交
8633ecb829

+ 6 - 0
src/Abc.Zebus.Directory.Runner/App.config

@@ -4,5 +4,11 @@
     <add key="Endpoint" value="tcp://*:129"/>
     <add key="Environment" value="Demo"/>
     <add key="PeerId" value="Directory.0"/>
+    <add key="Storage" value="InMemory" />
+
+    <add key="Cassandra.Hosts" value="" />
+    <add key="Cassandra.KeySpace" value="" />
+    <add key="Cassandra.LocalDataCenter" value="" />
+
   </appSettings>
 </configuration>

+ 45 - 0
src/Abc.Zebus.Directory.Runner/AppSettings.cs

@@ -0,0 +1,45 @@
+using System;
+using System.Configuration;
+using System.Globalization;
+
+namespace Abc.Zebus.Directory.Runner
+{
+    internal static class AppSettings
+    {
+        public static T Get<T>(string key, T defaultValue)
+        {
+            var value = ConfigurationManager.AppSettings[key];
+            if (value == null)
+                return defaultValue;
+
+            return Parser<T>.Parse(value);
+        }
+
+        public static string[] GetArray(string key)
+        {
+            var value = ConfigurationManager.AppSettings[key];
+            if (value == null)
+                return new string[0];
+
+            return value.Split(new[] { ';' }, StringSplitOptions.RemoveEmptyEntries);
+        }
+
+        private static class Parser<T>
+        {
+            private static readonly Func<string, object> _value;
+
+            public static T Parse(string s)
+            {
+                return (T)_value(s);
+            }
+
+            static Parser()
+            {
+                if (typeof(T) == typeof(TimeSpan))
+                    _value = s => TimeSpan.Parse(s, CultureInfo.InvariantCulture);
+                else
+                    _value = s => Convert.ChangeType(s, typeof(T));
+            }
+        }
+    }
+}

+ 22 - 0
src/Abc.Zebus.Directory.Runner/CassandraAppSettingsConfiguration.cs

@@ -0,0 +1,22 @@
+using System;
+using Abc.Zebus.Directory.Cassandra.Cql;
+using Abc.Zebus.Util;
+
+namespace Abc.Zebus.Directory.Runner
+{
+    class CassandraAppSettingsConfiguration : ICassandraConfiguration
+    {
+        public CassandraAppSettingsConfiguration()
+        {
+            Hosts = AppSettings.Get("Cassandra.Hosts", "");
+            KeySpace = AppSettings.Get("Cassandra.KeySpace", "");
+            QueryTimeout = AppSettings.Get("Cassandra.QueryTimeout", 5.Seconds());
+            LocalDataCenter = AppSettings.Get("Cassandra.LocalDataCenter", "");
+        }
+
+        public string Hosts { get; }
+        public string KeySpace { get; }
+        public TimeSpan QueryTimeout { get; }
+        public string LocalDataCenter { get; }
+    }
+}

+ 16 - 5
src/Abc.Zebus.Directory.Runner/Program.cs

@@ -3,6 +3,8 @@ using System.Configuration;
 using System.IO;
 using System.Threading;
 using Abc.Zebus.Core;
+using Abc.Zebus.Directory.Cassandra.Cql;
+using Abc.Zebus.Directory.Cassandra.Storage;
 using Abc.Zebus.Directory.Configuration;
 using Abc.Zebus.Directory.DeadPeerDetection;
 using Abc.Zebus.Directory.Initialization;
@@ -29,10 +31,11 @@ namespace Abc.Zebus.Directory.Runner
             };
 
             XmlConfigurator.ConfigureAndWatch(LogManager.GetRepository(typeof(Program).Assembly), new FileInfo(PathUtil.InBaseDirectory("log4net.config")));
-            _log.Info("Starting in memory directory");
+            var storageType = ConfigurationManager.AppSettings["Storage"];
+            _log.Info($"Starting in directory with storage type '{storageType}'");
 
             var busFactory = new BusFactory();
-            InjectDirectoryServiceSpecificConfiguration(busFactory);
+            InjectDirectoryServiceSpecificConfiguration(busFactory, storageType);
 
             busFactory
                 .WithConfiguration(new AppSettingsBusConfiguration(), ConfigurationManager.AppSettings["Environment"])
@@ -42,7 +45,7 @@ namespace Abc.Zebus.Directory.Runner
 
             using (busFactory.CreateAndStartBus())
             {
-                _log.Info("In memory directory started");
+                _log.Info("Directory started");
 
                 _log.Info("Starting dead peer detector");
                 var deadPeerDetector = busFactory.Container.GetInstance<IDeadPeerDetector>();
@@ -55,15 +58,16 @@ namespace Abc.Zebus.Directory.Runner
             }
         }
 
-        private static void InjectDirectoryServiceSpecificConfiguration(BusFactory busFactory)
+        private static void InjectDirectoryServiceSpecificConfiguration(BusFactory busFactory, string storageType)
         {
+            var useCassandraStorage = string.Equals(storageType, "Cassandra", StringComparison.OrdinalIgnoreCase);
             busFactory.ConfigureContainer(c =>
             {
                 c.AddRegistry<DirectoryRegistry>();
                 c.ForSingletonOf<IDirectoryConfiguration>().Use<AppSettingsDirectoryConfiguration>();
 
                 c.For<IDeadPeerDetector>().Use<DeadPeerDetector>();
-                c.ForSingletonOf<IPeerRepository>().Use<MemoryPeerRepository>();
+                c.ForSingletonOf<IPeerRepository>().Use(ctx => useCassandraStorage ? (IPeerRepository)ctx.GetInstance<CqlPeerRepository>() : ctx.GetInstance<MemoryPeerRepository>());
                 c.ForSingletonOf<PeerDirectoryServer>().Use<PeerDirectoryServer>();
                 c.ForSingletonOf<IPeerDirectory>().Use(ctx => ctx.GetInstance<PeerDirectoryServer>());
 
@@ -74,6 +78,13 @@ namespace Abc.Zebus.Directory.Runner
 
                     return dispatcher;
                 });
+
+                // Cassandra specific
+                if (useCassandraStorage)
+                {
+                    c.ForSingletonOf<CassandraCqlSessionManager>().Use(() => new CassandraCqlSessionManager());
+                    c.ForSingletonOf<ICassandraConfiguration>().Use<CassandraAppSettingsConfiguration>();
+                }
             });
         }
     }