Bläddra i källkod

Directory.Cassandra: Update schema of `Peers` and `DynamicSubscriptions` tables to add multiple partitions.

Previous schema was using a "fake" boolean key as a unique partition. This meant that all the peers and their
subscriptions were stored in a single partition.

Every time we needed to remove a peer and its associated subscriptions, we had to tombstone a cell from
a single partition, which was triggering a warning from Cassandra when scanning peers with too many
tombstones (cassandra.yaml `tombstone_warn_threshold`)

We now partition the data by PeerId which makes it possible to tombstone a whole partition instead of
individual cells in a row
Mathieu Stefani 3 år sedan
förälder
incheckning
d8b963d87e

+ 3 - 4
src/Abc.Zebus.Directory.Cassandra.Tests/Storage/CqlPeerRepositoryPerformanceTests.cs → src/Abc.Zebus.Directory.Cassandra.Tests/Cql/CqlPeerRepositoryPerformanceTests.cs

@@ -4,14 +4,13 @@ using System.Diagnostics;
 using System.Linq;
 using System.Linq;
 using Abc.Zebus.Directory.Cassandra.Cql;
 using Abc.Zebus.Directory.Cassandra.Cql;
 using Abc.Zebus.Directory.Cassandra.Storage;
 using Abc.Zebus.Directory.Cassandra.Storage;
-using Abc.Zebus.Directory.Cassandra.Tests.Cql;
 using Abc.Zebus.Routing;
 using Abc.Zebus.Routing;
 using Abc.Zebus.Testing.Extensions;
 using Abc.Zebus.Testing.Extensions;
 using Abc.Zebus.Testing.Measurements;
 using Abc.Zebus.Testing.Measurements;
 using Cassandra;
 using Cassandra;
 using NUnit.Framework;
 using NUnit.Framework;
 
 
