浏览代码

Fix wrong usage of ISchedulerLongRunning in ObserveOn

akarnokd 6 年之前
父节点
当前提交
a678705260

+ 20 - 0
Rx.NET/Source/src/System.Reactive/Concurrency/Synchronization.ObserveOn.cs

@@ -28,6 +28,26 @@ namespace System.Reactive.Concurrency
             protected override void Run(ObserveOnObserverNew<TSource> sink) => sink.Run(_source);
         }
 
+        /// <summary>
+        /// The new ObserveOn operator run with an ISchedulerLongRunning in a mostly lock-free manner.
+        /// </summary>
+        internal sealed class SchedulerLongRunning : Producer<TSource, ObserveOnObserverLongRunning<TSource>>
+        {
+            private readonly IObservable<TSource> _source;
+            private readonly ISchedulerLongRunning _scheduler;
+
+            public SchedulerLongRunning(IObservable<TSource> source, ISchedulerLongRunning scheduler)
+            {
+                _source = source;
+                _scheduler = scheduler;
+            }
+
+            protected override ObserveOnObserverLongRunning<TSource> CreateSink(IObserver<TSource> observer) => new ObserveOnObserverLongRunning<TSource>(_scheduler, observer);
+
+            [Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1062:Validate arguments of public methods", MessageId = "2", Justification = "Visibility restricted to friend assemblies. Those should be correct by inspection.")]
+            protected override void Run(ObserveOnObserverLongRunning<TSource> sink) => sink.Run(_source);
+        }
+
         internal sealed class Context : Producer<TSource, Context._>
         {
             private readonly IObservable<TSource> _source;

+ 5 - 0
Rx.NET/Source/src/System.Reactive/Concurrency/Synchronization.cs

@@ -181,6 +181,11 @@ namespace System.Reactive.Concurrency
                 throw new ArgumentNullException(nameof(scheduler));
             }
 
+            var longRunning = scheduler.AsLongRunning();
+            if (longRunning != null)
+            {
+                return new ObserveOn<TSource>.SchedulerLongRunning(source, longRunning);
+            }
             return new ObserveOn<TSource>.Scheduler(source, scheduler);
         }
 

+ 179 - 66
Rx.NET/Source/src/System.Reactive/Internal/ScheduledObserver.cs

@@ -381,11 +381,6 @@ namespace System.Reactive
     {
         private readonly IScheduler _scheduler;
 
-        /// <summary>
-        /// If not null, the <see cref="_scheduler"/> supports
-        /// long running tasks.
-        /// </summary>
-        private readonly ISchedulerLongRunning _longRunning;
         private readonly ConcurrentQueue<T> _queue;
 
         /// <summary>
@@ -418,7 +413,6 @@ namespace System.Reactive
         public ObserveOnObserverNew(IScheduler scheduler, IObserver<T> downstream) : base(downstream)
         {
             _scheduler = scheduler;
-            _longRunning = scheduler.AsLongRunning();
             _queue = new ConcurrentQueue<T>();
         }
 
@@ -480,15 +474,7 @@ namespace System.Reactive
 
                 if (Disposable.TrySetMultiple(ref _task, newTask))
                 {
-                    var longRunning = _longRunning;
-                    if (longRunning != null)
-                    {
-                        newTask.Disposable = longRunning.ScheduleLongRunning(this, DrainLongRunningAction);
-                    }
-                    else
-                    {
-                        newTask.Disposable = _scheduler.Schedule(this, DrainShortRunningFunc);
-                    }
+                    newTask.Disposable = _scheduler.Schedule(this, DrainShortRunningFunc);
                 }
 
                 // If there was a cancellation, clear the queue
@@ -502,14 +488,6 @@ namespace System.Reactive
             }
         }
 
-        /// <summary>
-        /// The static action to be scheduled on a long running scheduler.
-        /// Avoids creating a delegate that captures <code>this</code>
-        /// whenever the signals have to be drained.
-        /// </summary>
-        private static readonly Action<ObserveOnObserverNew<T>, ICancelable> DrainLongRunningAction =
-            (self, cancel) => self.DrainLongRunning();
-
         /// <summary>
         /// The static action to be scheduled on a simple scheduler.
         /// Avoids creating a delegate that captures <code>this</code>
