|
@@ -1,6 +1,7 @@
|
|
|
using System;
|
|
|
using System.Collections.Generic;
|
|
|
using System.Linq;
|
|
|
+using Abc.Zebus.Directory.Cassandra.Data;
|
|
|
using Abc.Zebus.Directory.Storage;
|
|
|
using Cassandra;
|
|
|
using Cassandra.Data.Linq;
|
|
@@ -18,9 +19,9 @@ namespace Abc.Zebus.Directory.Cassandra.Storage
|
|
|
|
|
|
public bool? IsPersistent(PeerId peerId)
|
|
|
{
|
|
|
- return _dataContext.StoragePeers
|
|
|
+ return _dataContext.Peers
|
|
|
.SetConsistencyLevel(ConsistencyLevel.LocalQuorum)
|
|
|
- .Where(peer => peer.UselessKey == false && peer.PeerId == peerId.ToString())
|
|
|
+ .Where(peer => peer.PeerId == peerId.ToString())
|
|
|
.Select(x => (bool?)x.IsPersistent)
|
|
|
.Execute()
|
|
|
.FirstOrDefault();
|
|
@@ -30,13 +31,13 @@ namespace Abc.Zebus.Directory.Cassandra.Storage
|
|
|
{
|
|
|
var peerDynamicSubscriptions = _dataContext.DynamicSubscriptions
|
|
|
.SetConsistencyLevel(ConsistencyLevel.LocalQuorum)
|
|
|
- .Where(sub => sub.UselessKey == false && sub.PeerId == peerId.ToString())
|
|
|
+ .Where(s => s.PeerId == peerId.ToString())
|
|
|
.Execute()
|
|
|
- .SelectMany(sub => sub.ToSubscriptionsForType().ToSubscriptions());
|
|
|
+ .SelectMany(s => s.ToSubscriptionsForType().ToSubscriptions());
|
|
|
|
|
|
- return _dataContext.StoragePeers
|
|
|
+ return _dataContext.Peers
|
|
|
.SetConsistencyLevel(ConsistencyLevel.LocalQuorum)
|
|
|
- .Where(peer => peer.UselessKey == false && peer.PeerId == peerId.ToString())
|
|
|
+ .Where(peer => peer.PeerId == peerId.ToString())
|
|
|
.Execute()
|
|
|
.FirstOrDefault()
|
|
|
.ToPeerDescriptor(peerDynamicSubscriptions);
|
|
@@ -46,9 +47,8 @@ namespace Abc.Zebus.Directory.Cassandra.Storage
|
|
|
{
|
|
|
if (!loadDynamicSubscriptions)
|
|
|
{
|
|
|
- return _dataContext.StoragePeers
|
|
|
+ return _dataContext.Peers
|
|
|
.SetConsistencyLevel(ConsistencyLevel.LocalQuorum)
|
|
|
- .Where(peer => peer.UselessKey == false)
|
|
|
.Execute()
|
|
|
.Select(peer => peer.ToPeerDescriptor()!)
|
|
|
.ToList();
|
|
@@ -56,15 +56,13 @@ namespace Abc.Zebus.Directory.Cassandra.Storage
|
|
|
|
|
|
var dynamicSubscriptionsByPeer = _dataContext.DynamicSubscriptions
|
|
|
.SetConsistencyLevel(ConsistencyLevel.LocalQuorum)
|
|
|
- .Where(sub => sub.UselessKey == false)
|
|
|
.Execute()
|
|
|
.SelectMany(sub => sub.ToSubscriptionsForType().ToSubscriptions().Select(s => new { sub.PeerId, Subscription = s }))
|
|
|
.ToLookup(peerSub => peerSub.PeerId, peerSub=> peerSub.Subscription);
|
|
|
|
|
|
|
|
|
- return _dataContext.StoragePeers
|
|
|
+ return _dataContext.Peers
|
|
|
.SetConsistencyLevel(ConsistencyLevel.LocalQuorum)
|
|
|
- .Where(peer => peer.UselessKey == false)
|
|
|
.Execute()
|
|
|
.Select(peer => peer.ToPeerDescriptor(dynamicSubscriptionsByPeer[peer.PeerId]))
|
|
|
.Where(descriptor => descriptor != null)
|
|
@@ -73,26 +71,28 @@ namespace Abc.Zebus.Directory.Cassandra.Storage
|
|
|
|
|
|
public void AddOrUpdatePeer(PeerDescriptor peerDescriptor)
|
|
|
{
|
|
|
- var storagePeer = peerDescriptor.ToStoragePeer();
|
|
|
- _dataContext.StoragePeers
|
|
|
- .Insert(storagePeer)
|
|
|
+ var cassandraPeer = peerDescriptor.ToCassandra();
|
|
|
+ _dataContext.Peers
|
|
|
+ .Insert(cassandraPeer)
|
|
|
.SetConsistencyLevel(ConsistencyLevel.LocalQuorum)
|
|
|
- .SetTimestamp(storagePeer.TimestampUtc)
|
|
|
+ .SetTimestamp(cassandraPeer.TimestampUtc)
|
|
|
.Execute();
|
|
|
}
|
|
|
|
|
|
public void RemovePeer(PeerId peerId)
|
|
|
{
|
|
|
var now = DateTime.UtcNow;
|
|
|
- _dataContext.StoragePeers
|
|
|
+
|
|
|
+ _dataContext.Peers
|
|
|
.SetConsistencyLevel(ConsistencyLevel.LocalQuorum)
|
|
|
- .Where(peer => peer.UselessKey == false && peer.PeerId == peerId.ToString())
|
|
|
+ .Where(peer => peer.PeerId == peerId.ToString())
|
|
|
.Delete()
|
|
|
.SetTimestamp(now)
|
|
|
.Execute();
|
|
|
+
|
|
|
_dataContext.DynamicSubscriptions
|
|
|
.SetConsistencyLevel(ConsistencyLevel.LocalQuorum)
|
|
|
- .Where(sub => sub.UselessKey == false && sub.PeerId == peerId.ToString())
|
|
|
+ .Where(s => s.PeerId == peerId.ToString())
|
|
|
.Delete()
|
|
|
.SetTimestamp(now)
|
|
|
.Execute();
|
|
@@ -100,10 +100,10 @@ namespace Abc.Zebus.Directory.Cassandra.Storage
|
|
|
|
|
|
public void SetPeerResponding(PeerId peerId, bool isResponding)
|
|
|
{
|
|
|
- _dataContext.StoragePeers
|
|
|
+ _dataContext.Peers
|
|
|
.SetConsistencyLevel(ConsistencyLevel.LocalQuorum)
|
|
|
- .Where(peer => peer.UselessKey == false && peer.PeerId == peerId.ToString())
|
|
|
- .Select(peer => new StoragePeer { IsResponding = isResponding })
|
|
|
+ .Where(peer => peer.PeerId == peerId.ToString())
|
|
|
+ .Select(peer => new CassandraPeer { IsResponding = isResponding })
|
|
|
.Update()
|
|
|
.SetTimestamp(DateTime.UtcNow)
|
|
|
.Execute();
|
|
@@ -112,15 +112,13 @@ namespace Abc.Zebus.Directory.Cassandra.Storage
|
|
|
|
|
|
public void AddDynamicSubscriptionsForTypes(PeerId peerId, DateTime timestampUtc, SubscriptionsForType[] subscriptionsForTypes)
|
|
|
{
|
|
|
- if (subscriptionsForTypes == null)
|
|
|
- return;
|
|
|
var batch = _dataContext.Session.CreateBatch();
|
|
|
batch.SetConsistencyLevel(ConsistencyLevel.LocalQuorum);
|
|
|
|
|
|
foreach (var subscription in subscriptionsForTypes)
|
|
|
{
|
|
|
batch.Append(_dataContext.DynamicSubscriptions
|
|
|
- .Insert(subscription.ToStorageSubscription(peerId))
|
|
|
+ .Insert(subscription.ToCassandra(peerId))
|
|
|
.SetTimestamp(timestampUtc));
|
|
|
}
|
|
|
batch.Execute();
|
|
@@ -128,15 +126,13 @@ namespace Abc.Zebus.Directory.Cassandra.Storage
|
|
|
|
|
|
public void RemoveDynamicSubscriptionsForTypes(PeerId peerId, DateTime timestampUtc, MessageTypeId[] messageTypeIds)
|
|
|
{
|
|
|
- if (messageTypeIds == null)
|
|
|
- return;
|
|
|
var batch = _dataContext.Session.CreateBatch();
|
|
|
batch.SetConsistencyLevel(ConsistencyLevel.LocalQuorum);
|
|
|
|
|
|
foreach (var messageTypeId in messageTypeIds)
|
|
|
{
|
|
|
var deleteQuery = _dataContext.DynamicSubscriptions
|
|
|
- .Where(sub => sub.UselessKey == false && sub.PeerId == peerId.ToString() && sub.MessageTypeId == messageTypeId.FullName)
|
|
|
+ .Where(s => s.PeerId == peerId.ToString() && s.MessageTypeId == messageTypeId.FullName)
|
|
|
.Delete()
|
|
|
.SetTimestamp(timestampUtc);
|
|
|
batch.Append(deleteQuery);
|
|
@@ -149,7 +145,7 @@ namespace Abc.Zebus.Directory.Cassandra.Storage
|
|
|
{
|
|
|
_dataContext.DynamicSubscriptions
|
|
|
.SetConsistencyLevel(ConsistencyLevel.LocalQuorum)
|
|
|
- .Where(sub => sub.UselessKey == false && sub.PeerId == peerId.ToString())
|
|
|
+ .Where(s => s.PeerId == peerId.ToString())
|
|
|
.Delete()
|
|
|
.SetTimestamp(timestampUtc)
|
|
|
.Execute();
|