Răsfoiți Sursa

Add TrySetFirst to SerialDisposableValue. More usages of SerialDisposableValue are now possible.

Daniel Weber 5 ani în urmă
părinte
comite
570223fb52

+ 4 - 5
Rx.NET/Source/src/System.Reactive/Concurrency/Synchronization.cs

@@ -47,24 +47,23 @@ namespace System.Reactive.Concurrency
         {
             private sealed class Subscription : IDisposable
             {
-                private IDisposable _cancel;
+                private SerialDisposableValue _cancel;
 
                 public Subscription(IObservable<TSource> source, IScheduler scheduler, IObserver<TSource> observer)
                 {
-                    Disposable.TrySetSingle(
-                        ref _cancel,
+                    _cancel.TrySetFirst(
                         scheduler.Schedule(
                             (@this: this, source, observer),
                             (closureScheduler, state) =>
                             {
-                                Disposable.TrySetSerial(ref state.@this._cancel, new ScheduledDisposable(closureScheduler, state.source.SubscribeSafe(state.observer)));
+                                state.@this._cancel.Disposable = new ScheduledDisposable(closureScheduler, state.source.SubscribeSafe(state.observer));
                                 return Disposable.Empty;
                             }));
                 }
 
                 public void Dispose()
                 {
-                    Disposable.Dispose(ref _cancel);
+                    _cancel.Dispose();
                 }
             }
 

+ 2 - 0
Rx.NET/Source/src/System.Reactive/Disposables/SerialDisposableValue.cs

@@ -32,6 +32,8 @@ namespace System.Reactive.Disposables
             set => Disposables.Disposable.TrySetSerial(ref _current, value);
         }
 
+        public bool TrySetFirst(IDisposable disposable) => Disposables.Disposable.TrySetSingle(ref _current, disposable) == TrySetSingleResult.Success;
+
         /// <summary>
         /// Disposes the underlying disposable as well as all future replacements.
         /// </summary>

+ 4 - 4
Rx.NET/Source/src/System.Reactive/Linq/Observable/AppendPrepend.cs

@@ -367,7 +367,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 private readonly Node<TSource>? _appends;
                 private readonly ISchedulerLongRunning _scheduler;
 
-                private IDisposable? _schedulerDisposable;
+                private SerialDisposableValue _schedulerDisposable;
 
                 public _(LongRunning parent, IObserver<TSource> observer)
                     : base(observer)
@@ -387,7 +387,7 @@ namespace System.Reactive.Linq.ObservableImpl
                     else
                     {
                         var disposable = _scheduler.ScheduleLongRunning(this, static (@this, cancel) => @this.PrependValues(cancel));
-                        Disposable.TrySetSingle(ref _schedulerDisposable, disposable);
+                        _schedulerDisposable.TrySetFirst(disposable);
                     }
                 }
 
@@ -400,7 +400,7 @@ namespace System.Reactive.Linq.ObservableImpl
                     else
                     {
                         var disposable = _scheduler.ScheduleLongRunning(this, static (@this, cancel) => @this.AppendValues(cancel));
-                        Disposable.TrySetSerial(ref _schedulerDisposable, disposable);
+                        _schedulerDisposable.Disposable = disposable;
                     }
                 }
 
@@ -408,7 +408,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 {
                     if (disposing)
                     {
-                        Disposable.Dispose(ref _schedulerDisposable);
+                        _schedulerDisposable.Dispose();
                     }
 
                     base.Dispose(disposing);

+ 4 - 4
Rx.NET/Source/src/System.Reactive/Linq/Observable/Catch.cs

@@ -97,18 +97,18 @@ namespace System.Reactive.Linq.ObservableImpl
             }
 
             private bool _once;
-            private IDisposable? _subscription;
+            private SerialDisposableValue _subscription;
 
             public override void Run(IObservable<TSource> source)
             {
-                Disposable.TrySetSingle(ref _subscription, source.SubscribeSafe(this));
+                _subscription.TrySetFirst(source.SubscribeSafe(this));
             }
 
             protected override void Dispose(bool disposing)
             {
                 if (disposing)
                 {
-                    Disposable.Dispose(ref _subscription);
+                    _subscription.Dispose();
                 }
 
                 base.Dispose(disposing);
@@ -130,7 +130,7 @@ namespace System.Reactive.Linq.ObservableImpl
                     }
 
                     Volatile.Write(ref _once, true);
-                    Disposable.TrySetSerial(ref _subscription, result.SubscribeSafe(this));
+                    _subscription.Disposable = result.SubscribeSafe(this);
                 }
                 else
                 {

+ 13 - 13
Rx.NET/Source/src/System.Reactive/Linq/Observable/Delay.cs

@@ -51,7 +51,7 @@ namespace System.Reactive.Linq.ObservableImpl
             internal abstract class S : _
             {
                 protected readonly object _gate = new object();
-                protected IDisposable? _cancelable;
+                protected SerialDisposableValue _cancelable;
 
                 protected S(TParent parent, IObserver<TSource> observer)
                     : base(parent, observer)
@@ -75,7 +75,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
                     if (disposing)
                     {
-                        Disposable.Dispose(ref _cancelable);
+                        _cancelable.Dispose();
                     }
                 }
 
@@ -146,7 +146,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
                 protected void DrainQueue(TimeSpan next)
                 {
-                    Disposable.TrySetSerial(ref _cancelable, _scheduler.Schedule(this, next, static (@this, a) => @this.DrainQueue(a)));
+                    _cancelable.Disposable = _scheduler.Schedule(this, next, static (@this, a) => @this.DrainQueue(a));
                 }
 
                 private void DrainQueue(Action<S, TimeSpan> recurse)
@@ -270,7 +270,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 }
 
                 protected Queue<Reactive.TimeInterval<TSource>> _queue = new Queue<Reactive.TimeInterval<TSource>>();
