Browse Source

Use SerialDisposableValue where straightforward.

Daniel Weber 5 years ago
parent
commit
0097047480

+ 3 - 3
Rx.NET/Source/src/System.Reactive/Concurrency/EventLoopScheduler.cs

@@ -66,7 +66,7 @@ namespace System.Reactive.Concurrency
         /// <summary>
         /// <summary>
         /// Disposable that always holds the timer to dispatch the first element in the queue.
         /// Disposable that always holds the timer to dispatch the first element in the queue.
         /// </summary>
         /// </summary>
-        private IDisposable? _nextTimer;
+        private SerialDisposableValue _nextTimer;
 
 
         /// <summary>
         /// <summary>
         /// Flag indicating whether the event loop should quit. When set, the event should be signaled as well to
         /// Flag indicating whether the event loop should quit. When set, the event should be signaled as well to
@@ -265,7 +265,7 @@ namespace System.Reactive.Concurrency
                 if (!_disposed)
                 if (!_disposed)
                 {
                 {
                     _disposed = true;
                     _disposed = true;
-                    Disposable.Dispose(ref _nextTimer);
+                    _nextTimer.Dispose();
                     _evt.Release();
                     _evt.Release();
                 }
                 }
             }
             }
@@ -334,7 +334,7 @@ namespace System.Reactive.Concurrency
                             _nextItem = next;
                             _nextItem = next;
 
 
                             var due = next.DueTime - _stopwatch.Elapsed;
                             var due = next.DueTime - _stopwatch.Elapsed;
-                            Disposable.TrySetSerial(ref _nextTimer, ConcurrencyAbstractionLayer.Current.StartTimer(Tick, next, due));
+                            _nextTimer.Disposable = ConcurrencyAbstractionLayer.Current.StartTimer(Tick, next, due);
                         }
                         }
                     }
                     }
 
 

+ 4 - 4
Rx.NET/Source/src/System.Reactive/Concurrency/TaskPoolScheduler.cs

@@ -20,7 +20,7 @@ namespace System.Reactive.Concurrency
             private readonly TaskPoolScheduler _scheduler;
             private readonly TaskPoolScheduler _scheduler;
             private readonly Func<IScheduler, TState, IDisposable> _action;
             private readonly Func<IScheduler, TState, IDisposable> _action;
 
 
-            private IDisposable _cancel;
+            private SerialDisposableValue _cancel;
 
 
             public ScheduledWorkItem(TaskPoolScheduler scheduler, TState state, Func<IScheduler, TState, IDisposable> action)
             public ScheduledWorkItem(TaskPoolScheduler scheduler, TState state, Func<IScheduler, TState, IDisposable> action)
             {
             {
@@ -30,7 +30,7 @@ namespace System.Reactive.Concurrency
 
 
                 var cancelable = new CancellationDisposable();
                 var cancelable = new CancellationDisposable();
 
 
-                Disposable.SetSingle(ref _cancel, cancelable);
+                _cancel.Disposable = cancelable;
 
 
                 scheduler._taskFactory.StartNew(
                 scheduler._taskFactory.StartNew(
                     thisObject =>
                     thisObject =>
@@ -60,7 +60,7 @@ namespace System.Reactive.Concurrency
                         // exceptions at stage 2. If the exception isn't handled at the Rx level, it
                         // exceptions at stage 2. If the exception isn't handled at the Rx level, it
                         // propagates by means of a rethrow, falling back to behavior in 3.
                         // propagates by means of a rethrow, falling back to behavior in 3.
                         //
                         //
-                        Disposable.TrySetSerial(ref @this._cancel, @this._action(@this._scheduler, @this._state));
+                        @this._cancel.Disposable = @this._action(@this._scheduler, @this._state);
                     },
                     },
                     this,
                     this,
                     cancelable.Token);
                     cancelable.Token);
@@ -68,7 +68,7 @@ namespace System.Reactive.Concurrency
 
 
             public void Dispose()
             public void Dispose()
             {
             {
-                Disposable.Dispose(ref _cancel);
+                _cancel.Dispose();
             }
             }
         }
         }
 
 

+ 5 - 5
Rx.NET/Source/src/System.Reactive/Internal/ScheduledObserver.cs

