Pārlūkot izejas kodu

Persistence: improve storage report by adding statistics per message type

Mendel Monteiro-Beckerman 3 gadi atpakaļ
vecāks
revīzija
49a43cc835

+ 19 - 14
src/Abc.Zebus.Persistence.Cassandra.Tests/CqlStorageTests.cs

@@ -8,11 +8,10 @@ using Abc.Zebus.Persistence.Cassandra.Tests.Cql;
 using Abc.Zebus.Persistence.Matching;
 using Abc.Zebus.Persistence.Messages;
 using Abc.Zebus.Persistence.Reporter;
-using Abc.Zebus.Persistence.Storage;
 using Abc.Zebus.Testing;
+using Abc.Zebus.Testing.Comparison;
 using Abc.Zebus.Testing.Extensions;
 using Abc.Zebus.Transport;
-using Abc.Zebus.Util;
 using Cassandra.Data.Linq;
 using Moq;
 using NUnit.Framework;
@@ -123,7 +122,7 @@ namespace Abc.Zebus.Persistence.Cassandra.Tests
         {
             var messageBytes = new byte[512];
             new Random().NextBytes(messageBytes);
-            var messageId = new MessageId(Guid.Parse("0000c399-1ab0-e511-9706-ae1ea5dcf365"));      // Time component @2016-01-01 00:00:00Z
+            var messageId = new MessageId(Guid.Parse("0000c399-1ab0-e511-9706-ae1ea5dcf365")); // Time component @2016-01-01 00:00:00Z
             var otherMessageId = new MessageId(Guid.Parse("0000c399-1ab0-e511-9806-f1ef55aac8e9")); // Time component @2016-01-01 00:00:00Z
             var peerId = "Abc.Peer.0";
 
@@ -334,15 +333,15 @@ namespace Abc.Zebus.Persistence.Cassandra.Tests
             {
                 var transportMessages = Enumerable.Range(1, 100).Select(CreateTestTransportMessage).ToList();
                 var messages = transportMessages.SelectMany(x =>
-                                                        {
-                                                            var transportMessageBytes = TransportMessage.Serialize(x);
-                                                            return new[]
-                                                            {
-                                                                MatcherEntry.Message(firstPeer, x.Id, x.MessageTypeId, transportMessageBytes),
-                                                                MatcherEntry.Message(secondPeer, x.Id, x.MessageTypeId, transportMessageBytes),
-                                                            };
-                                                        })
-                                                        .ToList();
+                                                {
+                                                    var transportMessageBytes = TransportMessage.Serialize(x);
+                                                    return new[]
+                                                    {
+                                                        MatcherEntry.Message(firstPeer, x.Id, x.MessageTypeId, transportMessageBytes),
+                                                        MatcherEntry.Message(secondPeer, x.Id, x.MessageTypeId, transportMessageBytes),
+                                                    };
+                                                })
+                                                .ToList();
 
                 await _storage.Write(messages);
 
@@ -360,7 +359,7 @@ namespace Abc.Zebus.Persistence.Cassandra.Tests
         }
 
         [Test]
-        public void should_report_storage_informations()
+        public void should_report_storage_information()
         {
             var peer = new PeerId("peer");
 
@@ -370,7 +369,13 @@ namespace Abc.Zebus.Persistence.Cassandra.Tests
                 MatcherEntry.Message(peer, MessageId.NextId(), new MessageTypeId("Abc.Message.Fat"), new byte[] { 0x01, 0x02, 0x03, 0x04 }),
             });
 
-            _reporterMock.Verify(r => r.AddStorageReport(2, 7, 4, "Abc.Message.Fat"));
+            var entryTypeStatistics = new Dictionary<string, MessageTypeStatistics> { ["Abc.Message"] = new(1, 3),  ["Abc.Message.Fat"] = new(1, 4)  };
+            var storageReport = new StorageReport(2, 7, 4, "Abc.Message.Fat", entryTypeStatistics);
+            _reporterMock.Verify(r => r.AddStorageReport(It.Is<StorageReport>(x => x.MessageCount == storageReport.MessageCount
+                                                                              && x.BatchSizeInBytes == storageReport.BatchSizeInBytes
+                                                                              && x.FattestMessageTypeId == storageReport.FattestMessageTypeId
+                                                                              && x.FattestMessageSizeInBytes == storageReport.FattestMessageSizeInBytes
+                                                                              && x.MessageTypeStatistics.DeepCompare(storageReport.MessageTypeStatistics))));
         }
 
         [Test]

+ 18 - 11
src/Abc.Zebus.Persistence.Cassandra/Cql/CqlStorage.cs

@@ -10,7 +10,6 @@ using Abc.Zebus.Persistence.Messages;
 using Abc.Zebus.Persistence.Reporter;
 using Abc.Zebus.Persistence.Storage;
 using Abc.Zebus.Persistence.Util;
-using Abc.Zebus.Util;
 using Cassandra;
 using Cassandra.Data.Linq;
 using Microsoft.Extensions.Logging;
