Procházet zdrojové kódy

Merge pull request #697 from akarnokd/TakeLastImprovements

4.x: TakeLast() improve structure and recursive scheduling
Daniel C. Weber před 7 roky
rodič
revize
0d4e94c330

+ 34 - 26
Rx.NET/Source/src/System.Reactive/Linq/Observable/TakeLast.cs

@@ -29,15 +29,15 @@ namespace System.Reactive.Linq.ObservableImpl
 
             internal sealed class _ : IdentitySink<TSource>
             {
-                // CONSIDER: This sink has a parent reference that can be considered for removal.
-
-                private readonly Count _parent;
-                private Queue<TSource> _queue;
+                private readonly int _count;
+                private readonly IScheduler _loopScheduler;
+                private readonly Queue<TSource> _queue;
 
                 public _(Count parent, IObserver<TSource> observer)
                     : base(observer)
                 {
-                    _parent = parent;
+                    _count = parent._count;
+                    _loopScheduler = parent._loopScheduler;
                     _queue = new Queue<TSource>();
                 }
 
@@ -56,7 +56,8 @@ namespace System.Reactive.Linq.ObservableImpl
                 public override void OnNext(TSource value)
                 {
                     _queue.Enqueue(value);
-                    if (_queue.Count > _parent._count)
+
+                    if (_queue.Count > _count)
                     {
                         _queue.Dequeue();
                     }
@@ -66,28 +67,32 @@ namespace System.Reactive.Linq.ObservableImpl
                 {
                     DisposeUpstream();
 
-                    var longRunning = _parent._loopScheduler.AsLongRunning();
+                    var longRunning = _loopScheduler.AsLongRunning();
                     if (longRunning != null)
                     {
                         Disposable.SetSingle(ref _loopDisposable, longRunning.ScheduleLongRunning(this, (@this, c) => @this.Loop(c)));
                     }
                     else
                     {
-                        Disposable.SetSingle(ref _loopDisposable, _parent._loopScheduler.Schedule(this, (@this, a) => @this.LoopRec(a)));
+                        var first = _loopScheduler.Schedule(this, (innerScheduler, @this) => @this.LoopRec(innerScheduler));
+                        Disposable.TrySetSingle(ref _loopDisposable, first);
                     }
                 }
 
-                private void LoopRec(Action<_> recurse)
+                private IDisposable LoopRec(IScheduler scheduler)
                 {
                     if (_queue.Count > 0)
                     {
                         ForwardOnNext(_queue.Dequeue());
-                        recurse(this);
+
+                        var next = scheduler.Schedule(this, (innerScheduler, @this) => @this.LoopRec(innerScheduler));
+                        Disposable.TrySetMultiple(ref _loopDisposable, next);
                     }
                     else
                     {
                         ForwardOnCompleted();
                     }
+                    return Disposable.Empty;
                 }
 
                 private void Loop(ICancelable cancel)
@@ -131,37 +136,36 @@ namespace System.Reactive.Linq.ObservableImpl
 
             protected override _ CreateSink(IObserver<TSource> observer) => new _(this, observer);
 
-            protected override void Run(_ sink) => sink.Run();
+            protected override void Run(_ sink) => sink.Run(_source, _scheduler);
 
             internal sealed class _ : IdentitySink<TSource>
             {
-                // CONSIDER: This sink has a parent reference that can be considered for removal.
-
-                private readonly Time _parent;
+                private readonly TimeSpan _duration;
+                private readonly IScheduler _loopScheduler;
                 private Queue<System.Reactive.TimeInterval<TSource>> _queue;
 
                 public _(Time parent, IObserver<TSource> observer)
                     : base(observer)
                 {
-                    _parent = parent;
+                    _duration = parent._duration;
+                    _loopScheduler = parent._loopScheduler;
                     _queue = new Queue<System.Reactive.TimeInterval<TSource>>();
                 }
 
-                private IDisposable _sourceDisposable;
                 private IDisposable _loopDisposable;
                 private IStopwatch _watch;
 
-                public void Run()
+                public void Run(IObservable<TSource> source, IScheduler scheduler)
                 {
-                    _watch = _parent._scheduler.StartStopwatch();
-                    Disposable.SetSingle(ref _sourceDisposable, _parent._source.SubscribeSafe(this));
+                    _watch = scheduler.StartStopwatch();
+                    base.Run(source);
                 }
 
                 protected override void Dispose(bool disposing)
                 {
                     if (disposing)
                     {
-                        Disposable.TryDispose(ref _sourceDisposable);
+                        Disposable.TryDispose(ref _loopDisposable);
                     }
                     base.Dispose(disposing);
                 }
@@ -175,33 +179,37 @@ namespace System.Reactive.Linq.ObservableImpl
 
                 public override void OnCompleted()
                 {
-                    Disposable.TryDispose(ref _sourceDisposable);
+                    DisposeUpstream();
 
                     var now = _watch.Elapsed;
                     Trim(now);
 
-                    var longRunning = _parent._loopScheduler.AsLongRunning();
+                    var longRunning = _loopScheduler.AsLongRunning();
                     if (longRunning != null)
                     {
                         Disposable.SetSingle(ref _loopDisposable, longRunning.ScheduleLongRunning(this, (@this, c) => @this.Loop(c)));
                     }
                     else
                     {
-                        Disposable.SetSingle(ref _loopDisposable, _parent._loopScheduler.Schedule(this, (@this, a) => @this.LoopRec(a)));
+                        var first = _loopScheduler.Schedule(this, (innerScheduler, @this) => @this.LoopRec(innerScheduler));
+                        Disposable.TrySetSingle(ref _loopDisposable, first);
                     }
                 }
 
-                private void LoopRec(Action<_> recurse)
+                private IDisposable LoopRec(IScheduler scheduler)
                 {
                     if (_queue.Count > 0)
                     {
                         ForwardOnNext(_queue.Dequeue().Value);
-                        recurse(this);
+
+                        var next = scheduler.Schedule(this, (innerScheduler, @this) => @this.LoopRec(innerScheduler));
+                        Disposable.TrySetMultiple(ref _loopDisposable, next);
                     }
                     else
                     {
                         ForwardOnCompleted();
                     }
+                    return Disposable.Empty;
                 }
 
                 private void Loop(ICancelable cancel)
@@ -228,7 +236,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
                 private void Trim(TimeSpan now)
                 {
-                    while (_queue.Count > 0 && now - _queue.Peek().Interval >= _parent._duration)
+                    while (_queue.Count > 0 && now - _queue.Peek().Interval >= _duration)
                     {
                         _queue.Dequeue();
                     }