|
@@ -286,43 +286,41 @@ namespace System.Reactive.Threading.Tasks
|
|
|
return observable.ToTask(cancellationToken, state: null);
|
|
|
}
|
|
|
|
|
|
- private sealed class ToTaskObserver<TResult> : IObserver<TResult>
|
|
|
+ private sealed class ToTaskObserver<TResult> : SafeObserver<TResult>
|
|
|
{
|
|
|
private readonly CancellationToken _ct;
|
|
|
- private readonly IDisposable _disposable;
|
|
|
private readonly TaskCompletionSource<TResult> _tcs;
|
|
|
private readonly CancellationTokenRegistration _ctr = default(CancellationTokenRegistration);
|
|
|
|
|
|
private bool _hasValue;
|
|
|
private TResult _lastValue;
|
|
|
|
|
|
- public ToTaskObserver(TaskCompletionSource<TResult> tcs, IDisposable disposable, CancellationToken ct)
|
|
|
+ public ToTaskObserver(TaskCompletionSource<TResult> tcs, CancellationToken ct)
|
|
|
{
|
|
|
_ct = ct;
|
|
|
_tcs = tcs;
|
|
|
- _disposable = disposable;
|
|
|
|
|
|
if (ct.CanBeCanceled)
|
|
|
{
|
|
|
- _ctr = ct.Register(Cancel);
|
|
|
+ _ctr = ct.Register(@this => ((ToTaskObserver<TResult>)@this).Cancel(), this);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public void OnNext(TResult value)
|
|
|
+ public override void OnNext(TResult value)
|
|
|
{
|
|
|
_hasValue = true;
|
|
|
_lastValue = value;
|
|
|
}
|
|
|
|
|
|
- public void OnError(Exception error)
|
|
|
+ public override void OnError(Exception error)
|
|
|
{
|
|
|
_tcs.TrySetException(error);
|
|
|
|
|
|
_ctr.Dispose(); // no null-check needed (struct)
|
|
|
- _disposable.Dispose();
|
|
|
+ Dispose();
|
|
|
}
|
|
|
|
|
|
- public void OnCompleted()
|
|
|
+ public override void OnCompleted()
|
|
|
{
|
|
|
if (_hasValue)
|
|
|
{
|
|
@@ -334,12 +332,12 @@ namespace System.Reactive.Threading.Tasks
|
|
|
}
|
|
|
|
|
|
_ctr.Dispose(); // no null-check needed (struct)
|
|
|
- _disposable.Dispose();
|
|
|
+ Dispose();
|
|
|
}
|
|
|
|
|
|
private void Cancel()
|
|
|
{
|
|
|
- _disposable.Dispose();
|
|
|
+ Dispose();
|
|
|
#if HAS_TPL46
|
|
|
_tcs.TrySetCanceled(_ct);
|
|
|
#else
|
|
@@ -364,9 +362,7 @@ namespace System.Reactive.Threading.Tasks
|
|
|
|
|
|
var tcs = new TaskCompletionSource<TResult>(state);
|
|
|
|
|
|
- var disposable = new SingleAssignmentDisposable();
|
|
|
-
|
|
|
- var taskCompletionObserver = new ToTaskObserver<TResult>(tcs, disposable, cancellationToken);
|
|
|
+ var taskCompletionObserver = new ToTaskObserver<TResult>(tcs, cancellationToken);
|
|
|
|
|
|
//
|
|
|
// Subtle race condition: if the source completes before we reach the line below, the SingleAssigmentDisposable
|
|
@@ -382,7 +378,7 @@ namespace System.Reactive.Threading.Tasks
|
|
|
// exception handling logic here for the reason explained above. We cannot afford to throw here
|
|
|
// and as a result never set the TaskCompletionSource, so we tunnel everything through here.
|
|
|
//
|
|
|
- disposable.Disposable = observable.Subscribe/*Unsafe*/(taskCompletionObserver);
|
|
|
+ taskCompletionObserver.SetResource(observable.Subscribe/*Unsafe*/(taskCompletionObserver));
|
|
|
}
|
|
|
catch (Exception ex)
|
|
|
{
|