소스 검색

4.x: Replace ObserveOn(IScheduler) with a lock-free algorithm (#508)

David Karnok 7 년 전
부모
커밋
526e132186

+ 20 - 0
Rx.NET/Source/src/System.Reactive/Concurrency/Synchronization.ObserveOn.cs

@@ -26,6 +26,26 @@ namespace System.Reactive.Concurrency
             protected override IDisposable Run(ObserveOnObserver<TSource> sink) => _source.SubscribeSafe(sink);
         }
 
+        /// <summary>
+        /// The new ObserveOn operator run with an IScheduler in a lock-free manner.
+        /// </summary>
+        internal sealed class SchedulerNew : Producer<TSource, ObserveOnObserverNew<TSource>>
+        {
+            private readonly IObservable<TSource> _source;
+            private readonly IScheduler _scheduler;
+
+            public SchedulerNew(IObservable<TSource> source, IScheduler scheduler)
+            {
+                _source = source;
+                _scheduler = scheduler;
+            }
+
+            protected override ObserveOnObserverNew<TSource> CreateSink(IObserver<TSource> observer, IDisposable cancel) => new ObserveOnObserverNew<TSource>(_scheduler, observer, cancel);
+
+            [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1062:Validate arguments of public methods", MessageId = "2", Justification = "Visibility restricted to friend assemblies. Those should be correct by inspection.")]
+            protected override IDisposable Run(ObserveOnObserverNew<TSource> sink) => _source.SubscribeSafe(sink);
+        }
+
         internal sealed class Context : Producer<TSource, Context._>
         {
             private readonly IObservable<TSource> _source;

+ 1 - 1
Rx.NET/Source/src/System.Reactive/Concurrency/Synchronization.cs

@@ -102,7 +102,7 @@ namespace System.Reactive.Concurrency
             if (scheduler == null)
                 throw new ArgumentNullException(nameof(scheduler));
 
-            return new ObserveOn<TSource>.Scheduler(source, scheduler);
+            return new ObserveOn<TSource>.SchedulerNew(source, scheduler);
         }
 
         /// <summary>

+ 278 - 0
Rx.NET/Source/src/System.Reactive/Internal/ScheduledObserver.cs

@@ -338,4 +338,282 @@ namespace System.Reactive
         void EnsureActive();
         void EnsureActive(int count);
     }
