Browse Source

Merge branch 'master' into ToObservableImprovements

Daniel C. Weber 7 years ago
parent
commit
b6dcf1eb17

+ 1 - 0
Rx.NET/Source/benchmarks/Benchmarks.System.Reactive/Program.cs

@@ -18,6 +18,7 @@ namespace Benchmarks.System.Reactive
                 typeof(BufferCountBenchmark),
                 typeof(RangeBenchmark),
                 typeof(ToObservableBenchmark),
+                typeof(RepeatBenchmark),
                 typeof(AppendPrependBenchmark)
             });
 

+ 32 - 0
Rx.NET/Source/benchmarks/Benchmarks.System.Reactive/RepeatBenchmark.cs

@@ -0,0 +1,32 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information.
+
+using System;
+using System.Reactive.Linq;
+using System.Threading;
+using BenchmarkDotNet.Attributes;
+
+namespace Benchmarks.System.Reactive
+{
+    [MemoryDiagnoser]
+    public class RepeatBenchmark
+    {
+        [Params(1, 10, 100, 1000, 10000, 100000, 1000000)]
+        public int N;
+
+        public int _store;
+
+        [Benchmark]
+        public void Repeat_Infinite()
+        {
+            Observable.Repeat(1).Take(N).Subscribe(v => Volatile.Write(ref _store, v));
+        }
+
+        [Benchmark]
+        public void Repeat_Finite()
+        {
+            Observable.Repeat(1, N).Subscribe(v => Volatile.Write(ref _store, v));
+        }
+    }
+}

+ 122 - 33
Rx.NET/Source/src/System.Reactive/Linq/Observable/Repeat.cs

