Browse Source

4.x: Improve the performance of Range() (#684)

* 4.x: Improve the performance of Range()

* Add missing dispose override
David Karnok 7 years ago
parent
commit
b232955404

+ 2 - 1
Rx.NET/Source/benchmarks/Benchmarks.System.Reactive/Program.cs

@@ -15,7 +15,8 @@ namespace Benchmarks.System.Reactive
                 typeof(ZipBenchmark),
                 typeof(ZipBenchmark),
                 typeof(CombineLatestBenchmark),
                 typeof(CombineLatestBenchmark),
                 typeof(SwitchBenchmark),
                 typeof(SwitchBenchmark),
-                typeof(BufferCountBenchmark)
+                typeof(BufferCountBenchmark),
+                typeof(RangeBenchmark)
             });
             });
 
 
             switcher.Run();
             switcher.Run();

+ 26 - 0
Rx.NET/Source/benchmarks/Benchmarks.System.Reactive/RangeBenchmark.cs

@@ -0,0 +1,26 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information.
+
+using System;
+using System.Reactive.Linq;
+using System.Threading;
+using BenchmarkDotNet.Attributes;
+
+namespace Benchmarks.System.Reactive
+{
+    [MemoryDiagnoser]
+    public class RangeBenchmark
+    {
+        [Params(1, 10, 100, 1000, 10000, 100000, 1000000)]
+        public int N;
+
+        int _store;
+
+        [Benchmark]
+        public void Range()
+        {
+            Observable.Range(1, N).Subscribe(v => Volatile.Write(ref _store, v));
+        }
+    }
+}

+ 76 - 30
Rx.NET/Source/src/System.Reactive/Linq/Observable/Range.cs

@@ -7,71 +7,117 @@ using System.Reactive.Disposables;
 
 
 namespace System.Reactive.Linq.ObservableImpl
 namespace System.Reactive.Linq.ObservableImpl
 {
 {
-    internal sealed class Range : Producer<int, Range._>
+    internal sealed class RangeRecursive : Producer<int, RangeRecursive.RangeSink>
     {
     {
         private readonly int _start;
         private readonly int _start;
         private readonly int _count;
         private readonly int _count;
         private readonly IScheduler _scheduler;
         private readonly IScheduler _scheduler;
 
 
-        public Range(int start, int count, IScheduler scheduler)
+        public RangeRecursive(int start, int count, IScheduler scheduler)
         {
         {
             _start = start;
             _start = start;
             _count = count;
             _count = count;
             _scheduler = scheduler;
             _scheduler = scheduler;
         }
         }
 
 
-        protected override _ CreateSink(IObserver<int> observer) => new _(this, observer);
+        protected override RangeSink CreateSink(IObserver<int> observer) => new RangeSink(_start, _count, observer);
 
 
-        protected override void Run(_ sink) => sink.Run(_scheduler);
+        protected override void Run(RangeSink sink) => sink.Run(_scheduler);
 
 
-        internal sealed class _ : IdentitySink<int>
+        internal sealed class RangeSink : IdentitySink<int>
         {
         {
-            private readonly int _start;
-            private readonly int _count;
+            readonly int _end;
 
 
-            public _(Range parent, IObserver<int> observer)
+            int _index;
+
+            IDisposable _task;
+
+            public RangeSink(int start, int count, IObserver<int> observer)
                 : base(observer)
                 : base(observer)
             {
             {
-                _start = parent._start;
-                _count = parent._count;
+                _index = start;
+                _end = start + count;
             }
             }
 
 
             public void Run(IScheduler scheduler)
             public void Run(IScheduler scheduler)
             {
             {
-                var longRunning = scheduler.AsLongRunning();
-                if (longRunning != null)
+                var first = scheduler.Schedule(this, (innerScheduler, @this) => @this.LoopRec(innerScheduler));
+                Disposable.TrySetSingle(ref _task, first);
+            }
+
+            protected override void Dispose(bool disposing)
+            {
+                base.Dispose(disposing);
+                if (disposing)
                 {
                 {
-                    SetUpstream(longRunning.ScheduleLongRunning(0, Loop));
+                    Disposable.TryDispose(ref _task);
                 }
                 }
-                else
+            }   
+
+            private IDisposable LoopRec(IScheduler scheduler)
+            {
+                var idx = _index;
+                if (idx != _end)
+                {
+                    _index = idx + 1;
+                    ForwardOnNext(idx);
+                    var next = scheduler.Schedule(this, (innerScheduler, @this) => @this.LoopRec(innerScheduler));
+                    Disposable.TrySetMultiple(ref _task, next);
+                } else
                 {
                 {
-                    SetUpstream(scheduler.Schedule(0, LoopRec));
+                    ForwardOnCompleted();
                 }
                 }
+                return Disposable.Empty;
             }
             }
+        }
+    }
+
+    internal sealed class RangeLongRunning : Producer<int, RangeLongRunning.RangeSink>
+    {
+        private readonly int _start;
+        private readonly int _count;
+        private readonly ISchedulerLongRunning _scheduler;
 
 
-            private void Loop(int i, ICancelable cancel)
+        public RangeLongRunning(int start, int count, ISchedulerLongRunning scheduler)
+        {
+            _start = start;
+            _count = count;
+            _scheduler = scheduler;
+        }
+
+        protected override RangeSink CreateSink(IObserver<int> observer) => new RangeSink(_start, _count, observer);
+
+        protected override void Run(RangeSink sink) => sink.Run(_scheduler);
+
+        internal sealed class RangeSink : IdentitySink<int>
+        {
+            readonly int _end;
+
+            int _index;
+
+            public RangeSink(int start, int count, IObserver<int> observer)
+                : base(observer)
             {
             {
-                while (!cancel.IsDisposed && i < _count)
-                {
-                    ForwardOnNext(_start + i);
-                    i++;
-                }
+                _index = start;
+                _end = start + count;
+            }
 
 
-                if (!cancel.IsDisposed)
-                    ForwardOnCompleted();
+            public void Run(ISchedulerLongRunning scheduler)
+            {
+                SetUpstream(scheduler.ScheduleLongRunning(this, (@this, cancel) => @this.Loop(cancel)));
             }
             }
 
 
-            private void LoopRec(int i, Action<int> recurse)
+            private void Loop(ICancelable cancel)
             {
             {
-                if (i < _count)
+                var idx = _index;
+                var end = _end;
+                while (!cancel.IsDisposed && idx != end)
                 {
                 {
-                    ForwardOnNext(_start + i);
-                    recurse(i + 1);
+                    ForwardOnNext(idx++);
                 }
                 }
-                else
-                {
+
+                if (!cancel.IsDisposed)
                     ForwardOnCompleted();
                     ForwardOnCompleted();
-                }
             }
             }
         }
         }
     }
     }

+ 6 - 1
Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Creation.cs

@@ -338,7 +338,12 @@ namespace System.Reactive.Linq
 
 
         private static IObservable<int> Range_(int start, int count, IScheduler scheduler)
         private static IObservable<int> Range_(int start, int count, IScheduler scheduler)
         {
         {
-            return new Range(start, count, scheduler);
+            var longRunning = scheduler.AsLongRunning();
+            if (longRunning != null)
+            {
+                return new RangeLongRunning(start, count, longRunning);
+            }
+            return new RangeRecursive(start, count, scheduler);
         }
         }
 
 
         #endregion
         #endregion