+
+    /// <summary>
+    /// An ObserveOn operator implementation that uses lock-free
+    /// techniques to signal events to the downstream.
+    /// </summary>
+    /// <typeparam name="T">The element type of the sequence.</typeparam>
+    internal sealed class ObserveOnObserverNew<T> : IObserver<T>, IDisposable
+    {
+        readonly IObserver<T> downstream;
+
+        readonly IScheduler scheduler;
+
+        /// <summary>
+        /// If not null, the <see cref="scheduler"/> supports
+        /// long running tasks.
+        /// </summary>
+        readonly ISchedulerLongRunning longRunning;
+
+        readonly ConcurrentQueue<T> queue;
+
+        /// <summary>
+        /// The disposable of the upstream source.
+        /// </summary>
+        IDisposable upstream;
+
+        /// <summary>
+        /// The current task representing a running drain operation.
+        /// </summary>
+        IDisposable task;
+
+        /// <summary>
+        /// Indicates the work-in-progress state of this operator,
+        /// zero means no work is currently being done.
+        /// </summary>
+        int wip;
+
+        /// <summary>
+        /// If true, the upstream has issued OnCompleted.
+        /// </summary>
+        bool done;
+        /// <summary>
+        /// If <see cref="done"/> is true and this is non-null, the upstream
+        /// failed with an OnError.
+        /// </summary>
+        Exception error;
+
+        /// <summary>
+        /// Indicates a dispose has been requested.
+        /// </summary>
+        bool disposed;
+
+        public ObserveOnObserverNew(IScheduler scheduler, IObserver<T> downstream, IDisposable upstream)
+        {
+            this.downstream = downstream;
+            this.scheduler = scheduler;
+            this.longRunning = scheduler.AsLongRunning();
+            this.queue = new ConcurrentQueue<T>();
+            Volatile.Write(ref this.upstream, upstream);
+        }
+
+        public void Dispose()
+        {
+            Volatile.Write(ref disposed, true);
+            Interlocked.Exchange(ref upstream, BooleanDisposable.True)?.Dispose();
+            Interlocked.Exchange(ref task, BooleanDisposable.True)?.Dispose();
+            Clear();
+        }
+
+        /// <summary>
+        /// Remove remaining elements from the queue upon
+        /// cancellation or failure.
+        /// </summary>
+        void Clear()
+        {
+            var q = queue;
+            while (q.TryDequeue(out var _)) ;
+        }
+
+        public void OnCompleted()
+        {
+            Volatile.Write(ref done, true);
+            Schedule();
+        }
+
+        public void OnError(Exception error)
+        {
+            this.error = error;
+            Volatile.Write(ref done, true);
+            Schedule();
+        }
+
+        public void OnNext(T value)
+        {
+            queue.Enqueue(value);
+            Schedule();
+        }
+
+        /// <summary>
+        /// Submit the drain task via the appropriate scheduler if
+        /// there is no drain currently running (wip > 0).
+        /// </summary>
+        void Schedule()
+        {
+            if (Interlocked.Increment(ref wip) == 1)
+            {
+                var oldTask = Volatile.Read(ref task);
+
+                var newTask = new SingleAssignmentDisposable();
+
+                if (oldTask != BooleanDisposable.True
+                    && Interlocked.CompareExchange(ref task, newTask, oldTask) == oldTask)
+                {
+
+                    var longRunning = this.longRunning;
+                    if (longRunning != null)
+                    {
+                        newTask.Disposable = longRunning.ScheduleLongRunning(this, DRAIN_LONG_RUNNING);
+                    }
+                    else
+                    {
+                        newTask.Disposable = scheduler.Schedule(this, DRAIN_SHORT_RUNNING);
+                    }
+                }
+
+                // If there was a cancellation, clear the queue
+                // of items. This doesn't have to be inside the
+                // wip != 0 (exclusive) mode as the queue
+                // is of a multi-consumer type.
+                if (Volatile.Read(ref disposed))
+                {
+                    Clear();
+                }
+            }
+        }
+
+        /// <summary>
+        /// The static action to be scheduled on a long running scheduler.
+        /// Avoids creating a delegate that captures <code>this</code>
+        /// whenever the signals have to be drained.
+        /// </summary>
+        static readonly Action<ObserveOnObserverNew<T>, ICancelable> DRAIN_LONG_RUNNING =
+            (self, cancel) => self.DrainLongRunning();
+
+        /// <summary>
+        /// The static action to be scheduled on a simple scheduler.
+        /// Avoids creating a delegate that captures <code>this</code>
+        /// whenever the signals have to be drained.
+        /// </summary>
+        static readonly Func<IScheduler, ObserveOnObserverNew<T>, IDisposable> DRAIN_SHORT_RUNNING =
+            (scheduler, self) => self.DrainShortRunning(scheduler);
+
+        /// <summary>
+        /// Emits at most one signal per run on a scheduler that doesn't like
+        /// long running tasks.
+        /// </summary>
+        /// <param name="recursiveScheduler">The scheduler to use for scheduling the next signal emission if necessary.</param>
+        /// <returns>The IDisposable of the recursively scheduled task or an empty disposable.</returns>
+        IDisposable DrainShortRunning(IScheduler recursiveScheduler)
+        {
+            DrainStep(queue, downstream, false);
+
+            if (Interlocked.Decrement(ref wip) != 0)
+            {
+                return recursiveScheduler.Schedule(this, DRAIN_SHORT_RUNNING);
+            }
+            return Disposable.Empty;
+        }
+
+        /// <summary>
+        /// Executes a drain step by checking the disposed state,
+        /// checking for the terminated state and for an
+        /// empty queue, issuing the approrpiate signals to the
+        /// given downstream.
+        /// </summary>
+        /// <param name="q">The queue to use.</param>
+        /// <param name="downstream">The intended consumer of the events.</param>
+        /// <param name="delayError">Should the errors be delayed until all
+        /// queued items have been emitted to the downstream?</param>
+        /// <returns>True if the drain loop should stop.</returns>
+        bool DrainStep(ConcurrentQueue<T> q, IObserver<T> downstream, bool delayError)
+        {
+            // Check if the operator has been disposed
+            if (Volatile.Read(ref disposed))
+            {
+                // cleanup residue items in the queue
+                Clear();
+                return true;
+            }
+
+            // Has the upstream call OnCompleted?
+            var d = Volatile.Read(ref done);
+
+            if (d && !delayError)
+            {
+                // done = true happens before setting error
+                // this is safe to be a plain read
+                var ex = error;
+                // if not null, there was an OnError call
+                if (ex != null)
+                {
+                    Volatile.Write(ref disposed, true);
+                    downstream.OnError(ex);
+                    return true;
+                }
+            }
+
+            // get the next item from the queue if any
+            var empty = !queue.TryDequeue(out var v);
+
+            // the upstream called OnComplete and the queue is empty
+            // that means we are done, no further signals can happen
+            if (d && empty)
+            {
+                Volatile.Write(ref disposed, true);
+                // done = true happens before setting error
+                // this is safe to be a plain read
+                var ex = error;
+                // if not null, there was an OnError call
+                if (ex != null)
+                {
+                    downstream.OnError(ex);
+                }
+                else
+                {
+                    // otherwise, complete normally
+                    downstream.OnCompleted();
+                }
+                return true;
+            }
+            else
+            // the queue is empty and the upstream hasn't completed yet
+            if (empty)
+            {
+                return true;
+            }
+            // emit the item
+            downstream.OnNext(v);
+
+            // keep looping
+            return false;
+        }
+
+        /// <summary>
+        /// Emits as many signals as possible to the downstream observer
+        /// as this is executing a long-running scheduler so
+        /// it can occupy that thread as long as it needs to.
+        /// </summary>
+        void DrainLongRunning()
+        {
+            var missed = 1;
+
+            // read out fields upfront as the DrainStep uses atomics
+            // that would force the re-read of these constant values
+            // from memory, regardless of readonly, afaik
+            var q = queue;
+            var downstream = this.downstream;
+
+            for (; ; )
+            {
+                for (; ; )
+                {
+                    // delayError: true - because of 
+                    //      ObserveOn_LongRunning_HoldUpDuringDispatchAndFail
+                    // expects something that almost looks like full delayError
+                    if (DrainStep(q, downstream, true))
+                    {
+                        break;
+                    }
+                }
+
+                missed = Interlocked.Add(ref wip, -missed);
+                if (missed == 0)
+                {
+                    break;
+                }
+            }
+        }
+    }
 }