Przeglądaj źródła

Merge pull request #742 from akarnokd/ObserveOnQueueAccess

4.x: Upgrade the ObserveOn operator to IdentitySink, fix queue usage
Daniel C. Weber 7 lat temu
rodzic
commit
89cde7a4dd

+ 39 - 37
Rx.NET/Source/src/System.Reactive/Internal/ScheduledObserver.cs

@@ -377,9 +377,8 @@ namespace System.Reactive
     /// techniques to signal events to the downstream.
     /// </summary>
     /// <typeparam name="T">The element type of the sequence.</typeparam>
-    internal sealed class ObserveOnObserverNew<T> : IObserver<T>, IDisposable
+    internal sealed class ObserveOnObserverNew<T> : IdentitySink<T>
     {
-        private readonly IObserver<T> _downstream;
         private readonly IScheduler _scheduler;
 
         /// <summary>
@@ -388,7 +387,6 @@ namespace System.Reactive
         /// </summary>
         private readonly ISchedulerLongRunning _longRunning;
         private readonly ConcurrentQueue<T> _queue;
-        private IDisposable _run;
 
         /// <summary>
         /// The current task representing a running drain operation.
@@ -417,54 +415,55 @@ namespace System.Reactive
         /// </summary>
         private bool _disposed;
 
-        public ObserveOnObserverNew(IScheduler scheduler, IObserver<T> downstream)
+        public ObserveOnObserverNew(IScheduler scheduler, IObserver<T> downstream) : base(downstream)
         {
-            _downstream = downstream;
             _scheduler = scheduler;
             _longRunning = scheduler.AsLongRunning();
             _queue = new ConcurrentQueue<T>();
         }
 
-        public void Run(IObservable<T> source)
-        {
-            Disposable.SetSingle(ref _run, source.SubscribeSafe(this));
-        }
-
-        public void Dispose()
+        protected override void Dispose(bool disposing)
         {
             Volatile.Write(ref _disposed, true);
-            Disposable.TryDispose(ref _task);
-            Disposable.TryDispose(ref _run);
-            Clear();
+
+            base.Dispose(disposing);
+            if (disposing)
+            {
+                Disposable.TryDispose(ref _task);
+                Clear(_queue);
+            }
         }
 
         /// <summary>
         /// Remove remaining elements from the queue upon
         /// cancellation or failure.
         /// </summary>
-        private void Clear()
+        /// <param name="q">The queue to use. The argument ensures that the
+        /// _queue field is not re-read from memory unnecessarily
+        /// due to the memory barriers inside TryDequeue mandating it
+        /// despite the field is read-only.</param>
+        private void Clear(ConcurrentQueue<T> q)
         {
-            var q = _queue;
             while (q.TryDequeue(out var _))
             {
                 ;
             }
         }
 
-        public void OnCompleted()
+        public override void OnCompleted()
         {
             Volatile.Write(ref _done, true);
             Schedule();
         }
 
-        public void OnError(Exception error)
+        public override void OnError(Exception error)
         {
             _error = error;
             Volatile.Write(ref _done, true);
             Schedule();
         }
 
-        public void OnNext(T value)
+        public override void OnNext(T value)
         {
             _queue.Enqueue(value);
             Schedule();
@@ -485,11 +484,11 @@ namespace System.Reactive
                     var longRunning = _longRunning;
                     if (longRunning != null)
                     {
-                        newTask.Disposable = longRunning.ScheduleLongRunning(this, DRAIN_LONG_RUNNING);
+                        newTask.Disposable = longRunning.ScheduleLongRunning(this, DrainLongRunningAction);
                     }
                     else
                     {
-                        newTask.Disposable = _scheduler.Schedule(this, DRAIN_SHORT_RUNNING);
+                        newTask.Disposable = _scheduler.Schedule(this, DrainShortRunningFunc);
                     }
                 }
 
@@ -499,7 +498,7 @@ namespace System.Reactive
                 // is of a multi-consumer type.
                 if (Volatile.Read(ref _disposed))
                 {
-                    Clear();
+                    Clear(_queue);
                 }
             }
         }
@@ -509,7 +508,7 @@ namespace System.Reactive
         /// Avoids creating a delegate that captures <code>this</code>
         /// whenever the signals have to be drained.
         /// </summary>
