MessageHandledHandler.cs 1.5 KB

1234567891011121314151617181920212223242526272829303132333435
  1. using System.Linq;
  2. using Abc.Zebus.Persistence.Matching;
  3. using log4net;
  4. namespace Abc.Zebus.Persistence.Handlers
  5. {
  6. public class MessageHandledHandler : IMessageHandler<MessageHandled>, IMessageContextAware
  7. {
  8. private static readonly ILog _log = LogManager.GetLogger(typeof(MessageHandledHandler));
  9. private readonly IMessageReplayerRepository _messageReplayerRepository;
  10. private readonly IInMemoryMessageMatcher _inMemoryMessageMatcher;
  11. private readonly IPersistenceConfiguration _configuration;
  12. public MessageHandledHandler(IMessageReplayerRepository messageReplayerRepository, IInMemoryMessageMatcher inMemoryMessageMatcher, IPersistenceConfiguration configuration)
  13. {
  14. _messageReplayerRepository = messageReplayerRepository;
  15. _inMemoryMessageMatcher = inMemoryMessageMatcher;
  16. _configuration = configuration;
  17. }
  18. public MessageContext? Context { get; set; }
  19. public void Handle(MessageHandled message)
  20. {
  21. if (_configuration.PeerIdsToInvestigate != null && _configuration.PeerIdsToInvestigate.Contains(Context!.SenderId.ToString()))
  22. _log.Info($"Ack received from peer {Context.SenderId}. MessageId: {message.MessageId}");
  23. _inMemoryMessageMatcher.EnqueueAck(Context!.SenderId, message.MessageId);
  24. var activeMessageReplayer = _messageReplayerRepository.GetActiveMessageReplayer(Context.SenderId);
  25. activeMessageReplayer?.Handle(message);
  26. }
  27. }
  28. }