|
@@ -17,912 +17,911 @@ using Abc.Zebus.Util.Extensions;
|
|
using Microsoft.Extensions.Logging;
|
|
using Microsoft.Extensions.Logging;
|
|
using Newtonsoft.Json;
|
|
using Newtonsoft.Json;
|
|
|
|
|
|
-namespace Abc.Zebus.Core
|
|
|
|
|
|
+namespace Abc.Zebus.Core;
|
|
|
|
+
|
|
|
|
+public class Bus : IInternalBus, IMessageDispatchFactory
|
|
{
|
|
{
|
|
- public class Bus : IInternalBus, IMessageDispatchFactory
|
|
|
|
|
|
+ private static readonly BusMessageLogger _messageLogger = new(typeof(Bus));
|
|
|
|
+ private static readonly ILogger _logger = ZebusLogManager.GetLogger(typeof(Bus));
|
|
|
|
+
|
|
|
|
+ private readonly ConcurrentDictionary<MessageId, TaskCompletionSource<CommandResult>> _messageIdToTaskCompletionSources = new();
|
|
|
|
+ private readonly UniqueTimestampProvider _deserializationFailureTimestampProvider = new();
|
|
|
|
+ private readonly Dictionary<Subscription, SubscriptionStatus> _subscriptions = new();
|
|
|
|
+ private readonly List<IMessageHandlerInvoker> _startupInvokers = new();
|
|
|
|
+ private readonly HashSet<MessageTypeId> _pendingUnsubscriptions = new();
|
|
|
|
+ private readonly ITransport _transport;
|
|
|
|
+ private readonly IPeerDirectory _directory;
|
|
|
|
+ private readonly IMessageSerializer _serializer;
|
|
|
|
+ private readonly IMessageDispatcher _messageDispatcher;
|
|
|
|
+ private readonly IMessageSendingStrategy _messageSendingStrategy;
|
|
|
|
+ private readonly IStoppingStrategy _stoppingStrategy;
|
|
|
|
+ private readonly IBusConfiguration _configuration;
|
|
|
|
+
|
|
|
|
+ private Task? _processPendingUnsubscriptionsTask;
|
|
|
|
+
|
|
|
|
+ private int _subscriptionsVersion;
|
|
|
|
+ private int _status;
|
|
|
|
+
|
|
|
|
+ public Bus(ITransport transport,
|
|
|
|
+ IPeerDirectory directory,
|
|
|
|
+ IMessageSerializer serializer,
|
|
|
|
+ IMessageDispatcher messageDispatcher,
|
|
|
|
+ IMessageSendingStrategy messageSendingStrategy,
|
|
|
|
+ IStoppingStrategy stoppingStrategy,
|
|
|
|
+ IBusConfiguration configuration)
|
|
{
|
|
{
|
|
- private static readonly BusMessageLogger _messageLogger = new(typeof(Bus));
|
|
|
|
- private static readonly ILogger _logger = ZebusLogManager.GetLogger(typeof(Bus));
|
|
|
|
-
|
|
|
|
- private readonly ConcurrentDictionary<MessageId, TaskCompletionSource<CommandResult>> _messageIdToTaskCompletionSources = new();
|
|
|
|
- private readonly UniqueTimestampProvider _deserializationFailureTimestampProvider = new();
|
|
|
|
- private readonly Dictionary<Subscription, SubscriptionStatus> _subscriptions = new();
|
|
|
|
- private readonly List<IMessageHandlerInvoker> _startupInvokers = new();
|
|
|
|
- private readonly HashSet<MessageTypeId> _pendingUnsubscriptions = new();
|
|
|
|
- private readonly ITransport _transport;
|
|
|
|
- private readonly IPeerDirectory _directory;
|
|
|
|
- private readonly IMessageSerializer _serializer;
|
|
|
|
- private readonly IMessageDispatcher _messageDispatcher;
|
|
|
|
- private readonly IMessageSendingStrategy _messageSendingStrategy;
|
|
|
|
- private readonly IStoppingStrategy _stoppingStrategy;
|
|
|
|
- private readonly IBusConfiguration _configuration;
|
|
|
|
|
|
+ _transport = transport;
|
|
|
|
+ _transport.MessageReceived += OnTransportMessageReceived;
|
|
|
|
+ _directory = directory;
|
|
|
|
+ _directory.PeerUpdated += OnPeerUpdated;
|
|
|
|
+ _directory.PeerSubscriptionsUpdated += DispatchSubscriptionsUpdatedMessages;
|
|
|
|
+ _serializer = serializer;
|
|
|
|
+ _messageDispatcher = messageDispatcher;
|
|
|
|
+ _messageDispatcher.MessageHandlerInvokersUpdated += MessageDispatcherOnMessageHandlerInvokersUpdated;
|
|
|
|
+ _messageSendingStrategy = messageSendingStrategy;
|
|
|
|
+ _stoppingStrategy = stoppingStrategy;
|
|
|
|
+ _configuration = configuration;
|
|
|
|
+ }
|
|
|
|
|
|
- private Task? _processPendingUnsubscriptionsTask;
|
|
|
|
|
|
+ public event Action? Starting;
|
|
|
|
+ public event Action? Started;
|
|
|
|
+ public event Action? Stopping;
|
|
|
|
+ public event Action? Stopped;
|
|
|
|
|
|
- private int _subscriptionsVersion;
|
|
|
|
- private int _status;
|
|
|
|
|
|
+ public PeerId PeerId { get; private set; }
|
|
|
|
+ public string Environment { get; private set; } = string.Empty;
|
|
|
|
+ public bool IsRunning => Status is BusStatus.Started or BusStatus.Stopping;
|
|
|
|
+ public string EndPoint => _transport.InboundEndPoint;
|
|
|
|
+ public string? DeserializationFailureDumpDirectoryPath { get; set; }
|
|
|
|
|
|
- public Bus(ITransport transport,
|
|
|
|
- IPeerDirectory directory,
|
|
|
|
- IMessageSerializer serializer,
|
|
|
|
- IMessageDispatcher messageDispatcher,
|
|
|
|
- IMessageSendingStrategy messageSendingStrategy,
|
|
|
|
- IStoppingStrategy stoppingStrategy,
|
|
|
|
- IBusConfiguration configuration)
|
|
|
|
- {
|
|
|
|
- _transport = transport;
|
|
|
|
- _transport.MessageReceived += OnTransportMessageReceived;
|
|
|
|
- _directory = directory;
|
|
|
|
- _directory.PeerUpdated += OnPeerUpdated;
|
|
|
|
- _directory.PeerSubscriptionsUpdated += DispatchSubscriptionsUpdatedMessages;
|
|
|
|
- _serializer = serializer;
|
|
|
|
- _messageDispatcher = messageDispatcher;
|
|
|
|
- _messageDispatcher.MessageHandlerInvokersUpdated += MessageDispatcherOnMessageHandlerInvokersUpdated;
|
|
|
|
- _messageSendingStrategy = messageSendingStrategy;
|
|
|
|
- _stoppingStrategy = stoppingStrategy;
|
|
|
|
- _configuration = configuration;
|
|
|
|
- }
|
|
|
|
|
|
+ private BusStatus Status
|
|
|
|
+ {
|
|
|
|
+ get => (BusStatus)_status;
|
|
|
|
+ set => _status = (int)value;
|
|
|
|
+ }
|
|
|
|
|
|
- public event Action? Starting;
|
|
|
|
- public event Action? Started;
|
|
|
|
- public event Action? Stopping;
|
|
|
|
- public event Action? Stopped;
|
|
|
|
|
|
+ public void Configure(PeerId peerId, string environment)
|
|
|
|
+ {
|
|
|
|
+ PeerId = peerId;
|
|
|
|
+ Environment = environment;
|
|
|
|
+ DeserializationFailureDumpDirectoryPath = Path.Combine(Path.GetTempPath(), $"zebus_deserialization_dumps_{peerId}");
|
|
|
|
+ _transport.Configure(peerId, environment);
|
|
|
|
+ }
|
|
|
|
|
|
- public PeerId PeerId { get; private set; }
|
|
|
|
- public string Environment { get; private set; } = string.Empty;
|
|
|
|
- public bool IsRunning => Status == BusStatus.Started || Status == BusStatus.Stopping;
|
|
|
|
- public string EndPoint => _transport.InboundEndPoint;
|
|
|
|
- public string? DeserializationFailureDumpDirectoryPath { get; set; }
|
|
|
|
|
|
+ public virtual void Start()
|
|
|
|
+ {
|
|
|
|
+ if (Interlocked.CompareExchange(ref _status, (int)BusStatus.Starting, (int)BusStatus.Stopped) != (int)BusStatus.Stopped)
|
|
|
|
+ throw new InvalidOperationException("Unable to start, the bus is already running");
|
|
|
|
|
|
- private BusStatus Status
|
|
|
|
|
|
+ try
|
|
{
|
|
{
|
|
- get => (BusStatus)_status;
|
|
|
|
- set => _status = (int)value;
|
|
|
|
|
|
+ Starting?.Invoke();
|
|
}
|
|
}
|
|
-
|
|
|
|
- public void Configure(PeerId peerId, string environment)
|
|
|
|
|
|
+ catch
|
|
{
|
|
{
|
|
- PeerId = peerId;
|
|
|
|
- Environment = environment;
|
|
|
|
- DeserializationFailureDumpDirectoryPath = Path.Combine(Path.GetTempPath(), $"zebus_deserialization_dumps_{peerId}");
|
|
|
|
- _transport.Configure(peerId, environment);
|
|
|
|
|
|
+ Status = BusStatus.Stopped;
|
|
|
|
+ throw;
|
|
}
|
|
}
|
|
|
|
|
|
- public virtual void Start()
|
|
|
|
|
|
+ var registered = false;
|
|
|
|
+ try
|
|
{
|
|
{
|
|
- if (Interlocked.CompareExchange(ref _status, (int)BusStatus.Starting, (int)BusStatus.Stopped) != (int)BusStatus.Stopped)
|
|
|
|
- throw new InvalidOperationException("Unable to start, the bus is already running");
|
|
|
|
|
|
+ _logger.LogDebug("Loading invokers...");
|
|
|
|
+ _messageDispatcher.LoadMessageHandlerInvokers();
|
|
|
|
|
|
- try
|
|
|
|
- {
|
|
|
|
- Starting?.Invoke();
|
|
|
|
- }
|
|
|
|
- catch
|
|
|
|
- {
|
|
|
|
- Status = BusStatus.Stopped;
|
|
|
|
- throw;
|
|
|
|
- }
|
|
|
|
|
|
+ _logger.LogDebug("Performing startup subscribe...");
|
|
|
|
+ PerformStartupSubscribe();
|
|
|
|
|
|
- var registered = false;
|
|
|
|
- try
|
|
|
|
- {
|
|
|
|
- _logger.LogDebug("Loading invokers...");
|
|
|
|
- _messageDispatcher.LoadMessageHandlerInvokers();
|
|
|
|
|
|
+ _logger.LogDebug("Starting transport...");
|
|
|
|
+ _transport.Start();
|
|
|
|
|
|
- _logger.LogDebug("Performing startup subscribe...");
|
|
|
|
- PerformStartupSubscribe();
|
|
|
|
|
|
+ Status = BusStatus.Started;
|
|
|
|
|
|
- _logger.LogDebug("Starting transport...");
|
|
|
|
- _transport.Start();
|
|
|
|
|
|
+ _logger.LogDebug("Registering on directory...");
|
|
|
|
+ var self = new Peer(PeerId, EndPoint);
|
|
|
|
+ _directory.RegisterAsync(this, self, GetSubscriptions()).Wait();
|
|
|
|
+ registered = true;
|
|
|
|
|
|
- Status = BusStatus.Started;
|
|
|
|
|
|
+ _transport.OnRegistered();
|
|
|
|
|
|
- _logger.LogDebug("Registering on directory...");
|
|
|
|
- var self = new Peer(PeerId, EndPoint);
|
|
|
|
- _directory.RegisterAsync(this, self, GetSubscriptions()).Wait();
|
|
|
|
- registered = true;
|
|
|
|
|
|
+ _logger.LogDebug("Starting message dispatcher...");
|
|
|
|
+ _messageDispatcher.Start();
|
|
|
|
+ }
|
|
|
|
+ catch
|
|
|
|
+ {
|
|
|
|
+ InternalStop(registered);
|
|
|
|
+ Status = BusStatus.Stopped;
|
|
|
|
+ throw;
|
|
|
|
+ }
|
|
|
|
|
|
- _transport.OnRegistered();
|
|
|
|
|
|
+ Started?.Invoke();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private void PerformStartupSubscribe()
|
|
|
|
+ {
|
|
|
|
+ var startupSubscriptions = _messageDispatcher.GetMessageHandlerInvokers()
|
|
|
|
+ .SelectMany(x => x.GetStartupSubscriptions())
|
|
|
|
+ .ToList();
|
|
|
|
|
|
- _logger.LogDebug("Starting message dispatcher...");
|
|
|
|
- _messageDispatcher.Start();
|
|
|
|
|
|
+ lock (_subscriptions)
|
|
|
|
+ {
|
|
|
|
+ foreach (var subscription in startupSubscriptions)
|
|
|
|
+ {
|
|
|
|
+ var status = GetOrAddSubscriptionStatus(subscription);
|
|
|
|
+ status.CurrentSubscriptionCount++;
|
|
}
|
|
}
|
|
- catch
|
|
|
|
|
|
+
|
|
|
|
+ foreach (var status in _subscriptions.Values)
|
|
{
|
|
{
|
|
- InternalStop(registered);
|
|
|
|
- Status = BusStatus.Stopped;
|
|
|
|
- throw;
|
|
|
|
|
|
+ status.CurrentSubscriptionCount += status.StartupSubscriptionCount;
|
|
}
|
|
}
|
|
|
|
|
|
- Started?.Invoke();
|
|
|
|
|
|
+ foreach (var invoker in _startupInvokers)
|
|
|
|
+ {
|
|
|
|
+ _messageDispatcher.AddInvoker(invoker);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
+ }
|
|
|
|
|
|
- private void PerformStartupSubscribe()
|
|
|
|
|
|
+ protected virtual IEnumerable<Subscription> GetSubscriptions()
|
|
|
|
+ {
|
|
|
|
+ lock (_subscriptions)
|
|
{
|
|
{
|
|
- var startupSubscriptions = _messageDispatcher.GetMessageHandlerInvokers()
|
|
|
|
- .SelectMany(x => x.GetStartupSubscriptions())
|
|
|
|
- .ToList();
|
|
|
|
|
|
+ return _subscriptions.Where(i => i.Value.CurrentSubscriptionCount > 0)
|
|
|
|
+ .Select(i => i.Key)
|
|
|
|
+ .ToList();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
|
|
- lock (_subscriptions)
|
|
|
|
- {
|
|
|
|
- foreach (var subscription in startupSubscriptions)
|
|
|
|
- {
|
|
|
|
- var status = GetOrAddSubscriptionStatus(subscription);
|
|
|
|
- status.CurrentSubscriptionCount++;
|
|
|
|
- }
|
|
|
|
|
|
+ private SubscriptionStatus GetOrAddSubscriptionStatus(Subscription subscription)
|
|
|
|
+ {
|
|
|
|
+ if (!_subscriptions.TryGetValue(subscription, out var status))
|
|
|
|
+ {
|
|
|
|
+ status = new SubscriptionStatus();
|
|
|
|
+ _subscriptions.Add(subscription, status);
|
|
|
|
+ }
|
|
|
|
|
|
- foreach (var status in _subscriptions.Values)
|
|
|
|
- {
|
|
|
|
- status.CurrentSubscriptionCount += status.StartupSubscriptionCount;
|
|
|
|
- }
|
|
|
|
|
|
+ return status;
|
|
|
|
+ }
|
|
|
|
|
|
- foreach (var invoker in _startupInvokers)
|
|
|
|
- {
|
|
|
|
- _messageDispatcher.AddInvoker(invoker);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
+ public virtual void Stop()
|
|
|
|
+ {
|
|
|
|
+ if (Interlocked.CompareExchange(ref _status, (int)BusStatus.Stopping, (int)BusStatus.Started) != (int)BusStatus.Started)
|
|
|
|
+ throw new InvalidOperationException("Unable to stop, the bus is not running");
|
|
|
|
|
|
- protected virtual IEnumerable<Subscription> GetSubscriptions()
|
|
|
|
|
|
+ try
|
|
{
|
|
{
|
|
- lock (_subscriptions)
|
|
|
|
- {
|
|
|
|
- return _subscriptions.Where(i => i.Value.CurrentSubscriptionCount > 0)
|
|
|
|
- .Select(i => i.Key)
|
|
|
|
- .ToList();
|
|
|
|
- }
|
|
|
|
|
|
+ Stopping?.Invoke();
|
|
}
|
|
}
|
|
-
|
|
|
|
- private SubscriptionStatus GetOrAddSubscriptionStatus(Subscription subscription)
|
|
|
|
|
|
+ catch
|
|
{
|
|
{
|
|
- if (!_subscriptions.TryGetValue(subscription, out var status))
|
|
|
|
- {
|
|
|
|
- status = new SubscriptionStatus();
|
|
|
|
- _subscriptions.Add(subscription, status);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- return status;
|
|
|
|
|
|
+ Status = BusStatus.Started;
|
|
|
|
+ throw;
|
|
}
|
|
}
|
|
|
|
|
|
- public virtual void Stop()
|
|
|
|
- {
|
|
|
|
- if (Interlocked.CompareExchange(ref _status, (int)BusStatus.Stopping, (int)BusStatus.Started) != (int)BusStatus.Started)
|
|
|
|
- throw new InvalidOperationException("Unable to stop, the bus is not running");
|
|
|
|
|
|
+ InternalStop(true);
|
|
|
|
+
|
|
|
|
+ Stopped?.Invoke();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private void InternalStop(bool unregister)
|
|
|
|
+ {
|
|
|
|
+ Status = BusStatus.Stopping;
|
|
|
|
|
|
|
|
+ if (unregister)
|
|
|
|
+ {
|
|
try
|
|
try
|
|
{
|
|
{
|
|
- Stopping?.Invoke();
|
|
|
|
|
|
+ _directory.UnregisterAsync(this).Wait();
|
|
}
|
|
}
|
|
- catch
|
|
|
|
|
|
+ catch (Exception ex)
|
|
{
|
|
{
|
|
- Status = BusStatus.Started;
|
|
|
|
- throw;
|
|
|
|
|
|
+ _logger.LogError(ex, "Error during stop");
|
|
}
|
|
}
|
|
-
|
|
|
|
- InternalStop(true);
|
|
|
|
-
|
|
|
|
- Stopped?.Invoke();
|
|
|
|
}
|
|
}
|
|
|
|
|
|
- private void InternalStop(bool unregister)
|
|
|
|
|
|
+ lock (_subscriptions)
|
|
{
|
|
{
|
|
- Status = BusStatus.Stopping;
|
|
|
|
-
|
|
|
|
- if (unregister)
|
|
|
|
|
|
+ foreach (var (subscription, status) in _subscriptions.ToList())
|
|
{
|
|
{
|
|
- try
|
|
|
|
- {
|
|
|
|
- _directory.UnregisterAsync(this).Wait();
|
|
|
|
- }
|
|
|
|
- catch (Exception ex)
|
|
|
|
- {
|
|
|
|
- _logger.LogError(ex, "Error during stop");
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
+ status.CurrentSubscriptionCount = 0;
|
|
|
|
|
|
- lock (_subscriptions)
|
|
|
|
- {
|
|
|
|
- foreach (var (subscription, status) in _subscriptions.ToList())
|
|
|
|
- {
|
|
|
|
- status.CurrentSubscriptionCount = 0;
|
|
|
|
-
|
|
|
|
- if (status.IsEmpty)
|
|
|
|
- _subscriptions.Remove(subscription);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- _pendingUnsubscriptions.Clear();
|
|
|
|
- _processPendingUnsubscriptionsTask = null;
|
|
|
|
-
|
|
|
|
- unchecked
|
|
|
|
- {
|
|
|
|
- ++_subscriptionsVersion;
|
|
|
|
- }
|
|
|
|
|
|
+ if (status.IsEmpty)
|
|
|
|
+ _subscriptions.Remove(subscription);
|
|
}
|
|
}
|
|
|
|
|
|
- try
|
|
|
|
- {
|
|
|
|
- _stoppingStrategy.Stop(_transport, _messageDispatcher);
|
|
|
|
- }
|
|
|
|
- finally
|
|
|
|
|
|
+ _pendingUnsubscriptions.Clear();
|
|
|
|
+ _processPendingUnsubscriptionsTask = null;
|
|
|
|
+
|
|
|
|
+ unchecked
|
|
{
|
|
{
|
|
- Status = BusStatus.Stopped;
|
|
|
|
|
|
+ ++_subscriptionsVersion;
|
|
}
|
|
}
|
|
-
|
|
|
|
- _messageIdToTaskCompletionSources.Clear();
|
|
|
|
}
|
|
}
|
|
|
|
|
|
- public void Publish(IEvent message)
|
|
|
|
|
|
+ try
|
|
{
|
|
{
|
|
- if (!IsRunning)
|
|
|
|
- throw new InvalidOperationException("Unable to publish message, the bus is not running");
|
|
|
|
|
|
+ _stoppingStrategy.Stop(_transport, _messageDispatcher);
|
|
|
|
+ }
|
|
|
|
+ finally
|
|
|
|
+ {
|
|
|
|
+ Status = BusStatus.Stopped;
|
|
|
|
+ }
|
|
|
|
|
|
- var peersHandlingMessage = _directory.GetPeersHandlingMessage(message);
|
|
|
|
|
|
+ _messageIdToTaskCompletionSources.Clear();
|
|
|
|
+ }
|
|
|
|
|
|
- PublishImpl(message, peersHandlingMessage);
|
|
|
|
- }
|
|
|
|
|
|
+ public void Publish(IEvent message)
|
|
|
|
+ {
|
|
|
|
+ if (!IsRunning)
|
|
|
|
+ throw new InvalidOperationException("Unable to publish message, the bus is not running");
|
|
|
|
|
|
- public void Publish(IEvent message, PeerId targetPeerId)
|
|
|
|
- {
|
|
|
|
- if (!IsRunning)
|
|
|
|
- throw new InvalidOperationException("Unable to publish message, the bus is not running");
|
|
|
|
|
|
+ var peersHandlingMessage = _directory.GetPeersHandlingMessage(message);
|
|
|
|
|
|
- var peer = _directory.GetPeer(targetPeerId);
|
|
|
|
- if (peer != null)
|
|
|
|
- PublishImpl(message, new List<Peer> { peer });
|
|
|
|
- }
|
|
|
|
|
|
+ PublishImpl(message, peersHandlingMessage);
|
|
|
|
+ }
|
|
|
|
|
|
- private void PublishImpl(IEvent message, IList<Peer> peers)
|
|
|
|
- {
|
|
|
|
- var localDispatchEnabled = LocalDispatch.Enabled;
|
|
|
|
- var shouldBeHandledLocally = localDispatchEnabled && peers.Any(x => x.Id == PeerId);
|
|
|
|
- if (shouldBeHandledLocally)
|
|
|
|
- HandleLocalMessage(message, null);
|
|
|
|
|
|
+ public void Publish(IEvent message, PeerId targetPeerId)
|
|
|
|
+ {
|
|
|
|
+ if (!IsRunning)
|
|
|
|
+ throw new InvalidOperationException("Unable to publish message, the bus is not running");
|
|
|
|
|
|
- var targetPeers = shouldBeHandledLocally ? peers.Where(x => x.Id != PeerId).ToList() : peers;
|
|
|
|
- LogAndSendMessage(message, targetPeers, true, shouldBeHandledLocally);
|
|
|
|
- }
|
|
|
|
|
|
+ var peer = _directory.GetPeer(targetPeerId);
|
|
|
|
+ if (peer != null)
|
|
|
|
+ PublishImpl(message, new List<Peer> { peer });
|
|
|
|
+ }
|
|
|
|
|
|
- public Task<CommandResult> Send(ICommand message)
|
|
|
|
- {
|
|
|
|
- if (!IsRunning)
|
|
|
|
- throw new InvalidOperationException("Unable to send message, the bus is not running");
|
|
|
|
|
|
+ private void PublishImpl(IEvent message, IList<Peer> peers)
|
|
|
|
+ {
|
|
|
|
+ var localDispatchEnabled = LocalDispatch.Enabled;
|
|
|
|
+ var shouldBeHandledLocally = localDispatchEnabled && peers.Any(x => x.Id == PeerId);
|
|
|
|
+ if (shouldBeHandledLocally)
|
|
|
|
+ HandleLocalMessage(message, null);
|
|
|
|
+
|
|
|
|
+ var targetPeers = shouldBeHandledLocally ? peers.Where(x => x.Id != PeerId).ToList() : peers;
|
|
|
|
+ LogAndSendMessage(message, targetPeers, true, shouldBeHandledLocally);
|
|
|
|
+ }
|
|
|
|
|
|
- var peers = _directory.GetPeersHandlingMessage(message);
|
|
|
|
- if (peers.Count == 0)
|
|
|
|
- throw new InvalidOperationException("Unable to find peer for specified command, " + BusMessageLogger.ToString(message) + ". Did you change the namespace?");
|
|
|
|
|
|
+ public Task<CommandResult> Send(ICommand message)
|
|
|
|
+ {
|
|
|
|
+ if (!IsRunning)
|
|
|
|
+ throw new InvalidOperationException("Unable to send message, the bus is not running");
|
|
|
|
|
|
- var self = peers.FirstOrDefault(x => x.Id == PeerId);
|
|
|
|
|
|
+ var peers = _directory.GetPeersHandlingMessage(message);
|
|
|
|
+ if (peers.Count == 0)
|
|
|
|
+ throw new InvalidOperationException("Unable to find peer for specified command, " + BusMessageLogger.ToString(message) + ". Did you change the namespace?");
|
|
|
|
|
|
- if (self != null)
|
|
|
|
- return SendImpl(message, self);
|
|
|
|
|
|
+ var self = peers.FirstOrDefault(x => x.Id == PeerId);
|
|
|
|
|
|
- if (peers.Count > 1)
|
|
|
|
- {
|
|
|
|
- var exceptionMessage = $"{peers.Count} peers are handling {BusMessageLogger.ToString(message)}. Peers: {string.Join(", ", peers.Select(p => p.ToString()))}.";
|
|
|
|
- throw new InvalidOperationException(exceptionMessage);
|
|
|
|
- }
|
|
|
|
|
|
+ if (self != null)
|
|
|
|
+ return SendImpl(message, self);
|
|
|
|
|
|
- return SendImpl(message, peers[0]);
|
|
|
|
|
|
+ if (peers.Count > 1)
|
|
|
|
+ {
|
|
|
|
+ var exceptionMessage = $"{peers.Count} peers are handling {BusMessageLogger.ToString(message)}. Peers: {string.Join(", ", peers.Select(p => p.ToString()))}.";
|
|
|
|
+ throw new InvalidOperationException(exceptionMessage);
|
|
}
|
|
}
|
|
|
|
|
|
- public Task<CommandResult> Send(ICommand message, Peer peer)
|
|
|
|
- {
|
|
|
|
- if (peer == null)
|
|
|
|
- throw new ArgumentNullException(nameof(peer));
|
|
|
|
|
|
+ return SendImpl(message, peers[0]);
|
|
|
|
+ }
|
|
|
|
|
|
- if (!IsRunning)
|
|
|
|
- throw new InvalidOperationException("Unable to send message, the bus is not running");
|
|
|
|
|
|
+ public Task<CommandResult> Send(ICommand message, Peer peer)
|
|
|
|
+ {
|
|
|
|
+ if (peer == null)
|
|
|
|
+ throw new ArgumentNullException(nameof(peer));
|
|
|
|
|
|
- return SendImpl(message, peer);
|
|
|
|
- }
|
|
|
|
|
|
+ if (!IsRunning)
|
|
|
|
+ throw new InvalidOperationException("Unable to send message, the bus is not running");
|
|
|
|
|
|
- private Task<CommandResult> SendImpl(ICommand message, Peer peer)
|
|
|
|
- {
|
|
|
|
- var taskCompletionSource = new TaskCompletionSource<CommandResult>(TaskCreationOptions.RunContinuationsAsynchronously);
|
|
|
|
|
|
+ return SendImpl(message, peer);
|
|
|
|
+ }
|
|
|
|
|
|
- if (LocalDispatch.Enabled && peer.Id == PeerId)
|
|
|
|
- {
|
|
|
|
- HandleLocalMessage(message, taskCompletionSource);
|
|
|
|
- }
|
|
|
|
- else
|
|
|
|
- {
|
|
|
|
- var transportMessage = ToTransportMessage(message);
|
|
|
|
|
|
+ private Task<CommandResult> SendImpl(ICommand message, Peer peer)
|
|
|
|
+ {
|
|
|
|
+ var taskCompletionSource = new TaskCompletionSource<CommandResult>(TaskCreationOptions.RunContinuationsAsynchronously);
|
|
|
|
|
|
- if (!peer.IsResponding && !_messageSendingStrategy.IsMessagePersistent(transportMessage) && !message.TypeId().IsInfrastructure())
|
|
|
|
- throw new InvalidOperationException($"Unable to send this transient message {BusMessageLogger.ToString(message)} while peer {peer.Id} is not responding.");
|
|
|
|
|
|
+ if (LocalDispatch.Enabled && peer.Id == PeerId)
|
|
|
|
+ {
|
|
|
|
+ HandleLocalMessage(message, taskCompletionSource);
|
|
|
|
+ }
|
|
|
|
+ else
|
|
|
|
+ {
|
|
|
|
+ var transportMessage = ToTransportMessage(message);
|
|
|
|
|
|
- _messageIdToTaskCompletionSources.TryAdd(transportMessage.Id, taskCompletionSource);
|
|
|
|
|
|
+ if (!peer.IsResponding && !_messageSendingStrategy.IsMessagePersistent(transportMessage) && !message.TypeId().IsInfrastructure())
|
|
|
|
+ throw new InvalidOperationException($"Unable to send this transient message {BusMessageLogger.ToString(message)} while peer {peer.Id} is not responding.");
|
|
|
|
|
|
- var peers = new[] { peer };
|
|
|
|
- _messageLogger.LogSendMessage(message, peers, transportMessage);
|
|
|
|
- SendTransportMessage(transportMessage, peers);
|
|
|
|
- }
|
|
|
|
|
|
+ _messageIdToTaskCompletionSources.TryAdd(transportMessage.Id, taskCompletionSource);
|
|
|
|
|
|
- return taskCompletionSource.Task;
|
|
|
|
|
|
+ var peers = new[] { peer };
|
|
|
|
+ _messageLogger.LogSendMessage(message, peers, transportMessage);
|
|
|
|
+ SendTransportMessage(transportMessage, peers);
|
|
}
|
|
}
|
|
|
|
|
|
- public async Task<IDisposable> SubscribeAsync(SubscriptionRequest request)
|
|
|
|
- {
|
|
|
|
- if (request == null)
|
|
|
|
- throw new ArgumentNullException(nameof(request));
|
|
|
|
|
|
+ return taskCompletionSource.Task;
|
|
|
|
+ }
|
|
|
|
|
|
- if (IsRunning && !request.ThereIsNoHandlerButIKnowWhatIAmDoing)
|
|
|
|
- EnsureMessageHandlerInvokerExists(request.Subscriptions);
|
|
|
|
|
|
+ public async Task<IDisposable> SubscribeAsync(SubscriptionRequest request)
|
|
|
|
+ {
|
|
|
|
+ if (request == null)
|
|
|
|
+ throw new ArgumentNullException(nameof(request));
|
|
|
|
|
|
- request.MarkAsSubmitted(_subscriptionsVersion, IsRunning);
|
|
|
|
|
|
+ if (IsRunning && !request.ThereIsNoHandlerButIKnowWhatIAmDoing)
|
|
|
|
+ EnsureMessageHandlerInvokerExists(request.Subscriptions);
|
|
|
|
|
|
- if (!request.IsStartupRequest && request.Batch != null)
|
|
|
|
- await request.Batch.WhenSubmittedAsync().ConfigureAwait(false);
|
|
|
|
|
|
+ request.MarkAsSubmitted(_subscriptionsVersion, IsRunning);
|
|
|
|
|
|
- await AddSubscriptionsAsync(request).ConfigureAwait(false);
|
|
|
|
|
|
+ if (!request.IsStartupRequest && request.Batch != null)
|
|
|
|
+ await request.Batch.WhenSubmittedAsync().ConfigureAwait(false);
|
|
|
|
|
|
- return new DisposableAction(() => RemoveSubscriptions(request));
|
|
|
|
- }
|
|
|
|
|
|
+ await AddSubscriptionsAsync(request).ConfigureAwait(false);
|
|
|
|
|
|
- public async Task<IDisposable> SubscribeAsync(SubscriptionRequest request, Action<IMessage> handler)
|
|
|
|
- {
|
|
|
|
- if (request == null)
|
|
|
|
- throw new ArgumentNullException(nameof(request));
|
|
|
|
|
|
+ return new DisposableAction(() => RemoveSubscriptions(request));
|
|
|
|
+ }
|
|
|
|
|
|
- if (handler == null)
|
|
|
|
- throw new ArgumentNullException(nameof(handler));
|
|
|
|
|
|
+ public async Task<IDisposable> SubscribeAsync(SubscriptionRequest request, Action<IMessage> handler)
|
|
|
|
+ {
|
|
|
|
+ if (request == null)
|
|
|
|
+ throw new ArgumentNullException(nameof(request));
|
|
|
|
|
|
- request.MarkAsSubmitted(_subscriptionsVersion, IsRunning);
|
|
|
|
|
|
+ if (handler == null)
|
|
|
|
+ throw new ArgumentNullException(nameof(handler));
|
|
|
|
|
|
- var eventHandlerInvokers = request.Subscriptions
|
|
|
|
- .GroupBy(x => x.MessageTypeId)
|
|
|
|
- .Select(x => new DynamicMessageHandlerInvoker(
|
|
|
|
- handler,
|
|
|
|
- x.Key.GetMessageType() ?? throw new InvalidOperationException($"Could not resolve type {x.Key.FullName}"),
|
|
|
|
- x.Select(s => s.BindingKey).ToList()
|
|
|
|
- ))
|
|
|
|
- .ToList();
|
|
|
|
|
|
+ request.MarkAsSubmitted(_subscriptionsVersion, IsRunning);
|
|
|
|
|
|
- if (request.IsStartupRequest)
|
|
|
|
|
|
+ var eventHandlerInvokers = request.Subscriptions
|
|
|
|
+ .GroupBy(x => x.MessageTypeId)
|
|
|
|
+ .Select(x => new DynamicMessageHandlerInvoker(
|
|
|
|
+ handler,
|
|
|
|
+ x.Key.GetMessageType() ?? throw new InvalidOperationException($"Could not resolve type {x.Key.FullName}"),
|
|
|
|
+ x.Select(s => s.BindingKey).ToList()
|
|
|
|
+ ))
|
|
|
|
+ .ToList();
|
|
|
|
+
|
|
|
|
+ if (request.IsStartupRequest)
|
|
|
|
+ {
|
|
|
|
+ lock (_subscriptions)
|
|
{
|
|
{
|
|
- lock (_subscriptions)
|
|
|
|
- {
|
|
|
|
- _startupInvokers.AddRange(eventHandlerInvokers);
|
|
|
|
- }
|
|
|
|
|
|
+ _startupInvokers.AddRange(eventHandlerInvokers);
|
|
}
|
|
}
|
|
- else
|
|
|
|
- {
|
|
|
|
- if (request.Batch != null)
|
|
|
|
- await request.Batch.WhenSubmittedAsync().ConfigureAwait(false);
|
|
|
|
|
|
+ }
|
|
|
|
+ else
|
|
|
|
+ {
|
|
|
|
+ if (request.Batch != null)
|
|
|
|
+ await request.Batch.WhenSubmittedAsync().ConfigureAwait(false);
|
|
|
|
|
|
- foreach (var eventHandlerInvoker in eventHandlerInvokers)
|
|
|
|
- _messageDispatcher.AddInvoker(eventHandlerInvoker);
|
|
|
|
- }
|
|
|
|
|
|
+ foreach (var eventHandlerInvoker in eventHandlerInvokers)
|
|
|
|
+ _messageDispatcher.AddInvoker(eventHandlerInvoker);
|
|
|
|
+ }
|
|
|
|
|
|
- await AddSubscriptionsAsync(request).ConfigureAwait(false);
|
|
|
|
|
|
+ await AddSubscriptionsAsync(request).ConfigureAwait(false);
|
|
|
|
|
|
- return new DisposableAction(() =>
|
|
|
|
|
|
+ return new DisposableAction(() =>
|
|
|
|
+ {
|
|
|
|
+ if (request.IsStartupRequest)
|
|
{
|
|
{
|
|
- if (request.IsStartupRequest)
|
|
|
|
|
|
+ lock (_subscriptions)
|
|
{
|
|
{
|
|
- lock (_subscriptions)
|
|
|
|
- {
|
|
|
|
- _startupInvokers.RemoveRange(eventHandlerInvokers);
|
|
|
|
- }
|
|
|
|
|
|
+ _startupInvokers.RemoveRange(eventHandlerInvokers);
|
|
}
|
|
}
|
|
|
|
+ }
|
|
|
|
|
|
- foreach (var eventHandlerInvoker in eventHandlerInvokers)
|
|
|
|
- _messageDispatcher.RemoveInvoker(eventHandlerInvoker);
|
|
|
|
|
|
+ foreach (var eventHandlerInvoker in eventHandlerInvokers)
|
|
|
|
+ _messageDispatcher.RemoveInvoker(eventHandlerInvoker);
|
|
|
|
|
|
- RemoveSubscriptions(request);
|
|
|
|
- });
|
|
|
|
- }
|
|
|
|
|
|
+ RemoveSubscriptions(request);
|
|
|
|
+ });
|
|
|
|
+ }
|
|
|
|
|
|
- private void EnsureMessageHandlerInvokerExists(IEnumerable<Subscription> subscriptions)
|
|
|
|
|
|
+ private void EnsureMessageHandlerInvokerExists(IEnumerable<Subscription> subscriptions)
|
|
|
|
+ {
|
|
|
|
+ foreach (var subscription in subscriptions)
|
|
{
|
|
{
|
|
- foreach (var subscription in subscriptions)
|
|
|
|
- {
|
|
|
|
- if (_messageDispatcher.GetMessageHandlerInvokers().All(x => x.MessageTypeId != subscription.MessageTypeId))
|
|
|
|
- throw new ArgumentException($"No handler available for message type Id: {subscription.MessageTypeId}");
|
|
|
|
- }
|
|
|
|
|
|
+ if (_messageDispatcher.GetMessageHandlerInvokers().All(x => x.MessageTypeId != subscription.MessageTypeId))
|
|
|
|
+ throw new ArgumentException($"No handler available for message type Id: {subscription.MessageTypeId}");
|
|
}
|
|
}
|
|
|
|
+ }
|
|
|
|
|
|
- private async Task AddSubscriptionsAsync(SubscriptionRequest request)
|
|
|
|
|
|
+ private async Task AddSubscriptionsAsync(SubscriptionRequest request)
|
|
|
|
+ {
|
|
|
|
+ if (request.IsStartupRequest)
|
|
{
|
|
{
|
|
- if (request.IsStartupRequest)
|
|
|
|
|
|
+ lock (_subscriptions)
|
|
{
|
|
{
|
|
- lock (_subscriptions)
|
|
|
|
|
|
+ foreach (var subscription in request.Subscriptions)
|
|
{
|
|
{
|
|
- foreach (var subscription in request.Subscriptions)
|
|
|
|
- {
|
|
|
|
- var status = GetOrAddSubscriptionStatus(subscription);
|
|
|
|
- status.StartupSubscriptionCount++;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- if (!IsRunning)
|
|
|
|
- return;
|
|
|
|
|
|
+ var status = GetOrAddSubscriptionStatus(subscription);
|
|
|
|
+ status.StartupSubscriptionCount++;
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ if (!IsRunning)
|
|
|
|
+ return;
|
|
}
|
|
}
|
|
|
|
+ }
|
|
|
|
|
|
- if (request.Batch != null)
|
|
|
|
|
|
+ if (request.Batch != null)
|
|
|
|
+ {
|
|
|
|
+ var batchSubscriptions = request.Batch.TryConsumeBatchSubscriptions();
|
|
|
|
+ if (batchSubscriptions != null)
|
|
{
|
|
{
|
|
- var batchSubscriptions = request.Batch.TryConsumeBatchSubscriptions();
|
|
|
|
- if (batchSubscriptions != null)
|
|
|
|
|
|
+ try
|
|
{
|
|
{
|
|
- try
|
|
|
|
- {
|
|
|
|
- await SendSubscriptionsAsync(batchSubscriptions).ConfigureAwait(false);
|
|
|
|
- request.Batch.NotifyRegistrationCompleted(null);
|
|
|
|
- }
|
|
|
|
- catch (Exception ex)
|
|
|
|
- {
|
|
|
|
- request.Batch.NotifyRegistrationCompleted(ex);
|
|
|
|
- throw;
|
|
|
|
- }
|
|
|
|
|
|
+ await SendSubscriptionsAsync(batchSubscriptions).ConfigureAwait(false);
|
|
|
|
+ request.Batch.NotifyRegistrationCompleted(null);
|
|
}
|
|
}
|
|
- else
|
|
|
|
|
|
+ catch (Exception ex)
|
|
{
|
|
{
|
|
- await request.Batch.WhenRegistrationCompletedAsync().ConfigureAwait(false);
|
|
|
|
|
|
+ request.Batch.NotifyRegistrationCompleted(ex);
|
|
|
|
+ throw;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
else
|
|
else
|
|
{
|
|
{
|
|
- await SendSubscriptionsAsync(request.Subscriptions).ConfigureAwait(false);
|
|
|
|
|
|
+ await request.Batch.WhenRegistrationCompletedAsync().ConfigureAwait(false);
|
|
}
|
|
}
|
|
|
|
+ }
|
|
|
|
+ else
|
|
|
|
+ {
|
|
|
|
+ await SendSubscriptionsAsync(request.Subscriptions).ConfigureAwait(false);
|
|
|
|
+ }
|
|
|
|
|
|
- async Task SendSubscriptionsAsync(IEnumerable<Subscription> subscriptions)
|
|
|
|
- {
|
|
|
|
- if (request.SubmissionSubscriptionsVersion != _subscriptionsVersion)
|
|
|
|
- throw new InvalidOperationException("The bus has been stopped before the subscriptions have been sent");
|
|
|
|
|
|
+ async Task SendSubscriptionsAsync(IEnumerable<Subscription> subscriptions)
|
|
|
|
+ {
|
|
|
|
+ if (request.SubmissionSubscriptionsVersion != _subscriptionsVersion)
|
|
|
|
+ throw new InvalidOperationException("The bus has been stopped before the subscriptions have been sent");
|
|
|
|
|
|
- var updatedTypes = new HashSet<MessageTypeId>();
|
|
|
|
|
|
+ var updatedTypes = new HashSet<MessageTypeId>();
|
|
|
|
|
|
- lock (_subscriptions)
|
|
|
|
|
|
+ lock (_subscriptions)
|
|
|
|
+ {
|
|
|
|
+ foreach (var subscription in subscriptions)
|
|
{
|
|
{
|
|
- foreach (var subscription in subscriptions)
|
|
|
|
- {
|
|
|
|
- var status = GetOrAddSubscriptionStatus(subscription);
|
|
|
|
- status.CurrentSubscriptionCount++;
|
|
|
|
|
|
+ var status = GetOrAddSubscriptionStatus(subscription);
|
|
|
|
+ status.CurrentSubscriptionCount++;
|
|
|
|
|
|
- if (status.CurrentSubscriptionCount <= 1)
|
|
|
|
- updatedTypes.Add(subscription.MessageTypeId);
|
|
|
|
- }
|
|
|
|
|
|
+ if (status.CurrentSubscriptionCount <= 1)
|
|
|
|
+ updatedTypes.Add(subscription.MessageTypeId);
|
|
}
|
|
}
|
|
|
|
+ }
|
|
|
|
|
|
- if (updatedTypes.Count != 0)
|
|
|
|
- {
|
|
|
|
- // Wait until all unsubscriptions are completed to prevent race conditions
|
|
|
|
- await WhenUnsubscribeCompletedAsync().ConfigureAwait(false);
|
|
|
|
- await UpdateDirectorySubscriptionsAsync(updatedTypes).ConfigureAwait(false);
|
|
|
|
- }
|
|
|
|
|
|
+ if (updatedTypes.Count != 0)
|
|
|
|
+ {
|
|
|
|
+ // Wait until all unsubscriptions are completed to prevent race conditions
|
|
|
|
+ await WhenUnsubscribeCompletedAsync().ConfigureAwait(false);
|
|
|
|
+ await UpdateDirectorySubscriptionsAsync(updatedTypes).ConfigureAwait(false);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+ }
|
|
|
|
|
|
- internal async Task WhenUnsubscribeCompletedAsync()
|
|
|
|
- {
|
|
|
|
- Task? task;
|
|
|
|
|
|
+ internal async Task WhenUnsubscribeCompletedAsync()
|
|
|
|
+ {
|
|
|
|
+ Task? task;
|
|
|
|
|
|
- lock (_subscriptions)
|
|
|
|
- {
|
|
|
|
- task = _processPendingUnsubscriptionsTask;
|
|
|
|
- }
|
|
|
|
|
|
+ lock (_subscriptions)
|
|
|
|
+ {
|
|
|
|
+ task = _processPendingUnsubscriptionsTask;
|
|
|
|
+ }
|
|
|
|
|
|
- if (task == null)
|
|
|
|
- return;
|
|
|
|
|
|
+ if (task == null)
|
|
|
|
+ return;
|
|
|
|
|
|
- try
|
|
|
|
- {
|
|
|
|
- await task.ConfigureAwait(false);
|
|
|
|
- }
|
|
|
|
- catch (Exception ex)
|
|
|
|
- {
|
|
|
|
- _logger.LogError(ex, "Error waiting for pending unsubscription");
|
|
|
|
- }
|
|
|
|
|
|
+ try
|
|
|
|
+ {
|
|
|
|
+ await task.ConfigureAwait(false);
|
|
}
|
|
}
|
|
|
|
+ catch (Exception ex)
|
|
|
|
+ {
|
|
|
|
+ _logger.LogError(ex, "Error waiting for pending unsubscription");
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
|
|
- private void RemoveSubscriptions(SubscriptionRequest request)
|
|
|
|
|
|
+ private void RemoveSubscriptions(SubscriptionRequest request)
|
|
|
|
+ {
|
|
|
|
+ lock (_subscriptions)
|
|
{
|
|
{
|
|
- lock (_subscriptions)
|
|
|
|
|
|
+ if (request.IsStartupRequest)
|
|
{
|
|
{
|
|
- if (request.IsStartupRequest)
|
|
|
|
- {
|
|
|
|
- foreach (var subscription in request.Subscriptions)
|
|
|
|
- {
|
|
|
|
- if (!_subscriptions.TryGetValue(subscription, out var status))
|
|
|
|
- continue;
|
|
|
|
-
|
|
|
|
- status.StartupSubscriptionCount--;
|
|
|
|
-
|
|
|
|
- if (status.IsEmpty)
|
|
|
|
- _subscriptions.Remove(subscription);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- if (!IsRunning)
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
- else
|
|
|
|
- {
|
|
|
|
- if (!IsRunning || request.SubmissionSubscriptionsVersion != _subscriptionsVersion)
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
foreach (var subscription in request.Subscriptions)
|
|
foreach (var subscription in request.Subscriptions)
|
|
{
|
|
{
|
|
if (!_subscriptions.TryGetValue(subscription, out var status))
|
|
if (!_subscriptions.TryGetValue(subscription, out var status))
|
|
continue;
|
|
continue;
|
|
|
|
|
|
- status.CurrentSubscriptionCount--;
|
|
|
|
|
|
+ status.StartupSubscriptionCount--;
|
|
|
|
|
|
- if (status.CurrentSubscriptionCount <= 0)
|
|
|
|
- {
|
|
|
|
- if (status.IsEmpty)
|
|
|
|
- _subscriptions.Remove(subscription);
|
|
|
|
-
|
|
|
|
- _pendingUnsubscriptions.Add(subscription.MessageTypeId);
|
|
|
|
- }
|
|
|
|
|
|
+ if (status.IsEmpty)
|
|
|
|
+ _subscriptions.Remove(subscription);
|
|
}
|
|
}
|
|
|
|
|
|
- if (_pendingUnsubscriptions.Count != 0 && _processPendingUnsubscriptionsTask?.IsCompleted != false)
|
|
|
|
|
|
+ if (!IsRunning)
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ else
|
|
|
|
+ {
|
|
|
|
+ if (!IsRunning || request.SubmissionSubscriptionsVersion != _subscriptionsVersion)
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ foreach (var subscription in request.Subscriptions)
|
|
|
|
+ {
|
|
|
|
+ if (!_subscriptions.TryGetValue(subscription, out var status))
|
|
|
|
+ continue;
|
|
|
|
+
|
|
|
|
+ status.CurrentSubscriptionCount--;
|
|
|
|
+
|
|
|
|
+ if (status.CurrentSubscriptionCount <= 0)
|
|
{
|
|
{
|
|
- var subscriptionsVersion = _subscriptionsVersion;
|
|
|
|
- _processPendingUnsubscriptionsTask = Task.Run(() => ProcessPendingUnsubscriptions(subscriptionsVersion));
|
|
|
|
|
|
+ if (status.IsEmpty)
|
|
|
|
+ _subscriptions.Remove(subscription);
|
|
|
|
+
|
|
|
|
+ _pendingUnsubscriptions.Add(subscription.MessageTypeId);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ if (_pendingUnsubscriptions.Count != 0 && _processPendingUnsubscriptionsTask?.IsCompleted != false)
|
|
|
|
+ {
|
|
|
|
+ var subscriptionsVersion = _subscriptionsVersion;
|
|
|
|
+ _processPendingUnsubscriptionsTask = Task.Run(() => ProcessPendingUnsubscriptions(subscriptionsVersion));
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
+ }
|
|
|
|
|
|
- private async Task ProcessPendingUnsubscriptions(int subscriptionsVersion)
|
|
|
|
|
|
+ private async Task ProcessPendingUnsubscriptions(int subscriptionsVersion)
|
|
|
|
+ {
|
|
|
|
+ try
|
|
{
|
|
{
|
|
- try
|
|
|
|
|
|
+ var updatedTypes = new HashSet<MessageTypeId>();
|
|
|
|
+
|
|
|
|
+ while (true)
|
|
{
|
|
{
|
|
- var updatedTypes = new HashSet<MessageTypeId>();
|
|
|
|
|
|
+ updatedTypes.Clear();
|
|
|
|
|
|
- while (true)
|
|
|
|
|
|
+ lock (_subscriptions)
|
|
{
|
|
{
|
|
- updatedTypes.Clear();
|
|
|
|
|
|
+ updatedTypes.UnionWith(_pendingUnsubscriptions);
|
|
|
|
+ _pendingUnsubscriptions.Clear();
|
|
|
|
|
|
- lock (_subscriptions)
|
|
|
|
|
|
+ if (updatedTypes.Count == 0 || !IsRunning || Status == BusStatus.Stopping || subscriptionsVersion != _subscriptionsVersion)
|
|
{
|
|
{
|
|
- updatedTypes.UnionWith(_pendingUnsubscriptions);
|
|
|
|
- _pendingUnsubscriptions.Clear();
|
|
|
|
-
|
|
|
|
- if (updatedTypes.Count == 0 || !IsRunning || Status == BusStatus.Stopping || subscriptionsVersion != _subscriptionsVersion)
|
|
|
|
- {
|
|
|
|
- _processPendingUnsubscriptionsTask = null;
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
|
|
+ _processPendingUnsubscriptionsTask = null;
|
|
|
|
+ return;
|
|
}
|
|
}
|
|
-
|
|
|
|
- await UpdateDirectorySubscriptionsAsync(updatedTypes).ConfigureAwait(false);
|
|
|
|
}
|
|
}
|
|
- }
|
|
|
|
- catch (Exception ex)
|
|
|
|
- {
|
|
|
|
- _logger.LogError(ex, "Error processing pending unsubscription");
|
|
|
|
|
|
|
|
- lock (_subscriptions)
|
|
|
|
- {
|
|
|
|
- _processPendingUnsubscriptionsTask = null;
|
|
|
|
- }
|
|
|
|
|
|
+ await UpdateDirectorySubscriptionsAsync(updatedTypes).ConfigureAwait(false);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
-
|
|
|
|
- private async Task UpdateDirectorySubscriptionsAsync(HashSet<MessageTypeId> updatedTypes)
|
|
|
|
|
|
+ catch (Exception ex)
|
|
{
|
|
{
|
|
- var subscriptions = GetSubscriptions().Where(sub => updatedTypes.Contains(sub.MessageTypeId));
|
|
|
|
- var subscriptionsByTypes = SubscriptionsForType.CreateDictionary(subscriptions);
|
|
|
|
|
|
+ _logger.LogError(ex, "Error processing pending unsubscription");
|
|
|
|
|
|
- var subscriptionUpdates = new List<SubscriptionsForType>(updatedTypes.Count);
|
|
|
|
- foreach (var updatedMessageId in updatedTypes)
|
|
|
|
- subscriptionUpdates.Add(subscriptionsByTypes.GetValueOrDefault(updatedMessageId, new SubscriptionsForType(updatedMessageId)));
|
|
|
|
-
|
|
|
|
- await _directory.UpdateSubscriptionsAsync(this, subscriptionUpdates).ConfigureAwait(false);
|
|
|
|
|
|
+ lock (_subscriptions)
|
|
|
|
+ {
|
|
|
|
+ _processPendingUnsubscriptionsTask = null;
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
+ }
|
|
|
|
|
|
- public void Reply(int errorCode)
|
|
|
|
- => Reply(errorCode, null);
|
|
|
|
|
|
+ private async Task UpdateDirectorySubscriptionsAsync(HashSet<MessageTypeId> updatedTypes)
|
|
|
|
+ {
|
|
|
|
+ var subscriptions = GetSubscriptions().Where(sub => updatedTypes.Contains(sub.MessageTypeId));
|
|
|
|
+ var subscriptionsByTypes = SubscriptionsForType.CreateDictionary(subscriptions);
|
|
|
|
|
|
- public void Reply(int errorCode, string? message)
|
|
|
|
- {
|
|
|
|
- var messageContext = MessageContext.Current;
|
|
|
|
- if (messageContext == null)
|
|
|
|
- throw new InvalidOperationException("Reply called without message context");
|
|
|
|
|
|
+ var subscriptionUpdates = new List<SubscriptionsForType>(updatedTypes.Count);
|
|
|
|
+ foreach (var updatedMessageId in updatedTypes)
|
|
|
|
+ subscriptionUpdates.Add(subscriptionsByTypes.GetValueOrDefault(updatedMessageId, new SubscriptionsForType(updatedMessageId)));
|
|
|
|
|
|
- messageContext.ReplyCode = errorCode;
|
|
|
|
- messageContext.ReplyMessage = message;
|
|
|
|
- }
|
|
|
|
|
|
+ await _directory.UpdateSubscriptionsAsync(this, subscriptionUpdates).ConfigureAwait(false);
|
|
|
|
+ }
|
|
|
|
|
|
- public void Reply(IMessage? response)
|
|
|
|
- {
|
|
|
|
- var messageContext = MessageContext.Current;
|
|
|
|
- if (messageContext == null)
|
|
|
|
- throw new InvalidOperationException("Reply called without message context");
|
|
|
|
|
|
+ public void Reply(int errorCode)
|
|
|
|
+ => Reply(errorCode, null);
|
|
|
|
|
|
- messageContext.ReplyResponse = response;
|
|
|
|
- }
|
|
|
|
|
|
+ public void Reply(int errorCode, string? message)
|
|
|
|
+ {
|
|
|
|
+ var messageContext = MessageContext.Current;
|
|
|
|
+ if (messageContext == null)
|
|
|
|
+ throw new InvalidOperationException("Reply called without message context");
|
|
|
|
|
|
- private void OnPeerUpdated(PeerId peerId, PeerUpdateAction peerUpdateAction)
|
|
|
|
- => _transport.OnPeerUpdated(peerId, peerUpdateAction);
|
|
|
|
|
|
+ messageContext.ReplyCode = errorCode;
|
|
|
|
+ messageContext.ReplyMessage = message;
|
|
|
|
+ }
|
|
|
|
|
|
- private void OnTransportMessageReceived(TransportMessage transportMessage)
|
|
|
|
- {
|
|
|
|
- if (transportMessage.MessageTypeId == MessageExecutionCompleted.TypeId)
|
|
|
|
- {
|
|
|
|
- HandleMessageExecutionCompleted(transportMessage);
|
|
|
|
- }
|
|
|
|
- else
|
|
|
|
- {
|
|
|
|
- var executeSynchronously = transportMessage.MessageTypeId.IsInfrastructure();
|
|
|
|
- HandleRemoteMessage(transportMessage, executeSynchronously);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
+ public void Reply(IMessage? response)
|
|
|
|
+ {
|
|
|
|
+ var messageContext = MessageContext.Current;
|
|
|
|
+ if (messageContext == null)
|
|
|
|
+ throw new InvalidOperationException("Reply called without message context");
|
|
|
|
|
|
- public MessageDispatch? CreateMessageDispatch(TransportMessage transportMessage)
|
|
|
|
- => CreateMessageDispatch(transportMessage, synchronousDispatch: false, sendAcknowledgment: false);
|
|
|
|
|
|
+ messageContext.ReplyResponse = response;
|
|
|
|
+ }
|
|
|
|
|
|
- private MessageDispatch? CreateMessageDispatch(TransportMessage transportMessage, bool synchronousDispatch, bool sendAcknowledgment = true)
|
|
|
|
- {
|
|
|
|
- var message = ToMessage(transportMessage);
|
|
|
|
- if (message == null)
|
|
|
|
- return null;
|
|
|
|
|
|
+ private void OnPeerUpdated(PeerId peerId, PeerUpdateAction peerUpdateAction)
|
|
|
|
+ => _transport.OnPeerUpdated(peerId, peerUpdateAction);
|
|
|
|
|
|
- var context = MessageContext.CreateNew(transportMessage);
|
|
|
|
- var continuation = GetOnRemoteMessageDispatchedContinuation(transportMessage, sendAcknowledgment);
|
|
|
|
- return new MessageDispatch(context, message, _serializer, continuation, synchronousDispatch);
|
|
|
|
|
|
+ private void OnTransportMessageReceived(TransportMessage transportMessage)
|
|
|
|
+ {
|
|
|
|
+ if (transportMessage.MessageTypeId == MessageExecutionCompleted.TypeId)
|
|
|
|
+ {
|
|
|
|
+ HandleMessageExecutionCompleted(transportMessage);
|
|
}
|
|
}
|
|
-
|
|
|
|
- protected virtual void HandleRemoteMessage(TransportMessage transportMessage, bool synchronous = false)
|
|
|
|
|
|
+ else
|
|
{
|
|
{
|
|
- var dispatch = CreateMessageDispatch(transportMessage, synchronous);
|
|
|
|
- if (dispatch == null)
|
|
|
|
- {
|
|
|
|
- _logger.LogWarning($"Received a remote message that could not be deserialized: {transportMessage.MessageTypeId.FullName} from {transportMessage.Originator.SenderId}");
|
|
|
|
- _transport.AckMessage(transportMessage);
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- _messageLogger.LogReceiveMessageRemote(dispatch.Message, transportMessage);
|
|
|
|
- _messageDispatcher.Dispatch(dispatch);
|
|
|
|
|
|
+ var executeSynchronously = transportMessage.MessageTypeId.IsInfrastructure();
|
|
|
|
+ HandleRemoteMessage(transportMessage, executeSynchronously);
|
|
}
|
|
}
|
|
|
|
+ }
|
|
|
|
|
|
- private Action<MessageDispatch, DispatchResult> GetOnRemoteMessageDispatchedContinuation(TransportMessage transportMessage, bool sendAcknowledgment)
|
|
|
|
- {
|
|
|
|
- return (dispatch, dispatchResult) =>
|
|
|
|
- {
|
|
|
|
- HandleDispatchErrors(dispatch, dispatchResult, transportMessage);
|
|
|
|
|
|
+ public MessageDispatch? CreateMessageDispatch(TransportMessage transportMessage)
|
|
|
|
+ => CreateMessageDispatch(transportMessage, synchronousDispatch: false, sendAcknowledgment: false);
|
|
|
|
|
|
- if (!sendAcknowledgment)
|
|
|
|
- return;
|
|
|
|
|
|
+ private MessageDispatch? CreateMessageDispatch(TransportMessage transportMessage, bool synchronousDispatch, bool sendAcknowledgment = true)
|
|
|
|
+ {
|
|
|
|
+ var message = ToMessage(transportMessage);
|
|
|
|
+ if (message == null)
|
|
|
|
+ return null;
|
|
|
|
|
|
- if (dispatch.Message is ICommand && !(dispatch.Message is PersistMessageCommand))
|
|
|
|
- {
|
|
|
|
- var messageExecutionCompleted = MessageExecutionCompleted.Create(dispatch.Context, dispatchResult, _serializer);
|
|
|
|
- var shouldLogMessageExecutionCompleted = _messageLogger.IsInfoEnabled(dispatch.Message);
|
|
|
|
- LogAndSendMessage(messageExecutionCompleted, new[] { dispatch.Context.GetSender() }, shouldLogMessageExecutionCompleted);
|
|
|
|
- }
|
|
|
|
|
|
+ var context = MessageContext.CreateNew(transportMessage);
|
|
|
|
+ var continuation = GetOnRemoteMessageDispatchedContinuation(transportMessage, sendAcknowledgment);
|
|
|
|
+ return new MessageDispatch(context, message, _serializer, continuation, synchronousDispatch);
|
|
|
|
+ }
|
|
|
|
|
|
- AckTransportMessage(transportMessage);
|
|
|
|
- };
|
|
|
|
|
|
+ protected virtual void HandleRemoteMessage(TransportMessage transportMessage, bool synchronous = false)
|
|
|
|
+ {
|
|
|
|
+ var dispatch = CreateMessageDispatch(transportMessage, synchronous);
|
|
|
|
+ if (dispatch == null)
|
|
|
|
+ {
|
|
|
|
+ _logger.LogWarning($"Received a remote message that could not be deserialized: {transportMessage.MessageTypeId.FullName} from {transportMessage.Originator.SenderId}");
|
|
|
|
+ _transport.AckMessage(transportMessage);
|
|
|
|
+ return;
|
|
}
|
|
}
|
|
|
|
|
|
- private void HandleDispatchErrors(MessageDispatch dispatch, DispatchResult dispatchResult, TransportMessage? failingTransportMessage = null)
|
|
|
|
- {
|
|
|
|
- if (!_configuration.IsErrorPublicationEnabled || !IsRunning || dispatchResult.Errors.Count == 0 || dispatchResult.Errors.All(error => error is MessageProcessingException { ShouldPublishError: false }))
|
|
|
|
- return;
|
|
|
|
|
|
+ _messageLogger.LogReceiveMessageRemote(dispatch.Message, transportMessage);
|
|
|
|
+ _messageDispatcher.Dispatch(dispatch);
|
|
|
|
+ }
|
|
|
|
|
|
- var errorMessages = dispatchResult.Errors.Select(error => error.ToString());
|
|
|
|
- var errorMessage = string.Join(System.Environment.NewLine + System.Environment.NewLine, errorMessages);
|
|
|
|
|
|
+ private Action<MessageDispatch, DispatchResult> GetOnRemoteMessageDispatchedContinuation(TransportMessage transportMessage, bool sendAcknowledgment)
|
|
|
|
+ {
|
|
|
|
+ return (dispatch, dispatchResult) =>
|
|
|
|
+ {
|
|
|
|
+ HandleDispatchErrors(dispatch, dispatchResult, transportMessage);
|
|
|
|
|
|
- try
|
|
|
|
- {
|
|
|
|
- failingTransportMessage ??= ToTransportMessage(dispatch.Message);
|
|
|
|
- }
|
|
|
|
- catch (Exception ex)
|
|
|
|
- {
|
|
|
|
- HandleDispatchErrorsForUnserializableMessage(dispatch.Message, ex, errorMessage);
|
|
|
|
|
|
+ if (!sendAcknowledgment)
|
|
return;
|
|
return;
|
|
- }
|
|
|
|
|
|
|
|
- string jsonMessage;
|
|
|
|
- try
|
|
|
|
- {
|
|
|
|
- jsonMessage = JsonConvert.SerializeObject(dispatch.Message);
|
|
|
|
- }
|
|
|
|
- catch (Exception ex)
|
|
|
|
|
|
+ if (dispatch.Message is ICommand && !(dispatch.Message is PersistMessageCommand))
|
|
{
|
|
{
|
|
- jsonMessage = $"Unable to serialize message :{System.Environment.NewLine}{ex}";
|
|
|
|
|
|
+ var messageExecutionCompleted = MessageExecutionCompleted.Create(dispatch.Context, dispatchResult, _serializer);
|
|
|
|
+ var shouldLogMessageExecutionCompleted = _messageLogger.IsInfoEnabled(dispatch.Message);
|
|
|
|
+ LogAndSendMessage(messageExecutionCompleted, new[] { dispatch.Context.GetSender() }, shouldLogMessageExecutionCompleted);
|
|
}
|
|
}
|
|
|
|
|
|
- var messageProcessingFailed = new MessageProcessingFailed(failingTransportMessage, jsonMessage, errorMessage, SystemDateTime.UtcNow, dispatchResult.ErrorHandlerTypes.Select(x => x.FullName!).ToArray());
|
|
|
|
- Publish(messageProcessingFailed);
|
|
|
|
- }
|
|
|
|
|
|
+ AckTransportMessage(transportMessage);
|
|
|
|
+ };
|
|
|
|
+ }
|
|
|
|
|
|
- private void HandleDispatchErrorsForUnserializableMessage(IMessage message, Exception serializationException, string dispatchErrorMessage)
|
|
|
|
- {
|
|
|
|
- var messageTypeName = message.GetType().FullName;
|
|
|
|
- _logger.LogError(serializationException, $"Unable to serialize message {messageTypeName}");
|
|
|
|
|
|
+ private void HandleDispatchErrors(MessageDispatch dispatch, DispatchResult dispatchResult, TransportMessage? failingTransportMessage = null)
|
|
|
|
+ {
|
|
|
|
+ if (!_configuration.IsErrorPublicationEnabled || !IsRunning || dispatchResult.Errors.Count == 0 || dispatchResult.Errors.All(error => error is MessageProcessingException { ShouldPublishError: false }))
|
|
|
|
+ return;
|
|
|
|
|
|
- if (!_configuration.IsErrorPublicationEnabled || !IsRunning)
|
|
|
|
- return;
|
|
|
|
|
|
+ var errorMessages = dispatchResult.Errors.Select(error => error.ToString());
|
|
|
|
+ var errorMessage = string.Join(System.Environment.NewLine + System.Environment.NewLine, errorMessages);
|
|
|
|
|
|
- var errorMessage = $"Unable to handle local message\r\nMessage is not serializable\r\nMessageType: {messageTypeName}\r\nDispatch error: {dispatchErrorMessage}\r\nSerialization error: {serializationException}";
|
|
|
|
- var processingFailed = new CustomProcessingFailed(GetType().FullName!, errorMessage, SystemDateTime.UtcNow);
|
|
|
|
- Publish(processingFailed);
|
|
|
|
|
|
+ try
|
|
|
|
+ {
|
|
|
|
+ failingTransportMessage ??= ToTransportMessage(dispatch.Message);
|
|
}
|
|
}
|
|
-
|
|
|
|
- private void HandleMessageExecutionCompleted(TransportMessage transportMessage)
|
|
|
|
|
|
+ catch (Exception ex)
|
|
{
|
|
{
|
|
- var message = (MessageExecutionCompleted?)ToMessage(transportMessage);
|
|
|
|
- if (message == null)
|
|
|
|
- return;
|
|
|
|
-
|
|
|
|
- HandleMessageExecutionCompleted(transportMessage, message);
|
|
|
|
|
|
+ HandleDispatchErrorsForUnserializableMessage(dispatch.Message, ex, errorMessage);
|
|
|
|
+ return;
|
|
}
|
|
}
|
|
|
|
|
|
- protected virtual void HandleMessageExecutionCompleted(TransportMessage transportMessage, MessageExecutionCompleted message)
|
|
|
|
|
|
+ string jsonMessage;
|
|
|
|
+ try
|
|
|
|
+ {
|
|
|
|
+ jsonMessage = JsonConvert.SerializeObject(dispatch.Message);
|
|
|
|
+ }
|
|
|
|
+ catch (Exception ex)
|
|
{
|
|
{
|
|
- _messageLogger.LogReceiveMessageAck(message);
|
|
|
|
|
|
+ jsonMessage = $"Unable to serialize message :{System.Environment.NewLine}{ex}";
|
|
|
|
+ }
|
|
|
|
|
|
- if (!_messageIdToTaskCompletionSources.TryRemove(message.SourceCommandId, out var taskCompletionSource))
|
|
|
|
- return;
|
|
|
|
|
|
+ var messageProcessingFailed = new MessageProcessingFailed(failingTransportMessage, jsonMessage, errorMessage, SystemDateTime.UtcNow, dispatchResult.ErrorHandlerTypes.Select(x => x.FullName!).ToArray());
|
|
|
|
+ Publish(messageProcessingFailed);
|
|
|
|
+ }
|
|
|
|
|
|
- var response = message.PayloadTypeId != null ? ToMessage(message.PayloadTypeId.Value, message.Payload, transportMessage) : null;
|
|
|
|
- var commandResult = new CommandResult(message.ErrorCode, message.ResponseMessage, response);
|
|
|
|
|
|
+ private void HandleDispatchErrorsForUnserializableMessage(IMessage message, Exception serializationException, string dispatchErrorMessage)
|
|
|
|
+ {
|
|
|
|
+ var messageTypeName = message.GetType().FullName;
|
|
|
|
+ _logger.LogError(serializationException, $"Unable to serialize message {messageTypeName}");
|
|
|
|
|
|
- taskCompletionSource.SetResult(commandResult);
|
|
|
|
- }
|
|
|
|
|
|
+ if (!_configuration.IsErrorPublicationEnabled || !IsRunning)
|
|
|
|
+ return;
|
|
|
|
|
|
- protected virtual void HandleLocalMessage(IMessage message, TaskCompletionSource<CommandResult>? taskCompletionSource)
|
|
|
|
- {
|
|
|
|
- _messageLogger.LogReceiveMessageLocal(message);
|
|
|
|
|
|
+ var errorMessage = $"Unable to handle local message\r\nMessage is not serializable\r\nMessageType: {messageTypeName}\r\nDispatch error: {dispatchErrorMessage}\r\nSerialization error: {serializationException}";
|
|
|
|
+ var processingFailed = new CustomProcessingFailed(GetType().FullName!, errorMessage, SystemDateTime.UtcNow);
|
|
|
|
+ Publish(processingFailed);
|
|
|
|
+ }
|
|
|
|
|
|
- var context = MessageContext.CreateOverride(PeerId, EndPoint);
|
|
|
|
- var dispatch = new MessageDispatch(context, message, _serializer, GetOnLocalMessageDispatchedContinuation(taskCompletionSource))
|
|
|
|
- {
|
|
|
|
- IsLocal = true
|
|
|
|
- };
|
|
|
|
|
|
+ private void HandleMessageExecutionCompleted(TransportMessage transportMessage)
|
|
|
|
+ {
|
|
|
|
+ var message = (MessageExecutionCompleted?)ToMessage(transportMessage);
|
|
|
|
+ if (message == null)
|
|
|
|
+ return;
|
|
|
|
|
|
- _messageDispatcher.Dispatch(dispatch);
|
|
|
|
- }
|
|
|
|
|
|
+ HandleMessageExecutionCompleted(transportMessage, message);
|
|
|
|
+ }
|
|
|
|
|
|
- private Action<MessageDispatch, DispatchResult> GetOnLocalMessageDispatchedContinuation(TaskCompletionSource<CommandResult>? taskCompletionSource)
|
|
|
|
- {
|
|
|
|
- return (dispatch, dispatchResult) =>
|
|
|
|
- {
|
|
|
|
- HandleDispatchErrors(dispatch, dispatchResult);
|
|
|
|
|
|
+ protected virtual void HandleMessageExecutionCompleted(TransportMessage transportMessage, MessageExecutionCompleted message)
|
|
|
|
+ {
|
|
|
|
+ _messageLogger.LogReceiveMessageAck(message);
|
|
|
|
|
|
- if (taskCompletionSource == null)
|
|
|
|
- return;
|
|
|
|
|
|
+ if (!_messageIdToTaskCompletionSources.TryRemove(message.SourceCommandId, out var taskCompletionSource))
|
|
|
|
+ return;
|
|
|
|
|
|
- var errorStatus = dispatchResult.Errors.Any() ? CommandResult.GetErrorStatus(dispatchResult.Errors) : dispatch.Context.GetErrorStatus();
|
|
|
|
- var commandResult = new CommandResult(errorStatus.Code, errorStatus.Message, dispatch.Context.ReplyResponse);
|
|
|
|
- taskCompletionSource.SetResult(commandResult);
|
|
|
|
- };
|
|
|
|
- }
|
|
|
|
|
|
+ var response = message.PayloadTypeId != null ? ToMessage(message.PayloadTypeId.Value, message.Payload, transportMessage) : null;
|
|
|
|
+ var commandResult = new CommandResult(message.ErrorCode, message.ResponseMessage, response);
|
|
|
|
+
|
|
|
|
+ taskCompletionSource.SetResult(commandResult);
|
|
|
|
+ }
|
|
|
|
|
|
- private void LogAndSendMessage(IMessage message, IList<Peer> peers, bool logEnabled, bool locallyHandled = false)
|
|
|
|
|
|
+ protected virtual void HandleLocalMessage(IMessage message, TaskCompletionSource<CommandResult>? taskCompletionSource)
|
|
|
|
+ {
|
|
|
|
+ _messageLogger.LogReceiveMessageLocal(message);
|
|
|
|
+
|
|
|
|
+ var context = MessageContext.CreateOverride(PeerId, EndPoint);
|
|
|
|
+ var dispatch = new MessageDispatch(context, message, _serializer, GetOnLocalMessageDispatchedContinuation(taskCompletionSource))
|
|
{
|
|
{
|
|
- if (peers.Count == 0)
|
|
|
|
- {
|
|
|
|
- if (!locallyHandled && logEnabled)
|
|
|
|
- _messageLogger.LogSendMessage(message, peers);
|
|
|
|
|
|
+ IsLocal = true
|
|
|
|
+ };
|
|
|
|
|
|
|
|
+ _messageDispatcher.Dispatch(dispatch);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private Action<MessageDispatch, DispatchResult> GetOnLocalMessageDispatchedContinuation(TaskCompletionSource<CommandResult>? taskCompletionSource)
|
|
|
|
+ {
|
|
|
|
+ return (dispatch, dispatchResult) =>
|
|
|
|
+ {
|
|
|
|
+ HandleDispatchErrors(dispatch, dispatchResult);
|
|
|
|
+
|
|
|
|
+ if (taskCompletionSource == null)
|
|
return;
|
|
return;
|
|
- }
|
|
|
|
|
|
|
|
- var transportMessage = ToTransportMessage(message);
|
|
|
|
|
|
+ var errorStatus = dispatchResult.Errors.Any() ? CommandResult.GetErrorStatus(dispatchResult.Errors) : dispatch.Context.GetErrorStatus();
|
|
|
|
+ var commandResult = new CommandResult(errorStatus.Code, errorStatus.Message, dispatch.Context.ReplyResponse);
|
|
|
|
+ taskCompletionSource.SetResult(commandResult);
|
|
|
|
+ };
|
|
|
|
+ }
|
|
|
|
|
|
- if (logEnabled)
|
|
|
|
- _messageLogger.LogSendMessage(message, peers, transportMessage);
|
|
|
|
|
|
+ private void LogAndSendMessage(IMessage message, IList<Peer> peers, bool logEnabled, bool locallyHandled = false)
|
|
|
|
+ {
|
|
|
|
+ if (peers.Count == 0)
|
|
|
|
+ {
|
|
|
|
+ if (!locallyHandled && logEnabled)
|
|
|
|
+ _messageLogger.LogSendMessage(message, peers);
|
|
|
|
|
|
- SendTransportMessage(transportMessage, peers);
|
|
|
|
|
|
+ return;
|
|
}
|
|
}
|
|
|
|
|
|
- protected void SendTransportMessage(TransportMessage transportMessage, IList<Peer> peers)
|
|
|
|
- => _transport.Send(transportMessage, peers, new SendContext());
|
|
|
|
|
|
+ var transportMessage = ToTransportMessage(message);
|
|
|
|
|
|
- protected void AckTransportMessage(TransportMessage transportMessage)
|
|
|
|
- => _transport.AckMessage(transportMessage);
|
|
|
|
|
|
+ if (logEnabled)
|
|
|
|
+ _messageLogger.LogSendMessage(message, peers, transportMessage);
|
|
|
|
|
|
- protected TransportMessage ToTransportMessage(IMessage message)
|
|
|
|
- => _serializer.ToTransportMessage(message, PeerId, EndPoint);
|
|
|
|
|
|
+ SendTransportMessage(transportMessage, peers);
|
|
|
|
+ }
|
|
|
|
|
|
- private IMessage? ToMessage(TransportMessage transportMessage)
|
|
|
|
- => ToMessage(transportMessage.MessageTypeId, transportMessage.Content, transportMessage);
|
|
|
|
|
|
+ protected void SendTransportMessage(TransportMessage transportMessage, IList<Peer> peers)
|
|
|
|
+ => _transport.Send(transportMessage, peers, new SendContext());
|
|
|
|
|
|
- private IMessage? ToMessage(MessageTypeId messageTypeId, ReadOnlyMemory<byte> messageContent, TransportMessage transportMessage)
|
|
|
|
- {
|
|
|
|
- try
|
|
|
|
- {
|
|
|
|
- return _serializer.ToMessage(transportMessage, messageTypeId, messageContent);
|
|
|
|
- }
|
|
|
|
- catch (Exception exception)
|
|
|
|
- {
|
|
|
|
- HandleDeserializationError(messageTypeId, messageContent, transportMessage.Originator, exception, transportMessage);
|
|
|
|
- }
|
|
|
|
|
|
+ protected void AckTransportMessage(TransportMessage transportMessage)
|
|
|
|
+ => _transport.AckMessage(transportMessage);
|
|
|
|
|
|
- return null;
|
|
|
|
- }
|
|
|
|
|
|
+ protected TransportMessage ToTransportMessage(IMessage message)
|
|
|
|
+ => _serializer.ToTransportMessage(message, PeerId, EndPoint);
|
|
|
|
+
|
|
|
|
+ private IMessage? ToMessage(TransportMessage transportMessage)
|
|
|
|
+ => ToMessage(transportMessage.MessageTypeId, transportMessage.Content, transportMessage);
|
|
|
|
|
|
- private void HandleDeserializationError(MessageTypeId messageTypeId, ReadOnlyMemory<byte> messageContent, OriginatorInfo originator, Exception exception, TransportMessage transportMessage)
|
|
|
|
|
|
+ private IMessage? ToMessage(MessageTypeId messageTypeId, ReadOnlyMemory<byte> messageContent, TransportMessage transportMessage)
|
|
|
|
+ {
|
|
|
|
+ try
|
|
{
|
|
{
|
|
- var dumpLocation = DumpMessageOnDisk(messageTypeId, messageContent);
|
|
|
|
- var errorMessage = $"Unable to deserialize message {messageTypeId.FullName}. MessageId: {transportMessage.Id}. Originator: {originator.SenderId}. Message dumped at: {dumpLocation}\r\n{exception}";
|
|
|
|
- _logger.LogError(errorMessage);
|
|
|
|
|
|
+ return _serializer.ToMessage(transportMessage, messageTypeId, messageContent);
|
|
|
|
+ }
|
|
|
|
+ catch (Exception exception)
|
|
|
|
+ {
|
|
|
|
+ HandleDeserializationError(messageTypeId, messageContent, transportMessage.Originator, exception, transportMessage);
|
|
|
|
+ }
|
|
|
|
|
|
- if (!_configuration.IsErrorPublicationEnabled || !IsRunning)
|
|
|
|
- return;
|
|
|
|
|
|
+ return null;
|
|
|
|
+ }
|
|
|
|
|
|
- var processingFailed = new MessageProcessingFailed(transportMessage, string.Empty, errorMessage, SystemDateTime.UtcNow, null);
|
|
|
|
- Publish(processingFailed);
|
|
|
|
- }
|
|
|
|
|
|
+ private void HandleDeserializationError(MessageTypeId messageTypeId, ReadOnlyMemory<byte> messageContent, OriginatorInfo originator, Exception exception, TransportMessage transportMessage)
|
|
|
|
+ {
|
|
|
|
+ var dumpLocation = DumpMessageOnDisk(messageTypeId, messageContent);
|
|
|
|
+ var errorMessage = $"Unable to deserialize message {messageTypeId.FullName}. MessageId: {transportMessage.Id}. Originator: {originator.SenderId}. Message dumped at: {dumpLocation}\r\n{exception}";
|
|
|
|
+ _logger.LogError(errorMessage);
|
|
|
|
|
|
- private void MessageDispatcherOnMessageHandlerInvokersUpdated()
|
|
|
|
- {
|
|
|
|
- var snapshotGeneratingMessageTypes = _messageDispatcher.GetMessageHandlerInvokers()
|
|
|
|
- .Select(x => x.MessageHandlerType.GetBaseTypes().SingleOrDefault(y => y.IsGenericType && y.GetGenericTypeDefinition() == typeof(SubscriptionHandler<>))?.GenericTypeArguments[0])
|
|
|
|
- .Where(x => x != null);
|
|
|
|
|
|
+ if (!_configuration.IsErrorPublicationEnabled || !IsRunning)
|
|
|
|
+ return;
|
|
|
|
|
|
- _directory.EnableSubscriptionsUpdatedFor(snapshotGeneratingMessageTypes!);
|
|
|
|
- }
|
|
|
|
|
|
+ var processingFailed = new MessageProcessingFailed(transportMessage, string.Empty, errorMessage, SystemDateTime.UtcNow, null);
|
|
|
|
+ Publish(processingFailed);
|
|
|
|
+ }
|
|
|
|
|
|
- private void DispatchSubscriptionsUpdatedMessages(PeerId peerId, IReadOnlyList<Subscription> subscriptions)
|
|
|
|
- {
|
|
|
|
- if (peerId == PeerId)
|
|
|
|
- return;
|
|
|
|
|
|
+ private void MessageDispatcherOnMessageHandlerInvokersUpdated()
|
|
|
|
+ {
|
|
|
|
+ var snapshotGeneratingMessageTypes = _messageDispatcher.GetMessageHandlerInvokers()
|
|
|
|
+ .Select(x => x.MessageHandlerType.GetBaseTypes().SingleOrDefault(y => y.IsGenericType && y.GetGenericTypeDefinition() == typeof(SubscriptionHandler<>))?.GenericTypeArguments[0])
|
|
|
|
+ .Where(x => x != null);
|
|
|
|
|
|
- var messageContext = GetMessageContextForSubscriptionsUpdated();
|
|
|
|
- foreach (var subscription in subscriptions)
|
|
|
|
- {
|
|
|
|
- var subscriptionsUpdated = new SubscriptionsUpdated(subscription, peerId);
|
|
|
|
- var dispatch = new MessageDispatch(messageContext, subscriptionsUpdated, _serializer, GetOnLocalMessageDispatchedContinuation(null));
|
|
|
|
- _messageDispatcher.Dispatch(dispatch);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
+ _directory.EnableSubscriptionsUpdatedFor(snapshotGeneratingMessageTypes!);
|
|
|
|
+ }
|
|
|
|
|
|
- private MessageContext GetMessageContextForSubscriptionsUpdated()
|
|
|
|
- => MessageContext.Current ?? MessageContext.CreateOverride(PeerId, EndPoint);
|
|
|
|
|
|
+ private void DispatchSubscriptionsUpdatedMessages(PeerId peerId, IReadOnlyList<Subscription> subscriptions)
|
|
|
|
+ {
|
|
|
|
+ if (peerId == PeerId)
|
|
|
|
+ return;
|
|
|
|
|
|
- private string DumpMessageOnDisk(MessageTypeId messageTypeId, ReadOnlyMemory<byte> messageContent)
|
|
|
|
|
|
+ var messageContext = GetMessageContextForSubscriptionsUpdated();
|
|
|
|
+ foreach (var subscription in subscriptions)
|
|
{
|
|
{
|
|
- if (string.IsNullOrEmpty(DeserializationFailureDumpDirectoryPath))
|
|
|
|
- return "Message could not be dumped";
|
|
|
|
|
|
+ var subscriptionsUpdated = new SubscriptionsUpdated(subscription, peerId);
|
|
|
|
+ var dispatch = new MessageDispatch(messageContext, subscriptionsUpdated, _serializer, GetOnLocalMessageDispatchedContinuation(null));
|
|
|
|
+ _messageDispatcher.Dispatch(dispatch);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
|
|
- try
|
|
|
|
- {
|
|
|
|
- if (!System.IO.Directory.Exists(DeserializationFailureDumpDirectoryPath))
|
|
|
|
- System.IO.Directory.CreateDirectory(DeserializationFailureDumpDirectoryPath);
|
|
|
|
|
|
+ private MessageContext GetMessageContextForSubscriptionsUpdated()
|
|
|
|
+ => MessageContext.Current ?? MessageContext.CreateOverride(PeerId, EndPoint);
|
|
|
|
|
|
- var dumpFileName = $"{_deserializationFailureTimestampProvider.NextUtcTimestamp():yyyyMMdd-HH-mm-ss.fffffff}_{messageTypeId.FullName}";
|
|
|
|
- var dumpFilePath = Path.Combine(DeserializationFailureDumpDirectoryPath, dumpFileName);
|
|
|
|
- using (var fileStream = new FileStream(dumpFilePath, FileMode.Create))
|
|
|
|
- {
|
|
|
|
- var messageBytes = messageContent.ToArray();
|
|
|
|
- fileStream.Write(messageBytes, 0, messageBytes.Length);
|
|
|
|
- }
|
|
|
|
|
|
+ private string DumpMessageOnDisk(MessageTypeId messageTypeId, ReadOnlyMemory<byte> messageContent)
|
|
|
|
+ {
|
|
|
|
+ if (string.IsNullOrEmpty(DeserializationFailureDumpDirectoryPath))
|
|
|
|
+ return "Message could not be dumped";
|
|
|
|
|
|
- return dumpFilePath;
|
|
|
|
- }
|
|
|
|
- catch (Exception ex)
|
|
|
|
|
|
+ try
|
|
|
|
+ {
|
|
|
|
+ if (!System.IO.Directory.Exists(DeserializationFailureDumpDirectoryPath))
|
|
|
|
+ System.IO.Directory.CreateDirectory(DeserializationFailureDumpDirectoryPath);
|
|
|
|
+
|
|
|
|
+ var dumpFileName = $"{_deserializationFailureTimestampProvider.NextUtcTimestamp():yyyyMMdd-HH-mm-ss.fffffff}_{messageTypeId.FullName}";
|
|
|
|
+ var dumpFilePath = Path.Combine(DeserializationFailureDumpDirectoryPath, dumpFileName);
|
|
|
|
+ using (var fileStream = new FileStream(dumpFilePath, FileMode.Create))
|
|
{
|
|
{
|
|
- return "Message could not be dumped: " + ex;
|
|
|
|
|
|
+ var messageBytes = messageContent.ToArray();
|
|
|
|
+ fileStream.Write(messageBytes, 0, messageBytes.Length);
|
|
}
|
|
}
|
|
- }
|
|
|
|
|
|
|
|
- public void Dispose()
|
|
|
|
- {
|
|
|
|
- if (Status == BusStatus.Started)
|
|
|
|
- Stop();
|
|
|
|
|
|
+ return dumpFilePath;
|
|
}
|
|
}
|
|
-
|
|
|
|
- private enum BusStatus
|
|
|
|
|
|
+ catch (Exception ex)
|
|
{
|
|
{
|
|
- Stopped,
|
|
|
|
- Stopping,
|
|
|
|
- Starting,
|
|
|
|
- Started
|
|
|
|
|
|
+ return "Message could not be dumped: " + ex;
|
|
}
|
|
}
|
|
|
|
+ }
|
|
|
|
|
|
- private class SubscriptionStatus
|
|
|
|
- {
|
|
|
|
- public int CurrentSubscriptionCount;
|
|
|
|
- public int StartupSubscriptionCount;
|
|
|
|
|
|
+ public void Dispose()
|
|
|
|
+ {
|
|
|
|
+ if (Status == BusStatus.Started)
|
|
|
|
+ Stop();
|
|
|
|
+ }
|
|
|
|
|
|
- public bool IsEmpty => CurrentSubscriptionCount <= 0 && StartupSubscriptionCount <= 0;
|
|
|
|
- }
|
|
|
|
|
|
+ private enum BusStatus
|
|
|
|
+ {
|
|
|
|
+ Stopped,
|
|
|
|
+ Stopping,
|
|
|
|
+ Starting,
|
|
|
|
+ Started
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private class SubscriptionStatus
|
|
|
|
+ {
|
|
|
|
+ public int CurrentSubscriptionCount;
|
|
|
|
+ public int StartupSubscriptionCount;
|
|
|
|
+
|
|
|
|
+ public bool IsEmpty => CurrentSubscriptionCount <= 0 && StartupSubscriptionCount <= 0;
|
|
}
|
|
}
|
|
}
|
|
}
|