|
@@ -13,14 +13,14 @@ namespace Abc.Zebus.Dispatch
|
|
{
|
|
{
|
|
public class MessageDispatcher : IMessageDispatcher, IProvideQueueLength
|
|
public class MessageDispatcher : IMessageDispatcher, IProvideQueueLength
|
|
{
|
|
{
|
|
- private static readonly List<IMessageHandlerInvoker> _emptyInvokers = new List<IMessageHandlerInvoker>();
|
|
|
|
|
|
+ private static readonly List<IMessageHandlerInvoker> _emptyInvokers = new();
|
|
private static readonly ILogger _logger = ZebusLogManager.GetLogger(typeof(MessageDispatcher));
|
|
private static readonly ILogger _logger = ZebusLogManager.GetLogger(typeof(MessageDispatcher));
|
|
|
|
|
|
- private readonly ConcurrentDictionary<string, DispatchQueue> _dispatchQueues = new ConcurrentDictionary<string, DispatchQueue>(StringComparer.OrdinalIgnoreCase);
|
|
|
|
- private readonly object _lock = new object();
|
|
|
|
|
|
+ private readonly ConcurrentDictionary<string, DispatchQueue> _dispatchQueues = new(StringComparer.OrdinalIgnoreCase);
|
|
|
|
+ private readonly object _lock = new();
|
|
private readonly IMessageHandlerInvokerLoader[] _invokerLoaders;
|
|
private readonly IMessageHandlerInvokerLoader[] _invokerLoaders;
|
|
private readonly IDispatchQueueFactory _dispatchQueueFactory;
|
|
private readonly IDispatchQueueFactory _dispatchQueueFactory;
|
|
- private ConcurrentDictionary<MessageTypeId, List<IMessageHandlerInvoker>> _invokers = new ConcurrentDictionary<MessageTypeId, List<IMessageHandlerInvoker>>();
|
|
|
|
|
|
+ private ConcurrentDictionary<MessageTypeId, List<IMessageHandlerInvoker>> _invokers = new();
|
|
private Func<Assembly, bool>? _assemblyFilter;
|
|
private Func<Assembly, bool>? _assemblyFilter;
|
|
private Func<Type, bool>? _handlerFilter;
|
|
private Func<Type, bool>? _handlerFilter;
|
|
private Func<Type, bool>? _messageFilter;
|
|
private Func<Type, bool>? _messageFilter;
|
|
@@ -35,20 +35,11 @@ namespace Abc.Zebus.Dispatch
|
|
public event Action? Starting;
|
|
public event Action? Starting;
|
|
public event Action? Stopping;
|
|
public event Action? Stopping;
|
|
|
|
|
|
- public void ConfigureAssemblyFilter(Func<Assembly, bool> assemblyFilter)
|
|
|
|
- {
|
|
|
|
- _assemblyFilter = assemblyFilter;
|
|
|
|
- }
|
|
|
|
|
|
+ public void ConfigureAssemblyFilter(Func<Assembly, bool> assemblyFilter) => _assemblyFilter = assemblyFilter;
|
|
|
|
|
|
- public void ConfigureHandlerFilter(Func<Type, bool> handlerFilter)
|
|
|
|
- {
|
|
|
|
- _handlerFilter = handlerFilter;
|
|
|
|
- }
|
|
|
|
|
|
+ public void ConfigureHandlerFilter(Func<Type, bool> handlerFilter) => _handlerFilter = handlerFilter;
|
|
|
|
|
|
- public void ConfigureMessageFilter(Func<Type, bool> messageFilter)
|
|
|
|
- {
|
|
|
|
- _messageFilter = messageFilter;
|
|
|
|
- }
|
|
|
|
|
|
+ public void ConfigureMessageFilter(Func<Type, bool> messageFilter) => _messageFilter = messageFilter;
|
|
|
|
|
|
public event Action? MessageHandlerInvokersUpdated;
|
|
public event Action? MessageHandlerInvokersUpdated;
|
|
|
|
|
|
@@ -96,15 +87,9 @@ namespace Abc.Zebus.Dispatch
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- public IEnumerable<MessageTypeId> GetHandledMessageTypes()
|
|
|
|
- {
|
|
|
|
- return _invokers.Keys;
|
|
|
|
- }
|
|
|
|
|
|
+ public IEnumerable<MessageTypeId> GetHandledMessageTypes() => _invokers.Keys;
|
|
|
|
|
|
- public IEnumerable<IMessageHandlerInvoker> GetMessageHandlerInvokers()
|
|
|
|
- {
|
|
|
|
- return _invokers.SelectMany(x => x.Value);
|
|
|
|
- }
|
|
|
|
|
|
+ public IEnumerable<IMessageHandlerInvoker> GetMessageHandlerInvokers() => _invokers.SelectMany(x => x.Value);
|
|
|
|
|
|
public void Dispatch(MessageDispatch dispatch)
|
|
public void Dispatch(MessageDispatch dispatch)
|
|
{
|
|
{
|
|
@@ -261,15 +246,9 @@ namespace Abc.Zebus.Dispatch
|
|
MessageHandlerInvokersUpdated?.Invoke();
|
|
MessageHandlerInvokersUpdated?.Invoke();
|
|
}
|
|
}
|
|
|
|
|
|
- public int Purge()
|
|
|
|
- {
|
|
|
|
- return _dispatchQueues.Values.Sum(x => x.Purge());
|
|
|
|
- }
|
|
|
|
|
|
+ public int Purge() => _dispatchQueues.Values.Sum(x => x.Purge());
|
|
|
|
|
|
- public int GetReceiveQueueLength()
|
|
|
|
- {
|
|
|
|
- return _dispatchQueues.Values.Sum(x => x.QueueLength);
|
|
|
|
- }
|
|
|
|
|
|
+ public int GetReceiveQueueLength() => _dispatchQueues.Values.Sum(x => x.QueueLength);
|
|
|
|
|
|
private TypeSource CreateTypeSource()
|
|
private TypeSource CreateTypeSource()
|
|
{
|
|
{
|