Browse Source

4.x: Improve the performance of ToObservable()

akarnokd 7 years ago
parent
commit
6b4f148d41

+ 2 - 1
Rx.NET/Source/benchmarks/Benchmarks.System.Reactive/Program.cs

@@ -15,7 +15,8 @@ namespace Benchmarks.System.Reactive
                 typeof(ZipBenchmark),
                 typeof(CombineLatestBenchmark),
                 typeof(SwitchBenchmark),
-                typeof(BufferCountBenchmark)
+                typeof(BufferCountBenchmark),
+                typeof(ToObservableBenchmark)
             });
 
             switcher.Run();

+ 30 - 0
Rx.NET/Source/benchmarks/Benchmarks.System.Reactive/ToObservableBenchmark.cs

@@ -0,0 +1,30 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information.
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Reactive.Linq;
+using System.Threading;
+using BenchmarkDotNet.Attributes;
+
+namespace Benchmarks.System.Reactive
+{
+    [MemoryDiagnoser]
+    public class ToObservableBenchmark
+    {
+        [Params(1, 10, 100, 1000, 10000, 100000, 1000000)]
+        public int N;
+
+        int _store;
+
+        [Benchmark]
+        public void Exact()
+        {
+            Enumerable.Range(1, N)
+                .ToObservable()
+                .Subscribe(v => Volatile.Write(ref _store, v));
+        }
+    }
+}

+ 86 - 48
Rx.NET/Source/src/System.Reactive/Linq/Observable/ToObservable.cs

@@ -5,15 +5,16 @@
 using System.Collections.Generic;
 using System.Reactive.Concurrency;
 using System.Reactive.Disposables;
