|
|
@@ -1,12 +1,14 @@
|
|
|
+using System;
|
|
|
using System.Collections.Generic;
|
|
|
using System.Linq;
|
|
|
using Abc.Zebus.Routing;
|
|
|
+using Abc.Zebus.Util.Extensions;
|
|
|
|
|
|
namespace Abc.Zebus.Directory
|
|
|
{
|
|
|
public class PeerSubscriptionTree
|
|
|
{
|
|
|
- private readonly SubscriptionNode _rootNode = new SubscriptionNode(0);
|
|
|
+ private readonly SubscriptionNode _rootNode = new SubscriptionNode(0, false);
|
|
|
private List<Peer> _peersMatchingAllMessages = new List<Peer>();
|
|
|
|
|
|
public bool IsEmpty
|
|
|
@@ -88,14 +90,18 @@ namespace Abc.Zebus.Directory
|
|
|
|
|
|
private class SubscriptionNode
|
|
|
{
|
|
|
+ private static readonly Action<SubscriptionNode, string> _removeNode = (x, part) => x.RemoveChildNode(part);
|
|
|
+ private static readonly Action<SubscriptionNode, string> _removeSharpNode = (x, _) => x._sharpNode = null;
|
|
|
+ private static readonly Action<SubscriptionNode, string> _removeStarNode = (x, _) => x._starNode = null;
|
|
|
private readonly int _nextPartIndex;
|
|
|
private readonly bool _matchesAll;
|
|
|
private Dictionary<string, SubscriptionNode> _childrenNodes = new Dictionary<string, SubscriptionNode>();
|
|
|
private List<Peer> _peers = new List<Peer>();
|
|
|
private SubscriptionNode _sharpNode;
|
|
|
private SubscriptionNode _starNode;
|
|
|
+ private int _peerCountIncludingChildren;
|
|
|
|
|
|
- public SubscriptionNode(int nextPartIndex, bool matchesAll = false)
|
|
|
+ public SubscriptionNode(int nextPartIndex, bool matchesAll)
|
|
|
{
|
|
|
_nextPartIndex = nextPartIndex;
|
|
|
_matchesAll = matchesAll;
|
|
|
@@ -103,7 +109,7 @@ namespace Abc.Zebus.Directory
|
|
|
|
|
|
public bool IsEmpty
|
|
|
{
|
|
|
- get { return _peers.Count == 0 && (_sharpNode == null || _sharpNode.IsEmpty) && (_starNode == null || _starNode.IsEmpty) && _childrenNodes.All(x => x.Value.IsEmpty); }
|
|
|
+ get { return _peerCountIncludingChildren == 0; }
|
|
|
}
|
|
|
|
|
|
public void Accept(PeerCollector peerCollector, BindingKey routingKey)
|
|
|
@@ -129,12 +135,14 @@ namespace Abc.Zebus.Directory
|
|
|
childNode.Accept(peerCollector, routingKey);
|
|
|
}
|
|
|
|
|
|
- public void Update(Peer peer, BindingKey subscription, UpdateAction action)
|
|
|
+ public int Update(Peer peer, BindingKey subscription, UpdateAction action)
|
|
|
{
|
|
|
if (IsLeaf(subscription))
|
|
|
{
|
|
|
- UpdateList(peer, action);
|
|
|
- return;
|
|
|
+ var update = UpdateList(peer, action);
|
|
|
+ _peerCountIncludingChildren += update;
|
|
|
+
|
|
|
+ return update;
|
|
|
}
|
|
|
|
|
|
var nextPart = subscription.GetPart(_nextPartIndex);
|
|
|
@@ -142,19 +150,28 @@ namespace Abc.Zebus.Directory
|
|
|
if (nextPart == "#" || nextPart == null)
|
|
|
{
|
|
|
var sharpNode = GetOrCreateSharpNode();
|
|
|
- sharpNode.Update(peer, subscription, action);
|
|
|
- return;
|
|
|
+ return UpdateChildNode(sharpNode, peer, subscription, action, null, _removeSharpNode);
|
|
|
}
|
|
|
|
|
|
if (nextPart == "*")
|
|
|
{
|
|
|
var starNode = GetOrCreateStarNode();
|
|
|
- starNode.Update(peer, subscription, action);
|
|
|
- return;
|
|
|
+ return UpdateChildNode(starNode, peer, subscription, action, null, _removeStarNode);
|
|
|
}
|
|
|
|
|
|
var childNode = GetOrAddChildNode(nextPart);
|
|
|
- childNode.Update(peer, subscription, action);
|
|
|
+ return UpdateChildNode(childNode, peer, subscription, action, nextPart, _removeNode);
|
|
|
+ }
|
|
|
+
|
|
|
+ private int UpdateChildNode(SubscriptionNode childNode, Peer peer, BindingKey subscription, UpdateAction action, string childNodePart, Action<SubscriptionNode, string> remover)
|
|
|
+ {
|
|
|
+ var update = childNode.Update(peer, subscription, action);
|
|
|
+ _peerCountIncludingChildren += update;
|
|
|
+
|
|
|
+ if (childNode.IsEmpty)
|
|
|
+ remover(this, childNodePart);
|
|
|
+
|
|
|
+ return update;
|
|
|
}
|
|
|
|
|
|
private bool IsLeaf(BindingKey bindingKey)
|
|
|
@@ -168,37 +185,77 @@ namespace Abc.Zebus.Directory
|
|
|
return _nextPartIndex == bindingKey.PartCount;
|
|
|
}
|
|
|
|
|
|
- private SubscriptionNode CreateChildNode(bool matchesAll = false)
|
|
|
- {
|
|
|
- return new SubscriptionNode(_nextPartIndex + 1, matchesAll);
|
|
|
- }
|
|
|
-
|
|
|
private SubscriptionNode GetOrAddChildNode(string part)
|
|
|
{
|
|
|
SubscriptionNode child;
|
|
|
- if (!_childrenNodes.TryGetValue(part, out child))
|
|
|
- {
|
|
|
- child = CreateChildNode();
|
|
|
- var newChildren = _childrenNodes.ToDictionary(x => x.Key, x => x.Value);
|
|
|
- newChildren.Add(part, child);
|
|
|
- _childrenNodes = newChildren;
|
|
|
- }
|
|
|
+ if (_childrenNodes.TryGetValue(part, out child))
|
|
|
+ return child;
|
|
|
+
|
|
|
+ child = new SubscriptionNode(_nextPartIndex + 1, false);
|
|
|
+ var newChildren = new Dictionary<string, SubscriptionNode>(_childrenNodes.Count + 1);
|
|
|
+ newChildren.AddRange(_childrenNodes);
|
|
|
+ newChildren.Add(part, child);
|
|
|
+ _childrenNodes = newChildren;
|
|
|
+
|
|
|
return child;
|
|
|
}
|
|
|
|
|
|
+ private void RemoveChildNode(string part)
|
|
|
+ {
|
|
|
+ var newChildren = new Dictionary<string, SubscriptionNode>(_childrenNodes);
|
|
|
+ newChildren.Remove(part);
|
|
|
+
|
|
|
+ _childrenNodes = newChildren;
|
|
|
+ }
|
|
|
+
|
|
|
private SubscriptionNode GetOrCreateSharpNode()
|
|
|
{
|
|
|
- return _sharpNode ?? (_sharpNode = CreateChildNode(true));
|
|
|
+ return _sharpNode ?? (_sharpNode = new SubscriptionNode(_nextPartIndex + 1, true));
|
|
|
}
|
|
|
|
|
|
private SubscriptionNode GetOrCreateStarNode()
|
|
|
{
|
|
|
- return _starNode ?? (_starNode = CreateChildNode());
|
|
|
+ return _starNode ?? (_starNode = new SubscriptionNode(_nextPartIndex + 1, false));
|
|
|
+ }
|
|
|
+
|
|
|
+ private int UpdateList(Peer peer, UpdateAction action)
|
|
|
+ {
|
|
|
+ return action == UpdateAction.Add ? AddToList(peer) : RemoveFromList(peer);
|
|
|
+ }
|
|
|
+
|
|
|
+ private int AddToList(Peer peerToAdd)
|
|
|
+ {
|
|
|
+ var removed = false;
|
|
|
+ var newPeers = new List<Peer>(_peers.Capacity);
|
|
|
+ foreach (var peer in _peers)
|
|
|
+ {
|
|
|
+ if (peer.Id == peerToAdd.Id)
|
|
|
+ removed = true;
|
|
|
+ else
|
|
|
+ newPeers.Add(peer);
|
|
|
+ }
|
|
|
+ newPeers.Add(peerToAdd);
|
|
|
+
|
|
|
+ _peers = newPeers;
|
|
|
+
|
|
|
+ return removed ? 0 : 1;
|
|
|
}
|
|
|
|
|
|
- private void UpdateList(Peer peer, UpdateAction action)
|
|
|
+ private int RemoveFromList(Peer peerToRemove)
|
|
|
{
|
|
|
- PeerSubscriptionTree.UpdateList(ref _peers, peer, action);
|
|
|
+ var removed = false;
|
|
|
+ var newPeers = new List<Peer>(_peers.Capacity);
|
|
|
+ foreach (var peer in _peers)
|
|
|
+ {
|
|
|
+ if (peer.Id == peerToRemove.Id)
|
|
|
+ removed = true;
|
|
|
+ else
|
|
|
+ newPeers.Add(peer);
|
|
|
+ }
|
|
|
+
|
|
|
+ _peers = newPeers;
|
|
|
+
|
|
|
+ return removed ? -1 : 0;
|
|
|
}
|
|
|
}
|
|
|
|