|
@@ -38,10 +38,10 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
private readonly object _gate;
|
|
private readonly object _gate;
|
|
|
|
|
|
private readonly FirstObserver _firstObserver;
|
|
private readonly FirstObserver _firstObserver;
|
|
- private IDisposable? _firstDisposable;
|
|
|
|
|
|
+ private SingleAssignmentDisposableValue _firstDisposable;
|
|
|
|
|
|
private readonly SecondObserver _secondObserver;
|
|
private readonly SecondObserver _secondObserver;
|
|
- private IDisposable? _secondDisposable;
|
|
|
|
|
|
+ private SingleAssignmentDisposableValue _secondDisposable;
|
|
|
|
|
|
public _(Func<TFirst, TSecond, TResult> resultSelector, IObserver<TResult> observer)
|
|
public _(Func<TFirst, TSecond, TResult> resultSelector, IObserver<TResult> observer)
|
|
: base(observer)
|
|
: base(observer)
|
|
@@ -59,16 +59,16 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
|
|
|
public void Run(IObservable<TFirst> first, IObservable<TSecond> second)
|
|
public void Run(IObservable<TFirst> first, IObservable<TSecond> second)
|
|
{
|
|
{
|
|
- Disposable.SetSingle(ref _firstDisposable, first.SubscribeSafe(_firstObserver));
|
|
|
|
- Disposable.SetSingle(ref _secondDisposable, second.SubscribeSafe(_secondObserver));
|
|
|
|
|
|
+ _firstDisposable.Disposable = first.SubscribeSafe(_firstObserver);
|
|
|
|
+ _secondDisposable.Disposable = second.SubscribeSafe(_secondObserver);
|
|
}
|
|
}
|
|
|
|
|
|
protected override void Dispose(bool disposing)
|
|
protected override void Dispose(bool disposing)
|
|
{
|
|
{
|
|
if (disposing)
|
|
if (disposing)
|
|
{
|
|
{
|
|
- Disposable.Dispose(ref _firstDisposable);
|
|
|
|
- Disposable.Dispose(ref _secondDisposable);
|
|
|
|
|
|
+ _firstDisposable.Dispose();
|
|
|
|
+ _secondDisposable.Dispose();
|
|
|
|
|
|
// clearing the queue should happen under the lock
|
|
// clearing the queue should happen under the lock
|
|
// as they are plain Queue<T>s, not concurrent queues.
|
|
// as they are plain Queue<T>s, not concurrent queues.
|
|
@@ -154,7 +154,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
}
|
|
}
|
|
else
|
|
else
|
|
{
|
|
{
|
|
- Disposable.Dispose(ref _parent._firstDisposable);
|
|
|
|
|
|
+ _parent._firstDisposable.Dispose();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -237,7 +237,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
}
|
|
}
|
|
else
|
|
else
|
|
{
|
|
{
|
|
- Disposable.Dispose(ref _parent._secondDisposable);
|
|
|
|
|
|
+ _parent._secondDisposable.Dispose();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -605,7 +605,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
|
|
|
private Queue<TSource>[] _queues;
|
|
private Queue<TSource>[] _queues;
|
|
private bool[] _isDone;
|
|
private bool[] _isDone;
|
|
- private IDisposable[]? _subscriptions;
|
|
|
|
|
|
+ private SingleAssignmentDisposableValue[]? _subscriptions;
|
|
|
|
|
|
public void Run(IEnumerable<IObservable<TSource>> sources)
|
|
public void Run(IEnumerable<IObservable<TSource>> sources)
|
|
{
|
|
{
|
|
@@ -621,14 +621,14 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
|
|
|
_isDone = new bool[N];
|
|
_isDone = new bool[N];
|
|
|
|
|
|
- var subscriptions = new IDisposable[N];
|
|
|
|
|
|
+ var subscriptions = new SingleAssignmentDisposableValue[N];
|
|
|
|
|
|
if (Interlocked.CompareExchange(ref _subscriptions, subscriptions, null) == null)
|
|
if (Interlocked.CompareExchange(ref _subscriptions, subscriptions, null) == null)
|
|
{
|
|
{
|
|
for (var i = 0; i < N; i++)
|
|
for (var i = 0; i < N; i++)
|
|
{
|
|
{
|
|
var o = new SourceObserver(this, i);
|
|
var o = new SourceObserver(this, i);
|
|
- Disposable.SetSingle(ref subscriptions[i], srcs[i].SubscribeSafe(o));
|
|
|
|
|
|
+ subscriptions[i].Disposable = srcs[i].SubscribeSafe(o);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -637,12 +637,12 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
{
|
|
{
|
|
if (disposing)
|
|
if (disposing)
|
|
{
|
|
{
|
|
- var subscriptions = Interlocked.Exchange(ref _subscriptions, Array.Empty<IDisposable>());
|
|
|
|
- if (subscriptions != null && subscriptions != Array.Empty<IDisposable>())
|
|
|
|
|
|
+ var subscriptions = Interlocked.Exchange(ref _subscriptions, Array.Empty<SingleAssignmentDisposableValue>());
|
|
|
|
+ if (subscriptions != null && subscriptions != Array.Empty<SingleAssignmentDisposableValue>())
|
|
{
|
|
{
|
|
for (var i = 0; i < subscriptions.Length; i++)
|
|
for (var i = 0; i < subscriptions.Length; i++)
|
|
{
|
|
{
|
|
- Disposable.Dispose(ref subscriptions[i]);
|
|
|
|
|
|
+ subscriptions[i].Dispose();
|
|
}
|
|
}
|
|
|
|
|
|
lock (_gate)
|
|
lock (_gate)
|
|
@@ -703,9 +703,9 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
else
|
|
else
|
|
{
|
|
{
|
|
var subscriptions = Volatile.Read(ref _subscriptions);
|
|
var subscriptions = Volatile.Read(ref _subscriptions);
|
|
- if (subscriptions != null && subscriptions != Array.Empty<IDisposable>())
|
|
|
|
|
|
+ if (subscriptions != null && subscriptions != Array.Empty<SingleAssignmentDisposableValue>())
|
|
{
|
|
{
|
|
- Disposable.Dispose(ref subscriptions[index]);
|
|
|
|
|
|
+ subscriptions[index].Dispose();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|