+using System.Threading;
 
 namespace System.Reactive.Linq.ObservableImpl
 {
-    internal sealed class ToObservable<TSource> : Producer<TSource, ToObservable<TSource>._>
+    internal sealed class ToObservableRecursive<TSource> : Producer<TSource, ToObservableRecursive<TSource>._>
     {
         private readonly IEnumerable<TSource> _source;
         private readonly IScheduler _scheduler;
 
-        public ToObservable(IEnumerable<TSource> source, IScheduler scheduler)
+        public ToObservableRecursive(IEnumerable<TSource> source, IScheduler scheduler)
         {
             _source = source;
             _scheduler = scheduler;
@@ -21,21 +22,24 @@ namespace System.Reactive.Linq.ObservableImpl
 
         protected override _ CreateSink(IObserver<TSource> observer) => new _(observer);
 
-        protected override void Run(_ sink) => sink.Run(this);
+        protected override void Run(_ sink) => sink.Run(_source, _scheduler);
 
         internal sealed class _ : IdentitySink<TSource>
         {
+            IEnumerator<TSource> _enumerator;
+
+            volatile bool _disposed;
+
             public _(IObserver<TSource> observer)
                 : base(observer)
             {
             }
 
-            public void Run(ToObservable<TSource> parent)
+            public void Run(IEnumerable<TSource> source, IScheduler scheduler)
             {
-                var e = default(IEnumerator<TSource>);
                 try
                 {
-                    e = parent._source.GetEnumerator();
+                    _enumerator = source.GetEnumerator();
                 }
                 catch (Exception exception)
                 {
@@ -44,61 +48,44 @@ namespace System.Reactive.Linq.ObservableImpl
                     return;
                 }
 
-                var longRunning = parent._scheduler.AsLongRunning();
-                if (longRunning != null)
-                {
-                    //
-                    // Long-running schedulers have the contract they should *never* prevent
-                    // the work from starting, such that the scheduled work has the chance
-                    // to observe the cancellation and perform proper clean-up. In this case,
-                    // we're sure Loop will be entered, allowing us to dispose the enumerator.
-                    //
-                    SetUpstream(longRunning.ScheduleLongRunning((@this: this, e), (tuple, cancelable) => [email protected](tuple.e, cancelable)));
-                }
-                else
-                {
-                    //
-                    // We never allow the scheduled work to be cancelled. Instead, the flag
-                    // is used to have LoopRec bail out and perform proper clean-up of the
-                    // enumerator.
-                    //
-                    var flag = new BooleanDisposable();
-                    parent._scheduler.Schedule(new State(this, flag, e), (state, action) => state.sink.LoopRec(state, action));
-                    SetUpstream(flag);
-                }
+                //
+                // We never allow the scheduled work to be cancelled. Instead, the _disposed flag
+                // is used to have LoopRec bail out and perform proper clean-up of the
+                // enumerator.
+                //
+                scheduler.Schedule(this, (innerScheduler, @this) => @this.LoopRec(innerScheduler));
             }
 
-            private struct State
+            protected override void Dispose(bool disposing)
             {
-                public readonly _ sink;
-                public readonly ICancelable flag;
-                public readonly IEnumerator<TSource> enumerator;
-
-                public State(_ sink, ICancelable flag, IEnumerator<TSource> enumerator)
+                base.Dispose(disposing);
+                if (disposing)
                 {
-                    this.sink = sink;
-                    this.flag = flag;
-                    this.enumerator = enumerator;
+                    _disposed = true;
                 }
             }
 
-            private void LoopRec(State state, Action<State> recurse)
+            private IDisposable LoopRec(IScheduler scheduler)
             {
                 var hasNext = false;
                 var ex = default(Exception);
                 var current = default(TSource);
 
-                if (state.flag.IsDisposed)
+                var enumerator = _enumerator;
+
+                if (_disposed)
                 {
-                    state.enumerator.Dispose();
-                    return;
+                    _enumerator.Dispose();
+                    _enumerator = null;
+
+                    return Disposable.Empty;
                 }
 
                 try
                 {
-                    hasNext = state.enumerator.MoveNext();
+                    hasNext = enumerator.MoveNext();
                     if (hasNext)
-                        current = state.enumerator.Current;
+                        current = enumerator.Current;
                 }
                 catch (Exception exception)
                 {
@@ -107,22 +94,73 @@ namespace System.Reactive.Linq.ObservableImpl
 
                 if (ex != null)
                 {
-                    state.enumerator.Dispose();
+                    enumerator.Dispose();
+                    _enumerator = null;
 
                     ForwardOnError(ex);
-                    return;
+                    return Disposable.Empty;
                 }
 
                 if (!hasNext)
                 {
-                    state.enumerator.Dispose();
+                    enumerator.Dispose();
+                    _enumerator = null;
 
                     ForwardOnCompleted();
-                    return;
+                    return Disposable.Empty;
                 }
 
                 ForwardOnNext(current);
-                recurse(state);
+
+                //
+                // We never allow the scheduled work to be cancelled. Instead, the _disposed flag
+                // is used to have LoopRec bail out and perform proper clean-up of the
+                // enumerator.
+                //
+                scheduler.Schedule(this, (innerScheduler, @this) => @this.LoopRec(innerScheduler));
+
+                return Disposable.Empty;
+            }
+        }
+    }
+
+    internal sealed class ToObservableLongRunning<TSource> : Producer<TSource, ToObservableLongRunning<TSource>._>
+    {
+        private readonly IEnumerable<TSource> _source;
+        private readonly ISchedulerLongRunning _scheduler;
+
+        public ToObservableLongRunning(IEnumerable<TSource> source, ISchedulerLongRunning scheduler)
+        {
+            _source = source;
+            _scheduler = scheduler;
+        }
+
+        protected override _ CreateSink(IObserver<TSource> observer) => new _(observer);
+
+        protected override void Run(_ sink) => sink.Run(_source, _scheduler);
+
+        internal sealed class _ : IdentitySink<TSource>
+        {
+            public _(IObserver<TSource> observer)
+                : base(observer)
+            {
+            }
+
+            public void Run(IEnumerable<TSource> source, ISchedulerLongRunning scheduler)
+            {
+                var e = default(IEnumerator<TSource>);
+                try
+                {
+                    e = source.GetEnumerator();
+                }
+                catch (Exception exception)
+                {
+                    ForwardOnError(exception);
+
+                    return;
+                }
+
+                SetUpstream(scheduler.ScheduleLongRunning((@this: this, e), (tuple, cancelable) => [email protected](tuple.e, cancelable)));
             }
 
             private void Loop(IEnumerator<TSource> enumerator, ICancelable cancel)

+ 27 - 3
Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Conversions.cs

@@ -26,10 +26,18 @@ namespace System.Reactive.Linq
 
         private static IDisposable Subscribe_<TSource>(IEnumerable<TSource> source, IObserver<TSource> observer, IScheduler scheduler)
         {
+            var longRunning = scheduler.AsLongRunning();
+            if (longRunning != null)
+            {
+                //
+                // [OK] Use of unsafe Subscribe: we're calling into a known producer implementation.
+                //
+                return new ToObservableLongRunning<TSource>(source, longRunning).Subscribe/*Unsafe*/(observer);
+            }
             //
             // [OK] Use of unsafe Subscribe: we're calling into a known producer implementation.
             //
-            return new ToObservable<TSource>(source, scheduler).Subscribe/*Unsafe*/(observer);
+            return new ToObservableRecursive<TSource>(source, scheduler).Subscribe/*Unsafe*/(observer);
         }
 
         #endregion
@@ -73,12 +81,28 @@ namespace System.Reactive.Linq
 
         public virtual IObservable<TSource> ToObservable<TSource>(IEnumerable<TSource> source)
         {
-            return new ToObservable<TSource>(source, SchedulerDefaults.Iteration);
+            return ToObservable_(source, SchedulerDefaults.Iteration);
         }
 
         public virtual IObservable<TSource> ToObservable<TSource>(IEnumerable<TSource> source, IScheduler scheduler)
         {
-            return new ToObservable<TSource>(source, scheduler);
+            return ToObservable_(source, scheduler);
+        }
+
+        private static IObservable<TSource> ToObservable_<TSource>(IEnumerable<TSource> source, IScheduler scheduler)
+        {
+            var longRunning = scheduler.AsLongRunning();
+            if (longRunning != null)
+            {
+                //
+                // [OK] Use of unsafe Subscribe: we're calling into a known producer implementation.
+                //
+                return new ToObservableLongRunning<TSource>(source, longRunning);
+            }
+            //
+            // [OK] Use of unsafe Subscribe: we're calling into a known producer implementation.
+            //
+            return new ToObservableRecursive<TSource>(source, scheduler);
         }
 
         #endregion