Explorar el Código

Merge pull request #113 from Abc-Arbitrage/improve-storage-report

Improve storage report
Mendel Monteiro-Beckerman hace 3 años
padre
commit
29044ad072

+ 19 - 14
src/Abc.Zebus.Persistence.Cassandra.Tests/CqlStorageTests.cs

@@ -8,11 +8,10 @@ using Abc.Zebus.Persistence.Cassandra.Tests.Cql;
 using Abc.Zebus.Persistence.Matching;
 using Abc.Zebus.Persistence.Messages;
 using Abc.Zebus.Persistence.Reporter;
-using Abc.Zebus.Persistence.Storage;
 using Abc.Zebus.Testing;
+using Abc.Zebus.Testing.Comparison;
 using Abc.Zebus.Testing.Extensions;
 using Abc.Zebus.Transport;
-using Abc.Zebus.Util;
 using Cassandra.Data.Linq;
 using Moq;
 using NUnit.Framework;
@@ -123,7 +122,7 @@ namespace Abc.Zebus.Persistence.Cassandra.Tests
         {
             var messageBytes = new byte[512];
             new Random().NextBytes(messageBytes);
-            var messageId = new MessageId(Guid.Parse("0000c399-1ab0-e511-9706-ae1ea5dcf365"));      // Time component @2016-01-01 00:00:00Z
+            var messageId = new MessageId(Guid.Parse("0000c399-1ab0-e511-9706-ae1ea5dcf365")); // Time component @2016-01-01 00:00:00Z
             var otherMessageId = new MessageId(Guid.Parse("0000c399-1ab0-e511-9806-f1ef55aac8e9")); // Time component @2016-01-01 00:00:00Z
             var peerId = "Abc.Peer.0";
 
@@ -334,15 +333,15 @@ namespace Abc.Zebus.Persistence.Cassandra.Tests
             {
                 var transportMessages = Enumerable.Range(1, 100).Select(CreateTestTransportMessage).ToList();
                 var messages = transportMessages.SelectMany(x =>
-                                                        {
-                                                            var transportMessageBytes = TransportMessage.Serialize(x);
-                                                            return new[]
-                                                            {
-                                                                MatcherEntry.Message(firstPeer, x.Id, x.MessageTypeId, transportMessageBytes),
-                                                                MatcherEntry.Message(secondPeer, x.Id, x.MessageTypeId, transportMessageBytes),
-                                                            };
-                                                        })
-                                                        .ToList();
+                                                {
+                                                    var transportMessageBytes = TransportMessage.Serialize(x);
+                                                    return new[]
+                                                    {
+                                                        MatcherEntry.Message(firstPeer, x.Id, x.MessageTypeId, transportMessageBytes),
+                                                        MatcherEntry.Message(secondPeer, x.Id, x.MessageTypeId, transportMessageBytes),
+                                                    };
+                                                })
+                                                .ToList();
 
                 await _storage.Write(messages);
 
@@ -360,7 +359,7 @@ namespace Abc.Zebus.Persistence.Cassandra.Tests
         }
 
         [Test]
-        public void should_report_storage_informations()
+        public void should_report_storage_information()
         {
             var peer = new PeerId("peer");
 
@@ -370,7 +369,13 @@ namespace Abc.Zebus.Persistence.Cassandra.Tests
                 MatcherEntry.Message(peer, MessageId.NextId(), new MessageTypeId("Abc.Message.Fat"), new byte[] { 0x01, 0x02, 0x03, 0x04 }),
             });
 
-            _reporterMock.Verify(r => r.AddStorageReport(2, 7, 4, "Abc.Message.Fat"));
+            var entryTypeStatistics = new Dictionary<string, MessageTypeStorageReport> { ["Abc.Message"] = new(1, 3),  ["Abc.Message.Fat"] = new(1, 4)  };
+            var storageReport = new StorageReport(2, 7, 4, "Abc.Message.Fat", entryTypeStatistics);
+            _reporterMock.Verify(r => r.AddStorageReport(It.Is<StorageReport>(x => x.MessageCount == storageReport.MessageCount
+                                                                              && x.BatchSizeInBytes == storageReport.BatchSizeInBytes
+                                                                              && x.FattestMessageTypeId == storageReport.FattestMessageTypeId
+                                                                              && x.FattestMessageSizeInBytes == storageReport.FattestMessageSizeInBytes
+                                                                              && x.MessageTypeStorageReports.DeepCompare(storageReport.MessageTypeStorageReports))));
         }
 
         [Test]

