瀏覽代碼

4.x: Improve Generate() internals (#674)

David Karnok 7 年之前
父節點
當前提交
262544b76f
共有 1 個文件被更改,包括 54 次插入47 次删除
  1. 54 47
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Generate.cs

+ 54 - 47
Rx.NET/Source/src/System.Reactive/Linq/Observable/Generate.cs

@@ -28,36 +28,38 @@ namespace System.Reactive.Linq.ObservableImpl
 
             protected override _ CreateSink(IObserver<TResult> observer) => new _(this, observer);
 
-            protected override void Run(_ sink) => sink.Run();
+            protected override void Run(_ sink) => sink.Run(_scheduler);
 
             internal sealed class _ : IdentitySink<TResult>
             {
-                // CONSIDER: This sink has a parent reference that can be considered for removal.
-
-                private readonly NoTime _parent;
+                readonly Func<TState, bool> _condition;
+                readonly Func<TState, TState> _iterate;
+                readonly Func<TState, TResult> _resultSelector;
 
                 public _(NoTime parent, IObserver<TResult> observer)
                     : base(observer)
                 {
-                    _parent = parent;
+                    _condition = parent._condition;
+                    _iterate = parent._iterate;
+                    _resultSelector = parent._resultSelector;
+
+                    _state = parent._initialState;
+                    _first = true;
                 }
 
                 private TState _state;
                 private bool _first;
 
-                public void Run()
+                public void Run(IScheduler _scheduler)
                 {
-                    _state = _parent._initialState;
-                    _first = true;
-
-                    var longRunning = _parent._scheduler.AsLongRunning();
+                    var longRunning = _scheduler.AsLongRunning();
                     if (longRunning != null)
                     {
                         SetUpstream(longRunning.ScheduleLongRunning(this, (@this, c) => @this.Loop(c)));
                     }
                     else
                     {
-                        SetUpstream(_parent._scheduler.Schedule(this, (@this, a) => @this.LoopRec(a)));
+                        SetUpstream(_scheduler.Schedule(this, (@this, a) => @this.LoopRec(a)));
                     }
                 }
 
@@ -75,14 +77,14 @@ namespace System.Reactive.Linq.ObservableImpl
                             }
                             else
                             {
-                                _state = _parent._iterate(_state);
+                                _state = _iterate(_state);
                             }
 
-                            hasResult = _parent._condition(_state);
+                            hasResult = _condition(_state);
 
                             if (hasResult)
                             {
-                                result = _parent._resultSelector(_state);
+                                result = _resultSelector(_state);
                             }
                         }
                         catch (Exception exception)
@@ -119,14 +121,14 @@ namespace System.Reactive.Linq.ObservableImpl
                         }
                         else
                         {
-                            _state = _parent._iterate(_state);
+                            _state = _iterate(_state);
                         }
 
-                        hasResult = _parent._condition(_state);
+                        hasResult = _condition(_state);
 
                         if (hasResult)
                         {
-                            result = _parent._resultSelector(_state);
+                            result = _resultSelector(_state);
                         }
                     }
                     catch (Exception exception)
@@ -169,31 +171,34 @@ namespace System.Reactive.Linq.ObservableImpl
 
             protected override _ CreateSink(IObserver<TResult> observer) => new _(this, observer);
 