@@ -9,12 +9,12 @@ namespace System.Reactive.Linq.ObservableImpl
 {
     internal static class Repeat<TResult>
     {
-        internal sealed class Forever : Producer<TResult, Forever._>
+        internal sealed class ForeverRecursive : Producer<TResult, ForeverRecursive._>
         {
             private readonly TResult _value;
             private readonly IScheduler _scheduler;
 
-            public Forever(TResult value, IScheduler scheduler)
+            public ForeverRecursive(TResult value, IScheduler scheduler)
             {
                 _value = value;
                 _scheduler = scheduler;
@@ -22,35 +22,75 @@ namespace System.Reactive.Linq.ObservableImpl
 
             protected override _ CreateSink(IObserver<TResult> observer) => new _(_value, observer);
 
-            protected override void Run(_ sink) => sink.Run(this);
+            protected override void Run(_ sink) => sink.Run(_scheduler);
 
             internal sealed class _ : IdentitySink<TResult>
             {
                 private readonly TResult _value;
 
+                IDisposable _task;
+
                 public _(TResult value, IObserver<TResult> observer)
                     : base(observer)
                 {
                     _value = value;
                 }
 
-                public void Run(Forever parent)
+                public void Run(IScheduler scheduler)
                 {
-                    var longRunning = parent._scheduler.AsLongRunning();
-                    if (longRunning != null)
-                    {
-                        SetUpstream(longRunning.ScheduleLongRunning(this, (@this, c) => @this.LoopInf(c)));
-                    }
-                    else
+                    var first = scheduler.Schedule(this, (innerScheduler, @this) => @this.LoopRecInf(innerScheduler));
+                    Disposable.TrySetSingle(ref _task, first);
+                }
+
+                protected override void Dispose(bool disposing)
+                {
+                    base.Dispose(disposing);
+                    if (disposing)
                     {
-                        SetUpstream(parent._scheduler.Schedule(this, (@this, a) => @this.LoopRecInf(a)));
+                        Disposable.TryDispose(ref _task);
                     }
                 }
 
-                private void LoopRecInf(Action<_> recurse)
+                private IDisposable LoopRecInf(IScheduler scheduler)
                 {
                     ForwardOnNext(_value);
-                    recurse(this);
+
+                    var next = scheduler.Schedule(this, (innerScheduler, @this) => @this.LoopRecInf(innerScheduler));
+                    Disposable.TrySetMultiple(ref _task, next);
+
+                    return Disposable.Empty;
+                }
+            }
+        }
+
+        internal sealed class ForeverLongRunning : Producer<TResult, ForeverLongRunning._>
+        {
+            private readonly TResult _value;
+            private readonly ISchedulerLongRunning _scheduler;
+
+            public ForeverLongRunning(TResult value, ISchedulerLongRunning scheduler)
+            {
+                _value = value;
+                _scheduler = scheduler;
+            }
+
+            protected override _ CreateSink(IObserver<TResult> observer) => new _(_value, observer);
+
+            protected override void Run(_ sink) => sink.Run(_scheduler);
+
+            internal sealed class _ : IdentitySink<TResult>
+            {
+                private readonly TResult _value;
+
+                public _(TResult value, IObserver<TResult> observer)
+                    : base(observer)
+                {
+                    _value = value;
+                }
+
+                public void Run(ISchedulerLongRunning longRunning)
+                {
+                    SetUpstream(longRunning.ScheduleLongRunning(this, (@this, c) => @this.LoopInf(c)));
                 }
 
                 private void LoopInf(ICancelable cancel)
@@ -66,66 +106,115 @@ namespace System.Reactive.Linq.ObservableImpl
             }
         }
 
-        internal sealed class Count : Producer<TResult, Count._>
+        internal sealed class CountRecursive : Producer<TResult, CountRecursive._>
         {
             private readonly TResult _value;
             private readonly IScheduler _scheduler;
             private readonly int _repeatCount;
 
-            public Count(TResult value, int repeatCount, IScheduler scheduler)
+            public CountRecursive(TResult value, int repeatCount, IScheduler scheduler)
             {
                 _value = value;
                 _scheduler = scheduler;
                 _repeatCount = repeatCount;
             }
 
-            protected override _ CreateSink(IObserver<TResult> observer) => new _(_value, observer);
+            protected override _ CreateSink(IObserver<TResult> observer) => new _(_value, _repeatCount, observer);
 
-            protected override void Run(_ sink) => sink.Run(this);
+            protected override void Run(_ sink) => sink.Run(_scheduler);
 
             internal sealed class _ : IdentitySink<TResult>
             {
                 private readonly TResult _value;
 
-                public _(TResult value, IObserver<TResult> observer)
+                int _remaining;
+
+                IDisposable _task;
+
+                public _(TResult value, int repeatCount, IObserver<TResult> observer)
                     : base(observer)
                 {
                     _value = value;
+                    _remaining = repeatCount;
                 }
 
-                public void Run(Count parent)
+                public void Run(IScheduler scheduler)
                 {
-                    var longRunning = parent._scheduler.AsLongRunning();
-                    if (longRunning != null)
-                    {
-                        SetUpstream(longRunning.ScheduleLongRunning(parent._repeatCount, Loop));
-                    }
-                    else
+                    var first = scheduler.Schedule(this, (innerScheduler, @this) => @this.LoopRec(innerScheduler));
+                    Disposable.TrySetSingle(ref _task, first);
+                }
+
+                protected override void Dispose(bool disposing)
+                {
+                    base.Dispose(disposing);
+                    if (disposing)
                     {
-                        SetUpstream(parent._scheduler.Schedule(parent._repeatCount, LoopRec));
+                        Disposable.TryDispose(ref _task);
                     }
                 }
 
-                private void LoopRec(int n, Action<int> recurse)
+                private IDisposable LoopRec(IScheduler scheduler)
                 {
-                    if (n > 0)
+                    var remaining = _remaining;
+                    if (remaining > 0)
                     {
                         ForwardOnNext(_value);
-                        n--;
+                        _remaining = --remaining;
                     }
 
-                    if (n == 0)
+                    if (remaining == 0)
                     {
                         ForwardOnCompleted();
-                        return;
                     }
+                    else
+                    {
+                        var next = scheduler.Schedule(this, (innerScheduler, @this) => @this.LoopRec(innerScheduler));
+                        Disposable.TrySetMultiple(ref _task, next);
+                    }
+                    return Disposable.Empty;
+                }
+            }
+        }
 
-                    recurse(n);
+        internal sealed class CountLongRunning : Producer<TResult, CountLongRunning._>
+        {
+            private readonly TResult _value;
+            private readonly ISchedulerLongRunning _scheduler;
+            private readonly int _repeatCount;
+
+            public CountLongRunning(TResult value, int repeatCount, ISchedulerLongRunning scheduler)
+            {
+                _value = value;
+                _scheduler = scheduler;
+                _repeatCount = repeatCount;
+            }
+
+            protected override _ CreateSink(IObserver<TResult> observer) => new _(_value, _repeatCount, observer);
+
+            protected override void Run(_ sink) => sink.Run(_scheduler);
+
+            internal sealed class _ : IdentitySink<TResult>
+            {
+                private readonly TResult _value;
+
+                int _remaining;
+
+                public _(TResult value, int remaining, IObserver<TResult> observer)
+                    : base(observer)
+                {
+                    _value = value;
+                    _remaining = remaining;
+                }
+
+                public void Run(ISchedulerLongRunning longRunning)
+                {
+                    SetUpstream(longRunning.ScheduleLongRunning(this, (@this, cancel) => @this.Loop(cancel)));
                 }
 
-                private void Loop(int n, ICancelable cancel)
+                private void Loop(ICancelable cancel)
                 {
                     var value = _value;
+                    var n = _remaining;
                     while (n > 0 && !cancel.IsDisposed)
                     {
                         ForwardOnNext(value);

+ 24 - 4
Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Creation.cs

@@ -347,22 +347,42 @@ namespace System.Reactive.Linq
 
         public virtual IObservable<TResult> Repeat<TResult>(TResult value)
         {
-            return new Repeat<TResult>.Forever(value, SchedulerDefaults.Iteration);
+            return Repeat_(value, SchedulerDefaults.Iteration);
         }
 
         public virtual IObservable<TResult> Repeat<TResult>(TResult value, IScheduler scheduler)
         {
-            return new Repeat<TResult>.Forever(value, scheduler);
+            return Repeat_(value, scheduler);
+        }
+
+        private IObservable<TResult> Repeat_<TResult>(TResult value, IScheduler scheduler)
+        {
+            var longRunning = scheduler.AsLongRunning();
+            if (longRunning != null)
+            {
+                return new Repeat<TResult>.ForeverLongRunning(value, longRunning);
+            }
+            return new Repeat<TResult>.ForeverRecursive(value, scheduler);
         }
 
         public virtual IObservable<TResult> Repeat<TResult>(TResult value, int repeatCount)
         {
-            return new Repeat<TResult>.Count(value, repeatCount, SchedulerDefaults.Iteration);
+            return Repeat_(value, repeatCount, SchedulerDefaults.Iteration);
         }
 
         public virtual IObservable<TResult> Repeat<TResult>(TResult value, int repeatCount, IScheduler scheduler)
         {
-            return new Repeat<TResult>.Count(value, repeatCount, scheduler);
+            return Repeat_(value, repeatCount, scheduler);
+        }
+
+        private IObservable<TResult> Repeat_<TResult>(TResult value, int repeatCount, IScheduler scheduler)
+        {
+            var longRunning = scheduler.AsLongRunning();
+            if (longRunning != null)
+            {
+                return new Repeat<TResult>.CountLongRunning(value, repeatCount, longRunning);
+            }
+            return new Repeat<TResult>.CountRecursive(value, repeatCount, scheduler);
         }
 
         #endregion