Browse Source

CurrentThreadScheduler fast path Schedule (#599)

David Karnok 7 năm trước cách đây
mục cha
commit
a18e4da071

+ 54 - 14
Rx.NET/Source/src/System.Reactive/Concurrency/CurrentThreadScheduler.cs

@@ -31,6 +31,9 @@ namespace System.Reactive.Concurrency
         [ThreadStatic]
         private static IStopwatch s_clock;
 
+        [ThreadStatic]
+        private static bool running;
+
         private static SchedulerQueue<TimeSpan> GetQueue() => s_threadLocalQueue;
 
         private static void SetQueue(SchedulerQueue<TimeSpan> newQueue)
@@ -61,7 +64,7 @@ namespace System.Reactive.Concurrency
         /// Gets a value that indicates whether the caller must call a Schedule method.
         /// </summary>
         [EditorBrowsable(EditorBrowsableState.Advanced)]
-        public static bool IsScheduleRequired => GetQueue() == null;
+        public static bool IsScheduleRequired => !running;
 
         /// <summary>
         /// Schedules an action to be executed after dueTime.
@@ -77,32 +80,69 @@ namespace System.Reactive.Concurrency
             if (action == null)
                 throw new ArgumentNullException(nameof(action));
 
-            var dt = Time + Scheduler.Normalize(dueTime);
-
-            var si = new ScheduledItem<TimeSpan, TState>(this, state, action, dt);
-
-            var queue = GetQueue();
+            var queue = default(SchedulerQueue<TimeSpan>);
 
-            if (queue == null)
+            // There is no timed task and no task is currently running
+            if (!running)
             {
-                queue = new SchedulerQueue<TimeSpan>(4);
-                queue.Enqueue(si);
+                running = true;
 
-                SetQueue(queue);
+                if (dueTime > TimeSpan.Zero)
+                {
+                    ConcurrencyAbstractionLayer.Current.Sleep(dueTime);
+                }
+
+                // execute directly without queueing
+                IDisposable d;
                 try
                 {
-                    Trampoline.Run(queue);
+                    d = action(this, state);
                 }
-                finally
+                catch
                 {
                     SetQueue(null);
+                    running = false;
+                    throw;
                 }
+
+                // did recursive tasks arrive?
+                queue = GetQueue();
+
+                // yes, run those in the queue as well
+                if (queue != null)
+                {
+                    try
+                    {
+                        Trampoline.Run(queue);
+                    }
+                    finally
+                    {
+                        SetQueue(null);
+                        running = false;
+                    }
+                }
+                else
+                {
+                    running = false;
+                }
+
+                return d;
             }
-            else
+
+            queue = GetQueue();
+
+            // if there is a task running or there is a queue
+            if (queue == null)
             {
-                queue.Enqueue(si);
+                queue = new SchedulerQueue<TimeSpan>(4);
+                SetQueue(queue);
             }
 
+            var dt = Time + Scheduler.Normalize(dueTime);
+
+            // queue up more work
+            var si = new ScheduledItem<TimeSpan, TState>(this, state, action, dt);
+            queue.Enqueue(si);
             return si;
         }