-        private static readonly Action<ObserveOnObserverNew<T>, ICancelable> DRAIN_LONG_RUNNING =
+        private static readonly Action<ObserveOnObserverNew<T>, ICancelable> DrainLongRunningAction =
             (self, cancel) => self.DrainLongRunning();
 
         /// <summary>
@@ -517,7 +516,7 @@ namespace System.Reactive
         /// Avoids creating a delegate that captures <code>this</code>
         /// whenever the signals have to be drained.
         /// </summary>
-        private static readonly Func<IScheduler, ObserveOnObserverNew<T>, IDisposable> DRAIN_SHORT_RUNNING =
+        private static readonly Func<IScheduler, ObserveOnObserverNew<T>, IDisposable> DrainShortRunningFunc =
             (scheduler, self) => self.DrainShortRunning(scheduler);
 
         /// <summary>
@@ -528,13 +527,13 @@ namespace System.Reactive
         /// <returns>The IDisposable of the recursively scheduled task or an empty disposable.</returns>
         private IDisposable DrainShortRunning(IScheduler recursiveScheduler)
         {
-            DrainStep(_queue, _downstream, false);
+            DrainStep(_queue, false);
 
             if (Interlocked.Decrement(ref _wip) != 0)
             {
                 // Don't return the disposable of Schedule() because that may chain together
                 // a long string of ScheduledItems causing StackOverflowException upon Dispose()
-                var d = recursiveScheduler.Schedule(this, DRAIN_SHORT_RUNNING);
+                var d = recursiveScheduler.Schedule(this, DrainShortRunningFunc);
                 Disposable.TrySetMultiple(ref _task, d);
             }
             return Disposable.Empty;
@@ -546,18 +545,22 @@ namespace System.Reactive
         /// empty queue, issuing the appropriate signals to the
         /// given downstream.
         /// </summary>
-        /// <param name="q">The queue to use.</param>
-        /// <param name="downstream">The intended consumer of the events.</param>
+        /// <param name="q">The queue to use. The argument ensures that the
+        /// _queue field is not re-read from memory due to the memory barriers
+        /// inside TryDequeue mandating it despite the field is read-only.
+        /// In addition, the DrainStep is invoked from the DrainLongRunning's loop
+        /// so reading _queue inside this method would still incur the same barrier
+        /// overhead otherwise.</param>
         /// <param name="delayError">Should the errors be delayed until all
         /// queued items have been emitted to the downstream?</param>
         /// <returns>True if the drain loop should stop.</returns>
-        private bool DrainStep(ConcurrentQueue<T> q, IObserver<T> downstream, bool delayError)
+        private bool DrainStep(ConcurrentQueue<T> q, bool delayError)
         {
             // Check if the operator has been disposed
             if (Volatile.Read(ref _disposed))
             {
                 // cleanup residue items in the queue
-                Clear();
+                Clear(q);
                 return true;
             }
 
@@ -573,13 +576,13 @@ namespace System.Reactive
                 if (ex != null)
                 {
                     Volatile.Write(ref _disposed, true);
-                    downstream.OnError(ex);
+                    ForwardOnError(ex);
                     return true;
                 }
             }
 
             // get the next item from the queue if any
-            var empty = !_queue.TryDequeue(out var v);
+            var empty = !q.TryDequeue(out var v);
 
             // the upstream called OnComplete and the queue is empty
             // that means we are done, no further signals can happen
@@ -592,12 +595,12 @@ namespace System.Reactive
                 // if not null, there was an OnError call
                 if (ex != null)
                 {
-                    downstream.OnError(ex);
+                    ForwardOnError(ex);
                 }
                 else
                 {
                     // otherwise, complete normally
-                    downstream.OnCompleted();
+                    ForwardOnCompleted();
                 }
                 return true;
             }
@@ -608,7 +611,7 @@ namespace System.Reactive
                 return true;
             }
             // emit the item
-            downstream.OnNext(v);
+            ForwardOnNext(v);
 
             // keep looping
             return false;
@@ -627,7 +630,6 @@ namespace System.Reactive
             // that would force the re-read of these constant values
             // from memory, regardless of readonly, afaik
             var q = _queue;
-            var downstream = _downstream;
 
             for (; ; )
             {
@@ -636,7 +638,7 @@ namespace System.Reactive
                     // delayError: true - because of 
                     //      ObserveOn_LongRunning_HoldUpDuringDispatchAndFail
                     // expects something that almost looks like full delayError
-                    if (DrainStep(q, downstream, true))
+                    if (DrainStep(q, true))
                     {
                         break;
                     }