LocalScheduler.TimerQueue.cs 23 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Reactive.Disposables;
  6. using System.Reactive.PlatformServices;
  7. using System.Threading;
  8. namespace System.Reactive.Concurrency
  9. {
  10. public partial class LocalScheduler
  11. {
  12. /// <summary>
  13. /// Gate to protect local scheduler queues.
  14. /// </summary>
  15. private static readonly object Gate = new object();
  16. /// <summary>
  17. /// Gate to protect queues and to synchronize scheduling decisions and system clock
  18. /// change management.
  19. /// </summary>
  20. private static readonly object StaticGate = new object();
  21. /// <summary>
  22. /// Long term work queue. Contains work that's due beyond SHORTTERM, computed at the
  23. /// time of enqueueing.
  24. /// </summary>
  25. private static readonly PriorityQueue<WorkItem/*!*/> LongTerm = new PriorityQueue<WorkItem/*!*/>();
  26. /// <summary>
  27. /// Disposable resource for the long term timer that will reevaluate and dispatch the
  28. /// first item in the long term queue. A serial disposable is used to make "dispose
  29. /// current and assign new" logic easier. The disposable itself is never disposed.
  30. /// </summary>
  31. private static readonly SerialDisposable NextLongTermTimer = new SerialDisposable();
  32. /// <summary>
  33. /// Item at the head of the long term queue for which the current long term timer is
  34. /// running. Used to detect changes in the queue and decide whether we should replace
  35. /// or can continue using the current timer (because no earlier long term work was
  36. /// added to the queue).
  37. /// </summary>
  38. private static WorkItem _nextLongTermWorkItem;
  39. /// <summary>
  40. /// Short term work queue. Contains work that's due soon, computed at the time of
  41. /// enqueueing or upon reevaluation of the long term queue causing migration of work
  42. /// items. This queue is kept in order to be able to relocate short term items back
  43. /// to the long term queue in case a system clock change occurs.
  44. /// </summary>
  45. private readonly PriorityQueue<WorkItem/*!*/> _shortTerm = new PriorityQueue<WorkItem/*!*/>();
  46. /// <summary>
  47. /// Set of disposable handles to all of the current short term work Schedule calls,
  48. /// allowing those to be cancelled upon a system clock change.
  49. /// </summary>
  50. private readonly HashSet<IDisposable> _shortTermWork = new HashSet<IDisposable>();
  51. /// <summary>
  52. /// Threshold where an item is considered to be short term work or gets moved from
  53. /// long term to short term.
  54. /// </summary>
  55. private static readonly TimeSpan ShortTerm = TimeSpan.FromSeconds(10);
  56. /// <summary>
  57. /// Maximum error ratio for timer drift. We've seen machines with 10s drift on a
  58. /// daily basis, which is in the order 10E-4, so we allow for extra margin here.
  59. /// This value is used to calculate early arrival for the long term queue timer
  60. /// that will reevaluate work for the short term queue.
  61. ///
  62. /// Example: -------------------------------...---------------------*-----$
  63. /// ^ ^
  64. /// | |
  65. /// early due
  66. /// 0.999 1.0
  67. ///
  68. /// We also make the gap between early and due at least LONGTOSHORT so we have
  69. /// enough time to transition work to short term and as a courtesy to the
  70. /// destination scheduler to manage its queues etc.
  71. /// </summary>
  72. private const int MaxErrorRatio = 1000;
  73. /// <summary>
  74. /// Minimum threshold for the long term timer to fire before the queue is reevaluated
  75. /// for short term work. This value is chosen to be less than SHORTTERM in order to
  76. /// ensure the timer fires and has work to transition to the short term queue.
  77. /// </summary>
  78. private static readonly TimeSpan LongToShort = TimeSpan.FromSeconds(5);
  79. /// <summary>
  80. /// Threshold used to determine when a short term timer has fired too early compared
  81. /// to the absolute due time. This provides a last chance protection against early
  82. /// completion of scheduled work, which can happen in case of time adjustment in the
  83. /// operating system (cf. GetSystemTimeAdjustment).
  84. /// </summary>
  85. private static readonly TimeSpan RetryShort = TimeSpan.FromMilliseconds(50);
  86. /// <summary>
  87. /// Longest interval supported by timers in the BCL.
  88. /// </summary>
  89. private static readonly TimeSpan MaxSupportedTimer = TimeSpan.FromMilliseconds((1L << 32) - 2);
  90. /// <summary>
  91. /// Creates a new local scheduler.
  92. /// </summary>
  93. [Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Performance", "CA1810:InitializeReferenceTypeStaticFieldsInline", Justification = "We can't really lift this into a field initializer, and would end up checking for an initialization flag in every static method anyway (which is roughly what the JIT does in a thread-safe manner).")]
  94. protected LocalScheduler()
  95. {
  96. //
  97. // Hook up for system clock change notifications. This doesn't do anything until the
  98. // AddRef method is called (which can throw).
  99. //
  100. SystemClock.Register(this);
  101. }
  102. /// <summary>
  103. /// Enqueues absolute time scheduled work in the timer queue or the short term work list.
  104. /// </summary>
  105. /// <param name="state">State to pass to the action.</param>
  106. /// <param name="dueTime">Absolute time to run the work on. The timer queue is responsible to execute the work close to the specified time, also accounting for system clock changes.</param>
  107. /// <param name="action">Action to run, potentially recursing into the scheduler.</param>
  108. /// <returns>Disposable object to prevent the work from running.</returns>
  109. private IDisposable Enqueue<TState>(TState state, DateTimeOffset dueTime, Func<IScheduler, TState, IDisposable> action)
  110. {
  111. //
  112. // Work that's due in the past is sent to the underlying scheduler through the Schedule
  113. // overload for execution at TimeSpan.Zero. We don't go to the overload for immediate
  114. // scheduling in order to:
  115. //
  116. // - Preserve the time-based nature of the call as surfaced to the underlying scheduler,
  117. // as it may use different queuing strategies.
  118. //
  119. // - Optimize for the default behavior of LocalScheduler where a virtual call to Schedule
  120. // for immediate execution calls into the abstract Schedule method with TimeSpan.Zero.
  121. //
  122. var due = Scheduler.Normalize(dueTime - Now);
  123. if (due == TimeSpan.Zero)
  124. {
  125. return Schedule(state, TimeSpan.Zero, action);
  126. }
  127. //
  128. // We're going down the path of queueing up work or scheduling it, so we need to make
  129. // sure we can get system clock change notifications. If not, the call below is expected
  130. // to throw NotSupportedException. WorkItem.Invoke decreases the ref count again to allow
  131. // the system clock monitor to stop if there's no work left. Notice work items always
  132. // reach an execution stage since we don't dequeue items but merely mark them as cancelled
  133. // through WorkItem.Dispose. Double execution is also prevented, so the ref count should
  134. // correctly balance out.
  135. //
  136. SystemClock.AddRef();
  137. var workItem = new WorkItem<TState>(this, state, dueTime, action);
  138. if (due <= ShortTerm)
  139. {
  140. ScheduleShortTermWork(workItem);
  141. }
  142. else
  143. {
  144. ScheduleLongTermWork(workItem);
  145. }
  146. return workItem;
  147. }
  148. /// <summary>
  149. /// Schedule work that's due in the short term. This leads to relative scheduling calls to the
  150. /// underlying scheduler for short TimeSpan values. If the system clock changes in the meantime,
  151. /// the short term work is attempted to be cancelled and reevaluated.
  152. /// </summary>
  153. /// <param name="item">Work item to schedule in the short term. The caller is responsible to determine the work is indeed short term.</param>
  154. private void ScheduleShortTermWork(WorkItem/*!*/ item)
  155. {
  156. lock (Gate)
  157. {
  158. _shortTerm.Enqueue(item);
  159. //
  160. // We don't bother trying to dequeue the item or stop the timer upon cancellation,
  161. // but always let the timer fire to do the queue maintenance. When the item is
  162. // cancelled, it won't run (see WorkItem.Invoke). In the event of a system clock
  163. // change, all outstanding work in _shortTermWork is cancelled and the short
  164. // term queue is reevaluated, potentially prompting rescheduling of short term
  165. // work. Notice work is protected against double execution by the implementation
  166. // of WorkItem.Invoke.
  167. //
  168. var d = new SingleAssignmentDisposable();
  169. _shortTermWork.Add(d);
  170. //
  171. // We normalize the time delta again (possibly redundant), because we can't assume
  172. // the underlying scheduler implementations is valid and deals with negative values
  173. // (though it should).
  174. //
  175. var dueTime = Scheduler.Normalize(item.DueTime - item.Scheduler.Now);
  176. d.Disposable = item.Scheduler.Schedule((@this: this, d), dueTime, (self, tuple) => [email protected](self, tuple.d));
  177. }
  178. }
  179. /// <summary>
  180. /// Callback to process the next short term work item.
  181. /// </summary>
  182. /// <param name="scheduler">Recursive scheduler supplied by the underlying scheduler.</param>
  183. /// <param name="cancel">Disposable used to identify the work the timer was triggered for (see code for usage).</param>
  184. /// <returns>Empty disposable. Recursive work cancellation is wired through the original WorkItem.</returns>
  185. private IDisposable ExecuteNextShortTermWorkItem(IScheduler scheduler, IDisposable cancel)
  186. {
  187. var next = default(WorkItem);
  188. lock (Gate)
  189. {
  190. //
  191. // Notice that even though we try to cancel all work in the short term queue upon a
  192. // system clock change, cancellation may not be honored immediately and there's a
  193. // small chance this code runs for work that has been cancelled. Because the handler
  194. // doesn't execute the work that triggered the time-based Schedule call, but always
  195. // runs the work from the short term queue in order, we need to make sure we're not
  196. // stealing items in the queue. We can do so by remembering the object identity of
  197. // the disposable and check whether it still exists in the short term work list. If
  198. // not, a system clock change handler has gotten rid of it as part of reevaluating
  199. // the short term queue, but we still ended up here because the inherent race in the
  200. // call to Dispose versus the underlying timer. It's also possible the underlying
  201. // scheduler does a bad job at cancellation, so this measure helps for that too.
  202. //
  203. if (_shortTermWork.Remove(cancel) && _shortTerm.Count > 0)
  204. {
  205. next = _shortTerm.Dequeue();
  206. }
  207. }
  208. if (next != null)
  209. {
  210. //
  211. // If things don't make sense and we're way too early to run the work, this is our
  212. // final chance to prevent us from running before the due time. This situation can
  213. // arise when Windows applies system clock adjustment (see SetSystemTimeAdjustment)
  214. // and as a result the clock is ticking slower. If the clock is ticking faster due
  215. // to such an adjustment, too bad :-). We try to minimize the window for the final
  216. // relative time based scheduling such that 10%+ adjustments to the clock rate
  217. // have only "little" impact (range of 100s of ms). On an absolute time scale, we
  218. // don't provide stronger guarantees.
  219. //
  220. if (next.DueTime - next.Scheduler.Now >= RetryShort)
  221. {
  222. ScheduleShortTermWork(next);
  223. }
  224. else
  225. {
  226. //
  227. // Invocation happens on the recursive scheduler supplied to the function. We
  228. // are already running on the target scheduler, so we should stay on board.
  229. // Not doing so would have unexpected behavior for e.g. NewThreadScheduler,
  230. // causing a whole new thread to be allocated because of a top-level call to
  231. // the Schedule method rather than a recursive one.
  232. //
  233. // Notice if work got cancelled, the call to Invoke will not propagate to user
  234. // code because of the IsDisposed check inside.
  235. //
  236. next.Invoke(scheduler);
  237. }
  238. }
  239. //
  240. // No need to return anything better here. We already handed out the original WorkItem
  241. // object upon the call to Enqueue (called itself by Schedule). The disposable inside
  242. // the work item allows a cancellation request to chase the underlying computation.
  243. //
  244. return Disposable.Empty;
  245. }
  246. /// <summary>
  247. /// Schedule work that's due on the long term. This leads to the work being queued up for
  248. /// eventual transitioning to the short term work list.
  249. /// </summary>
  250. /// <param name="item">Work item to schedule on the long term. The caller is responsible to determine the work is indeed long term.</param>
  251. private static void ScheduleLongTermWork(WorkItem/*!*/ item)
  252. {
  253. lock (StaticGate)
  254. {
  255. LongTerm.Enqueue(item);
  256. //
  257. // In case we're the first long-term item in the queue now, the timer will have
  258. // to be updated.
  259. //
  260. UpdateLongTermProcessingTimer();
  261. }
  262. }
  263. /// <summary>
  264. /// Updates the long term timer which is responsible to transition work from the head of the
  265. /// long term queue to the short term work list.
  266. /// </summary>
  267. /// <remarks>Should be called under the scheduler lock.</remarks>
  268. private static void UpdateLongTermProcessingTimer()
  269. {
  270. /*
  271. * CALLERS - Ensure this is called under the lock!
  272. *
  273. lock (s_gate) */
  274. {
  275. if (LongTerm.Count == 0)
  276. {
  277. return;
  278. }
  279. //
  280. // To avoid setting the timer all over again for the first work item if it hasn't changed,
  281. // we keep track of the next long term work item that will be processed by the timer.
  282. //
  283. var next = LongTerm.Peek();
  284. if (next == _nextLongTermWorkItem)
  285. {
  286. return;
  287. }
  288. //
  289. // We need to arrive early in order to accommodate for potential drift. The relative amount
  290. // of drift correction is kept in MAXERRORRATIO. At the very least, we want to be LONGTOSHORT
  291. // early to make the final jump from long term to short term, giving the target scheduler
  292. // enough time to process the item through its queue. LONGTOSHORT is chosen such that the
  293. // error due to drift is negligible.
  294. //
  295. var due = Scheduler.Normalize(next.DueTime - next.Scheduler.Now);
  296. var remainder = TimeSpan.FromTicks(Math.Max(due.Ticks / MaxErrorRatio, LongToShort.Ticks));
  297. var dueEarly = due - remainder;
  298. //
  299. // Limit the interval to maximum supported by underlying Timer.
  300. //
  301. var dueCapped = TimeSpan.FromTicks(Math.Min(dueEarly.Ticks, MaxSupportedTimer.Ticks));
  302. _nextLongTermWorkItem = next;
  303. NextLongTermTimer.Disposable = ConcurrencyAbstractionLayer.Current.StartTimer(_ => EvaluateLongTermQueue(), null, dueCapped);
  304. }
  305. }
  306. /// <summary>
  307. /// Evaluates the long term queue, transitioning short term work to the short term list,
  308. /// and adjusting the new long term processing timer accordingly.
  309. /// </summary>
  310. private static void EvaluateLongTermQueue()
  311. {
  312. lock (StaticGate)
  313. {
  314. while (LongTerm.Count > 0)
  315. {
  316. var next = LongTerm.Peek();
  317. var due = Scheduler.Normalize(next.DueTime - next.Scheduler.Now);
  318. if (due >= ShortTerm)
  319. {
  320. break;
  321. }
  322. var item = LongTerm.Dequeue();
  323. item.Scheduler.ScheduleShortTermWork(item);
  324. }
  325. _nextLongTermWorkItem = null;
  326. UpdateLongTermProcessingTimer();
  327. }
  328. }
  329. /// <summary>
  330. /// Callback invoked when a system clock change is observed in order to adjust and reevaluate
  331. /// the internal scheduling queues.
  332. /// </summary>
  333. /// <param name="args">Currently not used.</param>
  334. /// <param name="sender">Currently not used.</param>
  335. internal void SystemClockChanged(object sender, SystemClockChangedEventArgs args)
  336. {
  337. lock (StaticGate)
  338. {
  339. lock (Gate)
  340. {
  341. //
  342. // Best-effort cancellation of short term work. A check for presence in the hash set
  343. // is used to notice race conditions between cancellation and the timer firing (also
  344. // guarded by the same gate object). See checks in ExecuteNextShortTermWorkItem.
  345. //
  346. foreach (var d in _shortTermWork)
  347. {
  348. d.Dispose();
  349. }
  350. _shortTermWork.Clear();
  351. //
  352. // Transition short term work to the long term queue for reevaluation by calling the
  353. // EvaluateLongTermQueue method. We don't know which direction the clock was changed
  354. // in, so we don't optimize for special cases, but always transition the whole queue.
  355. // Notice the short term queue is bounded to SHORTTERM length.
  356. //
  357. while (_shortTerm.Count > 0)
  358. {
  359. var next = _shortTerm.Dequeue();
  360. LongTerm.Enqueue(next);
  361. }
  362. //
  363. // Reevaluate the queue and don't forget to null out the current timer to force the
  364. // method to create a new timer for the new first long term item.
  365. //
  366. _nextLongTermWorkItem = null;
  367. EvaluateLongTermQueue();
  368. }
  369. }
  370. }
  371. /// <summary>
  372. /// Represents a work item in the absolute time scheduler.
  373. /// </summary>
  374. /// <remarks>
  375. /// This type is very similar to ScheduledItem, but we need a different Invoke signature to allow customization
  376. /// of the target scheduler (e.g. when called in a recursive scheduling context, see ExecuteNextShortTermWorkItem).
  377. /// </remarks>
  378. private abstract class WorkItem : IComparable<WorkItem>, IDisposable
  379. {
  380. public readonly LocalScheduler Scheduler;
  381. public readonly DateTimeOffset DueTime;
  382. private readonly SingleAssignmentDisposable _disposable;
  383. private int _hasRun;
  384. protected WorkItem(LocalScheduler scheduler, DateTimeOffset dueTime)
  385. {
  386. Scheduler = scheduler;
  387. DueTime = dueTime;
  388. _disposable = new SingleAssignmentDisposable();
  389. _hasRun = 0;
  390. }
  391. public void Invoke(IScheduler scheduler)
  392. {
  393. //
  394. // Protect against possible maltreatment of the scheduler queues or races in
  395. // execution of a work item that got relocated across system clock changes.
  396. // Under no circumstance whatsoever we should run work twice. The monitor's
  397. // ref count should also be subject to this policy.
  398. //
  399. if (Interlocked.Exchange(ref _hasRun, 1) == 0)
  400. {
  401. try
  402. {
  403. if (!_disposable.IsDisposed)
  404. {
  405. _disposable.Disposable = InvokeCore(scheduler);
  406. }
  407. }
  408. finally
  409. {
  410. SystemClock.Release();
  411. }
  412. }
  413. }
  414. protected abstract IDisposable InvokeCore(IScheduler scheduler);
  415. public int CompareTo(WorkItem/*!*/ other) => Comparer<DateTimeOffset>.Default.Compare(DueTime, other.DueTime);
  416. public void Dispose() => _disposable.Dispose();
  417. }
  418. /// <summary>
  419. /// Represents a work item that closes over scheduler invocation state. Subtyping is
  420. /// used to have a common type for the scheduler queues.
  421. /// </summary>
  422. private sealed class WorkItem<TState> : WorkItem
  423. {
  424. private readonly TState _state;
  425. private readonly Func<IScheduler, TState, IDisposable> _action;
  426. public WorkItem(LocalScheduler scheduler, TState state, DateTimeOffset dueTime, Func<IScheduler, TState, IDisposable> action)
  427. : base(scheduler, dueTime)
  428. {
  429. _state = state;
  430. _action = action;
  431. }
  432. protected override IDisposable InvokeCore(IScheduler scheduler) => _action(scheduler, _state);
  433. }
  434. }
  435. }