+ 18 - 11
src/Abc.Zebus.Persistence.Cassandra/Cql/CqlStorage.cs

@@ -10,7 +10,6 @@ using Abc.Zebus.Persistence.Messages;
 using Abc.Zebus.Persistence.Reporter;
 using Abc.Zebus.Persistence.Storage;
 using Abc.Zebus.Persistence.Util;
-using Abc.Zebus.Util;
 using Cassandra;
 using Cassandra.Data.Linq;
 using Microsoft.Extensions.Logging;
@@ -68,8 +67,7 @@ namespace Abc.Zebus.Persistence.Cassandra.Cql
             if (entriesToPersist.Count == 0)
                 return Task.CompletedTask;
 
-            var fattestMessage = entriesToPersist.OrderByDescending(msg => msg.MessageBytes?.Length ?? 0).First();
-            _reporter.AddStorageReport(entriesToPersist.Count, entriesToPersist.Sum(msg => msg.MessageBytes?.Length ?? 0), fattestMessage.MessageBytes?.Length ?? 0, fattestMessage.MessageTypeName);
+            _reporter.AddStorageReport(ToStorageReport(entriesToPersist));
 
             var countByPeer = new Dictionary<PeerId, int>();
             foreach (var matcherEntry in entriesToPersist)
@@ -114,16 +112,17 @@ namespace Abc.Zebus.Persistence.Cassandra.Cql
                 var insertTask = _dataContext.Session.ExecuteAsync(boundStatement);
                 insertTasks.Add(insertTask);
                 insertTask.ContinueWith(t =>
-                {
-                    var shouldInvestigatePeer = _configuration.PeerIdsToInvestigate != null && _configuration.PeerIdsToInvestigate.Contains(matcherEntry.PeerId.ToString());
-                    if (shouldInvestigatePeer)
-                        _log.LogInformation($"Storage done for peer {matcherEntry.PeerId}, Type: {matcherEntry.Type}, Message Id: {matcherEntry.MessageId}, TaskResult: {t.Status}");
+                                        {
+                                            var shouldInvestigatePeer = _configuration.PeerIdsToInvestigate != null && _configuration.PeerIdsToInvestigate.Contains(matcherEntry.PeerId.ToString());
+                                            if (shouldInvestigatePeer)
+                                                _log.LogInformation($"Storage done for peer {matcherEntry.PeerId}, Type: {matcherEntry.Type}, Message Id: {matcherEntry.MessageId}, TaskResult: {t.Status}");
 
-                    if (t.IsFaulted)
-                        _log.LogError(t.Exception, "Error while inserting to Cassandra");
+                                            if (t.IsFaulted)
+                                                _log.LogError(t.Exception, "Error while inserting to Cassandra");
 
-                    remaining.Release();
-                }, TaskContinuationOptions.ExecuteSynchronously);
+                                            remaining.Release();
+                                        },
+                                        TaskContinuationOptions.ExecuteSynchronously);
             }
 
             var updateNonAckedCountTasks = new List<Task>();
@@ -135,6 +134,14 @@ namespace Abc.Zebus.Persistence.Cassandra.Cql
             return Task.WhenAll(insertTasks.Concat(updateNonAckedCountTasks));
         }
 
