MessageReplayer.cs 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Diagnostics;
  4. using System.Linq;
  5. using System.Threading;
  6. using Abc.Zebus.Persistence.Matching;
  7. using Abc.Zebus.Persistence.Messages;
  8. using Abc.Zebus.Persistence.Reporter;
  9. using Abc.Zebus.Persistence.Storage;
  10. using Abc.Zebus.Persistence.Util;
  11. using Abc.Zebus.Serialization;
  12. using Abc.Zebus.Transport;
  13. using Abc.Zebus.Util;
  14. using Microsoft.Extensions.Logging;
  15. namespace Abc.Zebus.Persistence
  16. {
  17. public class MessageReplayer : IMessageReplayer
  18. {
  19. private static readonly ILogger _logger = ZebusLogManager.GetLogger(typeof(MessageReplayer));
  20. private readonly BlockingCollection<TransportMessage> _liveMessages = new BlockingCollection<TransportMessage>();
  21. private readonly ConcurrentSet<MessageId> _unackedIds = new ConcurrentSet<MessageId>();
  22. private readonly IPersistenceConfiguration _persistenceConfiguration;
  23. private readonly IStorage _storage;
  24. private readonly IBus _bus;
  25. private readonly ITransport _transport;
  26. private readonly IInMemoryMessageMatcher _inMemoryMessageMatcher;
  27. private readonly Peer _self;
  28. private readonly Peer _peer;
  29. private readonly Guid _replayId;
  30. private readonly IReporter _reporter;
  31. private readonly IMessageSerializer _messageSerializer;
  32. private CancellationTokenSource? _cancellationTokenSource;
  33. private Thread? _runThread;
  34. private readonly Stopwatch _stopwatch = new Stopwatch();
  35. private readonly int _replayBatchSize;
  36. private readonly SendContext _emptySendContext = new SendContext();
  37. public MessageReplayer(IPersistenceConfiguration persistenceConfiguration,
  38. IStorage storage,
  39. IBus bus,
  40. ITransport transport,
  41. IInMemoryMessageMatcher inMemoryMessageMatcher,
  42. Peer peer,
  43. Guid replayId,
  44. IReporter reporter,
  45. IMessageSerializer messageSerializer)
  46. {
  47. _persistenceConfiguration = persistenceConfiguration;
  48. _storage = storage;
  49. _bus = bus;
  50. _transport = transport;
  51. _inMemoryMessageMatcher = inMemoryMessageMatcher;
  52. _self = new Peer(transport.PeerId, transport.InboundEndPoint);
  53. _peer = peer;
  54. _replayId = replayId;
  55. _reporter = reporter;
  56. _messageSerializer = messageSerializer;
  57. _replayBatchSize = _persistenceConfiguration.ReplayBatchSize;
  58. UnackedMessageCountThatReleasesNextBatch = _persistenceConfiguration.ReplayUnackedMessageCountThatReleasesNextBatch;
  59. }
  60. public int UnackedMessageCountThatReleasesNextBatch { get; set; }
  61. public event Action? Stopped;
  62. public void AddLiveMessage(TransportMessage message)
  63. {
  64. _liveMessages.Add(message);
  65. }
  66. public void Start()
  67. {
  68. var waitHandle = new ManualResetEvent(false);
  69. _inMemoryMessageMatcher.EnqueueWaitHandle(waitHandle);
  70. _cancellationTokenSource = new CancellationTokenSource();
  71. _runThread = BackgroundThread.Start(RunProc, waitHandle);
  72. }
  73. public bool Cancel()
  74. {
  75. _cancellationTokenSource?.Cancel();
  76. if (WaitForCompletion(5.Seconds()))
  77. return true;
  78. _logger.LogWarning($"Unable to cancel replayer, PeerId: {_peer.Id}");
  79. return false;
  80. }
  81. public void OnMessageAcked(MessageId messageId)
  82. {
  83. _unackedIds.Remove(messageId);
  84. }
  85. public bool WaitForCompletion(TimeSpan timeout)
  86. {
  87. return _runThread?.Join(timeout) ?? true;
  88. }
  89. private void RunProc(ManualResetEvent signal)
  90. {
  91. _logger.LogInformation($"Replay started, PeerId: {_peer.Id}");
  92. signal.WaitOne();
  93. signal.Dispose();
  94. _logger.LogInformation($"BatchPersister flushed, PeerId: {_peer.Id}");
  95. _stopwatch.Start();
  96. try
  97. {
  98. Run(_cancellationTokenSource!.Token);
  99. if (_cancellationTokenSource!.IsCancellationRequested)
  100. _logger.LogWarning("Replay cancelled, PeerId: {_peer.Id}");
  101. }
  102. catch (Exception ex)
  103. {
  104. _logger.LogError(ex, $"Replay failed, PeerId: {_peer.Id}");
  105. }
  106. _stopwatch.Stop();
  107. _logger.LogInformation($"Replay stopped, PeerId: {_peer.Id}. It ran for {_stopwatch.Elapsed}");
  108. Stopped?.Invoke();
  109. }
  110. public void Run(CancellationToken cancellationToken)
  111. {
  112. _bus.Publish(new ReplaySessionStarted(_peer.Id, _replayId));
  113. var replayDuration = MeasureDuration();
  114. var totalReplayedCount = ReplayUnackedMessages(cancellationToken);
  115. _logger.LogInformation($"Replay phase ended for {_peer.Id}. {totalReplayedCount} messages replayed in {replayDuration.Value} ({totalReplayedCount / replayDuration.Value.TotalSeconds} msg/s)");
  116. if (cancellationToken.IsCancellationRequested)
  117. return;
  118. _transport.Send(ToTransportMessage(new ReplayPhaseEnded(_replayId)), new[] { _peer }, _emptySendContext);
  119. var safetyDuration = MeasureDuration();
  120. ForwardLiveMessages(cancellationToken);
  121. _logger.LogInformation($"Safety phase ended for {_peer.Id} ({safetyDuration.Value})");
  122. if (cancellationToken.IsCancellationRequested)
  123. return;
  124. _transport.Send(ToTransportMessage(new SafetyPhaseEnded(_replayId)), new[] { _peer }, _emptySendContext);
  125. _bus.Publish(new ReplaySessionEnded(_peer.Id, _replayId));
  126. }
  127. private int ReplayUnackedMessages(CancellationToken cancellationToken)
  128. {
  129. using var reader = _storage.CreateMessageReader(_peer.Id);
  130. if (reader == null)
  131. return 0;
  132. var totalMessageCount = 0;
  133. foreach (var partition in reader.GetUnackedMessages().TakeWhile(m => !cancellationToken.IsCancellationRequested).Partition(_replayBatchSize, true))
  134. {
  135. var messageSentCount = 0;
  136. var batchDuration = MeasureDuration();
  137. var readAndSendDuration = MeasureDuration();
  138. foreach (var message in partition.Select(DeserializeTransportMessage))
  139. {
  140. _unackedIds.Add(message.Id);
  141. ReplayMessage(message);
  142. messageSentCount++;
  143. }
  144. totalMessageCount += messageSentCount;
  145. _logger.LogInformation($"Read and send for last batch of {messageSentCount} msgs for {_peer.Id} took {readAndSendDuration.Value}. ({messageSentCount / readAndSendDuration.Value.TotalSeconds} msg/s)");
  146. WaitForAcks(cancellationToken);
  147. _logger.LogInformation($"Last batch for {_peer.Id} took {batchDuration.Value} to be totally replayed ({messageSentCount / batchDuration.Value.TotalSeconds} msg/s)");
  148. _reporter.AddReplaySpeedReport(new ReplaySpeedReport(messageSentCount, readAndSendDuration.Value, batchDuration.Value));
  149. }
  150. _logger.LogInformation($"Replay finished for peer {_peer.Id}. Disposing the reader");
  151. return totalMessageCount;
  152. }
  153. private static TransportMessage DeserializeTransportMessage(byte[] row) => TransportMessage.Deserialize(row);
  154. private void WaitForAcks(CancellationToken cancellationToken)
  155. {
  156. if (_unackedIds.Count <= UnackedMessageCountThatReleasesNextBatch)
  157. return;
  158. var expectedAckCount = Math.Max(0, _unackedIds.Count - UnackedMessageCountThatReleasesNextBatch);
  159. _logger.LogInformation($"Waiting for {expectedAckCount} ack(s) before proceeding to next batch for {_peer.Id}");
  160. var waitDuration = MeasureDuration();
  161. while (_unackedIds.Count > UnackedMessageCountThatReleasesNextBatch)
  162. {
  163. if (cancellationToken.IsCancellationRequested)
  164. return;
  165. Thread.Sleep(100);
  166. }
  167. _logger.LogInformation($"Batch acked in {waitDuration.Value} for peer {_peer.Id} ({expectedAckCount / waitDuration.Value.TotalSeconds} msg/s)");
  168. _logger.LogInformation($"Proceeding with next batch for {_peer.Id}");
  169. }
  170. private void ReplayMessage(TransportMessage unackedMessage)
  171. {
  172. var messageReplayed = new MessageReplayed(_replayId, unackedMessage);
  173. var transportMessage = ToTransportMessage(messageReplayed);
  174. _transport.Send(transportMessage, new[] { _peer }, _emptySendContext);
  175. }
  176. private void ForwardLiveMessages(CancellationToken cancellationToken)
  177. {
  178. var phaseEnd = DateTime.UtcNow + _persistenceConfiguration.SafetyPhaseDuration;
  179. while (DateTime.UtcNow < phaseEnd && !cancellationToken.IsCancellationRequested)
  180. {
  181. if (!_liveMessages.TryTake(out var liveMessage, 200))
  182. continue;
  183. var messageReplayed = new MessageReplayed(_replayId, liveMessage);
  184. _transport.Send(ToTransportMessage(messageReplayed), new[] { _peer }, new SendContext());
  185. }
  186. }
  187. private TransportMessage ToTransportMessage(IMessage message, bool wasPersisted = false)
  188. {
  189. return new TransportMessage(message.TypeId(), _messageSerializer.Serialize(message), _self) { WasPersisted = wasPersisted };
  190. }
  191. private Lazy<TimeSpan> MeasureDuration()
  192. {
  193. var beginning = _stopwatch.Elapsed;
  194. return new Lazy<TimeSpan>(() => _stopwatch.Elapsed - beginning);
  195. }
  196. }
  197. }