|
|
@@ -51,7 +51,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
internal abstract class S : _
|
|
|
{
|
|
|
protected readonly object _gate = new object();
|
|
|
- protected IDisposable? _cancelable;
|
|
|
+ protected SerialDisposableValue _cancelable;
|
|
|
|
|
|
protected S(TParent parent, IObserver<TSource> observer)
|
|
|
: base(parent, observer)
|
|
|
@@ -75,7 +75,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
|
|
|
if (disposing)
|
|
|
{
|
|
|
- Disposable.Dispose(ref _cancelable);
|
|
|
+ _cancelable.Dispose();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -146,7 +146,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
|
|
|
protected void DrainQueue(TimeSpan next)
|
|
|
{
|
|
|
- Disposable.TrySetSerial(ref _cancelable, _scheduler.Schedule(this, next, static (@this, a) => @this.DrainQueue(a)));
|
|
|
+ _cancelable.Disposable = _scheduler.Schedule(this, next, static (@this, a) => @this.DrainQueue(a));
|
|
|
}
|
|
|
|
|
|
private void DrainQueue(Action<S, TimeSpan> recurse)
|
|
|
@@ -270,7 +270,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
}
|
|
|
|
|
|
protected Queue<Reactive.TimeInterval<TSource>> _queue = new Queue<Reactive.TimeInterval<TSource>>();
|
|
|
- protected IDisposable? _cancelable;
|
|
|
+ protected SerialDisposableValue _cancelable;
|
|
|
protected TimeSpan _delay;
|
|
|
|
|
|
private bool _hasCompleted;
|
|
|
@@ -284,14 +284,14 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
|
|
|
if (disposing)
|
|
|
{
|
|
|
- Disposable.Dispose(ref _cancelable);
|
|
|
+ _cancelable.Dispose();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
protected void ScheduleDrain()
|
|
|
{
|
|
|
var cd = new CancellationDisposable();
|
|
|
- Disposable.TrySetSerial(ref _cancelable, cd);
|
|
|
+ _cancelable.Disposable = cd;
|
|
|
|
|
|
_scheduler.AsLongRunning()!.ScheduleLongRunning(cd.Token, DrainQueue); // NB: This class is only used with long-running schedulers.
|
|
|
}
|
|
|
@@ -461,7 +461,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
{
|
|
|
_ready = false;
|
|
|
|
|
|
- Disposable.TrySetSingle(ref _cancelable, parent._scheduler.ScheduleAction(this, parent._dueTime, static @this => @this.Start()));
|
|
|
+ _cancelable.TrySetFirst(parent._scheduler.ScheduleAction(this, parent._dueTime, static @this => @this.Start()));
|
|
|
}
|
|
|
|
|
|
private void Start()
|
|
|
@@ -512,7 +512,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
// ScheduleDrain might have already set a newer disposable
|
|
|
// using TrySetSerial would cancel it, stopping the emission
|
|
|
// and hang the consumer
|
|
|
- Disposable.TrySetSingle(ref _cancelable, parent._scheduler.ScheduleAction(this, parent._dueTime, static @this => @this.Start()));
|
|
|
+ _cancelable.TrySetFirst(parent._scheduler.ScheduleAction(this, parent._dueTime, static @this => @this.Start()));
|
|
|
}
|
|
|
|
|
|
private void Start()
|
|
|
@@ -782,7 +782,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
{
|
|
|
private readonly _ _parent;
|
|
|
private readonly IObservable<TSource> _source;
|
|
|
- private IDisposable? _subscription;
|
|
|
+ private SerialDisposableValue _subscription;
|
|
|
|
|
|
public SubscriptionDelayObserver(_ parent, IObservable<TSource> source)
|
|
|
{
|
|
|
@@ -792,12 +792,12 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
|
|
|
internal void SetFirst(IDisposable d)
|
|
|
{
|
|
|
- Disposable.TrySetSingle(ref _subscription, d);
|
|
|
+ _subscription.TrySetFirst(d);
|
|
|
}
|
|
|
|
|
|
public void OnNext(TDelay value)
|
|
|
{
|
|
|
- Disposable.TrySetSerial(ref _subscription, _source.SubscribeSafe(_parent));
|
|
|
+ _subscription.Disposable = _source.SubscribeSafe(_parent);
|
|
|
}
|
|
|
|
|
|
public void OnError(Exception error)
|
|
|
@@ -807,12 +807,12 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
|
|
|
public void OnCompleted()
|
|
|
{
|
|
|
- Disposable.TrySetSerial(ref _subscription, _source.SubscribeSafe(_parent));
|
|
|
+ _subscription.Disposable = _source.SubscribeSafe(_parent);
|
|
|
}
|
|
|
|
|
|
public void Dispose()
|
|
|
{
|
|
|
- Disposable.Dispose(ref _subscription);
|
|
|
+ _subscription.Dispose();
|
|
|
}
|
|
|
}
|
|
|
}
|