|
@@ -17,15 +17,14 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
private Exception _error;
|
|
private Exception _error;
|
|
|
private bool _done;
|
|
private bool _done;
|
|
|
private bool _disposed;
|
|
private bool _disposed;
|
|
|
|
|
+ private IDisposable _subscription;
|
|
|
|
|
|
|
|
private readonly SemaphoreSlim _gate;
|
|
private readonly SemaphoreSlim _gate;
|
|
|
- private readonly SingleAssignmentDisposable _subscription;
|
|
|
|
|
|
|
|
|
|
public GetEnumerator()
|
|
public GetEnumerator()
|
|
|
{
|
|
{
|
|
|
_queue = new ConcurrentQueue<TSource>();
|
|
_queue = new ConcurrentQueue<TSource>();
|
|
|
_gate = new SemaphoreSlim(0);
|
|
_gate = new SemaphoreSlim(0);
|
|
|
- _subscription = new SingleAssignmentDisposable();
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
public IEnumerator<TSource> Run(IObservable<TSource> source)
|
|
public IEnumerator<TSource> Run(IObservable<TSource> source)
|
|
@@ -33,7 +32,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
//
|
|
//
|
|
|
// [OK] Use of unsafe Subscribe: non-pretentious exact mirror with the dual GetEnumerator method.
|
|
// [OK] Use of unsafe Subscribe: non-pretentious exact mirror with the dual GetEnumerator method.
|
|
|
//
|
|
//
|
|
|
- _subscription.Disposable = source.Subscribe/*Unsafe*/(this);
|
|
|
|
|
|
|
+ Disposable.TrySetSingle(ref _subscription, source.Subscribe/*Unsafe*/(this));
|
|
|
return this;
|
|
return this;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -46,14 +45,14 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
public void OnError(Exception error)
|
|
public void OnError(Exception error)
|
|
|
{
|
|
{
|
|
|
_error = error;
|
|
_error = error;
|
|
|
- _subscription.Dispose();
|
|
|
|
|
|
|
+ Disposable.TryDispose(ref _subscription);
|
|
|
_gate.Release();
|
|
_gate.Release();
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
public void OnCompleted()
|
|
public void OnCompleted()
|
|
|
{
|
|
{
|
|
|
_done = true;
|
|
_done = true;
|
|
|
- _subscription.Dispose();
|
|
|
|
|
|
|
+ Disposable.TryDispose(ref _subscription);
|
|
|
_gate.Release();
|
|
_gate.Release();
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -81,7 +80,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
|
|
|
|
|
public void Dispose()
|
|
public void Dispose()
|
|
|
{
|
|
{
|
|
|
- _subscription.Dispose();
|
|
|
|
|
|
|
+ Disposable.TryDispose(ref _subscription);
|
|
|
|
|
|
|
|
_disposed = true;
|
|
_disposed = true;
|
|
|
_gate.Release();
|
|
_gate.Release();
|