-                protected IDisposable? _cancelable;
+                protected SerialDisposableValue _cancelable;
                 protected TimeSpan _delay;
 
                 private bool _hasCompleted;
@@ -284,14 +284,14 @@ namespace System.Reactive.Linq.ObservableImpl
 
                     if (disposing)
                     {
-                        Disposable.Dispose(ref _cancelable);
+                        _cancelable.Dispose();
                     }
                 }
 
                 protected void ScheduleDrain()
                 {
                     var cd = new CancellationDisposable();
-                    Disposable.TrySetSerial(ref _cancelable, cd);
+                    _cancelable.Disposable = cd;
 
                     _scheduler.AsLongRunning()!.ScheduleLongRunning(cd.Token, DrainQueue); // NB: This class is only used with long-running schedulers.
                 }
@@ -461,7 +461,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 {
                     _ready = false;
 
-                    Disposable.TrySetSingle(ref _cancelable, parent._scheduler.ScheduleAction(this, parent._dueTime, static @this => @this.Start()));
+                    _cancelable.TrySetFirst(parent._scheduler.ScheduleAction(this, parent._dueTime, static @this => @this.Start()));
                 }
 
                 private void Start()
@@ -512,7 +512,7 @@ namespace System.Reactive.Linq.ObservableImpl
                     // ScheduleDrain might have already set a newer disposable
                     // using TrySetSerial would cancel it, stopping the emission
                     // and hang the consumer
-                    Disposable.TrySetSingle(ref _cancelable, parent._scheduler.ScheduleAction(this, parent._dueTime, static @this => @this.Start()));
+                    _cancelable.TrySetFirst(parent._scheduler.ScheduleAction(this, parent._dueTime, static @this => @this.Start()));
                 }
 
                 private void Start()
@@ -782,7 +782,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 {
                     private readonly _ _parent;
                     private readonly IObservable<TSource> _source;
-                    private IDisposable? _subscription;
+                    private SerialDisposableValue _subscription;
 
                     public SubscriptionDelayObserver(_ parent, IObservable<TSource> source)
                     {
@@ -792,12 +792,12 @@ namespace System.Reactive.Linq.ObservableImpl
 
                     internal void SetFirst(IDisposable d)
                     {
-                        Disposable.TrySetSingle(ref _subscription, d);
+                        _subscription.TrySetFirst(d);
                     }
 
                     public void OnNext(TDelay value)
                     {
-                        Disposable.TrySetSerial(ref _subscription, _source.SubscribeSafe(_parent));
+                        _subscription.Disposable = _source.SubscribeSafe(_parent);
                     }
 
                     public void OnError(Exception error)
@@ -807,12 +807,12 @@ namespace System.Reactive.Linq.ObservableImpl
 
                     public void OnCompleted()
                     {
-                        Disposable.TrySetSerial(ref _subscription, _source.SubscribeSafe(_parent));
+                        _subscription.Disposable = _source.SubscribeSafe(_parent);
                     }
 
                     public void Dispose()
                     {
-                        Disposable.Dispose(ref _subscription);
+                        _subscription.Dispose();
                     }
                 }
             }

+ 4 - 4
Rx.NET/Source/src/System.Reactive/Linq/Observable/Timeout.cs

@@ -239,7 +239,7 @@ namespace System.Reactive.Linq.ObservableImpl
             private readonly Func<TSource, IObservable<TTimeout>> _timeoutSelector;
             private readonly IObservable<TSource> _other;
 
-            private IDisposable? _sourceDisposable;
+            private SerialDisposableValue _sourceDisposable;
             private IDisposable? _timerDisposable;
             private long _index;
 
@@ -254,14 +254,14 @@ namespace System.Reactive.Linq.ObservableImpl
             {
                 SetTimer(parent._firstTimeout, 0L);
 
-                Disposable.TrySetSingle(ref _sourceDisposable, parent._source.SubscribeSafe(this));
+                _sourceDisposable.TrySetFirst(parent._source.SubscribeSafe(this));
             }
 
             protected override void Dispose(bool disposing)
             {
                 if (disposing)
                 {
-                    Disposable.Dispose(ref _sourceDisposable);
+                    _sourceDisposable.Dispose();
                     Disposable.Dispose(ref _timerDisposable);
                 }
 
@@ -318,7 +318,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 if (Volatile.Read(ref _index) == idx
                     && Interlocked.CompareExchange(ref _index, long.MaxValue, idx) == idx)
                 {
-                    Disposable.TrySetSerial(ref _sourceDisposable, _other.SubscribeSafe(GetForwarder()));
+                    _sourceDisposable.Disposable = _other.SubscribeSafe(GetForwarder());
                 }
             }