1
0
Эх сурвалжийг харах

Persistence: implement storage report in RocksDb storage

Mendel Monteiro-Beckerman 3 жил өмнө
parent
commit
96558e9087

+ 2 - 1
src/Abc.Zebus.Persistence.RocksDb.Tests/PerformanceTests.cs

@@ -6,6 +6,7 @@ using System.Linq;
 using System.Threading;
 using System.Threading.Tasks;
 using Abc.Zebus.Persistence.Matching;
+using Abc.Zebus.Persistence.Reporter;
 using Abc.Zebus.Persistence.Storage;
 using Abc.Zebus.Testing;
 using Abc.Zebus.Transport;
@@ -23,7 +24,7 @@ namespace Abc.Zebus.Persistence.RocksDb.Tests
         [SetUp]
         public void SetUp()
         {
-            _storage = new RocksDbStorage(Guid.NewGuid().ToString());
+            _storage = new RocksDbStorage(Guid.NewGuid().ToString(), new NoopReporter());
             _storage.Start();
         }
 

+ 11 - 4
src/Abc.Zebus.Persistence.RocksDb.Tests/RocksDbStorageTests.cs

@@ -6,6 +6,7 @@ using System.Threading.Tasks;
 using Abc.Zebus.Persistence.Matching;
 using Abc.Zebus.Persistence.Reporter;
 using Abc.Zebus.Testing;
+using Abc.Zebus.Testing.Comparison;
 using Abc.Zebus.Testing.Extensions;
 using Abc.Zebus.Transport;
 using Moq;
@@ -27,7 +28,7 @@ namespace Abc.Zebus.Persistence.RocksDb.Tests
             _databaseDirectoryPath = Path.Combine(Path.GetTempPath(), Guid.NewGuid().ToString());
 
             _reporterMock = new Mock<IReporter>();
-            _storage = new RocksDbStorage(_databaseDirectoryPath);
+            _storage = new RocksDbStorage(_databaseDirectoryPath,  _reporterMock.Object);
             _storage.Start();
         }
 
@@ -210,7 +211,7 @@ namespace Abc.Zebus.Persistence.RocksDb.Tests
             await _storage.Write(new[] { MatcherEntry.Ack(peer, messageId) });
             _storage.Stop();
 
-            _storage = new RocksDbStorage(_databaseDirectoryPath);
+            _storage = new RocksDbStorage(_databaseDirectoryPath, _reporterMock.Object);
             _storage.Start();
 
             var message = MatcherEntry.Message(peer, messageId, MessageUtil.TypeId<Message1>(), Array.Empty<byte>());
@@ -224,7 +225,7 @@ namespace Abc.Zebus.Persistence.RocksDb.Tests
             }
         }
 
-        [Test, Explicit]
+        [Test]
         public void should_report_storage_information()
         {
             var peer = new PeerId("peer");
@@ -235,7 +236,13 @@ namespace Abc.Zebus.Persistence.RocksDb.Tests
                 MatcherEntry.Message(peer, MessageId.NextId(), new MessageTypeId("Abc.Message.Fat"), new byte[] { 0x01, 0x02, 0x03, 0x04 }),
             });
 
-            _reporterMock.Verify(r => r.AddStorageReport(new StorageReport(2, 7, 4, "Abc.Message.Fat", new Dictionary<string, MessageTypeStatistics>())));
+            var entryTypeStatistics = new Dictionary<string, MessageTypeStatistics> { ["Abc.Message"] = new(1, 3),  ["Abc.Message.Fat"] = new(1, 4)  };
+            var storageReport = new StorageReport(2, 7, 4, "Abc.Message.Fat", entryTypeStatistics);
+            _reporterMock.Verify(r => r.AddStorageReport(It.Is<StorageReport>(x => x.MessageCount == storageReport.MessageCount
+                                                                              && x.BatchSizeInBytes == storageReport.BatchSizeInBytes
+                                                                              && x.FattestMessageTypeId == storageReport.FattestMessageTypeId
+                                                                              && x.FattestMessageSizeInBytes == storageReport.FattestMessageSizeInBytes
+                                                                              && x.MessageTypeStatistics.DeepCompare(storageReport.MessageTypeStatistics))));
         }
 
         private TransportMessage CreateTestTransportMessage(int i)

+ 16 - 3
src/Abc.Zebus.Persistence.RocksDb/RocksDbStorage.cs

@@ -6,6 +6,7 @@ using System.Linq;
 using System.Text;
 using System.Threading.Tasks;
 using Abc.Zebus.Persistence.Matching;
+using Abc.Zebus.Persistence.Reporter;
 using Abc.Zebus.Persistence.Storage;
 using RocksDbSharp;
 using StructureMap;
@@ -20,6 +21,7 @@ namespace Abc.Zebus.Persistence.RocksDb
     /// </summary>
     public class RocksDbStorage : IStorage, IDisposable
     {
+        private readonly IReporter _reporter;
         private static readonly int _guidLength = Guid.Empty.ToByteArray().Length;
 
         private readonly ConcurrentDictionary<MessageId, bool> _outOfOrderAcks = new ConcurrentDictionary<MessageId, bool>();
@@ -31,14 +33,15 @@ namespace Abc.Zebus.Persistence.RocksDb
         private ColumnFamilyHandle _acksColumnFamily = default!;
 
         [DefaultConstructor]
-        public RocksDbStorage()
-            : this(Path.Combine(AppDomain.CurrentDomain.BaseDirectory!, "database"))
+        public RocksDbStorage(IReporter reporter)
+            : this(Path.Combine(AppDomain.CurrentDomain.BaseDirectory!, "database"), reporter)
         {
         }
 
-        public RocksDbStorage(string databaseDirectoryPath)
+        public RocksDbStorage(string databaseDirectoryPath, IReporter reporter)
         {
             _databaseDirectoryPath = databaseDirectoryPath;
+            _reporter = reporter;
         }
 
         public void Start()
@@ -75,6 +78,8 @@ namespace Abc.Zebus.Persistence.RocksDb
 
         public Task Write(IList<MatcherEntry> entriesToPersist)
         {
+            _reporter.AddStorageReport(ToStorageReport(entriesToPersist));
+
             foreach (var entry in entriesToPersist)
             {
                 var key = CreateKeyBuffer(entry.PeerId);
@@ -115,6 +120,14 @@ namespace Abc.Zebus.Persistence.RocksDb
             return Task.CompletedTask;
         }
 
+        private static StorageReport ToStorageReport(IList<MatcherEntry> entriesToPersist)
+        {
+            var fattestMessage = entriesToPersist.OrderByDescending(msg => msg.MessageBytes?.Length ?? 0).First();
+            var entriesByTypeName = entriesToPersist.ToLookup(x => x.MessageTypeName)
+                                                    .ToDictionary(xs => xs.Key, xs => new MessageTypeStatistics(xs.Count(), xs.Sum(x => x.MessageBytes?.Length) ?? 0));
+            return new StorageReport(entriesToPersist.Count, entriesToPersist.Sum(msg => msg.MessageBytes?.Length ?? 0), fattestMessage.MessageBytes?.Length ?? 0, fattestMessage.MessageTypeName, entriesByTypeName);
+        }
+
         private void UpdateNonAckedCounts(IGrouping<PeerId, MatcherEntry> entry)
         {
             var nonAcked = entry.Aggregate(0, (s, e) => s + (e.IsAck ? -1 : 1));