|
@@ -161,7 +161,6 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
private readonly IObserver<X> _downstream;
|
|
private readonly IObserver<X> _downstream;
|
|
|
private int _wip;
|
|
private int _wip;
|
|
|
private Exception _terminalException;
|
|
private Exception _terminalException;
|
|
|
- private static readonly Exception DoneIndicator = new Exception();
|
|
|
|
|
private static readonly Exception SignaledIndicator = new Exception();
|
|
private static readonly Exception SignaledIndicator = new Exception();
|
|
|
private readonly ConcurrentQueue<X> _queue;
|
|
private readonly ConcurrentQueue<X> _queue;
|
|
|
|
|
|
|
@@ -173,7 +172,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
|
|
|
|
|
public void OnCompleted()
|
|
public void OnCompleted()
|
|
|
{
|
|
{
|
|
|
- if (Interlocked.CompareExchange(ref _terminalException, DoneIndicator, null) == null)
|
|
|
|
|
|
|
+ if (Interlocked.CompareExchange(ref _terminalException, ExceptionHelper.Terminated, null) == null)
|
|
|
{
|
|
{
|
|
|
Drain();
|
|
Drain();
|
|
|
}
|
|
}
|
|
@@ -218,7 +217,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
if (ex != SignaledIndicator)
|
|
if (ex != SignaledIndicator)
|
|
|
{
|
|
{
|
|
|
Interlocked.Exchange(ref _terminalException, SignaledIndicator);
|
|
Interlocked.Exchange(ref _terminalException, SignaledIndicator);
|
|
|
- if (ex != DoneIndicator)
|
|
|
|
|
|
|
+ if (ex != ExceptionHelper.Terminated)
|
|
|
{
|
|
{
|
|
|
_downstream.OnError(ex);
|
|
_downstream.OnError(ex);
|
|
|
}
|
|
}
|