-namespace Abc.Zebus.Directory.Cassandra.Tests.Storage
+namespace Abc.Zebus.Directory.Cassandra.Tests.Cql
 {
 {
     [TestFixture]
     [TestFixture]
     [Ignore("Performance tests")]
     [Ignore("Performance tests")]
@@ -27,7 +26,7 @@ namespace Abc.Zebus.Directory.Cassandra.Tests.Storage
             const int numberOfPeersToInsert = 30;
             const int numberOfPeersToInsert = 30;
             var repo = new CqlPeerRepository(DataContext);
             var repo = new CqlPeerRepository(DataContext);
             var subscriptionForTypes = Get10MessageTypesWith800BindingKeysEach();
             var subscriptionForTypes = Get10MessageTypesWith800BindingKeysEach();
-            
+
             for (var i = 0; i < numberOfPeersToInsert; i++)
             for (var i = 0; i < numberOfPeersToInsert; i++)
             {
             {
                 var stopwatch = Stopwatch.StartNew();
                 var stopwatch = Stopwatch.StartNew();
@@ -50,7 +49,7 @@ namespace Abc.Zebus.Directory.Cassandra.Tests.Storage
             Diagnostics.CassandraTraceSwitch.Level = TraceLevel.Info;
             Diagnostics.CassandraTraceSwitch.Level = TraceLevel.Info;
             var repo = new CqlPeerRepository(DataContext);
             var repo = new CqlPeerRepository(DataContext);
             var subscriptionForTypes = Get1MessageTypesWith100000BindingKeys();
             var subscriptionForTypes = Get1MessageTypesWith100000BindingKeys();
-            
+
 
 
             var stopwatch = Stopwatch.StartNew();
             var stopwatch = Stopwatch.StartNew();
             repo.AddOrUpdatePeer(new PeerDescriptor(new PeerId("Abc.Peer.0"), "tcp://toto:123", true, true, true, DateTime.UtcNow));
             repo.AddOrUpdatePeer(new PeerDescriptor(new PeerId("Abc.Peer.0"), "tcp://toto:123", true, true, true, DateTime.UtcNow));

+ 5 - 20
src/Abc.Zebus.Directory.Cassandra.Tests/Storage/CqlPeerRepositoryTests.DynamicSubscriptions.cs → src/Abc.Zebus.Directory.Cassandra.Tests/Cql/CqlPeerRepositoryTests.DynamicSubscriptions.cs

@@ -1,4 +1,5 @@
-using System.Linq;
+using System;
+using System.Linq;
 using Abc.Zebus.Directory.Tests;
 using Abc.Zebus.Directory.Tests;
 using Abc.Zebus.Routing;
 using Abc.Zebus.Routing;
 using Abc.Zebus.Testing;
 using Abc.Zebus.Testing;
@@ -7,9 +8,8 @@ using Abc.Zebus.Util;
 using Cassandra;
 using Cassandra;
 using Cassandra.Data.Linq;
 using Cassandra.Data.Linq;
 using NUnit.Framework;
 using NUnit.Framework;
-using System;
 
 
-namespace Abc.Zebus.Directory.Cassandra.Tests.Storage
+namespace Abc.Zebus.Directory.Cassandra.Tests.Cql
 {
 {
     public partial class CqlPeerRepositoryTests
     public partial class CqlPeerRepositoryTests
     {
     {
@@ -39,22 +39,6 @@ namespace Abc.Zebus.Directory.Cassandra.Tests.Storage
             return new SubscriptionsForType(MessageUtil.GetTypeId(typeof(TMessage)), bindings.Any() ? bindings : new[] { BindingKey.Empty });
             return new SubscriptionsForType(MessageUtil.GetTypeId(typeof(TMessage)), bindings.Any() ? bindings : new[] { BindingKey.Empty });
         }
         }
 
 
-        [Test]
-        public void should_not_crash_when_passing_null_subscriptions_array_to_AddDynamicSubscriptions()
-        {
-            var peerDescriptor = _peer1.ToPeerDescriptorWithRoundedTime(true, typeof(FakeCommand));
-
-            Assert.DoesNotThrow(() => _repository.AddDynamicSubscriptionsForTypes(peerDescriptor.PeerId, peerDescriptor.TimestampUtc.Value, null));
-        }
-
-        [Test]
-        public void should_not_crash_when_passing_null_subscriptions_array_to_RemoveDynamicSubscriptions()
-        {
-            var peerDescriptor = _peer1.ToPeerDescriptorWithRoundedTime(true, typeof(FakeCommand));
-
-            Assert.DoesNotThrow(() => _repository.RemoveDynamicSubscriptionsForTypes(peerDescriptor.PeerId, peerDescriptor.TimestampUtc.Value, null));
-        }
-
         [Test]
         [Test]
         public void should_remove_dynamic_subscriptions()
         public void should_remove_dynamic_subscriptions()
         {
         {
@@ -78,9 +62,10 @@ namespace Abc.Zebus.Directory.Cassandra.Tests.Storage
 			_repository.RemovePeer(peerDescriptor.PeerId);
 			_repository.RemovePeer(peerDescriptor.PeerId);
 
 
             _repository.Get(peerDescriptor.PeerId).ShouldBeNull();
             _repository.Get(peerDescriptor.PeerId).ShouldBeNull();
+
             var retrievedSubscriptions = DataContext.DynamicSubscriptions
             var retrievedSubscriptions = DataContext.DynamicSubscriptions
                                                     .SetConsistencyLevel(ConsistencyLevel.LocalQuorum)
                                                     .SetConsistencyLevel(ConsistencyLevel.LocalQuorum)
-                                                    .Where(sub => sub.UselessKey == false && sub.PeerId == peerDescriptor.PeerId.ToString())
+                                                    .Where(s => s.PeerId == peerDescriptor.PeerId.ToString())
                                                     .Execute();
                                                     .Execute();
 			retrievedSubscriptions.ShouldBeEmpty();
 			retrievedSubscriptions.ShouldBeEmpty();
         }
         }

+ 14 - 14
src/Abc.Zebus.Directory.Cassandra.Tests/Storage/CqlPeerRepositoryTests.cs → src/Abc.Zebus.Directory.Cassandra.Tests/Cql/CqlPeerRepositoryTests.cs

@@ -1,6 +1,9 @@
-using Abc.Zebus.Directory.Cassandra.Cql;
+using System;
+using System.Linq;
+using System.Threading;
+using Abc.Zebus.Directory.Cassandra.Cql;
+using Abc.Zebus.Directory.Cassandra.Data;
 using Abc.Zebus.Directory.Cassandra.Storage;
 using Abc.Zebus.Directory.Cassandra.Storage;
-using Abc.Zebus.Directory.Cassandra.Tests.Cql;
 using Abc.Zebus.Directory.Tests;
 using Abc.Zebus.Directory.Tests;
 using Abc.Zebus.Testing;
 using Abc.Zebus.Testing;
 using Abc.Zebus.Testing.Extensions;
 using Abc.Zebus.Testing.Extensions;
@@ -8,11 +11,8 @@ using Abc.Zebus.Util;
 using Cassandra;
 using Cassandra;
 using Cassandra.Data.Linq;
 using Cassandra.Data.Linq;
 using NUnit.Framework;
 using NUnit.Framework;
-using System;
-using System.Linq;
-using System.Threading;
 
 
-namespace Abc.Zebus.Directory.Cassandra.Tests.Storage
+namespace Abc.Zebus.Directory.Cassandra.Tests.Cql
 {
 {
     [TestFixture]
     [TestFixture]
     public partial class CqlPeerRepositoryTests : CqlTestFixture<DirectoryDataContext, ICassandraConfiguration>
     public partial class CqlPeerRepositoryTests : CqlTestFixture<DirectoryDataContext, ICassandraConfiguration>
@@ -159,11 +159,6 @@ namespace Abc.Zebus.Directory.Cassandra.Tests.Storage
             fetched.ShouldNotBeNull();
             fetched.ShouldNotBeNull();
         }
         }
 
 
-        private static DateTime GetUnspecifiedKindUtcNow()
-        {
-            return new DateTime(SystemDateTime.UtcNow.RoundToMillisecond().Ticks, DateTimeKind.Unspecified);
-        }
-
         [Test]
         [Test]
         public void should_insert_a_peer_with_no_timestamp_that_was_previously_deleted()
         public void should_insert_a_peer_with_no_timestamp_that_was_previously_deleted()
         {
         {
@@ -206,10 +201,10 @@ namespace Abc.Zebus.Directory.Cassandra.Tests.Storage
             descriptor.TimestampUtc = DateTime.UtcNow;
             descriptor.TimestampUtc = DateTime.UtcNow;
             _repository.AddOrUpdatePeer(descriptor);
             _repository.AddOrUpdatePeer(descriptor);
 
 
-            DataContext.StoragePeers
+            DataContext.Peers
                         .SetConsistencyLevel(ConsistencyLevel.LocalQuorum)
                         .SetConsistencyLevel(ConsistencyLevel.LocalQuorum)
-                        .Where(peer => peer.UselessKey == false && peer.PeerId == "Abc.DecommissionnedPeer.0")
-                        .Select(peer => new StoragePeer { StaticSubscriptionsBytes  = null, IsResponding = false, IsPersistent = false, HasDebuggerAttached = false, IsUp = false })
+                        .Where(peer => peer.PeerId == "Abc.DecommissionedPeer.0")
+                        .Select(peer => new CassandraPeer { StaticSubscriptionsBytes  = null, IsResponding = false, IsPersistent = false, HasDebuggerAttached = false, IsUp = false })
                         .Update()
                         .Update()
                         .SetTimestamp(DateTime.UtcNow)
                         .SetTimestamp(DateTime.UtcNow)
                         .Execute();
                         .Execute();
@@ -217,5 +212,10 @@ namespace Abc.Zebus.Directory.Cassandra.Tests.Storage
             _repository.Get(_peer1.Id).Peer.IsResponding.ShouldBeTrue();
             _repository.Get(_peer1.Id).Peer.IsResponding.ShouldBeTrue();
             _repository.GetPeers().ExpectedSingle().PeerId.ShouldEqual(_peer1.Id);
             _repository.GetPeers().ExpectedSingle().PeerId.ShouldEqual(_peer1.Id);
         }
         }
+
+        private static DateTime GetUnspecifiedKindUtcNow()
+        {
+            return new DateTime(SystemDateTime.UtcNow.RoundToMillisecond().Ticks, DateTimeKind.Unspecified);
+        }
     }
     }
 }
 }

+ 8 - 10
src/Abc.Zebus.Directory.Cassandra.Tests/Storage/StorageConvertionExtensionsTests.cs → src/Abc.Zebus.Directory.Cassandra.Tests/Data/CassandraExtensionsTests.cs

@@ -1,11 +1,11 @@
 using System;
 using System;
-using Abc.Zebus.Directory.Cassandra.Storage;
+using Abc.Zebus.Directory.Cassandra.Data;
 using Abc.Zebus.Testing.Extensions;
 using Abc.Zebus.Testing.Extensions;
 using NUnit.Framework;
 using NUnit.Framework;
 
 
-namespace Abc.Zebus.Directory.Cassandra.Tests.Storage
+namespace Abc.Zebus.Directory.Cassandra.Tests.Data
 {
 {
-    public class StorageConvertionExtensionsTests
+    public class CassandraExtensionsTests
     {
     {
         [Test]
         [Test]
         public void should_return_a_storage_peer_with_its_timestamp_kind_set_to_utc()
         public void should_return_a_storage_peer_with_its_timestamp_kind_set_to_utc()
@@ -13,20 +13,18 @@ namespace Abc.Zebus.Directory.Cassandra.Tests.Storage
             var unspecifiedKindUtcNow = new DateTime(DateTime.UtcNow.Ticks, DateTimeKind.Unspecified);
             var unspecifiedKindUtcNow = new DateTime(DateTime.UtcNow.Ticks, DateTimeKind.Unspecified);
             var peerDescriptor = new PeerDescriptor(new PeerId("Abc.Titi.0"), "tcp://toto:123", false, true, true, unspecifiedKindUtcNow);
             var peerDescriptor = new PeerDescriptor(new PeerId("Abc.Titi.0"), "tcp://toto:123", false, true, true, unspecifiedKindUtcNow);
 
 
-            var storagePeer = peerDescriptor.ToStoragePeer();
-
-            storagePeer.TimestampUtc.Kind.ShouldEqual(DateTimeKind.Utc);
+            var peer = peerDescriptor.ToCassandra();
+            peer.TimestampUtc.Kind.ShouldEqual(DateTimeKind.Utc);
         }
         }
 
 
         [Test]
         [Test]
         public void should_return_a_peer_descriptor_with_its_timestamp_kind_set_to_utc()
         public void should_return_a_peer_descriptor_with_its_timestamp_kind_set_to_utc()
         {
         {
             var unspecifiedKindUtcNow = new DateTime(DateTime.UtcNow.Ticks, DateTimeKind.Unspecified);
             var unspecifiedKindUtcNow = new DateTime(DateTime.UtcNow.Ticks, DateTimeKind.Unspecified);
-            var storagePeer = new StoragePeer { TimestampUtc = unspecifiedKindUtcNow, StaticSubscriptionsBytes = new byte[0]};
-
-            var peerDescriptor = storagePeer.ToPeerDescriptor(new Subscription[0]);
+            var peer = new CassandraPeer { TimestampUtc = unspecifiedKindUtcNow, StaticSubscriptionsBytes = new byte[0]};
 
 
+            var peerDescriptor = peer.ToPeerDescriptor(new Subscription[0]);
             peerDescriptor.TimestampUtc.Value.Kind.ShouldEqual(DateTimeKind.Utc);
             peerDescriptor.TimestampUtc.Value.Kind.ShouldEqual(DateTimeKind.Utc);
         }
         }
     }
     }
-}
+}

