CqlMessageReaderTests.cs 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. using System;
  2. using System.IO;
  3. using System.Linq;
  4. using Abc.Zebus.Persistence.CQL.Data;
  5. using Abc.Zebus.Persistence.CQL.Storage;
  6. using Abc.Zebus.Persistence.CQL.Tests.Cql;
  7. using Abc.Zebus.Persistence.Messages;
  8. using Abc.Zebus.Persistence.Storage;
  9. using Abc.Zebus.Serialization;
  10. using Abc.Zebus.Testing.Comparison;
  11. using Abc.Zebus.Testing.Extensions;
  12. using Abc.Zebus.Transport;
  13. using NUnit.Framework;
  14. namespace Abc.Zebus.Persistence.CQL.Tests
  15. {
  16. public class CqlMessageReaderTests : CqlTestFixture<PersistenceCqlDataContext, ICqlPersistenceConfiguration>
  17. {
  18. public override void CreateSchema()
  19. {
  20. IgnoreWhenSet("TF_BUILD");
  21. IgnoreWhenSet("GITHUB_ACTIONS");
  22. base.CreateSchema();
  23. }
  24. private void IgnoreWhenSet(string environmentVariable)
  25. {
  26. var env = Environment.GetEnvironmentVariable(environmentVariable);
  27. if (!string.IsNullOrEmpty(env) && bool.TryParse(env, out var isSet) && isSet)
  28. Assert.Ignore("We need a cassandra node for this");
  29. }
  30. [Test]
  31. public void should_read_non_acked_messages_since_oldest()
  32. {
  33. var peerId = new PeerId("PeerId");
  34. var now = DateTime.UtcNow;
  35. var oldestNonAckedMessageTimestamp = now.AddHours(-2).AddMilliseconds(1);
  36. var transportMessages = new[]
  37. {
  38. CreateTransportMessage(peerId),
  39. CreateTransportMessage(peerId),
  40. CreateTransportMessage(peerId),
  41. };
  42. var reader = CreateReader(peerId, oldestNonAckedMessageTimestamp);
  43. // first bucket - all acked
  44. InsertPersistentMessage(peerId, now.AddHours(-2), x => x.IsAcked = true);
  45. InsertPersistentMessage(peerId, oldestNonAckedMessageTimestamp, UpdatePersistentMessageWithNonAckedTransportMessage(transportMessages[0]));
  46. InsertPersistentMessage(peerId, now.AddHours(-2).AddMilliseconds(2), x => x.IsAcked = true);
  47. // second bucket - with non acked
  48. InsertPersistentMessage(peerId, now.AddHours(-1), x => x.IsAcked = true);
  49. InsertPersistentMessage(peerId, now.AddHours(-1).AddMilliseconds(1), UpdatePersistentMessageWithNonAckedTransportMessage(transportMessages[1]));
  50. InsertPersistentMessage(peerId, now.AddHours(-1).AddMilliseconds(2), x => x.IsAcked = true);
  51. // third bucket - with non acked
  52. InsertPersistentMessage(peerId, now.AddMilliseconds(-3), x => x.IsAcked = true);
  53. InsertPersistentMessage(peerId, now.AddMilliseconds(-2), UpdatePersistentMessageWithNonAckedTransportMessage(transportMessages[2]));
  54. InsertPersistentMessage(peerId, now.AddMilliseconds(-1), x => x.IsAcked = true);
  55. var nonAckedMessages = reader.GetUnackedMessages().ToList();
  56. nonAckedMessages.Count.ShouldEqual(3);
  57. for (var i = 0; i < nonAckedMessages.Count; i++)
  58. {
  59. var transportMessage = TransportMessageDeserializer.Deserialize(nonAckedMessages[i]);
  60. transportMessage.DeepCompare(transportMessages[i]).ShouldBeTrue();
  61. }
  62. }
  63. private Action<PersistentMessage> UpdatePersistentMessageWithNonAckedTransportMessage(TransportMessage transportMessage)
  64. {
  65. return x =>
  66. {
  67. x.IsAcked = false;
  68. x.TransportMessage = Serializer.Serialize(transportMessage).ToArray();
  69. };
  70. }
  71. private TransportMessage CreateTransportMessage(PeerId peerId)
  72. {
  73. var bytes = new byte[128];
  74. new Random().NextBytes(bytes);
  75. return new TransportMessage(new MessageTypeId("Fake"), new MemoryStream(bytes), new Peer(peerId, string.Empty));
  76. }
  77. private CqlMessageReader CreateReader(PeerId peerId, DateTime oldestNonAckedMessage)
  78. {
  79. return new CqlMessageReader(DataContext, new PeerState(peerId, 0, oldestNonAckedMessage.Ticks));
  80. }
  81. private void InsertPersistentMessage(PeerId peerId, DateTime timestamp, Action<PersistentMessage> updateMessage = null)
  82. {
  83. var message = new PersistentMessage
  84. {
  85. PeerId = peerId.ToString(),
  86. BucketId = BucketIdHelper.GetBucketId(timestamp),
  87. IsAcked = true,
  88. UniqueTimestampInTicks = timestamp.Ticks,
  89. TransportMessage = new byte[0]
  90. };
  91. updateMessage?.Invoke(message);
  92. DataContext.PersistentMessages.Insert(message).Execute();
  93. }
  94. }
  95. }