Browse Source

Use a light recurring scheduling, similar to the `ToObservable()` class

Peter Wehrfritz 7 years ago
parent
commit
694b719fad
1 changed files with 44 additions and 34 deletions
  1. 44 34
      Rx.NET/Source/src/System.Reactive/Linq/Observable/AppendPrepend.cs

+ 44 - 34
Rx.NET/Source/src/System.Reactive/Linq/Observable/AppendPrepend.cs

@@ -169,6 +169,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 private readonly TSource[] _appends;
                 private readonly TSource[] _appends;
                 private readonly IScheduler _scheduler;
                 private readonly IScheduler _scheduler;
                 private IDisposable _schedulerDisposable;
                 private IDisposable _schedulerDisposable;
+                private volatile bool _disposed;
 
 
                 public _(AppendPrependMultiple<TSource> parent, IObserver<TSource> observer)
                 public _(AppendPrependMultiple<TSource> parent, IObserver<TSource> observer)
                     : base(observer)
                     : base(observer)
@@ -217,8 +218,10 @@ namespace System.Reactive.Linq.ObservableImpl
                 {
                 {
                     if (disposing)
                     if (disposing)
                     {
                     {
+                        _disposed = true;
                         Disposable.TryDispose(ref _schedulerDisposable);
                         Disposable.TryDispose(ref _schedulerDisposable);
                     }
                     }
+
                     base.Dispose(disposing);
                     base.Dispose(disposing);
                 }
                 }
 
 
@@ -233,42 +236,41 @@ namespace System.Reactive.Linq.ObservableImpl
                         // to observe the cancellation and perform proper clean-up. In this case,
                         // to observe the cancellation and perform proper clean-up. In this case,
                         // we're sure Loop will be entered, allowing us to dispose the enumerator.
                         // we're sure Loop will be entered, allowing us to dispose the enumerator.
                         //
                         //
-                        return longRunning.ScheduleLongRunning(new State(null, this, array, continueWith), Loop);
+                        return longRunning.ScheduleLongRunning(new State(this, array, continueWith), Loop);
+                    }
+                    else
+                    {
+                        //
+                        // We never allow the scheduled work to be cancelled. Instead, the _disposed flag
+                        // is used to have LoopRec bail out and perform proper clean-up of the
+                        // enumerator.
+                        //
+                        _scheduler.Schedule(new State(this, array, continueWith), (innerScheduler, state) => state._sink.LoopRec(innerScheduler, state));
+                        return Disposable.Empty;
                     }
                     }
-
-                    //
-                    // We never allow the scheduled work to be cancelled. Instead, the flag
-                    // is used to have LoopRec bail out and perform proper clean-up of the
-                    // enumerator.
-                    //
-                    var flag = new BooleanDisposable();
-                    _scheduler.Schedule(new State(flag, this, array, continueWith), LoopRec);
-                    return flag;
                 }
                 }
 
 
                 private struct State
                 private struct State
                 {
                 {
                     public readonly _ _sink;
                     public readonly _ _sink;
-                    public readonly ICancelable _flag;
                     public readonly TSource[] _array;
                     public readonly TSource[] _array;
                     public readonly Action<_> _continue;
                     public readonly Action<_> _continue;
                     public int _current;
                     public int _current;
 
 
-                    public State(ICancelable flag, _ sink, TSource[] array, Action<_> c)
+                    public State(_ sink, TSource[] array, Action<_> c)
                     {
                     {
                         _sink = sink;
                         _sink = sink;
-                        _flag = flag;
                         _continue = c;
                         _continue = c;
                         _array = array;
                         _array = array;
                         _current = 0;
                         _current = 0;
                     }
                     }
                 }
                 }
 
 
-                private void LoopRec(State state, Action<State> recurse)
+                private IDisposable LoopRec(IScheduler scheduler, State state)
                 {
                 {
-                    if (state._flag.IsDisposed)
+                    if (_disposed)
                     {
                     {
-                        return;
+                        return Disposable.Empty;
                     }
                     }
 
 
                     var current = state._array[state._current];
                     var current = state._array[state._current];
@@ -279,10 +281,18 @@ namespace System.Reactive.Linq.ObservableImpl
                     if (state._current == state._array.Length)
                     if (state._current == state._array.Length)
                     {
                     {
                         state._continue(state._sink);
                         state._continue(state._sink);
-                        return;
+                    }
+                    else
+                    { 
+                        //
+                        // We never allow the scheduled work to be cancelled. Instead, the _disposed flag
+                        // is used to have LoopRec bail out and perform proper clean-up of the
+                        // enumerator.
+                        //
+                        scheduler.Schedule(state, (innerScheduler, s) => s._sink.LoopRec(innerScheduler, s));
                     }
                     }
 
 
-                    recurse(state);
+                    return Disposable.Empty;
                 }
                 }
 
 
                 private void Loop(State state, ICancelable cancel)
                 private void Loop(State state, ICancelable cancel)
@@ -307,9 +317,9 @@ namespace System.Reactive.Linq.ObservableImpl
 
 
         private sealed class Node<T>
         private sealed class Node<T>
         {
         {
-            private readonly Node<T> _parent;
-            private readonly T _value;
-            private readonly int _count;
+            public readonly Node<T> Parent;
+            public readonly T Value;
+            public readonly int Count;
 
 
             public Node(T value)
             public Node(T value)
                 : this(null, value)
                 : this(null, value)
@@ -318,44 +328,44 @@ namespace System.Reactive.Linq.ObservableImpl
 
 
             public Node(Node<T> parent, T value)
             public Node(Node<T> parent, T value)
             {
             {
-                _parent = parent;
-                _value = value;
+                Parent = parent;
+                Value = value;
 
 
                 if (parent == null)
                 if (parent == null)
                 {
                 {
-                    _count = 1;
+                    Count = 1;
                 }
                 }
                 else
                 else
                 {
                 {
-                    if (parent._count == int.MaxValue)
+                    if (parent.Count == int.MaxValue)
                     {
                     {
                         throw new NotSupportedException($"Consecutive appends or prepends with a count of more than int.MaxValue ({int.MaxValue}) are not supported.");
                         throw new NotSupportedException($"Consecutive appends or prepends with a count of more than int.MaxValue ({int.MaxValue}) are not supported.");
                     }
                     }
 
 
-                    _count = parent._count + 1;
+                    Count = parent.Count + 1;
                 }
                 }
             }
             }
 
 
             public T[] ToArray()
             public T[] ToArray()
             {
             {
-                var array = new T[_count];
+                var array = new T[Count];
                 var current = this;
                 var current = this;
-                for (var i = 0; i < _count; i++)
+                for (var i = 0; i < Count; i++)
                 {
                 {
-                    array[i] = current._value;
-                    current = current._parent;
+                    array[i] = current.Value;
+                    current = current.Parent;
                 }
                 }
                 return array;
                 return array;
             }
             }
 
 
             public T[] ToReverseArray()
             public T[] ToReverseArray()
             {
             {
-                var array = new T[_count];
+                var array = new T[Count];
                 var current = this;
                 var current = this;
-                for (var i = _count - 1; i >= 0; i--)
+                for (var i = Count - 1; i >= 0; i--)
                 {
                 {
-                    array[i] = current._value;
-                    current = current._parent;
+                    array[i] = current.Value;
+                    current = current.Parent;
                 }
                 }
                 return array;
                 return array;
             }
             }