Bart De Smet 8 лет назад
Родитель
Сommit
c8cf043bce

+ 100 - 78
Rx.NET/Source/src/System.Reactive/Linq/Observable/Repeat.cs

@@ -7,119 +7,141 @@ using System.Reactive.Disposables;
 
 namespace System.Reactive.Linq.ObservableImpl
 {
-    internal sealed class Repeat<TResult> : Producer<TResult>
+    internal static class Repeat<TResult>
     {
-        private readonly TResult _value;
-        private readonly int? _repeatCount;
-        private readonly IScheduler _scheduler;
-
-        public Repeat(TResult value, int? repeatCount, IScheduler scheduler)
+        internal sealed class Forever : Producer<TResult>
         {
-            _value = value;
-            _repeatCount = repeatCount;
-            _scheduler = scheduler;
-        }
-
-        protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
-        {
-            var sink = new _(this, observer, cancel);
-            setSink(sink);
-            return sink.Run();
-        }
+            private readonly TResult _value;
+            private readonly IScheduler _scheduler;
 
-        class _ : Sink<TResult>
-        {
-            private readonly Repeat<TResult> _parent;
+            public Forever(TResult value, IScheduler scheduler)
+            {
+                _value = value;
+                _scheduler = scheduler;
+            }
 
-            public _(Repeat<TResult> parent, IObserver<TResult> observer, IDisposable cancel)
-                : base(observer, cancel)
+            protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
-                _parent = parent;
+                var sink = new _(_value, observer, cancel);
+                setSink(sink);
+                return sink.Run(this);
             }
 
-            public IDisposable Run()
+            private sealed class _ : Sink<TResult>
             {
-                var longRunning = _parent._scheduler.AsLongRunning();
-                if (longRunning != null)
+                private readonly TResult _value;
+
+                public _(TResult value, IObserver<TResult> observer, IDisposable cancel)
+                    : base(observer, cancel)
                 {
-                    return Run(longRunning);
+                    _value = value;
                 }
-                else
+
+                public IDisposable Run(Forever parent)
                 {
-                    return Run(_parent._scheduler);
+                    var longRunning = parent._scheduler.AsLongRunning();
+                    if (longRunning != null)
+                    {
+                        return longRunning.ScheduleLongRunning(LoopInf);
+                    }
+                    else
+                    {
+                        return parent._scheduler.Schedule(LoopRecInf);
+                    }
                 }
-            }
 
-            private IDisposable Run(IScheduler scheduler)
-            {
-                if (_parent._repeatCount == null)
+                private void LoopRecInf(Action recurse)
                 {
-                    return scheduler.Schedule(LoopRecInf);
+                    base._observer.OnNext(_value);
+                    recurse();
                 }
-                else
+
+                private void LoopInf(ICancelable cancel)
                 {
-                    return scheduler.Schedule(_parent._repeatCount.Value, LoopRec);
+                    var value = _value;
+                    while (!cancel.IsDisposed)
+                        base._observer.OnNext(value);
+
+                    base.Dispose();
                 }
             }
+        }
+
+        internal sealed class Count : Producer<TResult>
+        {
+            private readonly TResult _value;
+            private readonly IScheduler _scheduler;
+            private readonly int _repeatCount;
 
-            private void LoopRecInf(Action recurse)
+            public Count(TResult value, int repeatCount, IScheduler scheduler)
             {
-                base._observer.OnNext(_parent._value);
-                recurse();
+                _value = value;
+                _scheduler = scheduler;
+                _repeatCount = repeatCount;
             }
 
-            private void LoopRec(int n, Action<int> recurse)
+            protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
-                if (n > 0)
-                {
-                    base._observer.OnNext(_parent._value);
-                    n--;
-                }
-
-                if (n == 0)
-                {
-                    base._observer.OnCompleted();
-                    base.Dispose();
-                    return;
-                }
-
-                recurse(n);
+                var sink = new _(_value, observer, cancel);
+                setSink(sink);
+                return sink.Run(this);
             }
 
-            private IDisposable Run(ISchedulerLongRunning scheduler)
+            private sealed class _ : Sink<TResult>
             {
-                if (_parent._repeatCount == null)
+                private readonly TResult _value;
+
+                public _(TResult value, IObserver<TResult> observer, IDisposable cancel)
+                    : base(observer, cancel)
                 {
-                    return scheduler.ScheduleLongRunning(LoopInf);
+                    _value = value;
                 }
-                else
+
+                public IDisposable Run(Count parent)
                 {
-                    return scheduler.ScheduleLongRunning(_parent._repeatCount.Value, Loop);
+                    var longRunning = parent._scheduler.AsLongRunning();
+                    if (longRunning != null)
+                    {
+                        return longRunning.ScheduleLongRunning(parent._repeatCount, Loop);
+                    }
+                    else
+                    {
+                        return parent._scheduler.Schedule(parent._repeatCount, LoopRec);
+                    }
                 }
-            }
 
-            private void LoopInf(ICancelable cancel)
-            {
-                var value = _parent._value;
-                while (!cancel.IsDisposed)
-                    base._observer.OnNext(value);
-
-                base.Dispose();
-            }
-
-            private void Loop(int n, ICancelable cancel)
-            {
-                var value = _parent._value;
-                while (n > 0 && !cancel.IsDisposed)
+                private void LoopRec(int n, Action<int> recurse)
                 {
-                    base._observer.OnNext(value);
-                    n--;
+                    if (n > 0)
+                    {
+                        base._observer.OnNext(_value);
+                        n--;
+                    }
+
+                    if (n == 0)
+                    {
+                        base._observer.OnCompleted();
+                        base.Dispose();
+                        return;
+                    }
+
+                    recurse(n);
                 }
 
-                if (!cancel.IsDisposed)
-                    base._observer.OnCompleted();
+                private void Loop(int n, ICancelable cancel)
+                {
+                    var value = _value;
+                    while (n > 0 && !cancel.IsDisposed)
+                    {
+                        base._observer.OnNext(value);
+                        n--;
+                    }
+
+                    if (!cancel.IsDisposed)
+                        base._observer.OnCompleted();
 
-                base.Dispose();
+                    base.Dispose();
+                }
             }
         }
     }

+ 4 - 4
Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Creation.cs

@@ -190,22 +190,22 @@ namespace System.Reactive.Linq
 
         public virtual IObservable<TResult> Repeat<TResult>(TResult value)
         {
-            return new Repeat<TResult>(value, null, SchedulerDefaults.Iteration);
+            return new Repeat<TResult>.Forever(value, SchedulerDefaults.Iteration);
         }
 
         public virtual IObservable<TResult> Repeat<TResult>(TResult value, IScheduler scheduler)
         {
-            return new Repeat<TResult>(value, null, scheduler);
+            return new Repeat<TResult>.Forever(value, scheduler);
         }
 
         public virtual IObservable<TResult> Repeat<TResult>(TResult value, int repeatCount)
         {
-            return new Repeat<TResult>(value, repeatCount, SchedulerDefaults.Iteration);
+            return new Repeat<TResult>.Count(value, repeatCount, SchedulerDefaults.Iteration);
         }
 
         public virtual IObservable<TResult> Repeat<TResult>(TResult value, int repeatCount, IScheduler scheduler)
         {
-            return new Repeat<TResult>(value, repeatCount, scheduler);
+            return new Repeat<TResult>.Count(value, repeatCount, scheduler);
         }
 
         #endregion