@@ -25,7 +25,7 @@ namespace System.Reactive
         private readonly IObserver<T> _observer;
         private readonly IObserver<T> _observer;
         private readonly IScheduler _scheduler;
         private readonly IScheduler _scheduler;
         private readonly ISchedulerLongRunning? _longRunning;
         private readonly ISchedulerLongRunning? _longRunning;
-        private IDisposable? _disposable;
+        private SerialDisposableValue _disposable;
 
 
         public ScheduledObserver(IScheduler scheduler, IObserver<T> observer)
         public ScheduledObserver(IScheduler scheduler, IObserver<T> observer)
         {
         {
@@ -70,11 +70,11 @@ namespace System.Reactive
                     {
                     {
                         _dispatcherJob = _longRunning!.ScheduleLongRunning(Dispatch); // NB: Only reachable when long-running.
                         _dispatcherJob = _longRunning!.ScheduleLongRunning(Dispatch); // NB: Only reachable when long-running.
 
 
-                        Disposable.TrySetSerial(ref _disposable, StableCompositeDisposable.Create
+                        _disposable.Disposable = StableCompositeDisposable.Create
                         (
                         (
                             _dispatcherJob,
                             _dispatcherJob,
                             _dispatcherEventRelease!
                             _dispatcherEventRelease!
-                        ));
+                        );
                     }
                     }
                 }
                 }
             }
             }
@@ -198,7 +198,7 @@ namespace System.Reactive
 
 
             if (isOwner)
             if (isOwner)
             {
             {
-                Disposable.TrySetSerial(ref _disposable, _scheduler.Schedule<object?>(null, Run));
+                _disposable.Disposable = _scheduler.Schedule<object?>(null, Run);
             }
             }
         }
         }
 
 
@@ -317,7 +317,7 @@ namespace System.Reactive
 
 
             if (disposing)
             if (disposing)
             {
             {
-                Disposable.Dispose(ref _disposable);
+                _disposable.Dispose();
             }
             }
         }
         }
     }
     }

+ 5 - 5
Rx.NET/Source/src/System.Reactive/Internal/SystemClock.Default.cs