+ 24 - 28
src/Abc.Zebus.Directory.Cassandra/Storage/CqlPeerRepository.cs → src/Abc.Zebus.Directory.Cassandra/Cql/CqlPeerRepository.cs

@@ -1,6 +1,7 @@
 using System;
 using System;
 using System.Collections.Generic;
 using System.Collections.Generic;
 using System.Linq;
 using System.Linq;
+using Abc.Zebus.Directory.Cassandra.Data;
 using Abc.Zebus.Directory.Storage;
 using Abc.Zebus.Directory.Storage;
 using Cassandra;
 using Cassandra;
 using Cassandra.Data.Linq;
 using Cassandra.Data.Linq;
@@ -18,9 +19,9 @@ namespace Abc.Zebus.Directory.Cassandra.Storage
 
 
         public bool? IsPersistent(PeerId peerId)
         public bool? IsPersistent(PeerId peerId)
         {
         {
-            return _dataContext.StoragePeers
+            return _dataContext.Peers
                                .SetConsistencyLevel(ConsistencyLevel.LocalQuorum)
                                .SetConsistencyLevel(ConsistencyLevel.LocalQuorum)
-                               .Where(peer => peer.UselessKey == false && peer.PeerId == peerId.ToString())
+                               .Where(peer => peer.PeerId == peerId.ToString())
                                .Select(x => (bool?)x.IsPersistent)
                                .Select(x => (bool?)x.IsPersistent)
                                .Execute()
                                .Execute()
                                .FirstOrDefault();
                                .FirstOrDefault();
@@ -30,13 +31,13 @@ namespace Abc.Zebus.Directory.Cassandra.Storage
         {
         {
             var peerDynamicSubscriptions = _dataContext.DynamicSubscriptions
             var peerDynamicSubscriptions = _dataContext.DynamicSubscriptions
                                                        .SetConsistencyLevel(ConsistencyLevel.LocalQuorum)
                                                        .SetConsistencyLevel(ConsistencyLevel.LocalQuorum)
-                                                       .Where(sub => sub.UselessKey == false && sub.PeerId == peerId.ToString())
+                                                       .Where(s => s.PeerId == peerId.ToString())
                                                        .Execute()
                                                        .Execute()
-                                                       .SelectMany(sub => sub.ToSubscriptionsForType().ToSubscriptions());
+                                                       .SelectMany(s => s.ToSubscriptionsForType().ToSubscriptions());
 
 
-            return _dataContext.StoragePeers
+            return _dataContext.Peers
                                .SetConsistencyLevel(ConsistencyLevel.LocalQuorum)
                                .SetConsistencyLevel(ConsistencyLevel.LocalQuorum)
-                               .Where(peer => peer.UselessKey == false && peer.PeerId == peerId.ToString())
+                               .Where(peer => peer.PeerId == peerId.ToString())
                                .Execute()
                                .Execute()
                                .FirstOrDefault()
                                .FirstOrDefault()
                                .ToPeerDescriptor(peerDynamicSubscriptions);
                                .ToPeerDescriptor(peerDynamicSubscriptions);
@@ -46,9 +47,8 @@ namespace Abc.Zebus.Directory.Cassandra.Storage
         {
         {
             if (!loadDynamicSubscriptions)
             if (!loadDynamicSubscriptions)
             {
             {
-                return _dataContext.StoragePeers
+                return _dataContext.Peers
                    .SetConsistencyLevel(ConsistencyLevel.LocalQuorum)
                    .SetConsistencyLevel(ConsistencyLevel.LocalQuorum)
-                   .Where(peer => peer.UselessKey == false)
                    .Execute()
                    .Execute()
                    .Select(peer => peer.ToPeerDescriptor()!)
                    .Select(peer => peer.ToPeerDescriptor()!)
                    .ToList();
                    .ToList();
@@ -56,15 +56,13 @@ namespace Abc.Zebus.Directory.Cassandra.Storage
 
 
             var dynamicSubscriptionsByPeer = _dataContext.DynamicSubscriptions
             var dynamicSubscriptionsByPeer = _dataContext.DynamicSubscriptions
                                                          .SetConsistencyLevel(ConsistencyLevel.LocalQuorum)
                                                          .SetConsistencyLevel(ConsistencyLevel.LocalQuorum)
-                                                         .Where(sub => sub.UselessKey == false)
                                                          .Execute()
                                                          .Execute()
                                                          .SelectMany(sub => sub.ToSubscriptionsForType().ToSubscriptions().Select(s => new { sub.PeerId, Subscription = s }))
                                                          .SelectMany(sub => sub.ToSubscriptionsForType().ToSubscriptions().Select(s => new { sub.PeerId, Subscription = s }))
                                                          .ToLookup(peerSub => peerSub.PeerId, peerSub=> peerSub.Subscription);
                                                          .ToLookup(peerSub => peerSub.PeerId, peerSub=> peerSub.Subscription);
 
 
 
 
-            return _dataContext.StoragePeers
+            return _dataContext.Peers
                                .SetConsistencyLevel(ConsistencyLevel.LocalQuorum)
                                .SetConsistencyLevel(ConsistencyLevel.LocalQuorum)
-                               .Where(peer => peer.UselessKey == false)
                                .Execute()
                                .Execute()
                                .Select(peer => peer.ToPeerDescriptor(dynamicSubscriptionsByPeer[peer.PeerId]))
                                .Select(peer => peer.ToPeerDescriptor(dynamicSubscriptionsByPeer[peer.PeerId]))
                                .Where(descriptor => descriptor != null)
                                .Where(descriptor => descriptor != null)
@@ -73,26 +71,28 @@ namespace Abc.Zebus.Directory.Cassandra.Storage
 
 
         public void AddOrUpdatePeer(PeerDescriptor peerDescriptor)
         public void AddOrUpdatePeer(PeerDescriptor peerDescriptor)
         {
         {
-            var storagePeer = peerDescriptor.ToStoragePeer();
-            _dataContext.StoragePeers
-                        .Insert(storagePeer)
+            var cassandraPeer = peerDescriptor.ToCassandra();
+            _dataContext.Peers
+                        .Insert(cassandraPeer)
                         .SetConsistencyLevel(ConsistencyLevel.LocalQuorum)
                         .SetConsistencyLevel(ConsistencyLevel.LocalQuorum)
-                        .SetTimestamp(storagePeer.TimestampUtc)
+                        .SetTimestamp(cassandraPeer.TimestampUtc)
                         .Execute();
                         .Execute();
         }
         }
 
 
         public void RemovePeer(PeerId peerId)
         public void RemovePeer(PeerId peerId)
         {
         {
             var now = DateTime.UtcNow;
             var now = DateTime.UtcNow;
-            _dataContext.StoragePeers
+
+            _dataContext.Peers
                         .SetConsistencyLevel(ConsistencyLevel.LocalQuorum)
                         .SetConsistencyLevel(ConsistencyLevel.LocalQuorum)
-                        .Where(peer => peer.UselessKey == false && peer.PeerId == peerId.ToString())
+                        .Where(peer => peer.PeerId == peerId.ToString())
                         .Delete()
                         .Delete()
                         .SetTimestamp(now)
                         .SetTimestamp(now)
                         .Execute();
                         .Execute();
+
             _dataContext.DynamicSubscriptions
             _dataContext.DynamicSubscriptions
                         .SetConsistencyLevel(ConsistencyLevel.LocalQuorum)
                         .SetConsistencyLevel(ConsistencyLevel.LocalQuorum)
-                        .Where(sub => sub.UselessKey == false && sub.PeerId == peerId.ToString())
+                        .Where(s => s.PeerId == peerId.ToString())
                         .Delete()
                         .Delete()
                         .SetTimestamp(now)
                         .SetTimestamp(now)
                         .Execute();
                         .Execute();
@@ -100,10 +100,10 @@ namespace Abc.Zebus.Directory.Cassandra.Storage
 
 
         public void SetPeerResponding(PeerId peerId, bool isResponding)
         public void SetPeerResponding(PeerId peerId, bool isResponding)
         {
         {
-            _dataContext.StoragePeers
+            _dataContext.Peers
                         .SetConsistencyLevel(ConsistencyLevel.LocalQuorum)
                         .SetConsistencyLevel(ConsistencyLevel.LocalQuorum)
-                        .Where(peer => peer.UselessKey == false && peer.PeerId == peerId.ToString())
-                        .Select(peer =>  new StoragePeer { IsResponding = isResponding })
+                        .Where(peer => peer.PeerId == peerId.ToString())
+                        .Select(peer =>  new CassandraPeer { IsResponding = isResponding })
                         .Update()
                         .Update()
                         .SetTimestamp(DateTime.UtcNow)
                         .SetTimestamp(DateTime.UtcNow)
                         .Execute();
                         .Execute();
@@ -112,15 +112,13 @@ namespace Abc.Zebus.Directory.Cassandra.Storage
 
 
         public void AddDynamicSubscriptionsForTypes(PeerId peerId, DateTime timestampUtc, SubscriptionsForType[] subscriptionsForTypes)
         public void AddDynamicSubscriptionsForTypes(PeerId peerId, DateTime timestampUtc, SubscriptionsForType[] subscriptionsForTypes)
         {
         {
-            if (subscriptionsForTypes == null)
-                return;
             var batch = _dataContext.Session.CreateBatch();
             var batch = _dataContext.Session.CreateBatch();
             batch.SetConsistencyLevel(ConsistencyLevel.LocalQuorum);
             batch.SetConsistencyLevel(ConsistencyLevel.LocalQuorum);
 
 
             foreach (var subscription in subscriptionsForTypes)
             foreach (var subscription in subscriptionsForTypes)
             {
             {
                 batch.Append(_dataContext.DynamicSubscriptions
                 batch.Append(_dataContext.DynamicSubscriptions
-                                         .Insert(subscription.ToStorageSubscription(peerId))
+                                         .Insert(subscription.ToCassandra(peerId))
                                          .SetTimestamp(timestampUtc));
                                          .SetTimestamp(timestampUtc));
             }
             }
             batch.Execute();
             batch.Execute();
@@ -128,15 +126,13 @@ namespace Abc.Zebus.Directory.Cassandra.Storage
 
 
         public void RemoveDynamicSubscriptionsForTypes(PeerId peerId, DateTime timestampUtc, MessageTypeId[] messageTypeIds)
         public void RemoveDynamicSubscriptionsForTypes(PeerId peerId, DateTime timestampUtc, MessageTypeId[] messageTypeIds)
         {
         {
-            if (messageTypeIds == null)
-                return;
             var batch = _dataContext.Session.CreateBatch();
             var batch = _dataContext.Session.CreateBatch();
             batch.SetConsistencyLevel(ConsistencyLevel.LocalQuorum);
             batch.SetConsistencyLevel(ConsistencyLevel.LocalQuorum);
 
 
             foreach (var messageTypeId in messageTypeIds)
             foreach (var messageTypeId in messageTypeIds)
             {
             {
                 var deleteQuery = _dataContext.DynamicSubscriptions
                 var deleteQuery = _dataContext.DynamicSubscriptions
-                                              .Where(sub => sub.UselessKey == false && sub.PeerId == peerId.ToString() && sub.MessageTypeId == messageTypeId.FullName)
+                                              .Where(s => s.PeerId == peerId.ToString() && s.MessageTypeId == messageTypeId.FullName)
                                               .Delete()
                                               .Delete()
                                               .SetTimestamp(timestampUtc);
                                               .SetTimestamp(timestampUtc);
                 batch.Append(deleteQuery);
                 batch.Append(deleteQuery);
@@ -149,7 +145,7 @@ namespace Abc.Zebus.Directory.Cassandra.Storage
         {
         {
             _dataContext.DynamicSubscriptions
             _dataContext.DynamicSubscriptions
                         .SetConsistencyLevel(ConsistencyLevel.LocalQuorum)
                         .SetConsistencyLevel(ConsistencyLevel.LocalQuorum)
-                        .Where(sub => sub.UselessKey == false && sub.PeerId == peerId.ToString())
+                        .Where(s => s.PeerId == peerId.ToString())
                         .Delete()
                         .Delete()
                         .SetTimestamp(timestampUtc)
                         .SetTimestamp(timestampUtc)
                         .Execute();
                         .Execute();

+ 3 - 2
src/Abc.Zebus.Directory.Cassandra/Storage/DirectoryDataContext.cs → src/Abc.Zebus.Directory.Cassandra/Cql/DirectoryDataContext.cs

@@ -1,4 +1,5 @@
 using Abc.Zebus.Directory.Cassandra.Cql;
 using Abc.Zebus.Directory.Cassandra.Cql;
+using Abc.Zebus.Directory.Cassandra.Data;
 using Cassandra.Data.Linq;
 using Cassandra.Data.Linq;
 
 
 namespace Abc.Zebus.Directory.Cassandra.Storage
 namespace Abc.Zebus.Directory.Cassandra.Storage
@@ -10,7 +11,7 @@ namespace Abc.Zebus.Directory.Cassandra.Storage
         {
         {
         }
         }
 
 
-        public Table<StorageSubscription> DynamicSubscriptions => new Table<StorageSubscription>(Session);
-        public Table<StoragePeer> StoragePeers => new Table<StoragePeer>(Session);
+        public Table<CassandraSubscription> DynamicSubscriptions => new(Session);
+        public Table<CassandraPeer> Peers => new (Session);
     }
     }
 }
 }

+ 32 - 28
src/Abc.Zebus.Directory.Cassandra/Storage/StorageConvertionExtensions.cs → src/Abc.Zebus.Directory.Cassandra/Data/CassandraExtensions.cs

@@ -2,17 +2,18 @@
 using System.Collections.Generic;
 using System.Collections.Generic;
 using System.IO;
 using System.IO;
 using System.Linq;
 using System.Linq;
+using Abc.Zebus.Directory.Cassandra.Storage;
 using Abc.Zebus.Routing;
 using Abc.Zebus.Routing;
 using ProtoBuf;
 using ProtoBuf;
 
 
-namespace Abc.Zebus.Directory.Cassandra.Storage
+namespace Abc.Zebus.Directory.Cassandra.Data
 {
 {
-    public static class StorageConversionExtensions
+    public static class CassandraExtensions
     {
     {
-        public static StoragePeer ToStoragePeer(this PeerDescriptor peerDescriptor)
+        public static CassandraPeer ToCassandra(this PeerDescriptor peerDescriptor)
         {
         {
             var timestamp = peerDescriptor.TimestampUtc.HasValue ? new DateTime(peerDescriptor.TimestampUtc.Value.Ticks, DateTimeKind.Utc) : DateTime.UtcNow;
             var timestamp = peerDescriptor.TimestampUtc.HasValue ? new DateTime(peerDescriptor.TimestampUtc.Value.Ticks, DateTimeKind.Utc) : DateTime.UtcNow;
-            return new StoragePeer
+            return new CassandraPeer
             {
             {
                 PeerId = peerDescriptor.PeerId.ToString(),
                 PeerId = peerDescriptor.PeerId.ToString(),
                 EndPoint = peerDescriptor.Peer.EndPoint,
                 EndPoint = peerDescriptor.Peer.EndPoint,
@@ -25,9 +26,9 @@ namespace Abc.Zebus.Directory.Cassandra.Storage
             };
             };
         }
         }
 
 
-        public static StorageSubscription ToStorageSubscription(this SubscriptionsForType subscriptionFortype, PeerId peerId)
+        public static CassandraSubscription ToCassandra(this SubscriptionsForType subscriptionFortype, PeerId peerId)
         {
         {
-            return new StorageSubscription
+            return new CassandraSubscription
             {
             {
                 PeerId = peerId.ToString(),
                 PeerId = peerId.ToString(),
                 MessageTypeId = subscriptionFortype.MessageTypeId.FullName!,
                 MessageTypeId = subscriptionFortype.MessageTypeId.FullName!,
@@ -35,40 +36,43 @@ namespace Abc.Zebus.Directory.Cassandra.Storage
             };
             };
         }
         }
 
 
-        public static SubscriptionsForType ToSubscriptionsForType(this StorageSubscription storageSubscription)
+        public static SubscriptionsForType ToSubscriptionsForType(this CassandraSubscription subscription)
         {
         {
-            return new SubscriptionsForType(new MessageTypeId(storageSubscription.MessageTypeId), DeserializeBindingKeys(storageSubscription.SubscriptionBindings));
+            return new SubscriptionsForType(
+                new MessageTypeId(subscription.MessageTypeId),
+                DeserializeBindingKeys(subscription.SubscriptionBindings)
+            );
         }
         }
 
 
-        public static PeerDescriptor? ToPeerDescriptor(this StoragePeer? storagePeer, IEnumerable<Subscription> peerDynamicSubscriptions)
+        public static PeerDescriptor? ToPeerDescriptor(this CassandraPeer? peer, IEnumerable<Subscription> peerDynamicSubscriptions)
         {
         {
-            if (storagePeer?.StaticSubscriptionsBytes == null)
+            if (peer?.StaticSubscriptionsBytes == null)
                 return null;
                 return null;
 
 
-            var staticSubscriptions = DeserializeSubscriptions(storagePeer.StaticSubscriptionsBytes);
+            var staticSubscriptions = DeserializeSubscriptions(peer.StaticSubscriptionsBytes);
             var allSubscriptions = staticSubscriptions.Concat(peerDynamicSubscriptions).Distinct().ToArray();
             var allSubscriptions = staticSubscriptions.Concat(peerDynamicSubscriptions).Distinct().ToArray();
-            return new PeerDescriptor(new PeerId(storagePeer.PeerId),
-                                      storagePeer.EndPoint,
-                                      storagePeer.IsPersistent,
-                                      storagePeer.IsUp,
-                                      storagePeer.IsResponding,
-                                      new DateTime(storagePeer.TimestampUtc.Ticks, DateTimeKind.Utc),
-                                      allSubscriptions) { HasDebuggerAttached = storagePeer.HasDebuggerAttached };
+            return new PeerDescriptor(new PeerId(peer.PeerId),
+                                      peer.EndPoint,
+                                      peer.IsPersistent,
+                                      peer.IsUp,
+                                      peer.IsResponding,
+                                      new DateTime(peer.TimestampUtc.Ticks, DateTimeKind.Utc),
+                                      allSubscriptions) { HasDebuggerAttached = peer.HasDebuggerAttached };
         }
         }
 
 
-        public static PeerDescriptor? ToPeerDescriptor(this StoragePeer? storagePeer)
+        public static PeerDescriptor? ToPeerDescriptor(this CassandraPeer? peer)
         {
         {
-            if (storagePeer == null)
+            if (peer == null)
                 return null;
                 return null;
 
 
-            var staticSubscriptions = DeserializeSubscriptions(storagePeer.StaticSubscriptionsBytes);
-            return new PeerDescriptor(new PeerId(storagePeer.PeerId),
-                                      storagePeer.EndPoint,
-                                      storagePeer.IsPersistent,
-                                      storagePeer.IsUp,
-                                      storagePeer.IsResponding,
-                                      new DateTime(storagePeer.TimestampUtc.Ticks, DateTimeKind.Utc),
-                                      staticSubscriptions) { HasDebuggerAttached = storagePeer.HasDebuggerAttached };
+            var staticSubscriptions = DeserializeSubscriptions(peer.StaticSubscriptionsBytes);
+            return new PeerDescriptor(new PeerId(peer.PeerId),
+                                      peer.EndPoint,
+                                      peer.IsPersistent,
+                                      peer.IsUp,
+                                      peer.IsResponding,
+                                      new DateTime(peer.TimestampUtc.Ticks, DateTimeKind.Utc),
+                                      staticSubscriptions) { HasDebuggerAttached = peer.HasDebuggerAttached };
         }
         }
 
 
         private static byte[] SerializeSubscriptions(Subscription[] subscriptions)
         private static byte[] SerializeSubscriptions(Subscription[] subscriptions)

+ 5 - 8
src/Abc.Zebus.Directory.Cassandra/Storage/StoragePeer.cs → src/Abc.Zebus.Directory.Cassandra/Data/CassandraPeer.cs

@@ -1,16 +1,12 @@
 using System;
 using System;
 using Cassandra.Mapping.Attributes;
 using Cassandra.Mapping.Attributes;
 
 
-namespace Abc.Zebus.Directory.Cassandra.Storage
+namespace Abc.Zebus.Directory.Cassandra.Data
 {
 {
-    [Table("Peers", CaseSensitive = true)]
-    public class StoragePeer
+    [Table("Peers_2", CaseSensitive = true)]
+    public class CassandraPeer
     {
     {
-        [PartitionKey]
-        public bool UselessKey { get; set; }
-
-        [ClusteringKey(0)]
-        [Column("PeerId")]
+        [PartitionKey, Column("PeerId")]
         public string PeerId { get; set; } = default!;
         public string PeerId { get; set; } = default!;
 
 
         [Column("EndPoint")]
         [Column("EndPoint")]
@@ -33,5 +29,6 @@ namespace Abc.Zebus.Directory.Cassandra.Storage
 
 
         [Column("StaticSubscriptions")]
         [Column("StaticSubscriptions")]
         public byte[] StaticSubscriptionsBytes { get; set; } = default!;
         public byte[] StaticSubscriptionsBytes { get; set; } = default!;
+
     }
     }
 }
 }

+ 17 - 0
src/Abc.Zebus.Directory.Cassandra/Data/CassandraSubscription.cs

@@ -0,0 +1,17 @@
+using Cassandra.Mapping.Attributes;
+
+namespace Abc.Zebus.Directory.Cassandra.Data
+{
+    [Table("DynamicSubscriptions_2", CaseSensitive = true)]
+    public class CassandraSubscription
+    {
+        [PartitionKey, Column("PeerId")]
+        public string PeerId { get; set; } = default!;
+
+        [ClusteringKey(0), Column("MessageTypeId")]
+        public string MessageTypeId { get; set; } = default!;
+
+        [Column("SubscriptionBindings")]
+        public byte[] SubscriptionBindings { get; set; } = default!;
+    }
+}

+ 0 - 22
src/Abc.Zebus.Directory.Cassandra/Storage/StorageSubscription.cs

@@ -1,22 +0,0 @@
-using Cassandra.Mapping.Attributes;
-
-namespace Abc.Zebus.Directory.Cassandra.Storage
-{
-    [Table("DynamicSubscriptions", CaseSensitive = true)]
-    public class StorageSubscription
-    {
-        [PartitionKey]
-        public bool UselessKey { get; set; }
-
-        [ClusteringKey(0)]
-        [Column("PeerId")]
-        public string PeerId { get; set; } = default!;
-
-        [ClusteringKey(1)]
-        [Column("MessageTypeId")]
-        public string MessageTypeId { get; set; } = default!;
-
-        [Column("SubscriptionBindings")]
-        public byte[] SubscriptionBindings { get; set; } = default!;
-    }
-}

+ 6 - 8
src/Abc.Zebus.Directory.Cassandra/schema_creation.cql

@@ -1,7 +1,6 @@
 -- First create your Keyspace with the replication factor you want
 -- First create your Keyspace with the replication factor you want
 
 
-create table IF NOT EXISTS "Peers" (
-	"UselessKey" boolean,
+create table IF NOT EXISTS "Peers_2" (
 	"PeerId" text,
 	"PeerId" text,
 	"EndPoint" text,
 	"EndPoint" text,
 	"IsUp" boolean,
 	"IsUp" boolean,
@@ -10,13 +9,12 @@ create table IF NOT EXISTS "Peers" (
 	"TimestampUtc" timestamp,
 	"TimestampUtc" timestamp,
 	"HasDebuggerAttached" boolean,
 	"HasDebuggerAttached" boolean,
 	"StaticSubscriptions" blob,
 	"StaticSubscriptions" blob,
-	PRIMARY KEY("UselessKey", "PeerId")
+	PRIMARY KEY("PeerId")
 );
 );
 
 
-create table IF NOT EXISTS "DynamicSubscriptions" (
-	"UselessKey" boolean,
+create table IF NOT EXISTS "DynamicSubscriptions_2" (
 	"PeerId" text,
 	"PeerId" text,
-	"MessageTypeId" text,	
+	"MessageTypeId" text,
 	"SubscriptionBindings" blob,
 	"SubscriptionBindings" blob,
-	PRIMARY KEY("UselessKey", "PeerId", "MessageTypeId")
-);
+	PRIMARY KEY("PeerId", "MessageTypeId")
+);