Jelajahi Sumber

Persistence: Stop using Zebus internals in Persistence.Cassandra and Persistence.RocksDb

Olivier Coanet 3 tahun lalu
induk
melakukan
a39618c006

+ 3 - 2
src/Abc.Zebus.Persistence.Cassandra.Tests/CqlMessageReaderTests.cs

@@ -11,6 +11,7 @@ using Abc.Zebus.Testing.Comparison;
 using Abc.Zebus.Testing.Extensions;
 using Abc.Zebus.Transport;
 using NUnit.Framework;
+#pragma warning disable CS0618
 
 namespace Abc.Zebus.Persistence.Cassandra.Tests
 {
@@ -60,7 +61,7 @@ namespace Abc.Zebus.Persistence.Cassandra.Tests
             nonAckedMessages.Count.ShouldEqual(3);
             for (var i = 0; i < nonAckedMessages.Count; i++)
             {
-                var transportMessage = TransportMessageDeserializer.Deserialize(nonAckedMessages[i]);
+                var transportMessage = TransportMessageConvert.Deserialize(nonAckedMessages[i]);
                 transportMessage.DeepCompare(transportMessages[i]).ShouldBeTrue();
             }
         }
@@ -70,7 +71,7 @@ namespace Abc.Zebus.Persistence.Cassandra.Tests
             return x =>
             {
                 x.IsAcked = false;
-                x.TransportMessage = Serializer.Serialize(transportMessage).ToArray();
+                x.TransportMessage = TransportMessageConvert.Serialize(transportMessage);
             };
         }
 

+ 74 - 70
src/Abc.Zebus.Persistence.Cassandra.Tests/CqlStorageTests.cs

@@ -8,6 +8,7 @@ using Abc.Zebus.Persistence.Cassandra.Tests.Cql;
 using Abc.Zebus.Persistence.Matching;
 using Abc.Zebus.Persistence.Messages;
 using Abc.Zebus.Persistence.Reporter;
+using Abc.Zebus.Persistence.Storage;
 using Abc.Zebus.Testing;
 using Abc.Zebus.Testing.Extensions;
 using Abc.Zebus.Transport;
@@ -16,6 +17,7 @@ using Cassandra.Data.Linq;
 using Moq;
 using NUnit.Framework;
 using ProtoBuf;
