Jelajahi Sumber

Candidate fix for LocalScheduler work stealing issue.

Bart De Smet 10 tahun lalu
induk
melakukan
e0ca794fdc

+ 62 - 52
Rx.NET/Source/System.Reactive.Core/Reactive/Concurrency/LocalScheduler.TimerQueue.cs

@@ -9,6 +9,11 @@ namespace System.Reactive.Concurrency
 {
     public partial class LocalScheduler
     {
+        /// <summary>
+        /// Gate to protect local scheduler queues.
+        /// </summary>
+        private static readonly object _gate = new object();
+
         /// <summary>
         /// Gate to protect queues and to synchronize scheduling decisions and system clock
         /// change management.
@@ -42,16 +47,16 @@ namespace System.Reactive.Concurrency
         /// items. This queue is kept in order to be able to relocate short term items back
         /// to the long term queue in case a system clock change occurs.
         /// </summary>
-        private static readonly PriorityQueue<WorkItem/*!*/> s_shortTerm = new PriorityQueue<WorkItem/*!*/>();
+        private readonly PriorityQueue<WorkItem/*!*/> _shortTerm = new PriorityQueue<WorkItem/*!*/>();
 
         /// <summary>
         /// Set of disposable handles to all of the current short term work Schedule calls,
         /// allowing those to be cancelled upon a system clock change.
         /// </summary>
 #if !NO_HASHSET
-        private static readonly HashSet<IDisposable> s_shortTermWork = new HashSet<IDisposable>();
+        private readonly HashSet<IDisposable> _shortTermWork = new HashSet<IDisposable>();
 #else
-        private static readonly Dictionary<IDisposable, object> s_shortTermWork = new Dictionary<IDisposable, object>();
+        private readonly Dictionary<IDisposable, object> _shortTermWork = new Dictionary<IDisposable, object>();
 #endif
 
         /// <summary>
@@ -98,8 +103,11 @@ namespace System.Reactive.Concurrency
         /// </summary>
         private static readonly TimeSpan MAXSUPPORTEDTIMER = TimeSpan.FromMilliseconds((1L << 32) - 2);
 
+        /// <summary>
+        /// Creates a new local scheduler.
+        /// </summary>
         [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Performance", "CA1810:InitializeReferenceTypeStaticFieldsInline", Justification = "We can't really lift this into a field initializer, and would end up checking for an initialization flag in every static method anyway (which is roughly what the JIT does in a thread-safe manner).")]
-        static LocalScheduler()
+        protected LocalScheduler()
         {
             //
             // Hook up for system clock change notifications. This doesn't do anything until the
@@ -111,12 +119,11 @@ namespace System.Reactive.Concurrency
         /// <summary>
         /// Enqueues absolute time scheduled work in the timer queue or the short term work list.
         /// </summary>
-        /// <param name="scheduler">Scheduler to run the work on. Typically "this" from the caller's perspective (LocalScheduler.Schedule), but parameter kept because we have a single (static) timer queue across all of Rx local schedulers.</param>
         /// <param name="state">State to pass to the action.</param>
         /// <param name="dueTime">Absolute time to run the work on. The timer queue is responsible to execute the work close to the specified time, also accounting for system clock changes.</param>
         /// <param name="action">Action to run, potentially recursing into the scheduler.</param>
         /// <returns>Disposable object to prevent the work from running.</returns>
-        private static IDisposable Enqueue<TState>(IScheduler scheduler, TState state, DateTimeOffset dueTime, Func<IScheduler, TState, IDisposable> action)
+        private IDisposable Enqueue<TState>(TState state, DateTimeOffset dueTime, Func<IScheduler, TState, IDisposable> action)
         {
             //
             // Work that's due in the past is sent to the underlying scheduler through the Schedule
@@ -129,10 +136,10 @@ namespace System.Reactive.Concurrency
             // - Optimize for the default behavior of LocalScheduler where a virtual call to Schedule
             //   for immediate execution calls into the abstract Schedule method with TimeSpan.Zero.
             //
-            var due = Scheduler.Normalize(dueTime - scheduler.Now);
+            var due = Scheduler.Normalize(dueTime - Now);
             if (due == TimeSpan.Zero)
             {
-                return scheduler.Schedule<TState>(state, TimeSpan.Zero, action);
+                return Schedule<TState>(state, TimeSpan.Zero, action);
             }
 
             //
@@ -146,7 +153,7 @@ namespace System.Reactive.Concurrency
             //
             SystemClock.AddRef();
 
-            var workItem = new WorkItem<TState>(scheduler, state, dueTime, action);
+            var workItem = new WorkItem<TState>(this, state, dueTime, action);
 
             if (due <= SHORTTERM)
             {
@@ -166,26 +173,26 @@ namespace System.Reactive.Concurrency
         /// the short term work is attempted to be cancelled and reevaluated.
         /// </summary>
         /// <param name="item">Work item to schedule in the short term. The caller is responsible to determine the work is indeed short term.</param>
-        private static void ScheduleShortTermWork(WorkItem/*!*/ item)
+        private void ScheduleShortTermWork(WorkItem/*!*/ item)
         {
-            lock (s_gate)
+            lock (_gate)
             {
-                s_shortTerm.Enqueue(item);
+                _shortTerm.Enqueue(item);
 
                 //
                 // We don't bother trying to dequeue the item or stop the timer upon cancellation,
                 // but always let the timer fire to do the queue maintenance. When the item is
                 // cancelled, it won't run (see WorkItem.Invoke). In the event of a system clock
-                // change, all outstanding work in s_shortTermWork is cancelled and the short
+                // change, all outstanding work in _shortTermWork is cancelled and the short
                 // term queue is reevaluated, potentially prompting rescheduling of short term
                 // work. Notice work is protected against double execution by the implementation
                 // of WorkItem.Invoke.
                 //
                 var d = new SingleAssignmentDisposable();
 #if !NO_HASHSET
-                s_shortTermWork.Add(d);
+                _shortTermWork.Add(d);
 #else
-                s_shortTermWork.Add(d, null);
+                _shortTermWork.Add(d, null);
 #endif
 
                 //
@@ -204,11 +211,11 @@ namespace System.Reactive.Concurrency
         /// <param name="scheduler">Recursive scheduler supplied by the underlying scheduler.</param>
         /// <param name="cancel">Disposable used to identify the work the timer was triggered for (see code for usage).</param>
         /// <returns>Empty disposable. Recursive work cancellation is wired through the original WorkItem.</returns>
-        private static IDisposable ExecuteNextShortTermWorkItem(IScheduler scheduler, IDisposable cancel)
+        private IDisposable ExecuteNextShortTermWorkItem(IScheduler scheduler, IDisposable cancel)
         {
             var next = default(WorkItem);
 
-            lock (s_gate)
+            lock (_gate)
             {
                 //
                 // Notice that even though we try to cancel all work in the short term queue upon a
@@ -223,9 +230,9 @@ namespace System.Reactive.Concurrency
                 // call to Dispose versus the underlying timer. It's also possible the underlying
                 // scheduler does a bad job at cancellation, so this measure helps for that too.
                 //
-                if (s_shortTermWork.Remove(cancel) && s_shortTerm.Count > 0)
+                if (_shortTermWork.Remove(cancel) && _shortTerm.Count > 0)
                 {
-                    next = s_shortTerm.Dequeue();
+                    next = _shortTerm.Dequeue();
                 }
             }
 
@@ -352,7 +359,7 @@ namespace System.Reactive.Concurrency
                         break;
 
                     var item = s_longTerm.Dequeue();
-                    ScheduleShortTermWork(item);
+                    item.Scheduler.ScheduleShortTermWork(item);
                 }
 
                 s_nextLongTermWorkItem = null;
@@ -366,42 +373,45 @@ namespace System.Reactive.Concurrency
         /// </summary>
         /// <param name="args">Currently not used.</param>
         /// <param name="sender">Currently not used.</param>
-        private static void SystemClockChanged(object sender, SystemClockChangedEventArgs args)
+        private void SystemClockChanged(object sender, SystemClockChangedEventArgs args)
         {
-            lock (s_gate)
+            lock (_gate)
             {
-                //
-                // Best-effort cancellation of short term work. A check for presence in the hash set
-                // is used to notice race conditions between cancellation and the timer firing (also
-                // guarded by the same gate object). See checks in ExecuteNextShortTermWorkItem.
-                //
+                lock (s_gate)
+                {
+                    //
+                    // Best-effort cancellation of short term work. A check for presence in the hash set
+                    // is used to notice race conditions between cancellation and the timer firing (also
+                    // guarded by the same gate object). See checks in ExecuteNextShortTermWorkItem.
+                    //
 #if !NO_HASHSET
-                foreach (var d in s_shortTermWork)
+                    foreach (var d in _shortTermWork)
 #else
-                foreach (var d in s_shortTermWork.Keys)
+                    foreach (var d in _shortTermWork.Keys)
 #endif
-                    d.Dispose();
+                        d.Dispose();
 
-                s_shortTermWork.Clear();
+                    _shortTermWork.Clear();
 
-                //
-                // Transition short term work to the long term queue for reevaluation by calling the
-                // EvaluateLongTermQueue method. We don't know which direction the clock was changed
-                // in, so we don't optimize for special cases, but always transition the whole queue.
-                // Notice the short term queue is bounded to SHORTTERM length.
-                //
-                while (s_shortTerm.Count > 0)
-                {
-                    var next = s_shortTerm.Dequeue();
-                    s_longTerm.Enqueue(next);
-                }
+                    //
+                    // Transition short term work to the long term queue for reevaluation by calling the
+                    // EvaluateLongTermQueue method. We don't know which direction the clock was changed
+                    // in, so we don't optimize for special cases, but always transition the whole queue.
+                    // Notice the short term queue is bounded to SHORTTERM length.
+                    //
+                    while (_shortTerm.Count > 0)
+                    {
+                        var next = _shortTerm.Dequeue();
+                        s_longTerm.Enqueue(next);
+                    }
 
-                //
-                // Reevaluate the queue and don't forget to null out the current timer to force the
-                // method to create a new timer for the new first long term item.
-                //
-                s_nextLongTermWorkItem = null;
-                EvaluateLongTermQueue(null);
+                    //
+                    // Reevaluate the queue and don't forget to null out the current timer to force the
+                    // method to create a new timer for the new first long term item.
+                    //
+                    s_nextLongTermWorkItem = null;
+                    EvaluateLongTermQueue(null);
+                }
             }
         }
 
@@ -414,12 +424,12 @@ namespace System.Reactive.Concurrency
         /// </remarks>
         abstract class WorkItem : IComparable<WorkItem>, IDisposable
         {
-            private readonly IScheduler _scheduler;
+            private readonly LocalScheduler _scheduler;
             private readonly DateTimeOffset _dueTime;
             private readonly SingleAssignmentDisposable _disposable;
             private int _hasRun;
 
-            public WorkItem(IScheduler scheduler, DateTimeOffset dueTime)
+            public WorkItem(LocalScheduler scheduler, DateTimeOffset dueTime)
             {
                 _scheduler = scheduler;
                 _dueTime = dueTime;
@@ -427,7 +437,7 @@ namespace System.Reactive.Concurrency
                 _hasRun = 0;
             }
 
-            public IScheduler Scheduler
+            public LocalScheduler Scheduler
             {
                 get { return _scheduler; }
             }
@@ -481,7 +491,7 @@ namespace System.Reactive.Concurrency
             private readonly TState _state;
             private readonly Func<IScheduler, TState, IDisposable> _action;
 
-            public WorkItem(IScheduler scheduler, TState state, DateTimeOffset dueTime, Func<IScheduler, TState, IDisposable> action)
+            public WorkItem(LocalScheduler scheduler, TState state, DateTimeOffset dueTime, Func<IScheduler, TState, IDisposable> action)
                 : base(scheduler, dueTime)
             {
                 _state = state;

+ 1 - 1
Rx.NET/Source/System.Reactive.Core/Reactive/Concurrency/LocalScheduler.cs

@@ -57,7 +57,7 @@ namespace System.Reactive.Concurrency
             if (action == null)
                 throw new ArgumentNullException("action");
 
-            return Enqueue(this, state, dueTime, action);
+            return Enqueue(state, dueTime, action);
         }
 
         /// <summary>