| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104 |
- using System;
- using System.IO;
- using System.Linq;
- using Abc.Zebus.Persistence.CQL.Data;
- using Abc.Zebus.Persistence.CQL.Storage;
- using Abc.Zebus.Persistence.CQL.Tests.Cql;
- using Abc.Zebus.Persistence.Messages;
- using Abc.Zebus.Persistence.Storage;
- using Abc.Zebus.Serialization;
- using Abc.Zebus.Testing.Comparison;
- using Abc.Zebus.Testing.Extensions;
- using Abc.Zebus.Transport;
- using NUnit.Framework;
- namespace Abc.Zebus.Persistence.CQL.Tests
- {
- public class CqlMessageReaderTests : CqlTestFixture<PersistenceCqlDataContext, ICqlPersistenceConfiguration>
- {
- public override void CreateSchema()
- {
- IgnoreWhenSet("TF_BUILD");
- IgnoreWhenSet("GITHUB_ACTIONS");
- base.CreateSchema();
- }
- private void IgnoreWhenSet(string environmentVariable)
- {
- var env = Environment.GetEnvironmentVariable(environmentVariable);
- if (!string.IsNullOrEmpty(env) && bool.TryParse(env, out var isSet) && isSet)
- Assert.Ignore("We need a cassandra node for this");
- }
- [Test]
- public void should_read_non_acked_messages_since_oldest()
- {
- var peerId = new PeerId("PeerId");
- var now = DateTime.UtcNow;
- var oldestNonAckedMessageTimestamp = now.AddHours(-2).AddMilliseconds(1);
- var transportMessages = new[]
- {
- CreateTransportMessage(peerId),
- CreateTransportMessage(peerId),
- CreateTransportMessage(peerId),
- };
- var reader = CreateReader(peerId, oldestNonAckedMessageTimestamp);
- // first bucket - all acked
- InsertPersistentMessage(peerId, now.AddHours(-2), x => x.IsAcked = true);
- InsertPersistentMessage(peerId, oldestNonAckedMessageTimestamp, UpdatePersistentMessageWithNonAckedTransportMessage(transportMessages[0]));
- InsertPersistentMessage(peerId, now.AddHours(-2).AddMilliseconds(2), x => x.IsAcked = true);
- // second bucket - with non acked
- InsertPersistentMessage(peerId, now.AddHours(-1), x => x.IsAcked = true);
- InsertPersistentMessage(peerId, now.AddHours(-1).AddMilliseconds(1), UpdatePersistentMessageWithNonAckedTransportMessage(transportMessages[1]));
- InsertPersistentMessage(peerId, now.AddHours(-1).AddMilliseconds(2), x => x.IsAcked = true);
- // third bucket - with non acked
- InsertPersistentMessage(peerId, now.AddMilliseconds(-3), x => x.IsAcked = true);
- InsertPersistentMessage(peerId, now.AddMilliseconds(-2), UpdatePersistentMessageWithNonAckedTransportMessage(transportMessages[2]));
- InsertPersistentMessage(peerId, now.AddMilliseconds(-1), x => x.IsAcked = true);
- var nonAckedMessages = reader.GetUnackedMessages().ToList();
- nonAckedMessages.Count.ShouldEqual(3);
- for (var i = 0; i < nonAckedMessages.Count; i++)
- {
- var transportMessage = TransportMessageDeserializer.Deserialize(nonAckedMessages[i]);
- transportMessage.DeepCompare(transportMessages[i]).ShouldBeTrue();
- }
- }
- private Action<PersistentMessage> UpdatePersistentMessageWithNonAckedTransportMessage(TransportMessage transportMessage)
- {
- return x =>
- {
- x.IsAcked = false;
- x.TransportMessage = Serializer.Serialize(transportMessage).ToArray();
- };
- }
- private TransportMessage CreateTransportMessage(PeerId peerId)
- {
- var bytes = new byte[128];
- new Random().NextBytes(bytes);
- return new TransportMessage(new MessageTypeId("Fake"), new MemoryStream(bytes), new Peer(peerId, string.Empty));
- }
- private CqlMessageReader CreateReader(PeerId peerId, DateTime oldestNonAckedMessage)
- {
- return new CqlMessageReader(DataContext, new PeerState(peerId, 0, oldestNonAckedMessage.Ticks));
- }
- private void InsertPersistentMessage(PeerId peerId, DateTime timestamp, Action<PersistentMessage> updateMessage = null)
- {
- var message = new PersistentMessage
- {
- PeerId = peerId.ToString(),
- BucketId = BucketIdHelper.GetBucketId(timestamp),
- IsAcked = true,
- UniqueTimestampInTicks = timestamp.Ticks,
- TransportMessage = new byte[0]
- };
- updateMessage?.Invoke(message);
- DataContext.PersistentMessages.Insert(message).Execute();
- }
- }
- }
|