+        private static StorageReport ToStorageReport(IList<MatcherEntry> entriesToPersist)
+        {
+            var fattestMessage = entriesToPersist.OrderByDescending(msg => msg.MessageLength).First();
+            var entriesByTypeName = entriesToPersist.ToLookup(x => x.MessageTypeName)
+                                                    .ToDictionary(xs => xs.Key, xs => new MessageTypeStorageReport(xs.Count(), xs.Sum(x => x.MessageLength)));
+            return new StorageReport(entriesToPersist.Count, entriesToPersist.Sum(msg => msg.MessageLength), fattestMessage.MessageLength, fattestMessage.MessageTypeName, entriesByTypeName);
+        }
+
         public Task RemovePeer(PeerId peerId)
         {
             return _peerStateRepository.RemovePeer(peerId);

+ 2 - 1
src/Abc.Zebus.Persistence.RocksDb.Tests/PerformanceTests.cs

@@ -6,6 +6,7 @@ using System.Linq;
 using System.Threading;
 using System.Threading.Tasks;
 using Abc.Zebus.Persistence.Matching;
+using Abc.Zebus.Persistence.Reporter;
 using Abc.Zebus.Persistence.Storage;
 using Abc.Zebus.Testing;
 using Abc.Zebus.Transport;
@@ -23,7 +24,7 @@ namespace Abc.Zebus.Persistence.RocksDb.Tests
         [SetUp]
         public void SetUp()
         {
-            _storage = new RocksDbStorage(Guid.NewGuid().ToString());
+            _storage = new RocksDbStorage(Guid.NewGuid().ToString(), new NoopReporter());
             _storage.Start();
         }
 

+ 12 - 5
src/Abc.Zebus.Persistence.RocksDb.Tests/RocksDbStorageTests.cs

@@ -6,6 +6,7 @@ using System.Threading.Tasks;
 using Abc.Zebus.Persistence.Matching;
 using Abc.Zebus.Persistence.Reporter;
 using Abc.Zebus.Testing;
+using Abc.Zebus.Testing.Comparison;
 using Abc.Zebus.Testing.Extensions;
 using Abc.Zebus.Transport;
 using Moq;
@@ -27,7 +28,7 @@ namespace Abc.Zebus.Persistence.RocksDb.Tests
             _databaseDirectoryPath = Path.Combine(Path.GetTempPath(), Guid.NewGuid().ToString());
 
             _reporterMock = new Mock<IReporter>();
-            _storage = new RocksDbStorage(_databaseDirectoryPath);
+            _storage = new RocksDbStorage(_databaseDirectoryPath,  _reporterMock.Object);
             _storage.Start();
         }
 
@@ -210,7 +211,7 @@ namespace Abc.Zebus.Persistence.RocksDb.Tests
             await _storage.Write(new[] { MatcherEntry.Ack(peer, messageId) });
             _storage.Stop();
 
-            _storage = new RocksDbStorage(_databaseDirectoryPath);
+            _storage = new RocksDbStorage(_databaseDirectoryPath, _reporterMock.Object);
             _storage.Start();
 
             var message = MatcherEntry.Message(peer, messageId, MessageUtil.TypeId<Message1>(), Array.Empty<byte>());
@@ -224,8 +225,8 @@ namespace Abc.Zebus.Persistence.RocksDb.Tests
             }
         }
 
-        [Test, Explicit]
-        public void should_report_storage_informations()
+        [Test]
+        public void should_report_storage_information()
         {
             var peer = new PeerId("peer");
 
@@ -235,7 +236,13 @@ namespace Abc.Zebus.Persistence.RocksDb.Tests
                 MatcherEntry.Message(peer, MessageId.NextId(), new MessageTypeId("Abc.Message.Fat"), new byte[] { 0x01, 0x02, 0x03, 0x04 }),
             });
 
-            _reporterMock.Verify(r => r.AddStorageReport(2, 7, 4, "Abc.Message.Fat"));
+            var entryTypeStatistics = new Dictionary<string, MessageTypeStorageReport> { ["Abc.Message"] = new(1, 3),  ["Abc.Message.Fat"] = new(1, 4)  };
+            var storageReport = new StorageReport(2, 7, 4, "Abc.Message.Fat", entryTypeStatistics);
+            _reporterMock.Verify(r => r.AddStorageReport(It.Is<StorageReport>(x => x.MessageCount == storageReport.MessageCount
+                                                                              && x.BatchSizeInBytes == storageReport.BatchSizeInBytes
+                                                                              && x.FattestMessageTypeId == storageReport.FattestMessageTypeId
+                                                                              && x.FattestMessageSizeInBytes == storageReport.FattestMessageSizeInBytes
+                                                                              && x.MessageTypeStorageReports.DeepCompare(storageReport.MessageTypeStorageReports))));
         }
 
         private TransportMessage CreateTestTransportMessage(int i)

