CqlMessageReader.cs 3.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using Abc.Zebus.Persistence.Cassandra.Data;
  5. using Abc.Zebus.Persistence.Messages;
  6. using Abc.Zebus.Persistence.Storage;
  7. using Cassandra;
  8. using Cassandra.Data.Linq;
  9. using log4net;
  10. namespace Abc.Zebus.Persistence.Cassandra.Cql
  11. {
  12. public class CqlMessageReader : IMessageReader
  13. {
  14. private static readonly ILog _log = LogManager.GetLogger(typeof(CqlMessageReader));
  15. private readonly PersistenceCqlDataContext _dataContext;
  16. private readonly PeerState _peerState;
  17. private readonly PreparedStatement _preparedStatement;
  18. public CqlMessageReader(PersistenceCqlDataContext dataContext, PeerState peerState)
  19. {
  20. _dataContext = dataContext;
  21. _peerState = peerState;
  22. _preparedStatement = _dataContext.Session.Prepare(_dataContext.PersistentMessages
  23. .Where(x => x.PeerId == _peerState.PeerId.ToString()
  24. && x.BucketId == 0
  25. && x.UniqueTimestampInTicks >= 0)
  26. .Select(x => new { x.IsAcked, x.TransportMessage })
  27. .ToString());
  28. }
  29. public IEnumerable<byte[]> GetUnackedMessages()
  30. {
  31. var oldestNonAckedMessageTimestampInTicks = _peerState.OldestNonAckedMessageTimestampInTicks;
  32. _log.Info($"Reading messages for peer {_peerState.PeerId} from {oldestNonAckedMessageTimestampInTicks} ({new DateTime(oldestNonAckedMessageTimestampInTicks).ToLongTimeString()})");
  33. var nonAckedMessagesInBuckets = BucketIdHelper.GetBucketsCollection(oldestNonAckedMessageTimestampInTicks)
  34. .Select(b => GetNonAckedMessagesInBucket(oldestNonAckedMessageTimestampInTicks, b));
  35. var nonAckedMessageRead = 0;
  36. foreach (var nonAckedMessagesInBucket in nonAckedMessagesInBuckets)
  37. {
  38. foreach (var nonAckedMessage in nonAckedMessagesInBucket)
  39. {
  40. nonAckedMessageRead++;
  41. yield return nonAckedMessage;
  42. }
  43. }
  44. _log.Info($"{nonAckedMessageRead} non acked messages replayed for peer {_peerState.PeerId}");
  45. }
  46. private IEnumerable<byte[]> GetNonAckedMessagesInBucket(long oldestNonAckedMessageTimestampInTicks, long bucketId)
  47. {
  48. return _dataContext.Session
  49. .Execute(_preparedStatement.Bind(_peerState.PeerId.ToString(), bucketId, oldestNonAckedMessageTimestampInTicks).SetPageSize(10 * 1000))
  50. .Where(x => !x.GetValue<bool>("IsAcked"))
  51. .Select(row => row.GetValue<byte[]>("TransportMessage"));
  52. }
  53. public void Dispose()
  54. {
  55. _log.Info($"Reader for peer {_peerState.PeerId} disposed");
  56. }
  57. }
  58. }