| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499 |
- // Licensed to the .NET Foundation under one or more agreements.
- // The .NET Foundation licenses this file to you under the Apache 2.0 License.
- // See the LICENSE file in the project root for more information.
- using System.Collections.Generic;
- using System.Reactive.Disposables;
- using System.Reactive.PlatformServices;
- using System.Threading;
- namespace System.Reactive.Concurrency
- {
- public partial class LocalScheduler
- {
- /// <summary>
- /// Gate to protect local scheduler queues.
- /// </summary>
- private static readonly object _gate = new object();
- /// <summary>
- /// Gate to protect queues and to synchronize scheduling decisions and system clock
- /// change management.
- /// </summary>
- private static readonly object s_gate = new object();
- /// <summary>
- /// Long term work queue. Contains work that's due beyond SHORTTERM, computed at the
- /// time of enqueueing.
- /// </summary>
- private static readonly PriorityQueue<WorkItem/*!*/> s_longTerm = new PriorityQueue<WorkItem/*!*/>();
- /// <summary>
- /// Disposable resource for the long term timer that will reevaluate and dispatch the
- /// first item in the long term queue. A serial disposable is used to make "dispose
- /// current and assign new" logic easier. The disposable itself is never disposed.
- /// </summary>
- private static readonly SerialDisposable s_nextLongTermTimer = new SerialDisposable();
- /// <summary>
- /// Item at the head of the long term queue for which the current long term timer is
- /// running. Used to detect changes in the queue and decide whether we should replace
- /// or can continue using the current timer (because no earlier long term work was
- /// added to the queue).
- /// </summary>
- private static WorkItem s_nextLongTermWorkItem = null;
- /// <summary>
- /// Short term work queue. Contains work that's due soon, computed at the time of
- /// enqueueing or upon reevaluation of the long term queue causing migration of work
- /// items. This queue is kept in order to be able to relocate short term items back
- /// to the long term queue in case a system clock change occurs.
- /// </summary>
- private readonly PriorityQueue<WorkItem/*!*/> _shortTerm = new PriorityQueue<WorkItem/*!*/>();
- /// <summary>
- /// Set of disposable handles to all of the current short term work Schedule calls,
- /// allowing those to be cancelled upon a system clock change.
- /// </summary>
- private readonly HashSet<IDisposable> _shortTermWork = new HashSet<IDisposable>();
- /// <summary>
- /// Threshold where an item is considered to be short term work or gets moved from
- /// long term to short term.
- /// </summary>
- private static readonly TimeSpan SHORTTERM = TimeSpan.FromSeconds(10);
- /// <summary>
- /// Maximum error ratio for timer drift. We've seen machines with 10s drift on a
- /// daily basis, which is in the order 10E-4, so we allow for extra margin here.
- /// This value is used to calculate early arrival for the long term queue timer
- /// that will reevaluate work for the short term queue.
- ///
- /// Example: -------------------------------...---------------------*-----$
- /// ^ ^
- /// | |
- /// early due
- /// 0.999 1.0
- ///
- /// We also make the gap between early and due at least LONGTOSHORT so we have
- /// enough time to transition work to short term and as a courtesy to the
- /// destination scheduler to manage its queues etc.
- /// </summary>
- private const int MAXERRORRATIO = 1000;
- /// <summary>
- /// Minimum threshold for the long term timer to fire before the queue is reevaluated
- /// for short term work. This value is chosen to be less than SHORTTERM in order to
- /// ensure the timer fires and has work to transition to the short term queue.
- /// </summary>
- private static readonly TimeSpan LONGTOSHORT = TimeSpan.FromSeconds(5);
- /// <summary>
- /// Threshold used to determine when a short term timer has fired too early compared
- /// to the absolute due time. This provides a last chance protection against early
- /// completion of scheduled work, which can happen in case of time adjustment in the
- /// operating system (cf. GetSystemTimeAdjustment).
- /// </summary>
- private static readonly TimeSpan RETRYSHORT = TimeSpan.FromMilliseconds(50);
- /// <summary>
- /// Longest interval supported by timers in the BCL.
- /// </summary>
- private static readonly TimeSpan MAXSUPPORTEDTIMER = TimeSpan.FromMilliseconds((1L << 32) - 2);
- /// <summary>
- /// Creates a new local scheduler.
- /// </summary>
- [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Performance", "CA1810:InitializeReferenceTypeStaticFieldsInline", Justification = "We can't really lift this into a field initializer, and would end up checking for an initialization flag in every static method anyway (which is roughly what the JIT does in a thread-safe manner).")]
- protected LocalScheduler()
- {
- //
- // Hook up for system clock change notifications. This doesn't do anything until the
- // AddRef method is called (which can throw).
- //
- SystemClock.Register(this);
- }
- /// <summary>
- /// Enqueues absolute time scheduled work in the timer queue or the short term work list.
- /// </summary>
- /// <param name="state">State to pass to the action.</param>
- /// <param name="dueTime">Absolute time to run the work on. The timer queue is responsible to execute the work close to the specified time, also accounting for system clock changes.</param>
- /// <param name="action">Action to run, potentially recursing into the scheduler.</param>
- /// <returns>Disposable object to prevent the work from running.</returns>
- private IDisposable Enqueue<TState>(TState state, DateTimeOffset dueTime, Func<IScheduler, TState, IDisposable> action)
- {
- //
- // Work that's due in the past is sent to the underlying scheduler through the Schedule
- // overload for execution at TimeSpan.Zero. We don't go to the overload for immediate
- // scheduling in order to:
- //
- // - Preserve the time-based nature of the call as surfaced to the underlying scheduler,
- // as it may use different queuing strategies.
- //
- // - Optimize for the default behavior of LocalScheduler where a virtual call to Schedule
- // for immediate execution calls into the abstract Schedule method with TimeSpan.Zero.
- //
- var due = Scheduler.Normalize(dueTime - Now);
- if (due == TimeSpan.Zero)
- {
- return Schedule<TState>(state, TimeSpan.Zero, action);
- }
- //
- // We're going down the path of queueing up work or scheduling it, so we need to make
- // sure we can get system clock change notifications. If not, the call below is expected
- // to throw NotSupportedException. WorkItem.Invoke decreases the ref count again to allow
- // the system clock monitor to stop if there's no work left. Notice work items always
- // reach an execution stage since we don't dequeue items but merely mark them as cancelled
- // through WorkItem.Dispose. Double execution is also prevented, so the ref count should
- // correctly balance out.
- //
- SystemClock.AddRef();
- var workItem = new WorkItem<TState>(this, state, dueTime, action);
- if (due <= SHORTTERM)
- {
- ScheduleShortTermWork(workItem);
- }
- else
- {
- ScheduleLongTermWork(workItem);
- }
- return workItem;
- }
- /// <summary>
- /// Schedule work that's due in the short term. This leads to relative scheduling calls to the
- /// underlying scheduler for short TimeSpan values. If the system clock changes in the meantime,
- /// the short term work is attempted to be cancelled and reevaluated.
- /// </summary>
- /// <param name="item">Work item to schedule in the short term. The caller is responsible to determine the work is indeed short term.</param>
- private void ScheduleShortTermWork(WorkItem/*!*/ item)
- {
- lock (_gate)
- {
- _shortTerm.Enqueue(item);
- //
- // We don't bother trying to dequeue the item or stop the timer upon cancellation,
- // but always let the timer fire to do the queue maintenance. When the item is
- // cancelled, it won't run (see WorkItem.Invoke). In the event of a system clock
- // change, all outstanding work in _shortTermWork is cancelled and the short
- // term queue is reevaluated, potentially prompting rescheduling of short term
- // work. Notice work is protected against double execution by the implementation
- // of WorkItem.Invoke.
- //
- var d = new SingleAssignmentDisposable();
- _shortTermWork.Add(d);
- //
- // We normalize the time delta again (possibly redundant), because we can't assume
- // the underlying scheduler implementations is valid and deals with negative values
- // (though it should).
- //
- var dueTime = Scheduler.Normalize(item.DueTime - item.Scheduler.Now);
- d.Disposable = item.Scheduler.Schedule(d, dueTime, ExecuteNextShortTermWorkItem);
- }
- }
- /// <summary>
- /// Callback to process the next short term work item.
- /// </summary>
- /// <param name="scheduler">Recursive scheduler supplied by the underlying scheduler.</param>
- /// <param name="cancel">Disposable used to identify the work the timer was triggered for (see code for usage).</param>
- /// <returns>Empty disposable. Recursive work cancellation is wired through the original WorkItem.</returns>
- private IDisposable ExecuteNextShortTermWorkItem(IScheduler scheduler, IDisposable cancel)
- {
- var next = default(WorkItem);
- lock (_gate)
- {
- //
- // Notice that even though we try to cancel all work in the short term queue upon a
- // system clock change, cancellation may not be honored immediately and there's a
- // small chance this code runs for work that has been cancelled. Because the handler
- // doesn't execute the work that triggered the time-based Schedule call, but always
- // runs the work from the short term queue in order, we need to make sure we're not
- // stealing items in the queue. We can do so by remembering the object identity of
- // the disposable and check whether it still exists in the short term work list. If
- // not, a system clock change handler has gotten rid of it as part of reevaluating
- // the short term queue, but we still ended up here because the inherent race in the
- // call to Dispose versus the underlying timer. It's also possible the underlying
- // scheduler does a bad job at cancellation, so this measure helps for that too.
- //
- if (_shortTermWork.Remove(cancel) && _shortTerm.Count > 0)
- {
- next = _shortTerm.Dequeue();
- }
- }
- if (next != null)
- {
- //
- // If things don't make sense and we're way too early to run the work, this is our
- // final chance to prevent us from running before the due time. This situation can
- // arise when Windows applies system clock adjustment (see SetSystemTimeAdjustment)
- // and as a result the clock is ticking slower. If the clock is ticking faster due
- // to such an adjustment, too bad :-). We try to minimize the window for the final
- // relative time based scheduling such that 10%+ adjustments to the clock rate
- // have only "little" impact (range of 100s of ms). On an absolute time scale, we
- // don't provide stronger guarantees.
- //
- if (next.DueTime - next.Scheduler.Now >= RETRYSHORT)
- {
- ScheduleShortTermWork(next);
- }
- else
- {
- //
- // Invocation happens on the recursive scheduler supplied to the function. We
- // are already running on the target scheduler, so we should stay on board.
- // Not doing so would have unexpected behavior for e.g. NewThreadScheduler,
- // causing a whole new thread to be allocated because of a top-level call to
- // the Schedule method rather than a recursive one.
- //
- // Notice if work got cancelled, the call to Invoke will not propagate to user
- // code because of the IsDisposed check inside.
- //
- next.Invoke(scheduler);
- }
- }
- //
- // No need to return anything better here. We already handed out the original WorkItem
- // object upon the call to Enqueue (called itself by Schedule). The disposable inside
- // the work item allows a cancellation request to chase the underlying computation.
- //
- return Disposable.Empty;
- }
- /// <summary>
- /// Schedule work that's due on the long term. This leads to the work being queued up for
- /// eventual transitioning to the short term work list.
- /// </summary>
- /// <param name="item">Work item to schedule on the long term. The caller is responsible to determine the work is indeed long term.</param>
- private static void ScheduleLongTermWork(WorkItem/*!*/ item)
- {
- lock (s_gate)
- {
- s_longTerm.Enqueue(item);
- //
- // In case we're the first long-term item in the queue now, the timer will have
- // to be updated.
- //
- UpdateLongTermProcessingTimer();
- }
- }
- /// <summary>
- /// Updates the long term timer which is responsible to transition work from the head of the
- /// long term queue to the short term work list.
- /// </summary>
- /// <remarks>Should be called under the scheduler lock.</remarks>
- private static void UpdateLongTermProcessingTimer()
- {
- /*
- * CALLERS - Ensure this is called under the lock!
- *
- lock (s_gate) */
- {
- if (s_longTerm.Count == 0)
- return;
- //
- // To avoid setting the timer all over again for the first work item if it hasn't changed,
- // we keep track of the next long term work item that will be processed by the timer.
- //
- var next = s_longTerm.Peek();
- if (next == s_nextLongTermWorkItem)
- return;
- //
- // We need to arrive early in order to accommodate for potential drift. The relative amount
- // of drift correction is kept in MAXERRORRATIO. At the very least, we want to be LONGTOSHORT
- // early to make the final jump from long term to short term, giving the target scheduler
- // enough time to process the item through its queue. LONGTOSHORT is chosen such that the
- // error due to drift is negligible.
- //
- var due = Scheduler.Normalize(next.DueTime - next.Scheduler.Now);
- var remainder = TimeSpan.FromTicks(Math.Max(due.Ticks / MAXERRORRATIO, LONGTOSHORT.Ticks));
- var dueEarly = due - remainder;
- //
- // Limit the interval to maximum supported by underlying Timer.
- //
- var dueCapped = TimeSpan.FromTicks(Math.Min(dueEarly.Ticks, MAXSUPPORTEDTIMER.Ticks));
- s_nextLongTermWorkItem = next;
- s_nextLongTermTimer.Disposable = ConcurrencyAbstractionLayer.Current.StartTimer(EvaluateLongTermQueue, null, dueCapped);
- }
- }
- /// <summary>
- /// Evaluates the long term queue, transitioning short term work to the short term list,
- /// and adjusting the new long term processing timer accordingly.
- /// </summary>
- /// <param name="state">Ignored.</param>
- private static void EvaluateLongTermQueue(object state)
- {
- lock (s_gate)
- {
- var next = default(WorkItem);
- while (s_longTerm.Count > 0)
- {
- next = s_longTerm.Peek();
- var due = Scheduler.Normalize(next.DueTime - next.Scheduler.Now);
- if (due >= SHORTTERM)
- break;
- var item = s_longTerm.Dequeue();
- item.Scheduler.ScheduleShortTermWork(item);
- }
- s_nextLongTermWorkItem = null;
- UpdateLongTermProcessingTimer();
- }
- }
- /// <summary>
- /// Callback invoked when a system clock change is observed in order to adjust and reevaluate
- /// the internal scheduling queues.
- /// </summary>
- /// <param name="args">Currently not used.</param>
- /// <param name="sender">Currently not used.</param>
- internal void SystemClockChanged(object sender, SystemClockChangedEventArgs args)
- {
- lock (_gate)
- {
- lock (s_gate)
- {
- //
- // Best-effort cancellation of short term work. A check for presence in the hash set
- // is used to notice race conditions between cancellation and the timer firing (also
- // guarded by the same gate object). See checks in ExecuteNextShortTermWorkItem.
- //
- foreach (var d in _shortTermWork)
- {
- d.Dispose();
- }
- _shortTermWork.Clear();
- //
- // Transition short term work to the long term queue for reevaluation by calling the
- // EvaluateLongTermQueue method. We don't know which direction the clock was changed
- // in, so we don't optimize for special cases, but always transition the whole queue.
- // Notice the short term queue is bounded to SHORTTERM length.
- //
- while (_shortTerm.Count > 0)
- {
- var next = _shortTerm.Dequeue();
- s_longTerm.Enqueue(next);
- }
- //
- // Reevaluate the queue and don't forget to null out the current timer to force the
- // method to create a new timer for the new first long term item.
- //
- s_nextLongTermWorkItem = null;
- EvaluateLongTermQueue(null);
- }
- }
- }
- /// <summary>
- /// Represents a work item in the absolute time scheduler.
- /// </summary>
- /// <remarks>
- /// This type is very similar to ScheduledItem, but we need a different Invoke signature to allow customization
- /// of the target scheduler (e.g. when called in a recursive scheduling context, see ExecuteNextShortTermWorkItem).
- /// </remarks>
- abstract class WorkItem : IComparable<WorkItem>, IDisposable
- {
- private readonly LocalScheduler _scheduler;
- private readonly DateTimeOffset _dueTime;
- private readonly SingleAssignmentDisposable _disposable;
- private int _hasRun;
- public WorkItem(LocalScheduler scheduler, DateTimeOffset dueTime)
- {
- _scheduler = scheduler;
- _dueTime = dueTime;
- _disposable = new SingleAssignmentDisposable();
- _hasRun = 0;
- }
- public LocalScheduler Scheduler
- {
- get { return _scheduler; }
- }
- public DateTimeOffset DueTime
- {
- get { return _dueTime; }
- }
- public void Invoke(IScheduler scheduler)
- {
- //
- // Protect against possible maltreatment of the scheduler queues or races in
- // execution of a work item that got relocated across system clock changes.
- // Under no circumstance whatsoever we should run work twice. The monitor's
- // ref count should also be subject to this policy.
- //
- if (Interlocked.Exchange(ref _hasRun, 1) == 0)
- {
- try
- {
- if (!_disposable.IsDisposed)
- _disposable.Disposable = InvokeCore(scheduler);
- }
- finally
- {
- SystemClock.Release();
- }
- }
- }
- protected abstract IDisposable InvokeCore(IScheduler scheduler);
- public int CompareTo(WorkItem/*!*/ other)
- {
- return Comparer<DateTimeOffset>.Default.Compare(this._dueTime, other._dueTime);
- }
- public void Dispose()
- {
- _disposable.Dispose();
- }
- }
- /// <summary>
- /// Represents a work item that closes over scheduler invocation state. Subtyping is
- /// used to have a common type for the scheduler queues.
- /// </summary>
- sealed class WorkItem<TState> : WorkItem
- {
- private readonly TState _state;
- private readonly Func<IScheduler, TState, IDisposable> _action;
- public WorkItem(LocalScheduler scheduler, TState state, DateTimeOffset dueTime, Func<IScheduler, TState, IDisposable> action)
- : base(scheduler, dueTime)
- {
- _state = state;
- _action = action;
- }
- protected override IDisposable InvokeCore(IScheduler scheduler)
- {
- return _action(scheduler, _state);
- }
- }
- }
- }
|