@@ -68,8 +67,7 @@ namespace Abc.Zebus.Persistence.Cassandra.Cql
             if (entriesToPersist.Count == 0)
                 return Task.CompletedTask;
 
-            var fattestMessage = entriesToPersist.OrderByDescending(msg => msg.MessageBytes?.Length ?? 0).First();
-            _reporter.AddStorageReport(entriesToPersist.Count, entriesToPersist.Sum(msg => msg.MessageBytes?.Length ?? 0), fattestMessage.MessageBytes?.Length ?? 0, fattestMessage.MessageTypeName);
+            _reporter.AddStorageReport(ToStorageReport(entriesToPersist));
 
             var countByPeer = new Dictionary<PeerId, int>();
             foreach (var matcherEntry in entriesToPersist)
@@ -109,16 +107,17 @@ namespace Abc.Zebus.Persistence.Cassandra.Cql
                 var insertTask = _dataContext.Session.ExecuteAsync(boundStatement);
                 insertTasks.Add(insertTask);
                 insertTask.ContinueWith(t =>
-                {
-                    var shouldInvestigatePeer = _configuration.PeerIdsToInvestigate != null && _configuration.PeerIdsToInvestigate.Contains(matcherEntry.PeerId.ToString());
-                    if (shouldInvestigatePeer)
-                        _log.LogInformation($"Storage done for peer {matcherEntry.PeerId}, Type: {matcherEntry.Type}, Message Id: {matcherEntry.MessageId}, TaskResult: {t.Status}");
+                                        {
+                                            var shouldInvestigatePeer = _configuration.PeerIdsToInvestigate != null && _configuration.PeerIdsToInvestigate.Contains(matcherEntry.PeerId.ToString());
+                                            if (shouldInvestigatePeer)
+                                                _log.LogInformation($"Storage done for peer {matcherEntry.PeerId}, Type: {matcherEntry.Type}, Message Id: {matcherEntry.MessageId}, TaskResult: {t.Status}");
 
-                    if (t.IsFaulted)
-                        _log.LogError(t.Exception, "Error while inserting to Cassandra");
+                                            if (t.IsFaulted)
+                                                _log.LogError(t.Exception, "Error while inserting to Cassandra");
 
-                    remaining.Release();
-                }, TaskContinuationOptions.ExecuteSynchronously);
+                                            remaining.Release();
+                                        },
+                                        TaskContinuationOptions.ExecuteSynchronously);
             }
 
             var updateNonAckedCountTasks = new List<Task>();
@@ -130,6 +129,14 @@ namespace Abc.Zebus.Persistence.Cassandra.Cql
             return Task.WhenAll(insertTasks.Concat(updateNonAckedCountTasks));
         }
 
+        private static StorageReport ToStorageReport(IList<MatcherEntry> entriesToPersist)
+        {
+            var fattestMessage = entriesToPersist.OrderByDescending(msg => msg.MessageBytes?.Length ?? 0).First();
+            var entriesByTypeName = entriesToPersist.ToLookup(x => x.MessageTypeName)
+                                                    .ToDictionary(xs => xs.Key, xs => new MessageTypeStatistics(xs.Count(), xs.Sum(x => x.MessageBytes?.Length) ?? 0));
+            return new StorageReport(entriesToPersist.Count, entriesToPersist.Sum(msg => msg.MessageBytes?.Length ?? 0), fattestMessage.MessageBytes?.Length ?? 0, fattestMessage.MessageTypeName, entriesByTypeName);
+        }
+
         public Task RemovePeer(PeerId peerId)
         {
             return _peerStateRepository.RemovePeer(peerId);

+ 2 - 2
src/Abc.Zebus.Persistence.RocksDb.Tests/RocksDbStorageTests.cs

@@ -225,7 +225,7 @@ namespace Abc.Zebus.Persistence.RocksDb.Tests
         }
 
         [Test, Explicit]
-        public void should_report_storage_informations()
+        public void should_report_storage_information()
         {
             var peer = new PeerId("peer");
 
@@ -235,7 +235,7 @@ namespace Abc.Zebus.Persistence.RocksDb.Tests
                 MatcherEntry.Message(peer, MessageId.NextId(), new MessageTypeId("Abc.Message.Fat"), new byte[] { 0x01, 0x02, 0x03, 0x04 }),
             });
 
-            _reporterMock.Verify(r => r.AddStorageReport(2, 7, 4, "Abc.Message.Fat"));
+            _reporterMock.Verify(r => r.AddStorageReport(new StorageReport(2, 7, 4, "Abc.Message.Fat", new Dictionary<string, MessageTypeStatistics>())));
         }
 
         private TransportMessage CreateTestTransportMessage(int i)

+ 1 - 1
src/Abc.Zebus.Persistence/MessageReplayer.cs