@@ -39,7 +39,7 @@ namespace System.Reactive.PlatformServices
     public class PeriodicTimerSystemClockMonitor : INotifySystemClockChanged
     public class PeriodicTimerSystemClockMonitor : INotifySystemClockChanged
     {
     {
         private readonly TimeSpan _period;
         private readonly TimeSpan _period;
-        private IDisposable? _timer;
+        private SerialDisposableValue _timer;
 
 
         /// <summary>
         /// <summary>
         /// Use the Unix milliseconds for the current time
         /// Use the Unix milliseconds for the current time
@@ -78,13 +78,13 @@ namespace System.Reactive.PlatformServices
             {
             {
                 _systemClockChanged -= value;
                 _systemClockChanged -= value;
 
 
-                Disposable.TrySetSerial(ref _timer, Disposable.Empty);
+                _timer.Disposable = Disposable.Empty;
             }
             }
         }
         }
 
 
         private void NewTimer()
         private void NewTimer()
         {
         {
-            Disposable.TrySetSerial(ref _timer, Disposable.Empty);
+            _timer.Disposable = Disposable.Empty;
 
 
             var n = 0L;
             var n = 0L;
             for (; ; )
             for (; ; )
@@ -92,13 +92,13 @@ namespace System.Reactive.PlatformServices
                 var now = SystemClock.UtcNow.ToUnixTimeMilliseconds();
                 var now = SystemClock.UtcNow.ToUnixTimeMilliseconds();
                 Interlocked.Exchange(ref _lastTimeUnixMillis, now);
                 Interlocked.Exchange(ref _lastTimeUnixMillis, now);
 
 
-                Disposable.TrySetSerial(ref _timer, ConcurrencyAbstractionLayer.Current.StartPeriodicTimer(TimeChanged, _period));
+                _timer.Disposable = ConcurrencyAbstractionLayer.Current.StartPeriodicTimer(TimeChanged, _period);
 
 
                 if (Math.Abs(SystemClock.UtcNow.ToUnixTimeMilliseconds() - now) <= SyncMaxDelta)
                 if (Math.Abs(SystemClock.UtcNow.ToUnixTimeMilliseconds() - now) <= SyncMaxDelta)
                 {
                 {
                     break;
                     break;
                 }
                 }
-                if (Volatile.Read(ref _timer) == Disposable.Empty)
+                if (_timer.Disposable == Disposable.Empty)
                 {
                 {
                     break;
                     break;
                 }
                 }

+ 9 - 9
Rx.NET/Source/src/System.Reactive/Linq/Observable/Buffer.cs

@@ -270,7 +270,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 private readonly IScheduler _scheduler;
                 private readonly IScheduler _scheduler;
                 private readonly object _gate = new object();
                 private readonly object _gate = new object();
                 private readonly Queue<List<TSource>> _q = new Queue<List<TSource>>();
                 private readonly Queue<List<TSource>> _q = new Queue<List<TSource>>();
-                private IDisposable? _timerSerial;
+                private SerialDisposableValue _timerSerial;
 
 
                 public _(TimeSliding parent, IObserver<IList<TSource>> observer)
                 public _(TimeSliding parent, IObserver<IList<TSource>> observer)
                     : base(observer)
                     : base(observer)
@@ -299,7 +299,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 {
                 {
                     if (disposing)
                     if (disposing)
                     {
                     {
-                        Disposable.Dispose(ref _timerSerial);
+                        _timerSerial.Dispose();
                     }
                     }
                     base.Dispose(disposing);
                     base.Dispose(disposing);
                 }
                 }
@@ -314,7 +314,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 {
                 {
                     var m = new SingleAssignmentDisposable();
                     var m = new SingleAssignmentDisposable();
 
 
-                    Disposable.TrySetSerial(ref _timerSerial, m);
+                    _timerSerial.Disposable = m;
 
 
                     var isSpan = false;
                     var isSpan = false;
                     var isShift = false;
                     var isShift = false;
@@ -529,7 +529,7 @@ namespace System.Reactive.Linq.ObservableImpl
                     _parent = parent;
                     _parent = parent;
                 }
                 }
 
 
-                private IDisposable? _timerSerial;
+                private SerialDisposableValue _timerSerial;
                 private int _n;
                 private int _n;
                 private int _windowId;
                 private int _windowId;
 
 
@@ -547,7 +547,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 {
                 {
                     if (disposing)
                     if (disposing)
                     {
                     {
-                        Disposable.Dispose(ref _timerSerial);
+                        _timerSerial.Dispose();
                     }
                     }
 
 
                     base.Dispose(disposing);
                     base.Dispose(disposing);
@@ -556,7 +556,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 private void CreateTimer(int id)
                 private void CreateTimer(int id)
                 {
                 {
                     var m = new SingleAssignmentDisposable();
                     var m = new SingleAssignmentDisposable();
-                    Disposable.TrySetSerial(ref _timerSerial, m);
+                    _timerSerial.Disposable = m;
 
 
                     m.Disposable = _parent._scheduler.ScheduleAction((@this: this, id), _parent._timeSpan, static tuple => [email protected](tuple.id));
                     m.Disposable = _parent._scheduler.ScheduleAction((@this: this, id), _parent._timeSpan, static tuple => [email protected](tuple.id));
                 }
                 }
@@ -654,7 +654,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 private readonly Func<IObservable<TBufferClosing>> _bufferClosingSelector;
                 private readonly Func<IObservable<TBufferClosing>> _bufferClosingSelector;
 
 
                 private List<TSource> _buffer = new List<TSource>();
                 private List<TSource> _buffer = new List<TSource>();
-                private IDisposable? _bufferClosingSerialDisposable;
+                private SerialDisposableValue _bufferClosingSerialDisposable;
 
 
                 public _(Selector parent, IObserver<IList<TSource>> observer)
                 public _(Selector parent, IObserver<IList<TSource>> observer)
                     : base(observer)
                     : base(observer)
@@ -673,7 +673,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 {
                 {
                     if (disposing)
                     if (disposing)
                     {
                     {
-                        Disposable.Dispose(ref _bufferClosingSerialDisposable);
+                        _bufferClosingSerialDisposable.Dispose();
                     }
                     }
                     base.Dispose(disposing);
                     base.Dispose(disposing);
                 }
                 }
@@ -695,7 +695,7 @@ namespace System.Reactive.Linq.ObservableImpl
                     }
                     }
 
 
                     var closingObserver = new BufferClosingObserver(this);
                     var closingObserver = new BufferClosingObserver(this);
-                    Disposable.TrySetSerial(ref _bufferClosingSerialDisposable, closingObserver);
+                    _bufferClosingSerialDisposable.Disposable = closingObserver;
                     closingObserver.SetResource(bufferClose.SubscribeSafe(closingObserver));
                     closingObserver.SetResource(bufferClose.SubscribeSafe(closingObserver));
                 }
                 }
 
 

