MessageDispatch.cs 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Threading;
  4. using Abc.Zebus.Serialization;
  5. namespace Abc.Zebus.Dispatch;
  6. public class MessageDispatch
  7. {
  8. private static readonly object _exceptionsLock = new();
  9. private readonly IMessageSerializer _messageSerializer;
  10. private readonly Action<MessageDispatch, DispatchResult> _continuation;
  11. private Dictionary<Type, Exception>? _exceptions;
  12. private int _remainingHandlerCount;
  13. private bool _isCloned;
  14. public MessageDispatch(MessageContext context, IMessage message, IMessageSerializer messageSerializer, Action<MessageDispatch, DispatchResult> continuation, bool shouldRunSynchronously = false)
  15. {
  16. _messageSerializer = messageSerializer;
  17. _continuation = continuation;
  18. ShouldRunSynchronously = shouldRunSynchronously;
  19. Context = context;
  20. Message = message;
  21. }
  22. public bool IsLocal { get; set; }
  23. public bool ShouldRunSynchronously { get; }
  24. public MessageContext Context { get; }
  25. public IMessage Message { get; private set; }
  26. public void SetIgnored()
  27. {
  28. _continuation(this, new DispatchResult(null));
  29. }
  30. public void SetHandled(IMessageHandlerInvoker invoker, Exception? error)
  31. {
  32. if (error != null)
  33. AddException(invoker.MessageHandlerType, error);
  34. if (Interlocked.Decrement(ref _remainingHandlerCount) == 0)
  35. _continuation(this, new DispatchResult(_exceptions));
  36. }
  37. private void AddException(Type messageHandlerType, Exception error)
  38. {
  39. lock (_exceptionsLock)
  40. {
  41. if (_exceptions == null)
  42. _exceptions = new Dictionary<Type, Exception>();
  43. _exceptions[messageHandlerType] = error;
  44. }
  45. }
  46. public void SetHandlerCount(int handlerCount)
  47. {
  48. _remainingHandlerCount = handlerCount;
  49. }
  50. internal void BeforeEnqueue()
  51. {
  52. if (!IsLocal || _isCloned)
  53. return;
  54. if (_messageSerializer.TryClone(Message, out var clone))
  55. Message = clone;
  56. _isCloned = true;
  57. }
  58. }