|
@@ -153,35 +153,33 @@ namespace Abc.Zebus.Persistence
|
|
|
|
|
|
private int ReplayUnackedMessages(CancellationToken cancellationToken)
|
|
|
{
|
|
|
- using (var reader = _storage.CreateMessageReader(_peer.Id))
|
|
|
- {
|
|
|
- if (reader == null)
|
|
|
- return 0;
|
|
|
- var totalMessageCount = 0;
|
|
|
+ using var reader = _storage.CreateMessageReader(_peer.Id);
|
|
|
+ if (reader == null)
|
|
|
+ return 0;
|
|
|
+ var totalMessageCount = 0;
|
|
|
|
|
|
- foreach (var partition in reader.GetUnackedMessages().TakeWhile(m => !cancellationToken.IsCancellationRequested).Partition(_replayBatchSize, true))
|
|
|
+ foreach (var partition in reader.GetUnackedMessages().TakeWhile(m => !cancellationToken.IsCancellationRequested).Partition(_replayBatchSize, true))
|
|
|
+ {
|
|
|
+ var messageSentCount = 0;
|
|
|
+ var batchDuration = MeasureDuration();
|
|
|
+ var readAndSendDuration = MeasureDuration();
|
|
|
+ foreach (var message in partition.Select(DeserializeTransportMessage))
|
|
|
{
|
|
|
- var messageSentCount = 0;
|
|
|
- var batchDuration = MeasureDuration();
|
|
|
- var readAndSendDuration = MeasureDuration();
|
|
|
- foreach (var message in partition.Select(DeserializeTransportMessage))
|
|
|
- {
|
|
|
- _unackedIds.Add(message.Id);
|
|
|
- ReplayMessage(message);
|
|
|
- messageSentCount++;
|
|
|
- }
|
|
|
-
|
|
|
- totalMessageCount += messageSentCount;
|
|
|
-
|
|
|
- _logger.LogInformation($"Read and send for last batch of {messageSentCount} msgs for {_peer.Id} took {readAndSendDuration.Value}. ({messageSentCount / readAndSendDuration.Value.TotalSeconds} msg/s)");
|
|
|
- WaitForAcks(cancellationToken);
|
|
|
- _logger.LogInformation($"Last batch for {_peer.Id} took {batchDuration.Value} to be totally replayed ({messageSentCount / batchDuration.Value.TotalSeconds} msg/s)");
|
|
|
- _reporter.AddReplaySpeedReport(new ReplaySpeedReport(messageSentCount, readAndSendDuration.Value.TotalSeconds, batchDuration.Value.TotalSeconds));
|
|
|
+ _unackedIds.Add(message.Id);
|
|
|
+ ReplayMessage(message);
|
|
|
+ messageSentCount++;
|
|
|
}
|
|
|
|
|
|
- _logger.LogInformation($"Replay finished for peer {_peer.Id}. Disposing the reader");
|
|
|
- return totalMessageCount;
|
|
|
+ totalMessageCount += messageSentCount;
|
|
|
+
|
|
|
+ _logger.LogInformation($"Read and send for last batch of {messageSentCount} msgs for {_peer.Id} took {readAndSendDuration.Value}. ({messageSentCount / readAndSendDuration.Value.TotalSeconds} msg/s)");
|
|
|
+ WaitForAcks(cancellationToken);
|
|
|
+ _logger.LogInformation($"Last batch for {_peer.Id} took {batchDuration.Value} to be totally replayed ({messageSentCount / batchDuration.Value.TotalSeconds} msg/s)");
|
|
|
+ _reporter.AddReplaySpeedReport(new ReplaySpeedReport(messageSentCount, readAndSendDuration.Value, batchDuration.Value));
|
|
|
}
|
|
|
+
|
|
|
+ _logger.LogInformation($"Replay finished for peer {_peer.Id}. Disposing the reader");
|
|
|
+ return totalMessageCount;
|
|
|
}
|
|
|
|
|
|
private static TransportMessage DeserializeTransportMessage(byte[] row) => TransportMessage.Deserialize(row);
|