+ 3 - 3
Rx.NET/Source/src/System.Reactive/Linq/Observable/Switch.cs

@@ -28,7 +28,7 @@ namespace System.Reactive.Linq.ObservableImpl
             {
             {
             }
             }
 
 
-            private IDisposable? _innerSerialDisposable;
+            private SerialDisposableValue _innerSerialDisposable;
             private bool _isStopped;
             private bool _isStopped;
             private ulong _latest;
             private ulong _latest;
             private bool _hasLatest;
             private bool _hasLatest;
@@ -37,7 +37,7 @@ namespace System.Reactive.Linq.ObservableImpl
             {
             {
                 if (disposing)
                 if (disposing)
                 {
                 {
-                    Disposable.Dispose(ref _innerSerialDisposable);
+                    _innerSerialDisposable.Dispose();
                 }
                 }
 
 
                 base.Dispose(disposing);
                 base.Dispose(disposing);
@@ -55,7 +55,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
 
                 var innerObserver = new InnerObserver(this, id);
                 var innerObserver = new InnerObserver(this, id);
 
 
-                Disposable.TrySetSerial(ref _innerSerialDisposable, innerObserver);
+                _innerSerialDisposable.Disposable = innerObserver;
                 innerObserver.SetResource(value.SubscribeSafe(innerObserver));
                 innerObserver.SetResource(value.SubscribeSafe(innerObserver));
             }
             }
 
 

+ 12 - 12
Rx.NET/Source/src/System.Reactive/Linq/Observable/Throttle.cs

@@ -39,14 +39,14 @@ namespace System.Reactive.Linq.ObservableImpl
 
 
             private TSource? _value;
             private TSource? _value;
             private bool _hasValue;
             private bool _hasValue;
-            private IDisposable? _serialCancelable;
+            private SerialDisposableValue _serialCancelable;
             private ulong _id;
             private ulong _id;
 
 
             protected override void Dispose(bool disposing)
             protected override void Dispose(bool disposing)
             {
             {
                 if (disposing)
                 if (disposing)
                 {
                 {
-                    Disposable.Dispose(ref _serialCancelable);
+                    _serialCancelable.Dispose();
                 }
                 }
 
 
                 base.Dispose(disposing);
                 base.Dispose(disposing);
@@ -64,8 +64,8 @@ namespace System.Reactive.Linq.ObservableImpl
                     currentid = _id;
                     currentid = _id;
                 }
                 }
 
 
-                Disposable.TrySetSerial(ref _serialCancelable, null);
-                Disposable.TrySetSerial(ref _serialCancelable, _scheduler.ScheduleAction((@this: this, currentid), _dueTime, static tuple => [email protected](tuple.currentid)));
+                _serialCancelable.Disposable = null;
+                _serialCancelable.Disposable = _scheduler.ScheduleAction((@this: this, currentid), _dueTime, static tuple => [email protected](tuple.currentid));
             }
             }
 
 
             private void Propagate(ulong currentid)
             private void Propagate(ulong currentid)