+#pragma warning disable CS0618
 
 namespace Abc.Zebus.Persistence.Cassandra.Tests
 {
@@ -69,29 +71,31 @@ namespace Abc.Zebus.Persistence.Cassandra.Tests
         [Test]
         public async Task should_write_message_entry_fields_to_cassandra()
         {
-            using (SystemDateTime.PauseTime())
-            {
-                var messageBytes = new byte[512];
-                new Random().NextBytes(messageBytes);
-                var messageId = MessageId.NextId();
-                var peerId = "Abc.Peer.0";
-
-                await _storage.Write(new List<MatcherEntry> { MatcherEntry.Message(new PeerId(peerId), messageId, MessageTypeId.PersistenceStopping, messageBytes) });
-
-                var retrievedMessage = DataContext.PersistentMessages.Execute().ExpectedSingle();
-                retrievedMessage.TransportMessage.ShouldBeEquivalentTo(messageBytes, true);
-                retrievedMessage.BucketId.ShouldEqual(GetBucketIdFromMessageId(messageId));
-                retrievedMessage.IsAcked.ShouldBeFalse();
-                retrievedMessage.PeerId.ShouldEqual(peerId);
-                retrievedMessage.UniqueTimestampInTicks.ShouldEqual(messageId.GetDateTime().Ticks);
-                var writeTimeRow = DataContext.Session.Execute("SELECT WRITETIME(\"IsAcked\") FROM \"PersistentMessage\";").ExpectedSingle();
-                writeTimeRow.GetValue<long>(0).ShouldEqual(ToUnixMicroSeconds(messageId.GetDateTime()));
-
-                var peerState = DataContext.PeerStates.Execute().ExpectedSingle();
-                peerState.NonAckedMessageCount.ShouldEqual(1);
-                peerState.PeerId.ShouldEqual(peerId);
-                peerState.OldestNonAckedMessageTimestamp.ShouldEqual(messageId.GetDateTime().Ticks - PeerState.MessagesTimeToLive.Ticks);
-            }
+            var now = DateTime.UtcNow;
+            _storage.DateTimeSource = () => now;
+
+            using var _ = MessageId.PauseIdGenerationAtDate(now);
+
+            var messageBytes = new byte[512];
+            new Random().NextBytes(messageBytes);
+            var messageId = MessageId.NextId();
+            var peerId = "Abc.Peer.0";
+
+            await _storage.Write(new List<MatcherEntry> { MatcherEntry.Message(new PeerId(peerId), messageId, MessageTypeId.PersistenceStopping, messageBytes) });
+
+            var retrievedMessage = DataContext.PersistentMessages.Execute().ExpectedSingle();
+            retrievedMessage.TransportMessage.ShouldBeEquivalentTo(messageBytes, true);
+            retrievedMessage.BucketId.ShouldEqual(GetBucketIdFromMessageId(messageId));
+            retrievedMessage.IsAcked.ShouldBeFalse();
+            retrievedMessage.PeerId.ShouldEqual(peerId);
+            retrievedMessage.UniqueTimestampInTicks.ShouldEqual(messageId.GetDateTime().Ticks);
+            var writeTimeRow = DataContext.Session.Execute("SELECT WRITETIME(\"IsAcked\") FROM \"PersistentMessage\";").ExpectedSingle();
+            writeTimeRow.GetValue<long>(0).ShouldEqual(ToUnixMicroSeconds(messageId.GetDateTime()));
+
+            var peerState = DataContext.PeerStates.Execute().ExpectedSingle();
+            peerState.NonAckedMessageCount.ShouldEqual(1);
+            peerState.PeerId.ShouldEqual(peerId);
+            peerState.OldestNonAckedMessageTimestamp.ShouldEqual(messageId.GetDateTime().Ticks - PeerState.MessagesTimeToLive.Ticks);
         }
 
         [Test]
@@ -257,7 +261,8 @@ namespace Abc.Zebus.Persistence.Cassandra.Tests
             MessageId.ResetLastTimestamp();
 
             var firstTime = DateTime.Now;
-            using (SystemDateTime.Set(firstTime))
+            _storage.DateTimeSource = () => firstTime;
+
             using (MessageId.PauseIdGenerationAtDate(firstTime))
             {
                 var peerId = new PeerId("Abc.Testing.Target");
@@ -266,7 +271,7 @@ namespace Abc.Zebus.Persistence.Cassandra.Tests
                 await _storage.Write(new[] { MatcherEntry.Message(peerId, firstMessageId, new MessageTypeId("Abc.Message"), new byte[] { 0x01, 0x02, 0x03 }) });
 
                 var secondTime = firstTime.AddHours(1);
-                SystemDateTime.Set(secondTime);
+                _storage.DateTimeSource = () => secondTime;
                 MessageId.PauseIdGenerationAtDate(secondTime);
 
                 var secondMessageId = MessageId.NextId();
@@ -323,13 +328,15 @@ namespace Abc.Zebus.Persistence.Cassandra.Tests
             var firstPeer = new PeerId("Abc.Testing.Target");
             var secondPeer = new PeerId("Abc.Testing.OtherTarget");
 
+            var now = DateTime.UtcNow;
+            _storage.DateTimeSource = () => now;
+
             using (MessageId.PauseIdGeneration())
-            using (SystemDateTime.PauseTime())
             {
                 var transportMessages = Enumerable.Range(1, 100).Select(CreateTestTransportMessage).ToList();
                 var messages = transportMessages.SelectMany(x =>
                                                         {
-                                                            var transportMessageBytes = Serialization.Serializer.Serialize(x).ToArray();
+                                                            var transportMessageBytes = TransportMessageConvert.Serialize(x);
                                                             return new[]
                                                             {
                                                                 MatcherEntry.Message(firstPeer, x.Id, x.MessageTypeId, transportMessageBytes),
@@ -345,7 +352,7 @@ namespace Abc.Zebus.Persistence.Cassandra.Tests
                 nonAckedMessageCountsForUpdatedPeers[secondPeer].ShouldEqual(100);
 
                 var readerForFirstPeer = (CqlMessageReader)_storage.CreateMessageReader(firstPeer);
-                var expectedTransportMessages = transportMessages.Select(Serialization.Serializer.Serialize).Select(x => x.ToArray()).ToList();
+                var expectedTransportMessages = transportMessages.Select(TransportMessageConvert.Serialize).ToList();
                 readerForFirstPeer.GetUnackedMessages().ToList().ShouldEqualDeeply(expectedTransportMessages);
 
                 var readerForSecondPeer = (CqlMessageReader)_storage.CreateMessageReader(secondPeer);
@@ -370,29 +377,28 @@ namespace Abc.Zebus.Persistence.Cassandra.Tests
         [Test]
         public async Task should_update_oldest_non_acked_message_timestamp()
         {
-            using (SystemDateTime.PauseTime())
-            {
-                var peerId = new PeerId("PeerId");
-                var now = SystemDateTime.UtcNow;
-                InsertPersistentMessage(peerId, now.AddMilliseconds(1));
-                InsertPersistentMessage(peerId, now.AddMilliseconds(2), AckState.Unacked);
-                InsertPersistentMessage(peerId, now.AddMilliseconds(3), AckState.Unacked);
-                InsertPersistentMessage(peerId, now.AddMilliseconds(4));
-                InsertPersistentMessage(peerId, now.AddMilliseconds(5), AckState.Unacked);
-                var peerState = new PeerState(peerId, 0, now.AddMinutes(-30).Ticks);
-                InsertPeerState(peerState);
-
-                await _storage.UpdateNewOldestMessageTimestamp(peerState);
-
-                GetPeerState(peerId).OldestNonAckedMessageTimestampInTicks.ShouldEqual(now.AddMilliseconds(2).Ticks - _expectedOldestNonAckedMessageTimestampSafetyOffset);
-            }
+            var now = DateTime.UtcNow;
+            _storage.DateTimeSource = () => now;
+
+            var peerId = new PeerId("PeerId");
+            InsertPersistentMessage(peerId, now.AddMilliseconds(1));
+            InsertPersistentMessage(peerId, now.AddMilliseconds(2), AckState.Unacked);
+            InsertPersistentMessage(peerId, now.AddMilliseconds(3), AckState.Unacked);
+            InsertPersistentMessage(peerId, now.AddMilliseconds(4));
+            InsertPersistentMessage(peerId, now.AddMilliseconds(5), AckState.Unacked);
+            var peerState = new PeerState(peerId, 0, now.AddMinutes(-30).Ticks);
+            InsertPeerState(peerState);
+
+            await _storage.UpdateNewOldestMessageTimestamp(peerState);
+
+            GetPeerState(peerId).OldestNonAckedMessageTimestampInTicks.ShouldEqual(now.AddMilliseconds(2).Ticks - _expectedOldestNonAckedMessageTimestampSafetyOffset);
         }
 
         [Test]
         public async Task should_not_update_oldest_non_acked_message_timestamp_if_it_did_not_change()
         {
             var peerId = new PeerId("PeerId");
-            var now = SystemDateTime.UtcNow;
+            var now = DateTime.UtcNow;
             InsertPersistentMessage(peerId, now.AddMilliseconds(1));
             InsertPersistentMessage(peerId, now.AddMilliseconds(2), AckState.Unacked);
             InsertPersistentMessage(peerId, now.AddMilliseconds(3), AckState.Unacked);
@@ -409,40 +415,38 @@ namespace Abc.Zebus.Persistence.Cassandra.Tests
         [Test]
         public async Task should_take_utc_now_timestamp_minus_safety_offset_as_oldest_non_acked_message_when_all_messages_are_acked()
         {
-            using (SystemDateTime.PauseTime())
-            {
-                var peerId = new PeerId("PeerId");
-                var now = SystemDateTime.UtcNow;
+            var now = DateTime.UtcNow;
+            _storage.DateTimeSource = () => now;
 
-                InsertPersistentMessage(peerId, now.AddMilliseconds(-5));
-                InsertPersistentMessage(peerId, now.AddMilliseconds(-4));
-                InsertPersistentMessage(peerId, now.AddMilliseconds(-3));
-                InsertPersistentMessage(peerId, now.AddMilliseconds(-2));
-                InsertPersistentMessage(peerId, now.AddMilliseconds(-1));
+            var peerId = new PeerId("PeerId");
+
+            InsertPersistentMessage(peerId, now.AddMilliseconds(-5));
+            InsertPersistentMessage(peerId, now.AddMilliseconds(-4));
+            InsertPersistentMessage(peerId, now.AddMilliseconds(-3));
+            InsertPersistentMessage(peerId, now.AddMilliseconds(-2));
+            InsertPersistentMessage(peerId, now.AddMilliseconds(-1));
 
-                var peerState = new PeerState(peerId, 0, now.AddHours(-1).Ticks);
-                InsertPeerState(peerState);
+            var peerState = new PeerState(peerId, 0, now.AddHours(-1).Ticks);
+            InsertPeerState(peerState);
 
-                await _storage.UpdateNewOldestMessageTimestamp(peerState);
+            await _storage.UpdateNewOldestMessageTimestamp(peerState);
 
-                GetPeerState(peerId).OldestNonAckedMessageTimestampInTicks.ShouldEqual(now.Ticks - _expectedOldestNonAckedMessageTimestampSafetyOffset);
-            }
+            GetPeerState(peerId).OldestNonAckedMessageTimestampInTicks.ShouldEqual(now.Ticks - _expectedOldestNonAckedMessageTimestampSafetyOffset);
         }
 
         [Test]
         public async Task should_take_utc_now_timestamp_as_oldest_non_acked_message_when_no_messages_are_acked()
         {
-            using (SystemDateTime.PauseTime())
-            {
-                var peerId = new PeerId("PeerId");
-                var now = SystemDateTime.UtcNow;
-                var peerState = new PeerState(peerId, 0, now.AddDays(-2).Ticks);
-                InsertPeerState(peerState);
+            var now = DateTime.UtcNow;
+            _storage.DateTimeSource = () => now;
+
+            var peerId = new PeerId("PeerId");
+            var peerState = new PeerState(peerId, 0, now.AddDays(-2).Ticks);
+            InsertPeerState(peerState);
 
-                await _storage.UpdateNewOldestMessageTimestamp(peerState);
+            await _storage.UpdateNewOldestMessageTimestamp(peerState);
 
-                GetPeerState(peerId).OldestNonAckedMessageTimestampInTicks.ShouldEqual(now.Ticks - _expectedOldestNonAckedMessageTimestampSafetyOffset);
-            }
+            GetPeerState(peerId).OldestNonAckedMessageTimestampInTicks.ShouldEqual(now.Ticks - _expectedOldestNonAckedMessageTimestampSafetyOffset);
         }
 
         [Test]
@@ -545,9 +549,9 @@ namespace Abc.Zebus.Persistence.Cassandra.Tests
             return new PeerState(new PeerId(state.PeerId), state.NonAckedMessageCount, state.OldestNonAckedMessageTimestamp);
         }
 
-        private static TransportMessage CreateTestTransportMessage(int i)
+        private TransportMessage CreateTestTransportMessage(int i)
         {
-            MessageId.PauseIdGenerationAtDate(SystemDateTime.UtcNow.Date.AddSeconds(i * 10));
+            MessageId.PauseIdGenerationAtDate(_storage.DateTimeSource.Invoke().Date.AddSeconds(i * 10));
             return new Message1(i).ToTransportMessage();
         }
 

+ 2 - 3
src/Abc.Zebus.Persistence.Cassandra.Tests/OldestNonAckedMessageUpdaterPeriodicActionTests.cs

@@ -8,7 +8,6 @@ using Abc.Zebus.Persistence.Cassandra.PeriodicAction;
 using Abc.Zebus.Persistence.Cassandra.Tests.Cql;
 using Abc.Zebus.Testing;
 using Abc.Zebus.Testing.Extensions;
-using Abc.Zebus.Util;
 using Moq;
 using NUnit.Framework;
 
@@ -38,8 +37,8 @@ namespace Abc.Zebus.Persistence.Cassandra.Tests
         {
             _testBus = new TestBus();
             var configurationMock = new Mock<ICqlPersistenceConfiguration>();
-            configurationMock.SetupGet(x => x.OldestMessagePerPeerCheckPeriod).Returns(30.Seconds());
-            configurationMock.SetupGet(x => x.OldestMessagePerPeerGlobalCheckPeriod).Returns(30.Seconds());
+            configurationMock.SetupGet(x => x.OldestMessagePerPeerCheckPeriod).Returns(TimeSpan.FromSeconds(30));
+            configurationMock.SetupGet(x => x.OldestMessagePerPeerGlobalCheckPeriod).Returns(TimeSpan.FromSeconds(30));
             _cqlStorage = new Mock<ICqlStorage>();
             _oldestMessageUpdater = new OldestNonAckedMessageUpdaterPeriodicAction(_testBus, configurationMock.Object, _cqlStorage.Object);
         }

+ 7 - 1
src/Abc.Zebus.Persistence.Cassandra/Cql/CqlStorage.cs

@@ -41,6 +41,12 @@ namespace Abc.Zebus.Persistence.Cassandra.Cql
 
         public Task CleanBucketTask { get; private set; } = Task.CompletedTask;
 
+        public Func<DateTime> DateTimeSource
+        {
+            get => _peerStateRepository.DateTimeSource;
+            set => _peerStateRepository.DateTimeSource = value;
+        }
+
         public Dictionary<PeerId, int> GetNonAckedMessageCounts()
         {
             return _peerStateRepository.GetAllKnownPeers()
@@ -196,7 +202,7 @@ namespace Abc.Zebus.Persistence.Cassandra.Cql
                                                         .Select(x => (long?)x.uniqueTimestampInTicks)
                                                         .FirstOrDefault();
 
-            return firstUnackedMessageTimestamp ?? SystemDateTime.UtcNow.Ticks;
+            return firstUnackedMessageTimestamp ?? DateTimeSource.Invoke().Ticks;
 
             IEnumerable<(bool isAcked, long uniqueTimestampInTicks)> ReadMessages(long bucketId)
             {

+ 3 - 4
src/Abc.Zebus.Persistence.Cassandra/Cql/PeerState.cs

@@ -1,5 +1,4 @@
 using System;
-using Abc.Zebus.Util;
 
 namespace Abc.Zebus.Persistence.Cassandra.Cql
 {
@@ -13,15 +12,15 @@ namespace Abc.Zebus.Persistence.Cassandra.Cql
         /// This delay prevents <see cref="OldestNonAckedMessageTimestampInTicks"/> from being moved too aggressively.
         /// Its value is set to the maximum estimated clock-drift / network delay of the system.
         /// </summary>
-        public static readonly TimeSpan OldestNonAckedMessageTimestampSafetyOffset = 20.Minutes();
+        public static readonly TimeSpan OldestNonAckedMessageTimestampSafetyOffset = TimeSpan.FromMinutes(20);
 
-        public static readonly TimeSpan MessagesTimeToLive = 30.Days();
+        public static readonly TimeSpan MessagesTimeToLive = TimeSpan.FromDays(30);
 
         public PeerState(PeerId peerId, int nonAckMessageCount = 0, long oldestNonAckedMessageTimestamp = 0, bool removed = false)
         {
             PeerId = peerId;
             NonAckedMessageCount = nonAckMessageCount;
-            OldestNonAckedMessageTimestampInTicks = oldestNonAckedMessageTimestamp > 0 ? oldestNonAckedMessageTimestamp : SystemDateTime.UtcNow.Ticks - MessagesTimeToLive.Ticks;
+            OldestNonAckedMessageTimestampInTicks = oldestNonAckedMessageTimestamp;
             Removed = removed;
         }
 

+ 8 - 4
src/Abc.Zebus.Persistence.Cassandra/Cql/PeerStateRepository.cs

@@ -1,11 +1,11 @@
-using System.Collections.Concurrent;
+using System;
+using System.Collections.Concurrent;
 using System.Collections.Generic;
 using System.Linq;
 using System.Threading.Tasks;
 using Abc.Zebus.Persistence.Cassandra.Data;
 using Abc.Zebus.Persistence.Messages;
 using Abc.Zebus.Persistence.Util;
-using Abc.Zebus.Util;
 using Cassandra;
 using Cassandra.Data.Linq;
 using log4net;
@@ -24,6 +24,8 @@ namespace Abc.Zebus.Persistence.Cassandra.Cql
             _dataContext = dataContext;
         }
 
+        public Func<DateTime> DateTimeSource { get; set; } = () => DateTime.UtcNow;
+
         public void Initialize()
         {
             _log.Info("Initializing PeerStateRepository");
@@ -48,7 +50,7 @@ namespace Abc.Zebus.Persistence.Cassandra.Cql
                                                           p =>
                                                           {
                                                               _log.Info($"Created new state for peer {p}");
-                                                              return new PeerState(p, delta);
+                                                              return new PeerState(p, delta, MinimumOldestNonAckedMessageTimestamp);
                                                           },
                                                           (id, state) => state.WithNonAckedMessageCountDelta(delta));
 
@@ -94,7 +96,7 @@ namespace Abc.Zebus.Persistence.Cassandra.Cql
 
         private Task RemovePersistentMessages(PeerId peerId)
         {
-            var allPossibleBuckets = BucketIdHelper.GetBucketsCollection(SystemDateTime.UtcNow.Ticks - PeerState.MessagesTimeToLive.Ticks).ToArray();
+            var allPossibleBuckets = BucketIdHelper.GetBucketsCollection(MinimumOldestNonAckedMessageTimestamp, DateTimeSource.Invoke()).ToArray();
 
             return _dataContext.PersistentMessages
                                .Where(x => x.PeerId == peerId.ToString() && allPossibleBuckets.Contains(x.BucketId))
@@ -106,5 +108,7 @@ namespace Abc.Zebus.Persistence.Cassandra.Cql
         {
             return _dataContext.PeerStates.Insert(new CassandraPeerState(p)).ExecuteAsync();
         }
+
+        private long MinimumOldestNonAckedMessageTimestamp => DateTimeSource.Invoke().Ticks - PeerState.MessagesTimeToLive.Ticks;
     }
 }

+ 5 - 6
src/Abc.Zebus.Persistence.Cassandra/PeriodicAction/OldestNonAckedMessageUpdaterPeriodicAction.cs

@@ -1,11 +1,10 @@
 using System;
+using System.Collections.Generic;
 using System.Linq;
 using System.Threading.Tasks;
 using Abc.Zebus.Hosting;
 using Abc.Zebus.Persistence.Cassandra.Cql;
 using Abc.Zebus.Persistence.Storage;
-using Abc.Zebus.Util;
-using Abc.Zebus.Util.Extensions;
 
 namespace Abc.Zebus.Persistence.Cassandra.PeriodicAction
 {
@@ -26,20 +25,20 @@ namespace Abc.Zebus.Persistence.Cassandra.PeriodicAction
         public override void DoPeriodicAction()
         {
             var isGlobalCheck = ShouldPerformGlobalCheck();
-            var peers = _cqlStorage.GetAllKnownPeers().AsList();
+            var peers = _cqlStorage.GetAllKnownPeers().ToList();
             var updatedNonAckedCounts = _nonAckedCountCache.Update(peers.Select(x => new NonAckedCount(x.PeerId, x.NonAckedMessageCount)));
-            var updatedPeerIds = updatedNonAckedCounts.Select(x => x.PeerId).ToHashSet();
+            var updatedPeerIds = new HashSet<PeerId>(updatedNonAckedCounts.Select(x => x.PeerId));
             var peersToCheck = isGlobalCheck ? peers : peers.Where(x => updatedPeerIds.Contains(x.PeerId));
 
             if (isGlobalCheck)
-                _lastGlobalCheck = SystemDateTime.UtcNow;
+                _lastGlobalCheck = DateTime.UtcNow;
 
             Parallel.ForEach(peersToCheck, new ParallelOptions { MaxDegreeOfParallelism = 10 }, UpdateOldestNonAckedMessage);
         }
 
         private bool ShouldPerformGlobalCheck()
         {
-            return SystemDateTime.UtcNow >= _lastGlobalCheck.Add(_configuration.OldestMessagePerPeerGlobalCheckPeriod);
+            return DateTime.UtcNow >= _lastGlobalCheck.Add(_configuration.OldestMessagePerPeerGlobalCheckPeriod);
         }
 
         private void UpdateOldestNonAckedMessage(PeerState peer)

+ 11 - 8
src/Abc.Zebus.Persistence.RocksDb.Tests/PerformanceTests.cs

@@ -1,15 +1,18 @@
 using System;
 using System.Collections.Generic;
 using System.Diagnostics;
+using System.IO;
 using System.Linq;
 using System.Threading;
 using System.Threading.Tasks;
 using Abc.Zebus.Persistence.Matching;
+using Abc.Zebus.Persistence.Storage;
 using Abc.Zebus.Testing;
 using Abc.Zebus.Transport;
 using Abc.Zebus.Util;
 using NUnit.Framework;
 using ProtoBuf;
+#pragma warning disable CS0618
 
 namespace Abc.Zebus.Persistence.RocksDb.Tests
 {
@@ -37,7 +40,7 @@ namespace Abc.Zebus.Persistence.RocksDb.Tests
             var messageBytes = MessageBytes();
 
             var startTime = DateTime.UtcNow;
-            var testDuration = 30.Seconds();
+            var testDuration = TimeSpan.FromSeconds(30);
             // Thread 1 - write messages
             var writeTask = Task.Run(async () =>
             {
@@ -86,16 +89,16 @@ namespace Abc.Zebus.Persistence.RocksDb.Tests
             var peerId = new PeerId("Peer");
             for (int i = 0; i < 10_000; i++)
             {
-                MessageId.PauseIdGenerationAtDate(SystemDateTime.UtcNow.Date.AddSeconds(i * 10));
+                MessageId.PauseIdGenerationAtDate(DateTime.UtcNow.Date.AddSeconds(i * 10));
                 entriesToPersist.Add(MatcherEntry.Message(peerId, MessageId.NextId(), new MessageTypeId("SomeEvent"), messageBytes));
             }
 
             _storage.Write(entriesToPersist);
 
-            // Read all unacked messages 
+            // Read all unacked messages
             var messageReader = _storage.CreateMessageReader(peerId);
             var startTime = DateTime.UtcNow;
-            var testDuration = 30.Seconds();
+            var testDuration = TimeSpan.FromSeconds(30);
             var count = 0;
             while (DateTime.UtcNow - startTime < testDuration)
             {
@@ -117,7 +120,7 @@ namespace Abc.Zebus.Persistence.RocksDb.Tests
             var entriesToPersist = new List<MatcherEntry>();
             for (int i = 0; i < count; i++)
             {
-                MessageId.PauseIdGenerationAtDate(SystemDateTime.UtcNow.Date.AddSeconds(i * 10));
+                MessageId.PauseIdGenerationAtDate(DateTime.UtcNow.Date.AddSeconds(i * 10));
                 entriesToPersist.Add(MatcherEntry.Message(new PeerId("Peer" + (i + offset)), MessageId.NextId(), new MessageTypeId("SomeEvent"), messageBytes));
             }
             return entriesToPersist;
@@ -126,13 +129,13 @@ namespace Abc.Zebus.Persistence.RocksDb.Tests
         private static byte[] MessageBytes()
         {
             var message = CreateTestTransportMessage(1);
-            var messageBytes = Serialization.Serializer.Serialize(message).ToArray();
-            return messageBytes;
+
+            return TransportMessageConvert.Serialize(message);
         }
 
         private static TransportMessage CreateTestTransportMessage(int i)
         {
-            MessageId.PauseIdGenerationAtDate(SystemDateTime.UtcNow.Date.AddSeconds(i * 10));
+            MessageId.PauseIdGenerationAtDate(DateTime.UtcNow.Date.AddSeconds(i * 10));
             return new Message1(i).ToTransportMessage();
         }
 

+ 13 - 24
src/Abc.Zebus.Persistence.RocksDb.Tests/RocksDbStorageTests.cs

@@ -5,14 +5,14 @@ using System.Linq;
 using System.Threading.Tasks;
 using Abc.Zebus.Persistence.Matching;
 using Abc.Zebus.Persistence.Reporter;
-using Abc.Zebus.Serialization.Protobuf;
+using Abc.Zebus.Persistence.Storage;
 using Abc.Zebus.Testing;
 using Abc.Zebus.Testing.Extensions;
 using Abc.Zebus.Transport;
-using Abc.Zebus.Util;
 using Moq;
 using NUnit.Framework;
 using ProtoBuf;
+#pragma warning disable CS0618
 
 namespace Abc.Zebus.Persistence.RocksDb.Tests
 {
@@ -44,7 +44,7 @@ namespace Abc.Zebus.Persistence.RocksDb.Tests
         public async Task should_write_message_entry_fields_to_cassandra()
         {
             var inputMessage = CreateTestTransportMessage(1);
-            var messageBytes = Serialization.Serializer.Serialize(inputMessage).ToArray();
+            var messageBytes = TransportMessageConvert.Serialize(inputMessage);
             var messageId = MessageId.NextId();
 
             var peerId = new PeerId("Abc.Peer.0");
@@ -58,7 +58,7 @@ namespace Abc.Zebus.Persistence.RocksDb.Tests
         [Test]
         public async Task should_not_overwrite_messages_with_same_time_component_and_different_message_id()
         {
-            var messageBytes = Serialization.Serializer.Serialize(CreateTestTransportMessage(1)).ToArray();
+            var messageBytes = TransportMessageConvert.Serialize(CreateTestTransportMessage(1));
             var messageId = new MessageId(Guid.Parse("0000c399-1ab0-e511-9706-ae1ea5dcf365"));      // Time component @2016-01-01 00:00:00Z
             var otherMessageId = new MessageId(Guid.Parse("0000c399-1ab0-e511-9806-f1ef55aac8e9")); // Time component @2016-01-01 00:00:00Z
 
@@ -77,7 +77,7 @@ namespace Abc.Zebus.Persistence.RocksDb.Tests
         public async Task should_support_out_of_order_acks_and_messages()
         {
             var inputMessage = CreateTestTransportMessage(1);
-            var messageBytes = Serialization.Serializer.Serialize(inputMessage).ToArray();
+            var messageBytes = TransportMessageConvert.Serialize(inputMessage);
             var messageId = MessageId.NextId();
 
             var peerId = new PeerId("Abc.Peer.0");
@@ -101,7 +101,7 @@ namespace Abc.Zebus.Persistence.RocksDb.Tests
         public async Task should_remove_peer()
         {
             var inputMessage = CreateTestTransportMessage(1);
-            var messageBytes = Serialization.Serializer.Serialize(inputMessage).ToArray();
+            var messageBytes = TransportMessageConvert.Serialize(inputMessage);
             var messageId = MessageId.NextId();
 
             var peerId = new PeerId("Abc.Peer.0");
@@ -141,12 +141,11 @@ namespace Abc.Zebus.Persistence.RocksDb.Tests
             var secondPeer = new PeerId("Abc.Testing.OtherTarget");
 
             using (MessageId.PauseIdGeneration())
-            using (SystemDateTime.PauseTime())
             {
                 var inputMessages = Enumerable.Range(1, 100).Select(CreateTestTransportMessage).ToList();
                 var messages = inputMessages.SelectMany(x =>
                                                         {
-                                                            var transportMessageBytes = Serialization.Serializer.Serialize(x).ToArray();
+                                                            var transportMessageBytes = TransportMessageConvert.Serialize(x);
                                                             return new[]
                                                             {
                                                                 MatcherEntry.Message(firstPeer, x.Id, x.MessageTypeId, transportMessageBytes),
@@ -157,7 +156,7 @@ namespace Abc.Zebus.Persistence.RocksDb.Tests
 
                 await _storage.Write(messages);
 
-                var expectedTransportMessages = inputMessages.Select(Serialization.Serializer.Serialize).Select(x => x.ToArray()).ToList();
+                var expectedTransportMessages = inputMessages.Select(TransportMessageConvert.Serialize).ToList();
                 using (var readerForFirstPeer = _storage.CreateMessageReader(firstPeer))
                 {
                     var transportMessages = readerForFirstPeer.GetUnackedMessages().ToList();
@@ -189,17 +188,17 @@ namespace Abc.Zebus.Persistence.RocksDb.Tests
             using (var reader = _storage.CreateMessageReader(peer))
             {
                 reader.GetUnackedMessages()
-                      .Select(TransportMessageDeserializer.Deserialize)
+                      .Select(TransportMessageConvert.Deserialize)
                       .Select(x => x.Id)
                       .ToList()
                       .ShouldBeEquivalentTo(message1.MessageId);
             }
         }
 
-        private static MatcherEntry GetMatcherEntryWithValidTransportMessage(PeerId peer, int i)
+        private MatcherEntry GetMatcherEntryWithValidTransportMessage(PeerId peer, int i)
         {
             var inputMessage = CreateTestTransportMessage(i);
-            var messageBytes = Serialization.Serializer.Serialize(inputMessage).ToArray();
+            var messageBytes = TransportMessageConvert.Serialize(inputMessage);
             var message1 = MatcherEntry.Message(peer, inputMessage.Id, MessageUtil.TypeId<Message1>(), messageBytes);
             return message1;
         }
@@ -241,9 +240,9 @@ namespace Abc.Zebus.Persistence.RocksDb.Tests
             _reporterMock.Verify(r => r.AddStorageReport(2, 7, 4, "Abc.Message.Fat"));
         }
 
-        private static TransportMessage CreateTestTransportMessage(int i)
+        private TransportMessage CreateTestTransportMessage(int i)
         {
-            MessageId.PauseIdGenerationAtDate(SystemDateTime.UtcNow.Date.AddSeconds(i * 10));
+            MessageId.PauseIdGenerationAtDate(DateTime.UtcNow.Date.AddSeconds(i * 10));
             return new Message1(i).ToTransportMessage();
         }
 
@@ -258,15 +257,5 @@ namespace Abc.Zebus.Persistence.RocksDb.Tests
                 Id = id;
             }
         }
-
-        private static class TransportMessageDeserializer
-        {
-            public static TransportMessage Deserialize(byte[] bytes)
-            {
-                var bufferReader = new ProtoBufferReader(bytes, bytes.Length);
-                var readTransportMessage = bufferReader.ReadTransportMessage();
-                return readTransportMessage;
-            }
-        }
     }
 }

+ 3 - 1
src/Abc.Zebus.Persistence/MessageReplayer.cs

@@ -179,7 +179,9 @@ namespace Abc.Zebus.Persistence
             }
         }
 
-        private static TransportMessage DeserializeTransportMessage(byte[] row) => TransportMessageDeserializer.Deserialize(row);
+#pragma warning disable CS0618
+        private static TransportMessage DeserializeTransportMessage(byte[] row) => TransportMessageConvert.Deserialize(row);
+#pragma warning restore CS0618
 
         private void WaitForAcks(CancellationToken cancellationToken)
         {

+ 24 - 0
src/Abc.Zebus.Persistence/Storage/TransportMessageConvert.cs

@@ -0,0 +1,24 @@
+using System;
+using Abc.Zebus.Serialization.Protobuf;
+using Abc.Zebus.Transport;
+
+namespace Abc.Zebus.Persistence.Storage
+{
+    [Obsolete("Use TransportMessage instead")]
+    public static class TransportMessageConvert
+    {
+        public static byte[] Serialize(TransportMessage transportMessage)
+        {
+            var writer = new ProtoBufferWriter();
+            writer.WriteTransportMessage(transportMessage);
+
+            return writer.Buffer.AsSpan(0, writer.Position).ToArray();
+        }
+
+        public static TransportMessage Deserialize(byte[] bytes)
+        {
+            var reader = new ProtoBufferReader(bytes, bytes.Length);
+            return reader.ReadTransportMessage();
+        }
+    }
+}

+ 0 - 14
src/Abc.Zebus.Persistence/Storage/TransportMessageDeserializer.cs

@@ -1,14 +0,0 @@
-using Abc.Zebus.Serialization.Protobuf;
-using Abc.Zebus.Transport;
-
-namespace Abc.Zebus.Persistence.Storage
-{
-    internal static class TransportMessageDeserializer
-    {
-        public static TransportMessage Deserialize(byte[] bytes)
-        {
-            var bufferReader = new ProtoBufferReader(bytes, bytes.Length);
-            return bufferReader.ReadTransportMessage();
-        }
-    }
-}

+ 4 - 3
src/Abc.Zebus.Persistence/Storage/TransportMessageSerializer.cs

@@ -7,7 +7,7 @@ namespace Abc.Zebus.Persistence.Storage
     /// <summary>
     /// Stateful, non thread-safe serializer.
     /// </summary>
-    internal class TransportMessageSerializer
+    public class TransportMessageSerializer
     {
         private readonly int _maximumCapacity;
         private ProtoBufferWriter _bufferWriter;
@@ -23,8 +23,7 @@ namespace Abc.Zebus.Persistence.Storage
             _bufferWriter.Reset();
             _bufferWriter.WriteTransportMessage(transportMessage);
 
-            var bytes = new byte[_bufferWriter.Position];
-            Buffer.BlockCopy(_bufferWriter.Buffer, 0, bytes, 0, _bufferWriter.Position);
+            var bytes = _bufferWriter.Buffer.AsSpan(0, _bufferWriter.Position).ToArray();
 
             // prevent service from leaking after fat transport message serializations
             if (_bufferWriter.Position > _maximumCapacity)
@@ -32,5 +31,7 @@ namespace Abc.Zebus.Persistence.Storage
 
             return bytes;
         }
+
+
     }
 }