LocalScheduler.TimerQueue.cs 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Reactive.Disposables;
  6. using System.Reactive.PlatformServices;
  7. using System.Threading;
  8. namespace System.Reactive.Concurrency
  9. {
  10. public partial class LocalScheduler
  11. {
  12. /// <summary>
  13. /// Gate to protect local scheduler queues.
  14. /// </summary>
  15. private static readonly object _gate = new object();
  16. /// <summary>
  17. /// Gate to protect queues and to synchronize scheduling decisions and system clock
  18. /// change management.
  19. /// </summary>
  20. private static readonly object s_gate = new object();
  21. /// <summary>
  22. /// Long term work queue. Contains work that's due beyond SHORTTERM, computed at the
  23. /// time of enqueueing.
  24. /// </summary>
  25. private static readonly PriorityQueue<WorkItem/*!*/> s_longTerm = new PriorityQueue<WorkItem/*!*/>();
  26. /// <summary>
  27. /// Disposable resource for the long term timer that will reevaluate and dispatch the
  28. /// first item in the long term queue. A serial disposable is used to make "dispose
  29. /// current and assign new" logic easier. The disposable itself is never disposed.
  30. /// </summary>
  31. private static readonly SerialDisposable s_nextLongTermTimer = new SerialDisposable();
  32. /// <summary>
  33. /// Item at the head of the long term queue for which the current long term timer is
  34. /// running. Used to detect changes in the queue and decide whether we should replace
  35. /// or can continue using the current timer (because no earlier long term work was
  36. /// added to the queue).
  37. /// </summary>
  38. private static WorkItem s_nextLongTermWorkItem = null;
  39. /// <summary>
  40. /// Short term work queue. Contains work that's due soon, computed at the time of
  41. /// enqueueing or upon reevaluation of the long term queue causing migration of work
  42. /// items. This queue is kept in order to be able to relocate short term items back
  43. /// to the long term queue in case a system clock change occurs.
  44. /// </summary>
  45. private readonly PriorityQueue<WorkItem/*!*/> _shortTerm = new PriorityQueue<WorkItem/*!*/>();
  46. /// <summary>
  47. /// Set of disposable handles to all of the current short term work Schedule calls,
  48. /// allowing those to be cancelled upon a system clock change.
  49. /// </summary>
  50. #if !NO_HASHSET
  51. private readonly HashSet<IDisposable> _shortTermWork = new HashSet<IDisposable>();
  52. #else
  53. private readonly Dictionary<IDisposable, object> _shortTermWork = new Dictionary<IDisposable, object>();
  54. #endif
  55. /// <summary>
  56. /// Threshold where an item is considered to be short term work or gets moved from
  57. /// long term to short term.
  58. /// </summary>
  59. private static readonly TimeSpan SHORTTERM = TimeSpan.FromSeconds(10);
  60. /// <summary>
  61. /// Maximum error ratio for timer drift. We've seen machines with 10s drift on a
  62. /// daily basis, which is in the order 10E-4, so we allow for extra margin here.
  63. /// This value is used to calculate early arrival for the long term queue timer
  64. /// that will reevaluate work for the short term queue.
  65. ///
  66. /// Example: -------------------------------...---------------------*-----$
  67. /// ^ ^
  68. /// | |
  69. /// early due
  70. /// 0.999 1.0
  71. ///
  72. /// We also make the gap between early and due at least LONGTOSHORT so we have
  73. /// enough time to transition work to short term and as a courtesy to the
  74. /// destination scheduler to manage its queues etc.
  75. /// </summary>
  76. private const int MAXERRORRATIO = 1000;
  77. /// <summary>
  78. /// Minimum threshold for the long term timer to fire before the queue is reevaluated
  79. /// for short term work. This value is chosen to be less than SHORTTERM in order to
  80. /// ensure the timer fires and has work to transition to the short term queue.
  81. /// </summary>
  82. private static readonly TimeSpan LONGTOSHORT = TimeSpan.FromSeconds(5);
  83. /// <summary>
  84. /// Threshold used to determine when a short term timer has fired too early compared
  85. /// to the absolute due time. This provides a last chance protection against early
  86. /// completion of scheduled work, which can happen in case of time adjustment in the
  87. /// operating system (cf. GetSystemTimeAdjustment).
  88. /// </summary>
  89. private static readonly TimeSpan RETRYSHORT = TimeSpan.FromMilliseconds(50);
  90. /// <summary>
  91. /// Longest interval supported by timers in the BCL.
  92. /// </summary>
  93. private static readonly TimeSpan MAXSUPPORTEDTIMER = TimeSpan.FromMilliseconds((1L << 32) - 2);
  94. /// <summary>
  95. /// Creates a new local scheduler.
  96. /// </summary>
  97. [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Performance", "CA1810:InitializeReferenceTypeStaticFieldsInline", Justification = "We can't really lift this into a field initializer, and would end up checking for an initialization flag in every static method anyway (which is roughly what the JIT does in a thread-safe manner).")]
  98. protected LocalScheduler()
  99. {
  100. //
  101. // Hook up for system clock change notifications. This doesn't do anything until the
  102. // AddRef method is called (which can throw).
  103. //
  104. SystemClock.Register(this);
  105. }
  106. /// <summary>
  107. /// Enqueues absolute time scheduled work in the timer queue or the short term work list.
  108. /// </summary>
  109. /// <param name="state">State to pass to the action.</param>
  110. /// <param name="dueTime">Absolute time to run the work on. The timer queue is responsible to execute the work close to the specified time, also accounting for system clock changes.</param>
  111. /// <param name="action">Action to run, potentially recursing into the scheduler.</param>
  112. /// <returns>Disposable object to prevent the work from running.</returns>
  113. private IDisposable Enqueue<TState>(TState state, DateTimeOffset dueTime, Func<IScheduler, TState, IDisposable> action)
  114. {
  115. //
  116. // Work that's due in the past is sent to the underlying scheduler through the Schedule
  117. // overload for execution at TimeSpan.Zero. We don't go to the overload for immediate
  118. // scheduling in order to:
  119. //
  120. // - Preserve the time-based nature of the call as surfaced to the underlying scheduler,
  121. // as it may use different queuing strategies.
  122. //
  123. // - Optimize for the default behavior of LocalScheduler where a virtual call to Schedule
  124. // for immediate execution calls into the abstract Schedule method with TimeSpan.Zero.
  125. //
  126. var due = Scheduler.Normalize(dueTime - Now);
  127. if (due == TimeSpan.Zero)
  128. {
  129. return Schedule<TState>(state, TimeSpan.Zero, action);
  130. }
  131. //
  132. // We're going down the path of queueing up work or scheduling it, so we need to make
  133. // sure we can get system clock change notifications. If not, the call below is expected
  134. // to throw NotSupportedException. WorkItem.Invoke decreases the ref count again to allow
  135. // the system clock monitor to stop if there's no work left. Notice work items always
  136. // reach an execution stage since we don't dequeue items but merely mark them as cancelled
  137. // through WorkItem.Dispose. Double execution is also prevented, so the ref count should
  138. // correctly balance out.
  139. //
  140. SystemClock.AddRef();
  141. var workItem = new WorkItem<TState>(this, state, dueTime, action);
  142. if (due <= SHORTTERM)
  143. {
  144. ScheduleShortTermWork(workItem);
  145. }
  146. else
  147. {
  148. ScheduleLongTermWork(workItem);
  149. }
  150. return workItem;
  151. }
  152. /// <summary>
  153. /// Schedule work that's due in the short term. This leads to relative scheduling calls to the
  154. /// underlying scheduler for short TimeSpan values. If the system clock changes in the meantime,
  155. /// the short term work is attempted to be cancelled and reevaluated.
  156. /// </summary>
  157. /// <param name="item">Work item to schedule in the short term. The caller is responsible to determine the work is indeed short term.</param>
  158. private void ScheduleShortTermWork(WorkItem/*!*/ item)
  159. {
  160. lock (_gate)
  161. {
  162. _shortTerm.Enqueue(item);
  163. //
  164. // We don't bother trying to dequeue the item or stop the timer upon cancellation,
  165. // but always let the timer fire to do the queue maintenance. When the item is
  166. // cancelled, it won't run (see WorkItem.Invoke). In the event of a system clock
  167. // change, all outstanding work in _shortTermWork is cancelled and the short
  168. // term queue is reevaluated, potentially prompting rescheduling of short term
  169. // work. Notice work is protected against double execution by the implementation
  170. // of WorkItem.Invoke.
  171. //
  172. var d = new SingleAssignmentDisposable();
  173. #if !NO_HASHSET
  174. _shortTermWork.Add(d);
  175. #else
  176. _shortTermWork.Add(d, null);
  177. #endif
  178. //
  179. // We normalize the time delta again (possibly redundant), because we can't assume
  180. // the underlying scheduler implementations is valid and deals with negative values
  181. // (though it should).
  182. //
  183. var dueTime = Scheduler.Normalize(item.DueTime - item.Scheduler.Now);
  184. d.Disposable = item.Scheduler.Schedule(d, dueTime, ExecuteNextShortTermWorkItem);
  185. }
  186. }
  187. /// <summary>
  188. /// Callback to process the next short term work item.
  189. /// </summary>
  190. /// <param name="scheduler">Recursive scheduler supplied by the underlying scheduler.</param>
  191. /// <param name="cancel">Disposable used to identify the work the timer was triggered for (see code for usage).</param>
  192. /// <returns>Empty disposable. Recursive work cancellation is wired through the original WorkItem.</returns>
  193. private IDisposable ExecuteNextShortTermWorkItem(IScheduler scheduler, IDisposable cancel)
  194. {
  195. var next = default(WorkItem);
  196. lock (_gate)
  197. {
  198. //
  199. // Notice that even though we try to cancel all work in the short term queue upon a
  200. // system clock change, cancellation may not be honored immediately and there's a
  201. // small chance this code runs for work that has been cancelled. Because the handler
  202. // doesn't execute the work that triggered the time-based Schedule call, but always
  203. // runs the work from the short term queue in order, we need to make sure we're not
  204. // stealing items in the queue. We can do so by remembering the object identity of
  205. // the disposable and check whether it still exists in the short term work list. If
  206. // not, a system clock change handler has gotten rid of it as part of reevaluating
  207. // the short term queue, but we still ended up here because the inherent race in the
  208. // call to Dispose versus the underlying timer. It's also possible the underlying
  209. // scheduler does a bad job at cancellation, so this measure helps for that too.
  210. //
  211. if (_shortTermWork.Remove(cancel) && _shortTerm.Count > 0)
  212. {
  213. next = _shortTerm.Dequeue();
  214. }
  215. }
  216. if (next != null)
  217. {
  218. //
  219. // If things don't make sense and we're way too early to run the work, this is our
  220. // final chance to prevent us from running before the due time. This situation can
  221. // arise when Windows applies system clock adjustment (see SetSystemTimeAdjustment)
  222. // and as a result the clock is ticking slower. If the clock is ticking faster due
  223. // to such an adjustment, too bad :-). We try to minimize the window for the final
  224. // relative time based scheduling such that 10%+ adjustments to the clock rate
  225. // have only "little" impact (range of 100s of ms). On an absolute time scale, we
  226. // don't provide stronger guarantees.
  227. //
  228. if (next.DueTime - next.Scheduler.Now >= RETRYSHORT)
  229. {
  230. ScheduleShortTermWork(next);
  231. }
  232. else
  233. {
  234. //
  235. // Invocation happens on the recursive scheduler supplied to the function. We
  236. // are already running on the target scheduler, so we should stay on board.
  237. // Not doing so would have unexpected behavior for e.g. NewThreadScheduler,
  238. // causing a whole new thread to be allocated because of a top-level call to
  239. // the Schedule method rather than a recursive one.
  240. //
  241. // Notice if work got cancelled, the call to Invoke will not propagate to user
  242. // code because of the IsDisposed check inside.
  243. //
  244. next.Invoke(scheduler);
  245. }
  246. }
  247. //
  248. // No need to return anything better here. We already handed out the original WorkItem
  249. // object upon the call to Enqueue (called itself by Schedule). The disposable inside
  250. // the work item allows a cancellation request to chase the underlying computation.
  251. //
  252. return Disposable.Empty;
  253. }
  254. /// <summary>
  255. /// Schedule work that's due on the long term. This leads to the work being queued up for
  256. /// eventual transitioning to the short term work list.
  257. /// </summary>
  258. /// <param name="item">Work item to schedule on the long term. The caller is responsible to determine the work is indeed long term.</param>
  259. private static void ScheduleLongTermWork(WorkItem/*!*/ item)
  260. {
  261. lock (s_gate)
  262. {
  263. s_longTerm.Enqueue(item);
  264. //
  265. // In case we're the first long-term item in the queue now, the timer will have
  266. // to be updated.
  267. //
  268. UpdateLongTermProcessingTimer();
  269. }
  270. }
  271. /// <summary>
  272. /// Updates the long term timer which is responsible to transition work from the head of the
  273. /// long term queue to the short term work list.
  274. /// </summary>
  275. /// <remarks>Should be called under the scheduler lock.</remarks>
  276. private static void UpdateLongTermProcessingTimer()
  277. {
  278. /*
  279. * CALLERS - Ensure this is called under the lock!
  280. *
  281. lock (s_gate) */
  282. {
  283. if (s_longTerm.Count == 0)
  284. return;
  285. //
  286. // To avoid setting the timer all over again for the first work item if it hasn't changed,
  287. // we keep track of the next long term work item that will be processed by the timer.
  288. //
  289. var next = s_longTerm.Peek();
  290. if (next == s_nextLongTermWorkItem)
  291. return;
  292. //
  293. // We need to arrive early in order to accommodate for potential drift. The relative amount
  294. // of drift correction is kept in MAXERRORRATIO. At the very least, we want to be LONGTOSHORT
  295. // early to make the final jump from long term to short term, giving the target scheduler
  296. // enough time to process the item through its queue. LONGTOSHORT is chosen such that the
  297. // error due to drift is negligible.
  298. //
  299. var due = Scheduler.Normalize(next.DueTime - next.Scheduler.Now);
  300. var remainder = TimeSpan.FromTicks(Math.Max(due.Ticks / MAXERRORRATIO, LONGTOSHORT.Ticks));
  301. var dueEarly = due - remainder;
  302. //
  303. // Limit the interval to maximum supported by underlying Timer.
  304. //
  305. var dueCapped = TimeSpan.FromTicks(Math.Min(dueEarly.Ticks, MAXSUPPORTEDTIMER.Ticks));
  306. s_nextLongTermWorkItem = next;
  307. s_nextLongTermTimer.Disposable = ConcurrencyAbstractionLayer.Current.StartTimer(EvaluateLongTermQueue, null, dueCapped);
  308. }
  309. }
  310. /// <summary>
  311. /// Evaluates the long term queue, transitioning short term work to the short term list,
  312. /// and adjusting the new long term processing timer accordingly.
  313. /// </summary>
  314. /// <param name="state">Ignored.</param>
  315. private static void EvaluateLongTermQueue(object state)
  316. {
  317. lock (s_gate)
  318. {
  319. var next = default(WorkItem);
  320. while (s_longTerm.Count > 0)
  321. {
  322. next = s_longTerm.Peek();
  323. var due = Scheduler.Normalize(next.DueTime - next.Scheduler.Now);
  324. if (due >= SHORTTERM)
  325. break;
  326. var item = s_longTerm.Dequeue();
  327. item.Scheduler.ScheduleShortTermWork(item);
  328. }
  329. s_nextLongTermWorkItem = null;
  330. UpdateLongTermProcessingTimer();
  331. }
  332. }
  333. /// <summary>
  334. /// Callback invoked when a system clock change is observed in order to adjust and reevaluate
  335. /// the internal scheduling queues.
  336. /// </summary>
  337. /// <param name="args">Currently not used.</param>
  338. /// <param name="sender">Currently not used.</param>
  339. internal void SystemClockChanged(object sender, SystemClockChangedEventArgs args)
  340. {
  341. lock (_gate)
  342. {
  343. lock (s_gate)
  344. {
  345. //
  346. // Best-effort cancellation of short term work. A check for presence in the hash set
  347. // is used to notice race conditions between cancellation and the timer firing (also
  348. // guarded by the same gate object). See checks in ExecuteNextShortTermWorkItem.
  349. //
  350. #if !NO_HASHSET
  351. foreach (var d in _shortTermWork)
  352. #else
  353. foreach (var d in _shortTermWork.Keys)
  354. #endif
  355. d.Dispose();
  356. _shortTermWork.Clear();
  357. //
  358. // Transition short term work to the long term queue for reevaluation by calling the
  359. // EvaluateLongTermQueue method. We don't know which direction the clock was changed
  360. // in, so we don't optimize for special cases, but always transition the whole queue.
  361. // Notice the short term queue is bounded to SHORTTERM length.
  362. //
  363. while (_shortTerm.Count > 0)
  364. {
  365. var next = _shortTerm.Dequeue();
  366. s_longTerm.Enqueue(next);
  367. }
  368. //
  369. // Reevaluate the queue and don't forget to null out the current timer to force the
  370. // method to create a new timer for the new first long term item.
  371. //
  372. s_nextLongTermWorkItem = null;
  373. EvaluateLongTermQueue(null);
  374. }
  375. }
  376. }
  377. /// <summary>
  378. /// Represents a work item in the absolute time scheduler.
  379. /// </summary>
  380. /// <remarks>
  381. /// This type is very similar to ScheduledItem, but we need a different Invoke signature to allow customization
  382. /// of the target scheduler (e.g. when called in a recursive scheduling context, see ExecuteNextShortTermWorkItem).
  383. /// </remarks>
  384. abstract class WorkItem : IComparable<WorkItem>, IDisposable
  385. {
  386. private readonly LocalScheduler _scheduler;
  387. private readonly DateTimeOffset _dueTime;
  388. private readonly SingleAssignmentDisposable _disposable;
  389. private int _hasRun;
  390. public WorkItem(LocalScheduler scheduler, DateTimeOffset dueTime)
  391. {
  392. _scheduler = scheduler;
  393. _dueTime = dueTime;
  394. _disposable = new SingleAssignmentDisposable();
  395. _hasRun = 0;
  396. }
  397. public LocalScheduler Scheduler
  398. {
  399. get { return _scheduler; }
  400. }
  401. public DateTimeOffset DueTime
  402. {
  403. get { return _dueTime; }
  404. }
  405. public void Invoke(IScheduler scheduler)
  406. {
  407. //
  408. // Protect against possible maltreatment of the scheduler queues or races in
  409. // execution of a work item that got relocated across system clock changes.
  410. // Under no circumstance whatsoever we should run work twice. The monitor's
  411. // ref count should also be subject to this policy.
  412. //
  413. if (Interlocked.Exchange(ref _hasRun, 1) == 0)
  414. {
  415. try
  416. {
  417. if (!_disposable.IsDisposed)
  418. _disposable.Disposable = InvokeCore(scheduler);
  419. }
  420. finally
  421. {
  422. SystemClock.Release();
  423. }
  424. }
  425. }
  426. protected abstract IDisposable InvokeCore(IScheduler scheduler);
  427. public int CompareTo(WorkItem/*!*/ other)
  428. {
  429. return Comparer<DateTimeOffset>.Default.Compare(this._dueTime, other._dueTime);
  430. }
  431. public void Dispose()
  432. {
  433. _disposable.Dispose();
  434. }
  435. }
  436. /// <summary>
  437. /// Represents a work item that closes over scheduler invocation state. Subtyping is
  438. /// used to have a common type for the scheduler queues.
  439. /// </summary>
  440. sealed class WorkItem<TState> : WorkItem
  441. {
  442. private readonly TState _state;
  443. private readonly Func<IScheduler, TState, IDisposable> _action;
  444. public WorkItem(LocalScheduler scheduler, TState state, DateTimeOffset dueTime, Func<IScheduler, TState, IDisposable> action)
  445. : base(scheduler, dueTime)
  446. {
  447. _state = state;
  448. _action = action;
  449. }
  450. protected override IDisposable InvokeCore(IScheduler scheduler)
  451. {
  452. return _action(scheduler, _state);
  453. }
  454. }
  455. }
  456. }