瀏覽代碼

Review Delay-operator (#655)

* Save the _sourceSubscription field in the various Sinks of Delay, use IdentitySink features.

* Save the allocation of a SingleAssignmentDisposable.

* Save some closure allocations and allow delegate caching.

* Don't set fields to their default values in Run.
Daniel C. Weber 7 年之前
父節點
當前提交
7eb1014910
共有 1 個文件被更改,包括 29 次插入48 次删除
  1. 29 48
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Delay.cs

+ 29 - 48
Rx.NET/Source/src/System.Reactive/Linq/Observable/Delay.cs

@@ -46,8 +46,6 @@ namespace System.Reactive.Linq.ObservableImpl
                     _scheduler = parent._scheduler;
                 }
 
-                private IDisposable _sourceSubscription;
-
                 protected IStopwatch _watch;
                 protected TimeSpan _delay;
                 protected bool _ready;
@@ -62,30 +60,21 @@ namespace System.Reactive.Linq.ObservableImpl
 
                 public override void Run(TParent parent)
                 {
-                    _active = false;
-                    _running = false;
-                    _queue = new Queue<System.Reactive.TimeInterval<TSource>>();
-                    _hasCompleted = false;
-                    _completeAt = default(TimeSpan);
-                    _hasFailed = false;
-                    _exception = default(Exception);
-
                     _watch = _scheduler.StartStopwatch();
 
                     RunCore(parent);
 
-                    Disposable.SetSingle(ref _sourceSubscription, parent._source.SubscribeSafe(this));
+                    base.Run(parent._source);
                 }
 
                 protected override void Dispose(bool disposing)
                 {
+                    base.Dispose(disposing);
+
                     if (disposing)
                     {
-                        Disposable.TryDispose(ref _sourceSubscription);
                         Disposable.TryDispose(ref _cancelable);
                     }
-
-                    base.Dispose(disposing);
                 }
 
                 protected abstract void RunCore(TParent parent);