+ 16 - 3
src/Abc.Zebus.Persistence.RocksDb/RocksDbStorage.cs

@@ -6,6 +6,7 @@ using System.Linq;
 using System.Text;
 using System.Threading.Tasks;
 using Abc.Zebus.Persistence.Matching;
+using Abc.Zebus.Persistence.Reporter;
 using Abc.Zebus.Persistence.Storage;
 using RocksDbSharp;
 using StructureMap;
@@ -20,6 +21,7 @@ namespace Abc.Zebus.Persistence.RocksDb
     /// </summary>
     public class RocksDbStorage : IStorage, IDisposable
     {
+        private readonly IReporter _reporter;
         private static readonly int _guidLength = Guid.Empty.ToByteArray().Length;
 
         private readonly ConcurrentDictionary<MessageId, bool> _outOfOrderAcks = new ConcurrentDictionary<MessageId, bool>();
@@ -31,14 +33,15 @@ namespace Abc.Zebus.Persistence.RocksDb
         private ColumnFamilyHandle _acksColumnFamily = default!;
 
         [DefaultConstructor]
-        public RocksDbStorage()
-            : this(Path.Combine(AppDomain.CurrentDomain.BaseDirectory!, "database"))
+        public RocksDbStorage(IReporter reporter)
+            : this(Path.Combine(AppDomain.CurrentDomain.BaseDirectory!, "database"), reporter)
         {
         }
 
-        public RocksDbStorage(string databaseDirectoryPath)
+        public RocksDbStorage(string databaseDirectoryPath, IReporter reporter)
         {
             _databaseDirectoryPath = databaseDirectoryPath;
+            _reporter = reporter;
         }
 
         public void Start()
@@ -75,6 +78,8 @@ namespace Abc.Zebus.Persistence.RocksDb
 
         public Task Write(IList<MatcherEntry> entriesToPersist)
         {
+            _reporter.AddStorageReport(ToStorageReport(entriesToPersist));
+
             foreach (var entry in entriesToPersist)
             {
                 var key = CreateKeyBuffer(entry.PeerId);
@@ -115,6 +120,14 @@ namespace Abc.Zebus.Persistence.RocksDb
             return Task.CompletedTask;
         }
 
+        private static StorageReport ToStorageReport(IList<MatcherEntry> entriesToPersist)
+        {
+            var fattestMessage = entriesToPersist.OrderByDescending(msg => msg.MessageLength).First();
+            var entriesByTypeName = entriesToPersist.ToLookup(x => x.MessageTypeName)
+                                                    .ToDictionary(xs => xs.Key, xs => new MessageTypeStorageReport(xs.Count(), xs.Sum(x => x.MessageLength)));
+            return new StorageReport(entriesToPersist.Count, entriesToPersist.Sum(msg => msg.MessageLength), fattestMessage.MessageLength, fattestMessage.MessageTypeName, entriesByTypeName);
+        }
+
         private void UpdateNonAckedCounts(IGrouping<PeerId, MatcherEntry> entry)
         {
             var nonAcked = entry.Aggregate(0, (s, e) => s + (e.IsAck ? -1 : 1));

+ 2 - 0
src/Abc.Zebus.Persistence/Matching/MatcherEntry.cs

@@ -47,5 +47,7 @@ namespace Abc.Zebus.Persistence.Matching
         {
             return IsEventWaitHandle || SystemDateTime.UtcNow - TimestampUtc >= delay;
         }
+
+        public int MessageLength => MessageBytes?.Length ?? 0;
     }
 }

+ 22 - 24
src/Abc.Zebus.Persistence/MessageReplayer.cs

@@ -153,35 +153,33 @@ namespace Abc.Zebus.Persistence
 
         private int ReplayUnackedMessages(CancellationToken cancellationToken)
         {
-            using (var reader = _storage.CreateMessageReader(_peer.Id))
-            {
-                if (reader == null)
-                    return 0;
-                var totalMessageCount = 0;
+            using var reader = _storage.CreateMessageReader(_peer.Id);
+            if (reader == null)
+                return 0;
+            var totalMessageCount = 0;
 
-                foreach (var partition in reader.GetUnackedMessages().TakeWhile(m => !cancellationToken.IsCancellationRequested).Partition(_replayBatchSize, true))
+            foreach (var partition in reader.GetUnackedMessages().TakeWhile(m => !cancellationToken.IsCancellationRequested).Partition(_replayBatchSize, true))
+            {
+                var messageSentCount = 0;
+                var batchDuration = MeasureDuration();
+                var readAndSendDuration = MeasureDuration();
+                foreach (var message in partition.Select(DeserializeTransportMessage))
                 {
-                    var messageSentCount = 0;
-                    var batchDuration = MeasureDuration();
-                    var readAndSendDuration = MeasureDuration();
-                    foreach (var message in partition.Select(DeserializeTransportMessage))
-                    {
-                        _unackedIds.Add(message.Id);
-                        ReplayMessage(message);
-                        messageSentCount++;
-                    }
-
-                    totalMessageCount += messageSentCount;
-
-                    _logger.LogInformation($"Read and send for last batch of {messageSentCount} msgs for {_peer.Id} took {readAndSendDuration.Value}. ({messageSentCount / readAndSendDuration.Value.TotalSeconds} msg/s)");
-                    WaitForAcks(cancellationToken);
-                    _logger.LogInformation($"Last batch for {_peer.Id} took {batchDuration.Value} to be totally replayed ({messageSentCount / batchDuration.Value.TotalSeconds} msg/s)");
-                    _reporter.AddReplaySpeedReport(messageSentCount, readAndSendDuration.Value.TotalSeconds, batchDuration.Value.TotalSeconds);
+                    _unackedIds.Add(message.Id);
+                    ReplayMessage(message);
+                    messageSentCount++;
                 }
 
-                _logger.LogInformation($"Replay finished for peer {_peer.Id}. Disposing the reader");
-                return totalMessageCount;
+                totalMessageCount += messageSentCount;
+
+                _logger.LogInformation($"Read and send for last batch of {messageSentCount} msgs for {_peer.Id} took {readAndSendDuration.Value}. ({messageSentCount / readAndSendDuration.Value.TotalSeconds} msg/s)");
+                WaitForAcks(cancellationToken);
+                _logger.LogInformation($"Last batch for {_peer.Id} took {batchDuration.Value} to be totally replayed ({messageSentCount / batchDuration.Value.TotalSeconds} msg/s)");
+                _reporter.AddReplaySpeedReport(new ReplaySpeedReport(messageSentCount, readAndSendDuration.Value, batchDuration.Value));
             }
+
+            _logger.LogInformation($"Replay finished for peer {_peer.Id}. Disposing the reader");
+            return totalMessageCount;
         }
 
         private static TransportMessage DeserializeTransportMessage(byte[] row) => TransportMessage.Deserialize(row);

+ 3 - 9
src/Abc.Zebus.Persistence/Reporter/IReporter.cs

@@ -1,14 +1,8 @@
-using System.Collections.Generic;
-
 namespace Abc.Zebus.Persistence.Reporter
 {
     public interface IReporter
     {
-        void AddReplaySpeedReport(int messagesReplayedCount, double sendDurationInSeconds, double ackDurationInSeconds);
-        IList<ReplaySpeedReport> TakeAndResetReplaySpeedReports();
-
-        void AddStorageReport(int messageCount, int batchSizeInBytes, int fattestMessageSizeInBytes, string fattestMessageTypeId);
-        IList<StorageReport> TakeAndResetStorageReports();
-
+        void AddReplaySpeedReport(ReplaySpeedReport replaySpeedReport);
+        void AddStorageReport(StorageReport storageReport);
     }
-}
+}

+ 5 - 5
src/Abc.Zebus.Persistence/Reporter/NoopReporter.cs

@@ -4,10 +4,10 @@ namespace Abc.Zebus.Persistence.Reporter
 {
     public class NoopReporter : IReporter
     {
-        private static readonly List<ReplaySpeedReport> _emptyReplayReports = new List<ReplaySpeedReport>(0);
-        private static readonly List<StorageReport> _emptyStorageReports = new List<StorageReport>(0);
+        private static readonly List<ReplaySpeedReport> _emptyReplayReports = new(0);
+        private static readonly List<StorageReport> _emptyStorageReports = new(0);
 
-        public void AddReplaySpeedReport(int messagesReplayedCount, double sendDurationInSeconds, double ackDurationInSeconds)
+        public void AddReplaySpeedReport(ReplaySpeedReport replaySpeedReport)
         {
         }
 
@@ -16,7 +16,7 @@ namespace Abc.Zebus.Persistence.Reporter
             return _emptyReplayReports;
         }
 
-        public void AddStorageReport(int messageCount, int batchSizeInBytes, int fattestMessageSizeInBytes, string fattestMessageTypeId)
+        public void AddStorageReport(StorageReport storageReport)
         {
         }
 
@@ -25,4 +25,4 @@ namespace Abc.Zebus.Persistence.Reporter
             return _emptyStorageReports;
         }
     }
