| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253 |
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Threading.Tasks;
- using Abc.Zebus.Hosting;
- using Abc.Zebus.Persistence.CQL.Storage;
- using Abc.Zebus.Persistence.Storage;
- using Abc.Zebus.Util;
- namespace Abc.Zebus.Persistence.CQL.PeriodicAction
- {
- public class OldestNonAckedMessageUpdaterPeriodicAction : PeriodicActionHostInitializer
- {
- private readonly ICqlPersistenceConfiguration _configuration;
- private readonly ICqlStorage _cqlStorage;
- private DateTime _lastGlobalCheck;
- private readonly NonAckedCountCache _nonAckedCountCache = new NonAckedCountCache();
- public OldestNonAckedMessageUpdaterPeriodicAction(IBus bus, ICqlPersistenceConfiguration configuration, ICqlStorage cqlStorage)
- : base(bus, configuration.OldestMessagePerPeerCheckPeriod)
- {
- _configuration = configuration;
- _cqlStorage = cqlStorage;
- }
- public override void DoPeriodicAction()
- {
- var isGlobalCheck = SystemDateTime.UtcNow >= _lastGlobalCheck.Add(_configuration.OldestMessagePerPeerGlobalCheckPeriod);
- var allPeersDictionary = _cqlStorage.GetAllKnownPeers().ToDictionary(state => state.PeerId);
- IEnumerable<PeerState> peersToCheck = allPeersDictionary.Values;
- var updatedPeers = _nonAckedCountCache.GetForUpdatedPeers(peersToCheck.Select(x => (x.PeerId, x.NonAckedMessageCount)).ToList());
- if (isGlobalCheck)
- {
- _lastGlobalCheck = SystemDateTime.UtcNow;
- }
- else
- {
- peersToCheck = updatedPeers.Select(x => allPeersDictionary[x.PeerId]);
- }
- Parallel.ForEach(peersToCheck, new ParallelOptions { MaxDegreeOfParallelism = 10 }, UpdateOldestNonAckedMessage);
- }
- private void UpdateOldestNonAckedMessage(PeerState peer)
- {
- if (peer.Removed)
- return;
- _cqlStorage.CleanBuckets(peer)
- .Wait(_configuration.OldestMessagePerPeerCheckPeriod);
- }
- }
- }
|