瀏覽代碼

Reimplement Timeout(TimeSpan) with lock-free methods (#546)

David Karnok 7 年之前
父節點
當前提交
795a430f92
共有 1 個文件被更改,包括 45 次插入60 次删除
  1. 45 60
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Timeout.cs

+ 45 - 60
Rx.NET/Source/src/System.Reactive/Linq/Observable/Timeout.cs

@@ -4,6 +4,7 @@
 
 using System.Reactive.Concurrency;
 using System.Reactive.Disposables;
+using System.Threading;
 
 namespace System.Reactive.Linq.ObservableImpl
 {
@@ -34,9 +35,11 @@ namespace System.Reactive.Linq.ObservableImpl
                 private readonly IObservable<TSource> _other;
                 private readonly IScheduler _scheduler;
 
-                private readonly object _gate = new object();
-                private SerialDisposable _subscription = new SerialDisposable();
-                private SerialDisposable _timer = new SerialDisposable();
+                long _index;
+
+                IDisposable _mainDisposable;
+                IDisposable _otherDisposable;
+                IDisposable _timerDisposable;
 
                 public _(Relative parent, IObserver<TSource> observer, IDisposable cancel)
                     : base(observer, cancel)
@@ -46,100 +49,82 @@ namespace System.Reactive.Linq.ObservableImpl
                     _scheduler = parent._scheduler;
                 }
 
-                private ulong _id;
-                private bool _switched;
-
                 public IDisposable Run(IObservable<TSource> source)
                 {
-                    var original = new SingleAssignmentDisposable();
 
-                    _subscription.Disposable = original;
+                    CreateTimer(0L);
 
-                    _id = 0UL;
-                    _switched = false;
-
-                    CreateTimer();
-
-                    original.Disposable = source.SubscribeSafe(this);
+                    Disposable.SetSingle(ref _mainDisposable, source.SubscribeSafe(this));
 
-                    return StableCompositeDisposable.Create(_subscription, _timer);
+                    return this;
                 }
 
-                private void CreateTimer()
+                protected override void Dispose(bool disposing)
                 {
-                    _timer.Disposable = _scheduler.Schedule(_id, _dueTime, Timeout);
+                    if (disposing)
+                    {
+                        Disposable.TryDispose(ref _mainDisposable);
+                        Disposable.TryDispose(ref _otherDisposable);
+                        Disposable.TryDispose(ref _timerDisposable);
+                    }
+                    base.Dispose(disposing);
                 }
 
-                private IDisposable Timeout(IScheduler _, ulong myid)
+                private void CreateTimer(long idx)
                 {
-                    var timerWins = false;
-
-                    lock (_gate)
+                    if (Disposable.TrySetMultiple(ref _timerDisposable, null))
                     {
-                        _switched = (_id == myid);
-                        timerWins = _switched;
-                    }
 
-                    if (timerWins)
-                        _subscription.Disposable = _other.SubscribeSafe(GetForwarder());
+                        var d = _scheduler.Schedule((idx, instance: this), _dueTime, (_, state) => { state.instance.Timeout(state.idx); return Disposable.Empty; });
 
-                    return Disposable.Empty;
+                        Disposable.TrySetMultiple(ref _timerDisposable, d);
+                    }
                 }
 
-                public override void OnNext(TSource value)
+                private void Timeout(long idx)
                 {
-                    var onNextWins = false;
-
-                    lock (_gate)
+                    if (Volatile.Read(ref _index) == idx && Interlocked.CompareExchange(ref _index, long.MaxValue, idx) == idx)
                     {
-                        onNextWins = !_switched;
-                        if (onNextWins)
-                        {
-                            _id = unchecked(_id + 1);
-                        }
+                        Disposable.TryDispose(ref _mainDisposable);
+
+                        var d = _other.Subscribe(GetForwarder());
+
+                        Disposable.SetSingle(ref _otherDisposable, d);
                     }
+                }
 
-                    if (onNextWins)
+                public override void OnNext(TSource value)
+                {
+                    var idx = Volatile.Read(ref _index);
+                    if (idx != long.MaxValue && Interlocked.CompareExchange(ref _index, idx + 1, idx) == idx)
                     {
+                        // Do not swap in the BooleanDisposable.True here
+                        // As we'll need _timerDisposable to store the next timer
+                        // BD.True would cancel it immediately and break the operation
+                        Volatile.Read(ref _timerDisposable)?.Dispose();
+
                         ForwardOnNext(value);
-                        CreateTimer();
+
+                        CreateTimer(idx + 1);
                     }
                 }
 
                 public override void OnError(Exception error)
                 {
-                    var onErrorWins = false;
-
-                    lock (_gate)
+                    if (Interlocked.Exchange(ref _index, long.MaxValue) != long.MaxValue)
                     {
-                        onErrorWins = !_switched;
-                        if (onErrorWins)
-                        {
-                            _id = unchecked(_id + 1);
-                        }
-                    }
+                        Disposable.TryDispose(ref _timerDisposable);
 
-                    if (onErrorWins)
-                    {
                         ForwardOnError(error);
                     }
                 }
 
                 public override void OnCompleted()
                 {
-                    var onCompletedWins = false;
-
-                    lock (_gate)
+                    if (Interlocked.Exchange(ref _index, long.MaxValue) != long.MaxValue)
                     {
-                        onCompletedWins = !_switched;
-                        if (onCompletedWins)
-                        {
-                            _id = unchecked(_id + 1);
-                        }
-                    }
+                        Disposable.TryDispose(ref _timerDisposable);
 
-                    if (onCompletedWins)
-                    {
                         ForwardOnCompleted();
                     }
                 }