-}
+}

+ 9 - 7
src/Abc.Zebus.Persistence/Reporter/ReplaySpeedReport.cs

@@ -1,16 +1,18 @@
-namespace Abc.Zebus.Persistence.Reporter
+using System;
+
+namespace Abc.Zebus.Persistence.Reporter
 {
     public class ReplaySpeedReport
     {
         public int MessageCount { get; }
-        public double SendDurationInSeconds { get; }
-        public double AckDurationInSeconds { get; }
+        public TimeSpan SendDuration { get; }
+        public TimeSpan AckDuration { get; }
 
-        public ReplaySpeedReport(int messageCount, double sendDurationInSeconds, double ackDurationInSeconds)
+        public ReplaySpeedReport(int messageCount, TimeSpan sendDuration, TimeSpan ackDuration)
         {
             MessageCount = messageCount;
-            SendDurationInSeconds = sendDurationInSeconds;
-            AckDurationInSeconds = ackDurationInSeconds;
+            SendDuration = sendDuration;
+            AckDuration = ackDuration;
         }
     }
-}
+}

+ 12 - 3
src/Abc.Zebus.Persistence/Reporter/StorageReport.cs

@@ -1,4 +1,6 @@
-namespace Abc.Zebus.Persistence.Reporter
+using System.Collections.Generic;
+
+namespace Abc.Zebus.Persistence.Reporter
 {
     public class StorageReport
     {
@@ -6,13 +8,20 @@
         public int BatchSizeInBytes { get; }
         public int FattestMessageSizeInBytes { get; }
         public string FattestMessageTypeId { get; }
+        public Dictionary<string, MessageTypeStorageReport> MessageTypeStorageReports { get; }
 
-        public StorageReport(int messageCount, int batchSizeInBytes, int fattestMessageSizeInBytes, string fattestMessageTypeId)
+        public StorageReport(int messageCount, int batchSizeInBytes, int fattestMessageSizeInBytes, string fattestMessageTypeId, Dictionary<string, MessageTypeStorageReport> messageTypeStorageReports)
         {
             MessageCount = messageCount;
             BatchSizeInBytes = batchSizeInBytes;
             FattestMessageSizeInBytes = fattestMessageSizeInBytes;
             FattestMessageTypeId = fattestMessageTypeId;
+            MessageTypeStorageReports = messageTypeStorageReports;
         }
+
+        public override string ToString()
+            => $"{nameof(MessageCount)}: {MessageCount}, {nameof(BatchSizeInBytes)}: {BatchSizeInBytes}, {nameof(FattestMessageSizeInBytes)}: {FattestMessageSizeInBytes}, {nameof(FattestMessageTypeId)}: {FattestMessageTypeId}, {nameof(MessageTypeStorageReports)}: {MessageTypeStorageReports}";
     }
-}
+
+    public record MessageTypeStorageReport(int Count, int TotalBytes);
+}

+ 1 - 1
src/Directory.Build.props

@@ -16,7 +16,7 @@
   </PropertyGroup>
 
   <PropertyGroup>
-    <ZebusVersion>3.13.0</ZebusVersion>
+    <ZebusVersion>3.13.1</ZebusVersion>
     <ZebusContractsVersion>3.0.0</ZebusContractsVersion>
     <ZebusDirectoryVersion>$(ZebusVersion)</ZebusDirectoryVersion>
     <ZebusPersistenceVersion>$(ZebusVersion)</ZebusPersistenceVersion>