@@ -526,7 +504,7 @@ namespace System.Reactive
         /// <returns>The IDisposable of the recursively scheduled task or an empty disposable.</returns>
         private IDisposable DrainShortRunning(IScheduler recursiveScheduler)
         {
-            DrainStep(_queue, false);
+            DrainStep(_queue);
 
             if (Interlocked.Decrement(ref _wip) != 0)
             {
@@ -550,23 +528,20 @@ namespace System.Reactive
         /// In addition, the DrainStep is invoked from the DrainLongRunning's loop
         /// so reading _queue inside this method would still incur the same barrier
         /// overhead otherwise.</param>
-        /// <param name="delayError">Should the errors be delayed until all
-        /// queued items have been emitted to the downstream?</param>
-        /// <returns>True if the drain loop should stop.</returns>
-        private bool DrainStep(ConcurrentQueue<T> q, bool delayError)
+        private void DrainStep(ConcurrentQueue<T> q)
         {
             // Check if the operator has been disposed
             if (Volatile.Read(ref _disposed))
             {
                 // cleanup residue items in the queue
                 Clear(q);
-                return true;
+                return;
             }
 
             // Has the upstream call OnCompleted?
             var d = Volatile.Read(ref _done);
 
-            if (d && !delayError)
+            if (d)
             {
                 // done = true happens before setting error
                 // this is safe to be a plain read
@@ -576,7 +551,7 @@ namespace System.Reactive
                 {
                     Volatile.Write(ref _disposed, true);
                     ForwardOnError(ex);
-                    return true;
+                    return;
                 }
             }
 
@@ -588,65 +563,203 @@ namespace System.Reactive
             if (d && empty)
             {
                 Volatile.Write(ref _disposed, true);
-                // done = true happens before setting error
-                // this is safe to be a plain read
-                var ex = _error;
-                // if not null, there was an OnError call
-                if (ex != null)
-                {
-                    ForwardOnError(ex);
-                }
-                else
-                {
-                    // otherwise, complete normally
-                    ForwardOnCompleted();
-                }
-                return true;
+                // otherwise, complete normally
+                ForwardOnCompleted();
+                return;
             }
             
             // the queue is empty and the upstream hasn't completed yet
             if (empty)
             {
-                return true;
+                return;
             }
             // emit the item
             ForwardOnNext(v);
+        }
+    }
+
+    /// <summary>
+    /// Signals events on a ISchedulerLongRunning by blocking the emission thread while waiting
+    /// for them from the upstream.
+    /// </summary>
+    /// <typeparam name="TSource">The element type of the sequence.</typeparam>
+    internal sealed class ObserveOnObserverLongRunning<TSource> : IdentitySink<TSource>
+    {
+        /// <summary>
+        /// This will run a suspending drain task, hogging the backing thread
+        /// until the sequence terminates or gets disposed.
+        /// </summary>
+        private readonly ISchedulerLongRunning _scheduler;
+
+        /// <summary>
+        /// The queue for holding the OnNext items, terminal signals have their own fields.
+        /// </summary>
+        private readonly ConcurrentQueue<TSource> _queue;
+
+        /// <summary>
+        /// Protects the suspension and resumption of the long running drain task.
+        /// </summary>
+        private readonly object _suspendGuard;
+
+        /// <summary>
+        /// The work-in-progress counter. If it jumps from 0 to 1, the drain task is resumed,
+        /// if it reaches 0 again, the drain task is suspended.
+        /// </summary>
+        private long _wip;
+
+        /// <summary>
+        /// Set to true if the upstream terminated.
+        /// </summary>
+        private bool _done;
+
+        /// <summary>
+        /// Set to a non-null Exception if the upstream terminated with OnError.
+        /// </summary>
+        private Exception _error;
+
+        /// <summary>
+        /// Indicates the sequence has been disposed and the drain task should quit.
+        /// </summary>
+        private bool _disposed;
+
+        /// <summary>
+        /// Makes sure the drain task is scheduled only once, when the first signal
+        /// from upstream arrives.
+        /// </summary>
+        private int _runDrainOnce;
+
+        /// <summary>
+        /// The disposable tracking the drain task.
+        /// </summary>
+        private IDisposable _drainTask;
 
-            // keep looping
-            return false;
+        public ObserveOnObserverLongRunning(ISchedulerLongRunning scheduler, IObserver<TSource> observer) : base(observer)
+        {
+            _scheduler = scheduler;
+            _queue = new ConcurrentQueue<TSource>();
+            _suspendGuard = new object();
+        }
+
+        public override void OnCompleted()
+        {
+            Volatile.Write(ref _done, true);
+            Schedule();
+        }
+
+        public override void OnError(Exception error)
+        {
+            _error = error;
+            Volatile.Write(ref _done, true);
+            Schedule();
+        }
+
+        public override void OnNext(TSource value)
+        {
+            _queue.Enqueue(value);
+            Schedule();
+        }
+
+        private void Schedule()
+        {
+            // Schedule the suspending drain once
+            if (Volatile.Read(ref _runDrainOnce) == 0
+                && Interlocked.CompareExchange(ref _runDrainOnce, 1, 0) == 0)
+            {
+                Disposable.SetSingle(ref _drainTask, _scheduler.ScheduleLongRunning(this, DrainLongRunning));
+            }
+
+            // Indicate more work is to be done by the drain loop
+            if (Interlocked.Increment(ref _wip) == 1L)
+            {
+                // resume the drain loop waiting on the guard
+                lock (_suspendGuard)
+                {
+                    Monitor.Pulse(_suspendGuard);
+                }
+            }
         }
 
         /// <summary>
-        /// Emits as many signals as possible to the downstream observer
-        /// as this is executing a long-running scheduler so
-        /// it can occupy that thread as long as it needs to.
+        /// Static reference to the Drain method, saves allocation.
         /// </summary>
-        private void DrainLongRunning()
+        private static readonly Action<ObserveOnObserverLongRunning<TSource>, ICancelable> DrainLongRunning = (self, cancelable) => self.Drain();
+
+        protected override void Dispose(bool disposing)
         {
-            var missed = 1;
+            // Indicate the drain task should quit
+            Volatile.Write(ref _disposed, true);
+            // Resume the drain task in case it sleeps
+            lock (_suspendGuard)
+            {
+                Monitor.Pulse(_suspendGuard);
+            }
+            // Cancel the drain task handle.
+            Disposable.TryDispose(ref _drainTask);
+            base.Dispose(disposing);
+        }
 
-            // read out fields upfront as the DrainStep uses atomics
-            // that would force the re-read of these constant values
-            // from memory, regardless of readonly, afaik
+        private void Drain()
+        {
             var q = _queue;
-
             for (; ; )
             {
-                for (; ; )
+                // If the sequence was disposed, clear the queue and quit
+                if (Volatile.Read(ref _disposed))
+                {
+                    while (q.TryDequeue(out var _)) ;
+                    break;
+                }
+
+                // Has the upstream terminated?
+                var isDone = Volatile.Read(ref _done);
+                // Do we have an item in the queue
+                var hasValue = q.TryDequeue(out var item);
+
+                // If the upstream has terminated and no further items are in the queue
+                if (isDone && !hasValue)
                 {
-                    // delayError: true - because of 
-                    //      ObserveOn_LongRunning_HoldUpDuringDispatchAndFail
-                    // expects something that almost looks like full delayError
-                    if (DrainStep(q, true))
+                    // Find out if the upstream terminated with an error and signal accordingly.
+                    var e = _error;
+                    if (e != null)
+                    {
+                        ForwardOnError(e);
+                    }
+                    else
                     {
-                        break;
+                        ForwardOnCompleted();
                     }
+                    break;
                 }
 
-                missed = Interlocked.Add(ref _wip, -missed);
-                if (missed == 0)
+                // There was an item, signal it.
+                if (hasValue)
                 {
-                    break;
+                    ForwardOnNext(item);
+                    // Consume the item and try the next item if the work-in-progress
+                    // indicator is still not zero
+                    if (Interlocked.Decrement(ref _wip) != 0L)
+                    {
+                        continue;
+                    }
+                }
+
+                // If we run out of work and the sequence is not disposed
+                if (Volatile.Read(ref _wip) == 0L && !Volatile.Read(ref _disposed))
+                {
+                    var g = _suspendGuard;
+                    // try sleeping, if we can't even enter the lock, the producer
+                    // side is currently trying to resume us
+                    if (Monitor.TryEnter(g))
+                    {
+                        // Make sure again there is still no work and the sequence is not disposed
+                        if (Volatile.Read(ref _wip) == 0L && !Volatile.Read(ref _disposed))
+                        { 
+                            // wait for a Pulse(g)
+                            Monitor.Wait(g);
+                        }
+                        // Unlock
+                        Monitor.Exit(g);
+                    }
                 }
             }
         }

+ 48 - 0
Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/Observable/ObserveOnTest.cs

@@ -658,6 +658,54 @@ namespace ReactiveTests.Tests
 
             Assert.True(cde.Wait(5000), "Timeout!");
         }
+
+        [Fact]
+        public void ObserveOn_LongRunning_SameThread()
+        {
+            var scheduler = TaskPoolScheduler.Default;
+
+            Assert.NotNull(scheduler.AsLongRunning());
+
+            var N = 1_000_000;
+            var threads = new HashSet<long>();
+            var cde = new CountdownEvent(1);
+
+            Observable.Range(1, N)
+                .ObserveOn(scheduler)
+                .Subscribe(
+                    v => threads.Add(Thread.CurrentThread.ManagedThreadId), 
+                    e => cde.Signal(), 
+                    () => cde.Signal()
+                );
+
+            Assert.True(cde.Wait(5000), "Timeout!");
+
+            Assert.Equal(1, threads.Count);
+        }
+
+        [Fact]
+        public void ObserveOn_LongRunning_DisableOptimizations()
+        {
+            var scheduler = TaskPoolScheduler.Default.DisableOptimizations();
+
+            Assert.Null(scheduler.AsLongRunning());
+
+            var N = 1_000_000;
+            var threads = new HashSet<long>();
+            var cde = new CountdownEvent(1);
+
+            Observable.Range(1, N)
+                .ObserveOn(scheduler)
+                .Subscribe(
+                    v => threads.Add(Thread.CurrentThread.ManagedThreadId),
+                    e => cde.Signal(),
+                    () => cde.Signal()
+                );
+
+            Assert.True(cde.Wait(5000), "Timeout!");
+
+            Assert.True(threads.Count >= 1);
+        }
     }
 
     internal class MyCtx : SynchronizationContext