OldestNonAckedMessageUpdaterPeriodicAction.cs 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Threading.Tasks;
  5. using Abc.Zebus.Hosting;
  6. using Abc.Zebus.Persistence.CQL.Storage;
  7. using Abc.Zebus.Persistence.Storage;
  8. using Abc.Zebus.Util;
  9. namespace Abc.Zebus.Persistence.CQL.PeriodicAction
  10. {
  11. public class OldestNonAckedMessageUpdaterPeriodicAction : PeriodicActionHostInitializer
  12. {
  13. private readonly ICqlPersistenceConfiguration _configuration;
  14. private readonly ICqlStorage _cqlStorage;
  15. private DateTime _lastGlobalCheck;
  16. private readonly NonAckedCountCache _nonAckedCountCache = new NonAckedCountCache();
  17. public OldestNonAckedMessageUpdaterPeriodicAction(IBus bus, ICqlPersistenceConfiguration configuration, ICqlStorage cqlStorage)
  18. : base(bus, configuration.OldestMessagePerPeerCheckPeriod)
  19. {
  20. _configuration = configuration;
  21. _cqlStorage = cqlStorage;
  22. }
  23. public override void DoPeriodicAction()
  24. {
  25. var isGlobalCheck = SystemDateTime.UtcNow >= _lastGlobalCheck.Add(_configuration.OldestMessagePerPeerGlobalCheckPeriod);
  26. var allPeersDictionary = _cqlStorage.GetAllKnownPeers().ToDictionary(state => state.PeerId);
  27. IEnumerable<PeerState> peersToCheck = allPeersDictionary.Values;
  28. var updatedPeers = _nonAckedCountCache.GetForUpdatedPeers(peersToCheck.Select(x => (x.PeerId, x.NonAckedMessageCount)).ToList());
  29. if (isGlobalCheck)
  30. {
  31. _lastGlobalCheck = SystemDateTime.UtcNow;
  32. }
  33. else
  34. {
  35. peersToCheck = updatedPeers.Select(x => allPeersDictionary[x.PeerId]);
  36. }
  37. Parallel.ForEach(peersToCheck, new ParallelOptions { MaxDegreeOfParallelism = 10 }, UpdateOldestNonAckedMessage);
  38. }
  39. private void UpdateOldestNonAckedMessage(PeerState peer)
  40. {
  41. if (peer.Removed)
  42. return;
  43. _cqlStorage.CleanBuckets(peer)
  44. .Wait(_configuration.OldestMessagePerPeerCheckPeriod);
  45. }
  46. }
  47. }