@@ -83,7 +83,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
 
             public override void OnError(Exception error)
             public override void OnError(Exception error)
             {
             {
-                Disposable.Dispose(ref _serialCancelable);
+                _serialCancelable.Dispose();
 
 
                 lock (_gate)
                 lock (_gate)
                 {
                 {
@@ -96,7 +96,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
 
             public override void OnCompleted()
             public override void OnCompleted()
             {
             {
-                Disposable.Dispose(ref _serialCancelable);
+                _serialCancelable.Dispose();
 
 
                 lock (_gate)
                 lock (_gate)
                 {
                 {
@@ -142,14 +142,14 @@ namespace System.Reactive.Linq.ObservableImpl
 
 
             private TSource? _value;
             private TSource? _value;
             private bool _hasValue;
             private bool _hasValue;
-            private IDisposable? _serialCancelable;
+            private SerialDisposableValue _serialCancelable;
             private ulong _id;
             private ulong _id;
 
 
             protected override void Dispose(bool disposing)
             protected override void Dispose(bool disposing)
             {
             {
                 if (disposing)
                 if (disposing)
                 {
                 {
-                    Disposable.Dispose(ref _serialCancelable);
+                    _serialCancelable.Dispose();
                 }
                 }
 
 
                 base.Dispose(disposing);
                 base.Dispose(disposing);
@@ -182,17 +182,17 @@ namespace System.Reactive.Linq.ObservableImpl
                     currentid = _id;
                     currentid = _id;
                 }
                 }
 
 
-                Disposable.TrySetSerial(ref _serialCancelable, null);
+                _serialCancelable.Disposable = null;
 
 
                 var newInnerObserver = new ThrottleObserver(this, value, currentid);
                 var newInnerObserver = new ThrottleObserver(this, value, currentid);
                 newInnerObserver.SetResource(throttle.SubscribeSafe(newInnerObserver));
                 newInnerObserver.SetResource(throttle.SubscribeSafe(newInnerObserver));
 
 
-                Disposable.TrySetSerial(ref _serialCancelable, newInnerObserver);
+                _serialCancelable.Disposable = newInnerObserver;
             }
             }
 
 
             public override void OnError(Exception error)
             public override void OnError(Exception error)
             {
             {
-                Disposable.Dispose(ref _serialCancelable);
+                _serialCancelable.Dispose();
 
 
                 lock (_gate)
                 lock (_gate)
                 {
                 {
@@ -205,7 +205,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
 
             public override void OnCompleted()
             public override void OnCompleted()
             {
             {
-                Disposable.Dispose(ref _serialCancelable);
+                _serialCancelable.Dispose();
 
 
                 lock (_gate)
                 lock (_gate)
                 {
                 {

+ 5 - 5
Rx.NET/Source/src/System.Reactive/Linq/Observable/Timeout.cs

@@ -150,7 +150,7 @@ namespace System.Reactive.Linq.ObservableImpl
             {
             {
                 private readonly IObservable<TSource> _other;
                 private readonly IObservable<TSource> _other;
 
 
-                private IDisposable? _serialDisposable;
+                private SerialDisposableValue _serialDisposable;
                 private int _wip;
                 private int _wip;
 
 
                 public _(IObservable<TSource> other, IObserver<TSource> observer)
                 public _(IObservable<TSource> other, IObserver<TSource> observer)
@@ -163,14 +163,14 @@ namespace System.Reactive.Linq.ObservableImpl
                 {
                 {
                     SetUpstream(parent._scheduler.ScheduleAction(this, parent._dueTime, static @this => @this.Timeout()));
                     SetUpstream(parent._scheduler.ScheduleAction(this, parent._dueTime, static @this => @this.Timeout()));
 
 
-                    Disposable.TrySetSingle(ref _serialDisposable, parent._source.SubscribeSafe(this));
+                    _serialDisposable.Disposable = parent._source.SubscribeSafe(this);
                 }
                 }
 
 
                 protected override void Dispose(bool disposing)
                 protected override void Dispose(bool disposing)
                 {
                 {
                     if (disposing)
                     if (disposing)
                     {
                     {
-                        Disposable.Dispose(ref _serialDisposable);
+                        _serialDisposable.Dispose();
                     }
                     }
 
 
                     base.Dispose(disposing);
                     base.Dispose(disposing);
@@ -180,7 +180,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 {
                 {
                     if (Interlocked.Increment(ref _wip) == 1)
                     if (Interlocked.Increment(ref _wip) == 1)
                     {
                     {
-                        Disposable.TrySetSerial(ref _serialDisposable, _other.SubscribeSafe(GetForwarder()));
+                        _serialDisposable.Disposable = _other.SubscribeSafe(GetForwarder());
                     }
                     }
                 }
                 }
 
 
@@ -191,7 +191,7 @@ namespace System.Reactive.Linq.ObservableImpl
                         ForwardOnNext(value);
                         ForwardOnNext(value);
                         if (Interlocked.Decrement(ref _wip) != 0)
                         if (Interlocked.Decrement(ref _wip) != 0)
                         {
                         {
-                            Disposable.TrySetSerial(ref _serialDisposable, _other.SubscribeSafe(GetForwarder()));
+                            _serialDisposable.Disposable = _other.SubscribeSafe(GetForwarder());
                         }
                         }
                     }
                     }
                 }
                 }