|
@@ -4,7 +4,6 @@ using System.Linq;
|
|
|
using System.Threading.Tasks;
|
|
|
using Abc.Zebus.Persistence.CQL.Data;
|
|
|
using Abc.Zebus.Persistence.CQL.Storage;
|
|
|
-using Abc.Zebus.Persistence.CQL.Testing;
|
|
|
using Abc.Zebus.Persistence.CQL.Tests.Cql;
|
|
|
using Abc.Zebus.Persistence.Matching;
|
|
|
using Abc.Zebus.Persistence.Messages;
|
|
@@ -13,6 +12,7 @@ using Abc.Zebus.Testing;
|
|
|
using Abc.Zebus.Testing.Extensions;
|
|
|
using Abc.Zebus.Transport;
|
|
|
using Abc.Zebus.Util;
|
|
|
+using Cassandra.Data.Linq;
|
|
|
using Moq;
|
|
|
using NUnit.Framework;
|
|
|
using ProtoBuf;
|
|
@@ -22,7 +22,6 @@ namespace Abc.Zebus.Persistence.CQL.Tests
|
|
|
public class CqlStorageTests : CqlTestFixture<PersistenceCqlDataContext, ICqlPersistenceConfiguration>
|
|
|
{
|
|
|
private CqlStorage _storage;
|
|
|
- private FakePeerStateRepository _peerStateRepository;
|
|
|
private Mock<IPersistenceConfiguration> _configurationMock;
|
|
|
private Mock<IReporter> _reporterMock;
|
|
|
|
|
@@ -45,8 +44,7 @@ namespace Abc.Zebus.Persistence.CQL.Tests
|
|
|
{
|
|
|
_configurationMock = new Mock<IPersistenceConfiguration>();
|
|
|
_reporterMock = new Mock<IReporter>();
|
|
|
- _peerStateRepository = new FakePeerStateRepository();
|
|
|
- _storage = new CqlStorage(DataContext, _peerStateRepository, _configurationMock.Object, _reporterMock.Object);
|
|
|
+ _storage = new CqlStorage(DataContext, _configurationMock.Object, _reporterMock.Object);
|
|
|
_storage.Start();
|
|
|
}
|
|
|
|
|
@@ -57,45 +55,42 @@ namespace Abc.Zebus.Persistence.CQL.Tests
|
|
|
}
|
|
|
|
|
|
[Test]
|
|
|
- public void should_initialize_peer_state_repository_on_start()
|
|
|
+ public void should_initialize_peer_state_on_start()
|
|
|
{
|
|
|
- var peerStateRepository = new FakePeerStateRepository();
|
|
|
- var storage = new CqlStorage(DataContext, peerStateRepository, _configurationMock.Object, _reporterMock.Object);
|
|
|
- storage.Start();
|
|
|
-
|
|
|
- peerStateRepository.IsInitialized.ShouldBeTrue();
|
|
|
- }
|
|
|
+ DataContext.PeerStates.Insert(new CassandraPeerState(new PeerState(new PeerId("New")))).Execute();
|
|
|
|
|
|
- [Test]
|
|
|
- public void should_save_peer_state_repository_on_stop()
|
|
|
- {
|
|
|
- var peerStateRepository = new FakePeerStateRepository();
|
|
|
- var storage = new CqlStorage(DataContext, peerStateRepository, _configurationMock.Object, _reporterMock.Object);
|
|
|
+ var storage = new CqlStorage(DataContext, _configurationMock.Object, _reporterMock.Object);
|
|
|
storage.Start();
|
|
|
- storage.Stop();
|
|
|
|
|
|
- peerStateRepository.IsInitialized.ShouldBeTrue();
|
|
|
- peerStateRepository.HasBeenSaved.ShouldBeTrue();
|
|
|
+ storage.GetAllKnownPeers().Count().ShouldEqual(1);
|
|
|
}
|
|
|
|
|
|
[Test]
|
|
|
public async Task should_write_message_entry_fields_to_cassandra()
|
|
|
{
|
|
|
- var messageBytes = new byte[512];
|
|
|
- new Random().NextBytes(messageBytes);
|
|
|
- var messageId = MessageId.NextId();
|
|
|
- var peerId = "Abc.Peer.0";
|
|
|
-
|
|
|
- await _storage.Write(new List<MatcherEntry> { MatcherEntry.Message(new PeerId(peerId), messageId, MessageTypeId.PersistenceStopping, messageBytes) });
|
|
|
-
|
|
|
- var retrievedMessage = DataContext.PersistentMessages.Execute().ExpectedSingle();
|
|
|
- retrievedMessage.TransportMessage.ShouldBeEquivalentTo(messageBytes, true);
|
|
|
- retrievedMessage.BucketId.ShouldEqual(GetBucketIdFromMessageId(messageId));
|
|
|
- retrievedMessage.IsAcked.ShouldBeFalse();
|
|
|
- retrievedMessage.PeerId.ShouldEqual(peerId);
|
|
|
- retrievedMessage.UniqueTimestampInTicks.ShouldEqual(messageId.GetDateTime().Ticks);
|
|
|
- var writeTimeRow = DataContext.Session.Execute("SELECT WRITETIME(\"IsAcked\") FROM \"PersistentMessage\";").ExpectedSingle();
|
|
|
- writeTimeRow.GetValue<long>(0).ShouldEqual(ToUnixMicroSeconds(messageId.GetDateTime()));
|
|
|
+ using (SystemDateTime.PauseTime())
|
|
|
+ {
|
|
|
+ var messageBytes = new byte[512];
|
|
|
+ new Random().NextBytes(messageBytes);
|
|
|
+ var messageId = MessageId.NextId();
|
|
|
+ var peerId = "Abc.Peer.0";
|
|
|
+
|
|
|
+ await _storage.Write(new List<MatcherEntry> { MatcherEntry.Message(new PeerId(peerId), messageId, MessageTypeId.PersistenceStopping, messageBytes) });
|
|
|
+
|
|
|
+ var retrievedMessage = DataContext.PersistentMessages.Execute().ExpectedSingle();
|
|
|
+ retrievedMessage.TransportMessage.ShouldBeEquivalentTo(messageBytes, true);
|
|
|
+ retrievedMessage.BucketId.ShouldEqual(GetBucketIdFromMessageId(messageId));
|
|
|
+ retrievedMessage.IsAcked.ShouldBeFalse();
|
|
|
+ retrievedMessage.PeerId.ShouldEqual(peerId);
|
|
|
+ retrievedMessage.UniqueTimestampInTicks.ShouldEqual(messageId.GetDateTime().Ticks);
|
|
|
+ var writeTimeRow = DataContext.Session.Execute("SELECT WRITETIME(\"IsAcked\") FROM \"PersistentMessage\";").ExpectedSingle();
|
|
|
+ writeTimeRow.GetValue<long>(0).ShouldEqual(ToUnixMicroSeconds(messageId.GetDateTime()));
|
|
|
+
|
|
|
+ var peerState = DataContext.PeerStates.Execute().ExpectedSingle();
|
|
|
+ peerState.NonAckedMessageCount.ShouldEqual(1);
|
|
|
+ peerState.PeerId.ShouldEqual(peerId);
|
|
|
+ peerState.OldestNonAckedMessageTimestamp.ShouldEqual(messageId.GetDateTime().Ticks - CqlStorage.PersistentMessagesTimeToLive.Ticks);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
[Test]
|
|
@@ -138,14 +133,6 @@ namespace Abc.Zebus.Persistence.CQL.Tests
|
|
|
retrievedMessages.Count.ShouldEqual(2);
|
|
|
}
|
|
|
|
|
|
- private static long ToUnixMicroSeconds(DateTime timestamp)
|
|
|
- {
|
|
|
- var origin = new DateTime(1970, 1, 1, 0, 0, 0, 0);
|
|
|
- var diff = timestamp - origin;
|
|
|
- var diffInMicroSeconds = diff.Ticks / 10;
|
|
|
- return diffInMicroSeconds;
|
|
|
- }
|
|
|
-
|
|
|
[Test]
|
|
|
public async Task should_write_ack_entry_fields_to_cassandra()
|
|
|
{
|
|
@@ -225,41 +212,42 @@ namespace Abc.Zebus.Persistence.CQL.Tests
|
|
|
}
|
|
|
|
|
|
[Test]
|
|
|
- public void should_call_peer_state_repository_when_asked_to_remove_peer()
|
|
|
+ public async Task should_remove_from_cassandra_when_asked_to_remove_peer()
|
|
|
{
|
|
|
var peerId = new PeerId("PeerId");
|
|
|
- _peerStateRepository.Add(new PeerState(peerId));
|
|
|
- var peerState =_peerStateRepository[peerId];
|
|
|
+ await _storage.Write(new List<MatcherEntry> { MatcherEntry.Message(peerId, MessageId.NextId(), MessageTypeId.PersistenceStopping, new byte[0]) });
|
|
|
|
|
|
- _storage.RemovePeer(peerId);
|
|
|
-
|
|
|
- peerState.Removed.ShouldBeTrue();
|
|
|
- _peerStateRepository.GetPeerStateFor(peerId).ShouldBeNull();
|
|
|
+ await _storage.RemovePeer(peerId);
|
|
|
+
|
|
|
+ DataContext.PeerStates.Execute().ShouldBeEmpty();
|
|
|
}
|
|
|
|
|
|
[Test]
|
|
|
- public void should_return_cql_message_reader()
|
|
|
+ public async Task should_delete_all_buckets_for_peer_when_removed()
|
|
|
{
|
|
|
var peerId = new PeerId("PeerId");
|
|
|
- _peerStateRepository.Add(new PeerState(peerId));
|
|
|
+ await _storage.Write(new List<MatcherEntry> { MatcherEntry.Message(peerId, MessageId.NextId(), MessageTypeId.PersistenceStopping, new byte[0]) });
|
|
|
|
|
|
- _storage.CreateMessageReader(peerId).ShouldNotBeNull();
|
|
|
+ DataContext.PersistentMessages.Execute().Count().ShouldEqual(1);
|
|
|
+
|
|
|
+ await _storage.RemovePeer(new PeerId("PeerId"));
|
|
|
+
|
|
|
+ DataContext.PersistentMessages.Execute().Any().ShouldBeFalse();
|
|
|
}
|
|
|
|
|
|
[Test]
|
|
|
- public void should_return_null_when_asked_for_a_message_reader_for_an_unknown_peer_id()
|
|
|
+ public async Task should_return_cql_message_reader()
|
|
|
{
|
|
|
- _storage.CreateMessageReader(new PeerId("UnknownPeerId")).ShouldBeNull();
|
|
|
- }
|
|
|
+ var peerId = new PeerId("PeerId");
|
|
|
+ await _storage.Write(new List<MatcherEntry> { MatcherEntry.Message(peerId, MessageId.NextId(), MessageTypeId.PersistenceStopping, new byte[0]) });
|
|
|
|
|
|
- private static long GetBucketIdFromMessageId(MessageId message)
|
|
|
- {
|
|
|
- return GetBucketIdFromDateTime(message.GetDateTime());
|
|
|
+ _storage.CreateMessageReader(peerId).ShouldNotBeNull();
|
|
|
}
|
|
|
|
|
|
- private static long GetBucketIdFromDateTime(DateTime timestamp)
|
|
|
+ [Test]
|
|
|
+ public void should_return_null_when_asked_for_a_message_reader_for_an_unknown_peer_id()
|
|
|
{
|
|
|
- return new DateTime(timestamp.Year, timestamp.Month, timestamp.Day, timestamp.Hour, 0, 0).Ticks;
|
|
|
+ _storage.CreateMessageReader(new PeerId("UnknownPeerId")).ShouldBeNull();
|
|
|
}
|
|
|
|
|
|
[Test]
|
|
@@ -306,24 +294,26 @@ namespace Abc.Zebus.Persistence.CQL.Tests
|
|
|
});
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
[Test]
|
|
|
- public void should_update_non_ack_message_count()
|
|
|
+ public async Task should_update_non_ack_message_count()
|
|
|
{
|
|
|
var firstPeer = new PeerId("Abc.Testing.Target");
|
|
|
var secondPeer = new PeerId("Abc.Testing.OtherTarget");
|
|
|
-
|
|
|
- _storage.Write(new[] { MatcherEntry.Message(firstPeer, MessageId.NextId(), new MessageTypeId("Abc.Message"), new byte[] { 0x01, 0x02, 0x03 }) });
|
|
|
- _storage.Write(new[] { MatcherEntry.Message(secondPeer, MessageId.NextId(), new MessageTypeId("Abc.Message"), new byte[] { 0x04, 0x05, 0x06 }) });
|
|
|
- _storage.Write(new[] { MatcherEntry.Message(firstPeer, MessageId.NextId(), new MessageTypeId("Abc.Message"), new byte[] { 0x07, 0x08, 0x09 }) });
|
|
|
|
|
|
- _peerStateRepository[firstPeer].NonAckedMessageCount.ShouldEqual(2);
|
|
|
- _peerStateRepository[secondPeer].NonAckedMessageCount.ShouldEqual(1);
|
|
|
+ await _storage.Write(new[] { MatcherEntry.Message(firstPeer, MessageId.NextId(), new MessageTypeId("Abc.Message"), new byte[] { 0x01, 0x02, 0x03 }) });
|
|
|
+ await _storage.Write(new[] { MatcherEntry.Message(secondPeer, MessageId.NextId(), new MessageTypeId("Abc.Message"), new byte[] { 0x04, 0x05, 0x06 }) });
|
|
|
+ await _storage.Write(new[] { MatcherEntry.Message(firstPeer, MessageId.NextId(), new MessageTypeId("Abc.Message"), new byte[] { 0x07, 0x08, 0x09 }) });
|
|
|
+
|
|
|
+ var nonAckedMessageCountsForUpdatedPeers = _storage.GetNonAckedMessageCounts();
|
|
|
+ nonAckedMessageCountsForUpdatedPeers[firstPeer].ShouldEqual(2);
|
|
|
+ nonAckedMessageCountsForUpdatedPeers[secondPeer].ShouldEqual(1);
|
|
|
|
|
|
- _storage.Write(new[] { MatcherEntry.Ack(firstPeer, MessageId.NextId()) });
|
|
|
+ await _storage.Write(new[] { MatcherEntry.Ack(firstPeer, MessageId.NextId()) });
|
|
|
|
|
|
- _peerStateRepository[firstPeer].NonAckedMessageCount.ShouldEqual(1);
|
|
|
- _peerStateRepository[secondPeer].NonAckedMessageCount.ShouldEqual(1);
|
|
|
+ nonAckedMessageCountsForUpdatedPeers = _storage.GetNonAckedMessageCounts();
|
|
|
+ nonAckedMessageCountsForUpdatedPeers[firstPeer].ShouldEqual(1);
|
|
|
+ nonAckedMessageCountsForUpdatedPeers[secondPeer].ShouldEqual(1);
|
|
|
}
|
|
|
|
|
|
[Test]
|
|
@@ -331,14 +321,12 @@ namespace Abc.Zebus.Persistence.CQL.Tests
|
|
|
{
|
|
|
var firstPeer = new PeerId("Abc.Testing.Target");
|
|
|
var secondPeer = new PeerId("Abc.Testing.OtherTarget");
|
|
|
- _peerStateRepository.Add(new PeerState(firstPeer, 0, SystemDateTime.UtcNow.Date.Ticks));
|
|
|
- _peerStateRepository.Add(new PeerState(secondPeer, 0, SystemDateTime.UtcNow.Date.Ticks));
|
|
|
|
|
|
using (MessageId.PauseIdGeneration())
|
|
|
using (SystemDateTime.PauseTime())
|
|
|
{
|
|
|
- var expectedTransportMessages = Enumerable.Range(1, 100).Select(CreateTestTransportMessage).ToList();
|
|
|
- var messages = expectedTransportMessages.SelectMany(x =>
|
|
|
+ var transportMessages = Enumerable.Range(1, 100).Select(CreateTestTransportMessage).ToList();
|
|
|
+ var messages = transportMessages.SelectMany(x =>
|
|
|
{
|
|
|
var transportMessageBytes = Serialization.Serializer.Serialize(x).ToArray();
|
|
|
return new[]
|
|
@@ -351,10 +339,12 @@ namespace Abc.Zebus.Persistence.CQL.Tests
|
|
|
|
|
|
await _storage.Write(messages);
|
|
|
|
|
|
- _peerStateRepository[firstPeer].NonAckedMessageCount.ShouldEqual(100);
|
|
|
- _peerStateRepository[secondPeer].NonAckedMessageCount.ShouldEqual(100);
|
|
|
+ var nonAckedMessageCountsForUpdatedPeers = _storage.GetNonAckedMessageCounts();
|
|
|
+ nonAckedMessageCountsForUpdatedPeers[firstPeer].ShouldEqual(100);
|
|
|
+ nonAckedMessageCountsForUpdatedPeers[secondPeer].ShouldEqual(100);
|
|
|
|
|
|
var readerForFirstPeer = (CqlMessageReader)_storage.CreateMessageReader(firstPeer);
|
|
|
+ var expectedTransportMessages = transportMessages.Select(Serialization.Serializer.Serialize).Select(x => x.ToArray()).ToList();
|
|
|
readerForFirstPeer.GetUnackedMessages().ToList().ShouldEqualDeeply(expectedTransportMessages);
|
|
|
|
|
|
var readerForSecondPeer = (CqlMessageReader)_storage.CreateMessageReader(secondPeer);
|
|
@@ -376,12 +366,171 @@ namespace Abc.Zebus.Persistence.CQL.Tests
|
|
|
_reporterMock.Verify(r => r.AddStorageReport(2, 7, 4, "Abc.Message.Fat"));
|
|
|
}
|
|
|
|
|
|
+ [Test]
|
|
|
+ public async Task should_update_oldest_non_acked_message_timestamp()
|
|
|
+ {
|
|
|
+ using (SystemDateTime.PauseTime())
|
|
|
+ {
|
|
|
+ var peerId = new PeerId("PeerId");
|
|
|
+ var now = SystemDateTime.UtcNow;
|
|
|
+ InsertPersistentMessage(peerId, now.AddMilliseconds(1), x => x.IsAcked = true);
|
|
|
+ InsertPersistentMessage(peerId, now.AddMilliseconds(2), x => x.IsAcked = false);
|
|
|
+ InsertPersistentMessage(peerId, now.AddMilliseconds(3), x => x.IsAcked = false);
|
|
|
+ InsertPersistentMessage(peerId, now.AddMilliseconds(4), x => x.IsAcked = true);
|
|
|
+ InsertPersistentMessage(peerId, now.AddMilliseconds(5), x => x.IsAcked = false);
|
|
|
+ var peerState = new PeerState(peerId, 0, now.AddMilliseconds(1).Ticks);
|
|
|
+ InsertPeerState(peerState);
|
|
|
+
|
|
|
+ await _storage.CleanBuckets(peerState);
|
|
|
+
|
|
|
+ GetPeerState(peerId).OldestNonAckedMessageTimestampInTicks.ShouldEqual(now.AddMilliseconds(2).Ticks);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ [Test]
|
|
|
+ public async Task should_not_update_oldest_non_acked_message_timestamp_if_it_did_not_change()
|
|
|
+ {
|
|
|
+ var peerId = new PeerId("PeerId");
|
|
|
+ var now = SystemDateTime.UtcNow;
|
|
|
+ InsertPersistentMessage(peerId, now.AddMilliseconds(1), x => x.IsAcked = true);
|
|
|
+ InsertPersistentMessage(peerId, now.AddMilliseconds(2), x => x.IsAcked = false);
|
|
|
+ InsertPersistentMessage(peerId, now.AddMilliseconds(3), x => x.IsAcked = false);
|
|
|
+ InsertPersistentMessage(peerId, now.AddMilliseconds(4), x => x.IsAcked = true);
|
|
|
+ InsertPersistentMessage(peerId, now.AddMilliseconds(5), x => x.IsAcked = false);
|
|
|
+ var peerState = new PeerState(peerId, 0, now.AddMilliseconds(2).Ticks);
|
|
|
+ InsertPeerState(peerState);
|
|
|
+
|
|
|
+ await _storage.CleanBuckets(peerState);
|
|
|
+
|
|
|
+ GetPeerState(peerId).OldestNonAckedMessageTimestampInTicks.ShouldEqual(now.AddMilliseconds(2).Ticks);
|
|
|
+ }
|
|
|
+
|
|
|
+ [Test]
|
|
|
+ public async Task should_take_last_message_timestamp_plus_one_tick_as_oldest_non_acked_message_when_all_messages_are_acked()
|
|
|
+ {
|
|
|
+ var peerId = new PeerId("PeerId");
|
|
|
+ var now = SystemDateTime.UtcNow;
|
|
|
+ InsertPersistentMessage(peerId, now.AddMilliseconds(1), x => x.IsAcked = true);
|
|
|
+ InsertPersistentMessage(peerId, now.AddMilliseconds(2), x => x.IsAcked = true);
|
|
|
+ InsertPersistentMessage(peerId, now.AddMilliseconds(3), x => x.IsAcked = true);
|
|
|
+ InsertPersistentMessage(peerId, now.AddMilliseconds(4), x => x.IsAcked = true);
|
|
|
+ InsertPersistentMessage(peerId, now.AddMilliseconds(5), x => x.IsAcked = true);
|
|
|
+ var peerState = new PeerState(peerId, 0, now.AddMilliseconds(2).Ticks);
|
|
|
+ InsertPeerState(peerState);
|
|
|
+
|
|
|
+ await _storage.CleanBuckets(peerState);
|
|
|
+
|
|
|
+ GetPeerState(peerId).OldestNonAckedMessageTimestampInTicks.ShouldEqual(now.AddMilliseconds(5).Ticks + 1);
|
|
|
+ }
|
|
|
+
|
|
|
+ [Test]
|
|
|
+ public async Task should_take_utc_now_timestamp_as_oldest_non_acked_message_when_no_messages_are_acked()
|
|
|
+ {
|
|
|
+ using (SystemDateTime.PauseTime())
|
|
|
+ {
|
|
|
+ var peerId = new PeerId("PeerId");
|
|
|
+ var now = SystemDateTime.UtcNow;
|
|
|
+ var peerState = new PeerState(peerId, 0, now.AddMilliseconds(2).Ticks);
|
|
|
+ InsertPeerState(peerState);
|
|
|
+
|
|
|
+ await _storage.CleanBuckets(peerState);
|
|
|
+
|
|
|
+ GetPeerState(peerId).OldestNonAckedMessageTimestampInTicks.ShouldEqual(now.Ticks);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ [Test]
|
|
|
+ public async Task should_delete_buckets_when_all_messages_are_acked_in_it()
|
|
|
+ {
|
|
|
+ var peerId = new PeerId("PeerId");
|
|
|
+ var now = DateTime.UtcNow.Date;
|
|
|
+ var peerState = new PeerState(peerId, 0, now.AddHours(-5).Ticks);
|
|
|
+ InsertPeerState(peerState);
|
|
|
+
|
|
|
+ // first bucket - 3 hours ago - all acked
|
|
|
+ InsertPersistentMessage(peerId, now.AddHours(-3), x => x.IsAcked = true);
|
|
|
+ InsertPersistentMessage(peerId, now.AddHours(-3).AddMinutes(1), x => x.IsAcked = true);
|
|
|
+ InsertPersistentMessage(peerId, now.AddHours(-3).AddMinutes(2), x => x.IsAcked = true);
|
|
|
+ // second bucket - 2 hours ago - all acked
|
|
|
+ InsertPersistentMessage(peerId, now.AddHours(-2), x => x.IsAcked = true);
|
|
|
+ InsertPersistentMessage(peerId, now.AddHours(-2).AddMinutes(1), x => x.IsAcked = true);
|
|
|
+ InsertPersistentMessage(peerId, now.AddHours(-2).AddMinutes(2), x => x.IsAcked = true);
|
|
|
+ // third bucket - 1 hours ago - with non acked
|
|
|
+ InsertPersistentMessage(peerId, now.AddHours(-1), x => x.IsAcked = true);
|
|
|
+ InsertPersistentMessage(peerId, now.AddHours(-1).AddMinutes(1), x => x.IsAcked = false); // <--- non acked !
|
|
|
+ InsertPersistentMessage(peerId, now.AddHours(-1).AddMinutes(2), x => x.IsAcked = true);
|
|
|
+ // current bucket - all acked
|
|
|
+ InsertPersistentMessage(peerId, now, x => x.IsAcked = true);
|
|
|
+
|
|
|
+ var messages = DataContext.PersistentMessages.Execute().ToList();
|
|
|
+ messages.Count.ShouldEqual(10);
|
|
|
+
|
|
|
+ await _storage.CleanBuckets(peerState);
|
|
|
+
|
|
|
+ Wait.Until(() => DataContext.PersistentMessages.Execute().Count() == 4, 2.Seconds());
|
|
|
+
|
|
|
+ GetPeerState(peerId).OldestNonAckedMessageTimestampInTicks.ShouldEqual(now.AddHours(-1).AddMinutes(1).Ticks);
|
|
|
+ var persistentMessagesFromDatabase = DataContext.PersistentMessages.Execute().ToList();
|
|
|
+ var storedMessages = persistentMessagesFromDatabase.Select(x => new { x.UniqueTimestampInTicks, x.IsAcked }).ToList();
|
|
|
+ storedMessages.ShouldBeEquivalentTo(new[]
|
|
|
+ {
|
|
|
+ new { UniqueTimestampInTicks = now.AddHours(-1).Ticks, IsAcked = true },
|
|
|
+ new { UniqueTimestampInTicks = now.AddHours(-1).AddMinutes(1).Ticks, IsAcked = false },
|
|
|
+ new { UniqueTimestampInTicks = now.AddHours(-1).AddMinutes(2).Ticks, IsAcked = true },
|
|
|
+ new { UniqueTimestampInTicks = now.Ticks, IsAcked = true },
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ private static long GetBucketIdFromMessageId(MessageId message)
|
|
|
+ {
|
|
|
+ return GetBucketIdFromDateTime(message.GetDateTime());
|
|
|
+ }
|
|
|
+
|
|
|
+ private static long GetBucketIdFromDateTime(DateTime timestamp)
|
|
|
+ {
|
|
|
+ return new DateTime(timestamp.Year, timestamp.Month, timestamp.Day, timestamp.Hour, 0, 0).Ticks;
|
|
|
+ }
|
|
|
+
|
|
|
+ private void InsertPersistentMessage(PeerId peerId, DateTime timestamp, Action<PersistentMessage> updateMessage = null)
|
|
|
+ {
|
|
|
+ var message = new PersistentMessage
|
|
|
+ {
|
|
|
+ PeerId = peerId.ToString(),
|
|
|
+ BucketId = BucketIdHelper.GetBucketId(timestamp),
|
|
|
+ IsAcked = true,
|
|
|
+ UniqueTimestampInTicks = timestamp.Ticks,
|
|
|
+ TransportMessage = new byte[0]
|
|
|
+ };
|
|
|
+ updateMessage?.Invoke(message);
|
|
|
+ DataContext.PersistentMessages.Insert(message).Execute();
|
|
|
+ }
|
|
|
+
|
|
|
+ private void InsertPeerState(PeerState peerState)
|
|
|
+ {
|
|
|
+ DataContext.PeerStates.Insert(new CassandraPeerState(peerState));
|
|
|
+ }
|
|
|
+
|
|
|
+ private PeerState GetPeerState(in PeerId peerId)
|
|
|
+ {
|
|
|
+ var peerString = peerId.ToString();
|
|
|
+ var state = DataContext.PeerStates.Where(x => x.PeerId == peerString).Execute().Single();
|
|
|
+ return new PeerState(new PeerId(state.PeerId), state.NonAckedMessageCount, state.OldestNonAckedMessageTimestamp);
|
|
|
+ }
|
|
|
+
|
|
|
private static TransportMessage CreateTestTransportMessage(int i)
|
|
|
{
|
|
|
MessageId.PauseIdGenerationAtDate(SystemDateTime.UtcNow.Date.AddSeconds(i * 10));
|
|
|
return new Message1(i).ToTransportMessage();
|
|
|
}
|
|
|
|
|
|
+ private static long ToUnixMicroSeconds(DateTime timestamp)
|
|
|
+ {
|
|
|
+ var origin = new DateTime(1970, 1, 1, 0, 0, 0, 0);
|
|
|
+ var diff = timestamp - origin;
|
|
|
+ var diffInMicroSeconds = diff.Ticks / 10;
|
|
|
+ return diffInMicroSeconds;
|
|
|
+ }
|
|
|
+
|
|
|
[ProtoContract]
|
|
|
private class Message1 : IEvent
|
|
|
{
|