-            protected override void Run(_ sink) => sink.Run();
+            protected override void Run(_ sink) => sink.Run(_scheduler, _initialState);
 
             internal sealed class _ : IdentitySink<TResult>
             {
-                // CONSIDER: This sink has a parent reference that can be considered for removal.
-
-                private readonly Absolute _parent;
+                readonly Func<TState, bool> _condition;
+                readonly Func<TState, TState> _iterate;
+                readonly Func<TState, TResult> _resultSelector;
+                readonly Func<TState, DateTimeOffset> _timeSelector;
 
                 public _(Absolute parent, IObserver<TResult> observer)
                     : base(observer)
                 {
-                    _parent = parent;
+                    _condition = parent._condition;
+                    _iterate = parent._iterate;
+                    _resultSelector = parent._resultSelector;
+                    _timeSelector = parent._timeSelector;
+
+                    _first = true;
                 }
 
                 private bool _first;
                 private bool _hasResult;
+
                 private TResult _result;
 
-                public void Run()
+                public void Run(IScheduler outerScheduler, TState initialState)
                 {
-                    _first = true;
-                    _hasResult = false;
-                    _result = default(TResult);
-
-                    SetUpstream(_parent._scheduler.Schedule((@this: this, _parent._initialState), (scheduler, tuple) => [email protected](scheduler, tuple._initialState)));
+                    SetUpstream(outerScheduler.Schedule((@this: this, initialState), (scheduler, tuple) => [email protected](scheduler, tuple.initialState)));
                 }
 
                 private IDisposable InvokeRec(IScheduler self, TState state)
@@ -213,15 +218,15 @@ namespace System.Reactive.Linq.ObservableImpl
                         }
                         else
                         {
-                            state = _parent._iterate(state);
+                            state = _iterate(state);
                         }
 
-                        _hasResult = _parent._condition(state);
+                        _hasResult = _condition(state);
 
                         if (_hasResult)
                         {
-                            _result = _parent._resultSelector(state);
-                            time = _parent._timeSelector(state);
+                            _result = _resultSelector(state);
+                            time = _timeSelector(state);
                         }
                     }
                     catch (Exception exception)
@@ -262,31 +267,33 @@ namespace System.Reactive.Linq.ObservableImpl
 
             protected override _ CreateSink(IObserver<TResult> observer) => new _(this, observer);
 
-            protected override void Run(_ sink) => sink.Run();
+            protected override void Run(_ sink) => sink.Run(_scheduler, _initialState);
 
             internal sealed class _ : IdentitySink<TResult>
             {
-                // CONSIDER: This sink has a parent reference that can be considered for removal.
-
-                private readonly Relative _parent;
+                private readonly Func<TState, bool> _condition;
+                private readonly Func<TState, TState> _iterate;
+                private readonly Func<TState, TResult> _resultSelector;
+                private readonly Func<TState, TimeSpan> _timeSelector;
 
                 public _(Relative parent, IObserver<TResult> observer)
                     : base(observer)
                 {
-                    _parent = parent;
+                    _condition = parent._condition;
+                    _iterate = parent._iterate;
+                    _resultSelector = parent._resultSelector;
+                    _timeSelector = parent._timeSelector;
+
+                    _first = true;
                 }
 
                 private bool _first;
                 private bool _hasResult;
                 private TResult _result;
 
-                public void Run()
+                public void Run(IScheduler outerScheduler, TState initialState)
                 {
-                    _first = true;
-                    _hasResult = false;
-                    _result = default(TResult);
-
-                    SetUpstream(_parent._scheduler.Schedule((@this: this, _parent._initialState), (scheduler, tuple) => [email protected](scheduler, tuple._initialState)));
+                    SetUpstream(outerScheduler.Schedule((@this: this, initialState), (scheduler, tuple) => [email protected](scheduler, tuple.initialState)));
                 }
 
                 private IDisposable InvokeRec(IScheduler self, TState state)
@@ -306,15 +313,15 @@ namespace System.Reactive.Linq.ObservableImpl
                         }
                         else
                         {
-                            state = _parent._iterate(state);
+                            state = _iterate(state);
                         }
 
-                        _hasResult = _parent._condition(state);
+                        _hasResult = _condition(state);
 
                         if (_hasResult)
                         {
-                            _result = _parent._resultSelector(state);
-                            time = _parent._timeSelector(state);
+                            _result = _resultSelector(state);
+                            time = _timeSelector(state);
                         }
                     }
                     catch (Exception exception)