@@ -176,7 +176,7 @@ namespace Abc.Zebus.Persistence
                     _logger.LogInformation($"Read and send for last batch of {messageSentCount} msgs for {_peer.Id} took {readAndSendDuration.Value}. ({messageSentCount / readAndSendDuration.Value.TotalSeconds} msg/s)");
                     WaitForAcks(cancellationToken);
                     _logger.LogInformation($"Last batch for {_peer.Id} took {batchDuration.Value} to be totally replayed ({messageSentCount / batchDuration.Value.TotalSeconds} msg/s)");
-                    _reporter.AddReplaySpeedReport(messageSentCount, readAndSendDuration.Value.TotalSeconds, batchDuration.Value.TotalSeconds);
+                    _reporter.AddReplaySpeedReport(new ReplaySpeedReport(messageSentCount, readAndSendDuration.Value.TotalSeconds, batchDuration.Value.TotalSeconds));
                 }
 
                 _logger.LogInformation($"Replay finished for peer {_peer.Id}. Disposing the reader");

+ 3 - 4
src/Abc.Zebus.Persistence/Reporter/IReporter.cs

@@ -4,11 +4,10 @@ namespace Abc.Zebus.Persistence.Reporter
 {
     public interface IReporter
     {
-        void AddReplaySpeedReport(int messagesReplayedCount, double sendDurationInSeconds, double ackDurationInSeconds);
+        void AddReplaySpeedReport(ReplaySpeedReport replaySpeedReport);
         IList<ReplaySpeedReport> TakeAndResetReplaySpeedReports();
 
-        void AddStorageReport(int messageCount, int batchSizeInBytes, int fattestMessageSizeInBytes, string fattestMessageTypeId);
+        void AddStorageReport(StorageReport storageReport);
         IList<StorageReport> TakeAndResetStorageReports();
-
     }
-}
+}

+ 5 - 5
src/Abc.Zebus.Persistence/Reporter/NoopReporter.cs

@@ -4,10 +4,10 @@ namespace Abc.Zebus.Persistence.Reporter
 {
     public class NoopReporter : IReporter
     {
-        private static readonly List<ReplaySpeedReport> _emptyReplayReports = new List<ReplaySpeedReport>(0);
-        private static readonly List<StorageReport> _emptyStorageReports = new List<StorageReport>(0);
+        private static readonly List<ReplaySpeedReport> _emptyReplayReports = new(0);
+        private static readonly List<StorageReport> _emptyStorageReports = new(0);
 
-        public void AddReplaySpeedReport(int messagesReplayedCount, double sendDurationInSeconds, double ackDurationInSeconds)
+        public void AddReplaySpeedReport(ReplaySpeedReport replaySpeedReport)
         {
         }
 
@@ -16,7 +16,7 @@ namespace Abc.Zebus.Persistence.Reporter
             return _emptyReplayReports;
         }
 
-        public void AddStorageReport(int messageCount, int batchSizeInBytes, int fattestMessageSizeInBytes, string fattestMessageTypeId)
+        public void AddStorageReport(StorageReport storageReport)
         {
         }
 
@@ -25,4 +25,4 @@ namespace Abc.Zebus.Persistence.Reporter
             return _emptyStorageReports;
         }
     }
-}
+}

+ 12 - 3
src/Abc.Zebus.Persistence/Reporter/StorageReport.cs

@@ -1,4 +1,6 @@
-namespace Abc.Zebus.Persistence.Reporter
+using System.Collections.Generic;
+
+namespace Abc.Zebus.Persistence.Reporter
 {
     public class StorageReport
     {
@@ -6,13 +8,20 @@
         public int BatchSizeInBytes { get; }
         public int FattestMessageSizeInBytes { get; }
         public string FattestMessageTypeId { get; }
+        public Dictionary<string, MessageTypeStatistics> MessageTypeStatistics { get; }
 
-        public StorageReport(int messageCount, int batchSizeInBytes, int fattestMessageSizeInBytes, string fattestMessageTypeId)
+        public StorageReport(int messageCount, int batchSizeInBytes, int fattestMessageSizeInBytes, string fattestMessageTypeId, Dictionary<string, MessageTypeStatistics> messageTypeStatistics)
         {
             MessageCount = messageCount;
             BatchSizeInBytes = batchSizeInBytes;
             FattestMessageSizeInBytes = fattestMessageSizeInBytes;
             FattestMessageTypeId = fattestMessageTypeId;
+            MessageTypeStatistics = messageTypeStatistics;
         }
+
+        public override string ToString()
+            => $"{nameof(MessageCount)}: {MessageCount}, {nameof(BatchSizeInBytes)}: {BatchSizeInBytes}, {nameof(FattestMessageSizeInBytes)}: {FattestMessageSizeInBytes}, {nameof(FattestMessageTypeId)}: {FattestMessageTypeId}, {nameof(MessageTypeStatistics)}: {MessageTypeStatistics}";
     }
-}
+
+    public record MessageTypeStatistics(int Count, int TotalBytes);
+}