@@ -112,7 +101,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
                 public override void OnError(Exception error)
                 {
-                    Disposable.TryDispose(ref _sourceSubscription);
+                    DisposeUpstream();
 
                     var shouldRun = false;
 
@@ -134,7 +123,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
                 public override void OnCompleted()
                 {
-                    Disposable.TryDispose(ref _sourceSubscription);
+                    DisposeUpstream();
 
                     var shouldRun = false;
 
@@ -276,11 +265,9 @@ namespace System.Reactive.Linq.ObservableImpl
                     _scheduler = parent._scheduler;
                 }
 
-                private IDisposable _sourceSubscription;
-
                 protected IStopwatch _watch;
                 protected TimeSpan _delay;
-                protected Queue<System.Reactive.TimeInterval<TSource>> _queue;
+                protected Queue<System.Reactive.TimeInterval<TSource>> _queue = new Queue<System.Reactive.TimeInterval<TSource>>();
 
                 private CancellationTokenSource _stop;
                 private bool _hasCompleted;
@@ -290,27 +277,21 @@ namespace System.Reactive.Linq.ObservableImpl
 
                 public override void Run(TParent parent)
                 {
-                    _queue = new Queue<System.Reactive.TimeInterval<TSource>>();
-                    _hasCompleted = false;
-                    _completeAt = default(TimeSpan);
-                    _hasFailed = false;
-                    _exception = default(Exception);
-
                     _watch = _scheduler.StartStopwatch();
 
                     RunCore(parent);
 
-                    Disposable.SetSingle(ref _sourceSubscription, parent._source.SubscribeSafe(this));
+                    base.Run(parent._source);
                 }
 
                 protected override void Dispose(bool disposing)
                 {
+                    base.Dispose(disposing);
+
                     if (disposing)
                     {
-                        Disposable.TryDispose(ref _sourceSubscription);
                         Disposable.TryDispose(ref _cancelable);
                     }
-                    base.Dispose(disposing);
                 }
 
                 protected abstract void RunCore(TParent parent);
@@ -325,7 +306,6 @@ namespace System.Reactive.Linq.ObservableImpl
 
                 public override void OnNext(TSource value)
                 {
-
                     lock (_gate)
                     {
                         var next = _watch.Elapsed.Add(_delay);
@@ -338,7 +318,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
                 public override void OnError(Exception error)
                 {
-                    Disposable.TryDispose(ref _sourceSubscription);
+                    DisposeUpstream();
 
                     lock (_gate)
                     {
@@ -353,8 +333,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
                 public override void OnCompleted()
                 {
-                    Disposable.TryDispose(ref _sourceSubscription);
-
+                    DisposeUpstream();
 
                     lock (_gate)
                     {
@@ -490,10 +469,10 @@ namespace System.Reactive.Linq.ObservableImpl
                 {
                     _ready = false;
 
-                    Disposable.TrySetSingle(ref _cancelable, parent._scheduler.Schedule(parent._dueTime, Start));
+                    Disposable.TrySetSingle(ref _cancelable, parent._scheduler.Schedule(this, parent._dueTime, (_, @this) => @this.Start()));
                 }
 
-                private void Start()
+                private IDisposable Start()
                 {
                     var next = default(TimeSpan);
                     var shouldRun = false;
@@ -526,6 +505,8 @@ namespace System.Reactive.Linq.ObservableImpl
                     {
                         Disposable.TrySetSerial(ref _cancelable, _scheduler.Schedule((Base<Absolute>.S)this, next, (@this, a) => DrainQueue(a)));
                     }
+
+                    return Disposable.Empty;
                 }
             }
 
@@ -541,10 +522,10 @@ namespace System.Reactive.Linq.ObservableImpl
                     // ScheduleDrain might have already set a newer disposable
                     // using TrySetSerial would cancel it, stopping the emission
                     // and hang the consumer
-                    Disposable.TrySetSingle(ref _cancelable, parent._scheduler.Schedule(parent._dueTime, Start));
+                    Disposable.TrySetSingle(ref _cancelable, parent._scheduler.Schedule(this, parent._dueTime, (_, @this) => @this.Start()));
                 }
 
-                private void Start()
+                private IDisposable Start()
                 {
                     lock (_gate)
                     {
@@ -561,6 +542,8 @@ namespace System.Reactive.Linq.ObservableImpl
                     }
 
                     ScheduleDrain();
+
+                    return Disposable.Empty;
                 }
             }
         }
@@ -674,9 +657,9 @@ namespace System.Reactive.Linq.ObservableImpl
                         return;
                     }
 
-                    var d = new SingleAssignmentDisposable();
-                    _delays.Add(d);
-                    d.Disposable = delay.SubscribeSafe(new DelayObserver(this, value, d));
+                    var observer = new DelayObserver(this, value);
+                    _delays.Add(observer);
+                    observer.SetResource(delay.SubscribeSafe(observer));
                 }
 
                 public override void OnError(Exception error)
@@ -706,31 +689,29 @@ namespace System.Reactive.Linq.ObservableImpl
                     }
                 }
 
-                private sealed class DelayObserver : IObserver<TDelay>
+                private sealed class DelayObserver : SafeObserver<TDelay>
                 {
                     private readonly _ _parent;
                     private readonly TSource _value;
-                    private readonly IDisposable _self;
 
-                    public DelayObserver(_ parent, TSource value, IDisposable self)
+                    public DelayObserver(_ parent, TSource value)
                     {
                         _parent = parent;
                         _value = value;
-                        _self = self;
                     }
 
-                    public void OnNext(TDelay value)
+                    public override void OnNext(TDelay value)
                     {
                         lock (_parent._gate)
                         {
                             _parent.ForwardOnNext(_value);
 
-                            _parent._delays.Remove(_self);
+                            _parent._delays.Remove(this);
                             _parent.CheckDone();
                         }
                     }
 
-                    public void OnError(Exception error)
+                    public override void OnError(Exception error)
                     {
                         lock (_parent._gate)
                         {
@@ -738,13 +719,13 @@ namespace System.Reactive.Linq.ObservableImpl
                         }
                     }
 
-                    public void OnCompleted()
+                    public override void OnCompleted()
                     {
                         lock (_parent._gate)
                         {
                             _parent.ForwardOnNext(_value);
 
-                            _parent._delays.Remove(_self);
+                            _parent._delays.Remove(this);
                             _parent.CheckDone();
                         }
                     }