Explorar o código

4.x: Fix Delay hanging due to wrong disposable-chain management (#560)

David Karnok %!s(int64=7) %!d(string=hai) anos
pai
achega
dfe52723b1
Modificáronse 1 ficheiros con 47 adicións e 24 borrados
  1. 47 24
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Delay.cs

+ 47 - 24
Rx.NET/Source/src/System.Reactive/Linq/Observable/Delay.cs

@@ -36,7 +36,7 @@ namespace System.Reactive.Linq.ObservableImpl
             internal abstract class S : _
             {
                 protected readonly object _gate = new object();
-                protected readonly SerialDisposable _cancelable = new SerialDisposable();
+                protected IDisposable _cancelable;
 
                 protected readonly IScheduler _scheduler;
 
@@ -74,22 +74,31 @@ namespace System.Reactive.Linq.ObservableImpl
 
                     RunCore(parent);
 
-                    var sourceSubscription = new SingleAssignmentDisposable();
-                    _sourceSubscription = sourceSubscription;
-                    sourceSubscription.Disposable = parent._source.SubscribeSafe(this);
+                    Disposable.SetSingle(ref _sourceSubscription, parent._source.SubscribeSafe(this));
 
-                    return StableCompositeDisposable.Create(_sourceSubscription, _cancelable);
+                    return this;
+                }
+
+                protected override void Dispose(bool disposing)
+                {
+                    if (disposing)
+                    {
+                        Disposable.TryDispose(ref _sourceSubscription);
+                        Disposable.TryDispose(ref _cancelable);
+                    }
+                    base.Dispose(disposing);
                 }
 
                 protected abstract void RunCore(TParent parent);
 
                 public override void OnNext(TSource value)
                 {
-                    var next = _watch.Elapsed.Add(_delay);
                     var shouldRun = false;
 
                     lock (_gate)
                     {
+                        var next = _watch.Elapsed.Add(_delay);
+
                         _queue.Enqueue(new System.Reactive.TimeInterval<TSource>(value, next));
 
                         shouldRun = _ready && !_active;
@@ -98,13 +107,13 @@ namespace System.Reactive.Linq.ObservableImpl
 
                     if (shouldRun)
                     {
-                        _cancelable.Disposable = _scheduler.Schedule(_delay, DrainQueue);
+                        Disposable.TrySetSerial(ref _cancelable, _scheduler.Schedule(_delay, DrainQueue));
                     }
                 }
 
                 public override void OnError(Exception error)
                 {
-                    _sourceSubscription.Dispose();
+                    Disposable.TryDispose(ref _sourceSubscription);
 
                     var shouldRun = false;
 
@@ -126,13 +135,14 @@ namespace System.Reactive.Linq.ObservableImpl
 
                 public override void OnCompleted()
                 {
-                    _sourceSubscription.Dispose();
+                    Disposable.TryDispose(ref _sourceSubscription);
 
-                    var next = _watch.Elapsed.Add(_delay);
                     var shouldRun = false;
 
                     lock (_gate)
                     {
+                        var next = _watch.Elapsed.Add(_delay);
+
                         _completeAt = next;
                         _hasCompleted = true;
 
@@ -142,7 +152,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
                     if (shouldRun)
                     {
-                        _cancelable.Disposable = _scheduler.Schedule(_delay, DrainQueue);
+                        Disposable.TrySetSerial(ref _cancelable, _scheduler.Schedule(_delay, DrainQueue));
                     }
                 }
 
@@ -256,7 +266,7 @@ namespace System.Reactive.Linq.ObservableImpl
             protected abstract class L : _
             {
                 protected readonly object _gate = new object();
-                protected readonly SerialDisposable _cancelable = new SerialDisposable();
+                protected IDisposable _cancelable;
                 private readonly SemaphoreSlim _evt = new SemaphoreSlim(0);
 
                 private readonly IScheduler _scheduler;
@@ -291,11 +301,19 @@ namespace System.Reactive.Linq.ObservableImpl
 
                     RunCore(parent);
 
-                    var sourceSubscription = new SingleAssignmentDisposable();
-                    _sourceSubscription = sourceSubscription;
-                    sourceSubscription.Disposable = parent._source.SubscribeSafe(this);
+                    Disposable.SetSingle(ref _sourceSubscription, parent._source.SubscribeSafe(this));
 
-                    return StableCompositeDisposable.Create(_sourceSubscription, _cancelable);
+                    return this;
+                }
+
+                protected override void Dispose(bool disposing)
+                {
+                    if (disposing)
+                    {
+                        Disposable.TryDispose(ref _sourceSubscription);
+                        Disposable.TryDispose(ref _cancelable);
+                    }
+                    base.Dispose(disposing);
                 }
 
                 protected abstract void RunCore(TParent parent);
@@ -303,17 +321,18 @@ namespace System.Reactive.Linq.ObservableImpl
                 protected void ScheduleDrain()
                 {
                     _stop = new CancellationTokenSource();
-                    _cancelable.Disposable = Disposable.Create(_stop.Cancel);
+                    Disposable.TrySetSerial(ref _cancelable, Disposable.Create(_stop.Cancel));
 
                     _scheduler.AsLongRunning().ScheduleLongRunning(DrainQueue);
                 }
 
                 public override void OnNext(TSource value)
                 {
-                    var next = _watch.Elapsed.Add(_delay);
 
                     lock (_gate)
                     {
+                        var next = _watch.Elapsed.Add(_delay);
+
                         _queue.Enqueue(new System.Reactive.TimeInterval<TSource>(value, next));
 
                         _evt.Release();
@@ -322,7 +341,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
                 public override void OnError(Exception error)
                 {
-                    _sourceSubscription.Dispose();
+                    Disposable.TryDispose(ref _sourceSubscription);
 
                     lock (_gate)
                     {
@@ -337,12 +356,13 @@ namespace System.Reactive.Linq.ObservableImpl
 
                 public override void OnCompleted()
                 {
-                    _sourceSubscription.Dispose();
+                    Disposable.TryDispose(ref _sourceSubscription);
 
-                    var next = _watch.Elapsed.Add(_delay);
 
                     lock (_gate)
                     {
+                        var next = _watch.Elapsed.Add(_delay);
+
                         _completeAt = next;
                         _hasCompleted = true;
 
@@ -473,7 +493,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 {
                     _ready = false;
 
-                    _cancelable.Disposable = parent._scheduler.Schedule(parent._dueTime, Start);
+                    Disposable.TrySetSingle(ref _cancelable, parent._scheduler.Schedule(parent._dueTime, Start));
                 }
 
                 private void Start()
@@ -507,7 +527,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
                     if (shouldRun)
                     {
-                        _cancelable.Disposable = _scheduler.Schedule(next, DrainQueue);
+                        Disposable.TrySetSerial(ref _cancelable, _scheduler.Schedule(next, DrainQueue));
                     }
                 }
             }
@@ -521,7 +541,10 @@ namespace System.Reactive.Linq.ObservableImpl
 
                 protected override void RunCore(Absolute parent)
                 {
-                    _cancelable.Disposable = parent._scheduler.Schedule(parent._dueTime, Start);
+                    // ScheduleDrain might have already set a newer disposable
+                    // using TrySetSerial would cancel it, stopping the emission
+                    // and hang the consumer
+                    Disposable.TrySetSingle(ref _cancelable, parent._scheduler.Schedule(parent._dueTime, Start));
                 }
 
                 private void Start()