浏览代码

- Add extra loop for the prepend values, that saves the allocation and generation of the first array

- Move the state directly to the sink, since we do not intend to loop in parallel.
- Some reorders
Peter Wehrfritz 7 年之前
父节点
当前提交
900f162b5d
共有 1 个文件被更改,包括 69 次插入84 次删除
  1. 69 84
      Rx.NET/Source/src/System.Reactive/Linq/Observable/AppendPrepend.cs

+ 69 - 84
Rx.NET/Source/src/System.Reactive/Linq/Observable/AppendPrepend.cs

@@ -185,9 +185,11 @@ namespace System.Reactive.Linq.ObservableImpl
             internal sealed class _ : IdentitySink<TSource>
             {
                 private readonly IObservable<TSource> _source;
-                private readonly TSource[] _prepends;
                 private readonly TSource[] _appends;
                 private readonly IScheduler _scheduler;
+
+                private Node<TSource> _currentPrependNode;
+                private int _currentAppendIndex;
                 private volatile bool _disposed;
 
                 public _(Recursive parent, IObserver<TSource> observer)
@@ -195,11 +197,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 {
                     _source = parent._source;
                     _scheduler = parent.Scheduler;
-
-                    if (parent._prepends != null)
-                    {
-                        _prepends = parent._prepends.ToArray();
-                    }
+                    _currentPrependNode = parent._prepends;
 
                     if (parent._appends != null)
                     {
@@ -209,37 +207,35 @@ namespace System.Reactive.Linq.ObservableImpl
 
                 public void Run()
                 {
-                    if (_prepends != null)
+                    if (_currentPrependNode == null)
+                    {
+                        SetUpstream(_source.SubscribeSafe(this));
+                    }
+                    else
                     {
-                        Action<_ > continueWith = s => s.SetUpstream(s._source.SubscribeSafe(s));
                         //
                         // We never allow the scheduled work to be cancelled. Instead, the _disposed flag
                         // is used to have LoopRec bail out and perform proper clean-up of the
                         // enumerator.
                         //
-                        _scheduler.Schedule(new State(this, _prepends, continueWith), (innerScheduler, state) => state._sink.LoopRec(innerScheduler, state));
-                    }
-                    else
-                    {
-                        SetUpstream(_source.SubscribeSafe(this));
+                        _scheduler.Schedule(this, (innerScheduler, @this) => @this.PrependValues(innerScheduler));
                     }
                 }
 
                 public override void OnCompleted()
                 {
-                    if (_appends != null)
+                    if (_appends == null)
+                    {
+                        ForwardOnCompleted();
+                    }
+                    else
                     {
-                        Action<_> continueWith = s => s.ForwardOnCompleted();
                         //
                         // We never allow the scheduled work to be cancelled. Instead, the _disposed flag
                         // is used to have LoopRec bail out and perform proper clean-up of the
                         // enumerator.
                         //
-                        _scheduler.Schedule(new State(this, _appends, continueWith), (innerScheduler, state) => state._sink.LoopRec(innerScheduler, state));
-                    }
-                    else
-                    {
-                        ForwardOnCompleted();
+                        _scheduler.Schedule(this, (innerScheduler, @this) => @this.AppendValues(innerScheduler));
                     }
                 }
 
@@ -253,37 +249,49 @@ namespace System.Reactive.Linq.ObservableImpl
                     base.Dispose(disposing);
                 }
 
-                private struct State
+                private IDisposable PrependValues(IScheduler scheduler)
                 {
-                    public readonly _ _sink;
-                    public readonly TSource[] _array;
-                    public readonly Action<_> _continue;
-                    public int _current;
+                    if (_disposed)
+                    {
+                        return Disposable.Empty;
+                    }
+
+                    var current = _currentPrependNode.Value;
+                    ForwardOnNext(current);
 
-                    public State(_ sink, TSource[] array, Action<_> c)
+                    _currentPrependNode = _currentPrependNode.Parent;
+                    if (_currentPrependNode == null)
                     {
-                        _sink = sink;
-                        _continue = c;
-                        _array = array;
-                        _current = 0;
+                        SetUpstream(_source.SubscribeSafe(this));
+                    }
+                    else
+                    { 
+                        //
+                        // We never allow the scheduled work to be cancelled. Instead, the _disposed flag
+                        // is used to have LoopRec bail out and perform proper clean-up of the
+                        // enumerator.
+                        //
+                        scheduler.Schedule(this, (innerScheduler, @this) => @this.PrependValues(innerScheduler));
                     }
+
+                    return Disposable.Empty;
                 }
 
-                private IDisposable LoopRec(IScheduler scheduler, State state)
+                private IDisposable AppendValues(IScheduler scheduler)
                 {
                     if (_disposed)
                     {
                         return Disposable.Empty;
                     }
 
-                    var current = state._array[state._current];
+                    var current = _appends[_currentAppendIndex];
                     ForwardOnNext(current);
 
-                    state._current++;
+                    _currentAppendIndex++;
 
-                    if (state._current == state._array.Length)
+                    if (_currentAppendIndex == _appends.Length)
                     {
-                        state._continue(state._sink);
+                        ForwardOnCompleted();
                     }
                     else
                     { 
@@ -292,7 +300,7 @@ namespace System.Reactive.Linq.ObservableImpl
                         // is used to have LoopRec bail out and perform proper clean-up of the
                         // enumerator.
                         //
-                        scheduler.Schedule(state, (innerScheduler, s) => s._sink.LoopRec(innerScheduler, s));
+                        scheduler.Schedule(this, (innerScheduler, @this) => @this.AppendValues(innerScheduler));
                     }
 
                     return Disposable.Empty;
@@ -341,9 +349,10 @@ namespace System.Reactive.Linq.ObservableImpl
             internal sealed class _ : IdentitySink<TSource>
             {
                 private readonly IObservable<TSource> _source;
-                private readonly TSource[] _prepends;
+                private readonly Node<TSource> _prepends; 
                 private readonly TSource[] _appends;
                 private readonly ISchedulerLongRunning _scheduler;
+
                 private IDisposable _schedulerDisposable;
 
                 public _(LongRunning parent, IObserver<TSource> observer)
@@ -352,11 +361,7 @@ namespace System.Reactive.Linq.ObservableImpl
                     _source = parent._source;
                     _scheduler = parent._longRunningScheduler;
 
-                    if (parent._prepends != null)
-                    {
-                        _prepends = parent._prepends.ToArray();
-                    }
-
+                    _prepends = parent._prepends;
                     if (parent._appends != null)
                     {
                         _appends = parent._appends.ToReverseArray();
@@ -365,27 +370,27 @@ namespace System.Reactive.Linq.ObservableImpl
 
                 public void Run()
                 {
-                    if (_prepends != null)
+                    if (_prepends == null)
                     {
-                        var disposable = Schedule(_prepends, s => s.SetUpstream(s._source.SubscribeSafe(s)));
-                        Disposable.TrySetSingle(ref _schedulerDisposable, disposable);
+                        SetUpstream(_source.SubscribeSafe(this));
                     }
                     else
                     {
-                        SetUpstream(_source.SubscribeSafe(this));
+                        var disposable = _scheduler.ScheduleLongRunning(this, (@this, cancel) => @this.PrependValues(cancel));
+                        Disposable.TrySetSingle(ref _schedulerDisposable, disposable);
                     }
                 }
 
                 public override void OnCompleted()
                 {
-                    if (_appends != null)
+                    if (_appends == null)
                     {
-                        var disposable = Schedule(_appends, s => s.ForwardOnCompleted());
-                        Disposable.TrySetSerial(ref _schedulerDisposable, disposable);
+                        ForwardOnCompleted();
                     }
                     else
                     {
-                        ForwardOnCompleted();
+                        var disposable = _scheduler.ScheduleLongRunning(this, (@this, cancel) => @this.AppendValues(cancel));
+                        Disposable.TrySetSerial(ref _schedulerDisposable, disposable);
                     }
                 }
 
@@ -399,34 +404,26 @@ namespace System.Reactive.Linq.ObservableImpl
                     base.Dispose(disposing);
                 }
 
-                private IDisposable Schedule(TSource[] array, Action<_> continueWith)
-                {
-                    //
-                    // Long-running schedulers have the contract they should *never* prevent
-                    // the work from starting, such that the scheduled work has the chance
-                    // to observe the cancellation and perform proper clean-up. In this case,
-                    // we're sure Loop will be entered, allowing us to dispose the enumerator.
-                    //
-                    return _scheduler.ScheduleLongRunning(new State(this, array, continueWith), Loop);
-                }
-
-                private struct State
+                private void PrependValues(ICancelable cancel)
                 {
-                    public readonly _ _sink;
-                    public readonly TSource[] _array;
-                    public readonly Action<_> _continue;
+                    var current = _prepends;
 
-                    public State(_ sink, TSource[] array, Action<_> c)
+                    while (!cancel.IsDisposed)
                     {
-                        _sink = sink;
-                        _continue = c;
-                        _array = array;
+                        ForwardOnNext(current.Value);
+                        current = current.Parent;
+
+                        if (current == null)
+                        {
+                            SetUpstream(_source.SubscribeSafe(this));
+                            break;
+                        }
                     }
                 }
 
-                private void Loop(State state, ICancelable cancel)
+                private void AppendValues(ICancelable cancel)
                 {
-                    var array = state._array;
+                    var array = _appends;
                     var i = 0;
 
                     while (!cancel.IsDisposed)
@@ -436,7 +433,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
                         if (i == array.Length)
                         {
-                            state._continue(state._sink);
+                            ForwardOnCompleted();
                             break;
                         }
                     }
@@ -475,18 +472,6 @@ namespace System.Reactive.Linq.ObservableImpl
                 }
             }
 
-            public T[] ToArray()
-            {
-                var array = new T[Count];
-                var current = this;
-                for (var i = 0; i < Count; i++)
-                {
-                    array[i] = current.Value;
-                    current = current.Parent;
-                }
-                return array;
-            }
-
             public T[] ToReverseArray()
             {
                 var array = new T[Count];