|
@@ -216,62 +216,73 @@ namespace System.Reactive.Linq
|
|
|
|
|
|
private sealed class CreateWithTaskActionObservable<TResult> : ObservableBase<TResult>
|
|
private sealed class CreateWithTaskActionObservable<TResult> : ObservableBase<TResult>
|
|
{
|
|
{
|
|
- private readonly Func<IObserver<TResult>, CancellationToken, Task<Action>> _subscribeAsync;
|
|
|
|
-
|
|
|
|
- public CreateWithTaskActionObservable(Func<IObserver<TResult>, CancellationToken, Task<Action>> subscribeAsync)
|
|
|
|
|
|
+ private sealed class Subscription : IDisposable
|
|
{
|
|
{
|
|
- _subscribeAsync = subscribeAsync;
|
|
|
|
- }
|
|
|
|
|
|
+ private sealed class TaskDisposeCompletionObserver : IObserver<Action>, IDisposable
|
|
|
|
+ {
|
|
|
|
+ private readonly IObserver<TResult> _observer;
|
|
|
|
+ private Action _disposable;
|
|
|
|
|
|
- protected override IDisposable SubscribeCore(IObserver<TResult> observer)
|
|
|
|
- {
|
|
|
|
- var cancellable = new CancellationDisposable();
|
|
|
|
|
|
+ public TaskDisposeCompletionObserver(IObserver<TResult> observer)
|
|
|
|
+ {
|
|
|
|
+ _observer = observer;
|
|
|
|
+ }
|
|
|
|
|
|
- var taskObservable = _subscribeAsync(observer, cancellable.Token).ToObservable();
|
|
|
|
|
|
+ public void Dispose()
|
|
|
|
+ {
|
|
|
|
+ Interlocked.Exchange(ref _disposable, Stubs.Nop)?.Invoke();
|
|
|
|
+ }
|
|
|
|
|
|
- var taskCompletionObserver = new TaskDisposeCompletionObserver(observer);
|
|
|
|
|
|
+ public void OnCompleted()
|
|
|
|
+ {
|
|
|
|
+ _observer.OnCompleted();
|
|
|
|
+ }
|
|
|
|
|
|
- //
|
|
|
|
- // We don't cancel the subscription below *ever* and want to make sure the returned resource gets disposed eventually.
|
|
|
|
- // Notice because we're using the AnonymousObservable<T> type, we get auto-detach behavior for free.
|
|
|
|
- //
|
|
|
|
- taskObservable.Subscribe(taskCompletionObserver);
|
|
|
|
|
|
+ public void OnError(Exception error)
|
|
|
|
+ {
|
|
|
|
+ _observer.OnError(error);
|
|
|
|
+ }
|
|
|
|
|
|
- return StableCompositeDisposable.Create(cancellable, taskCompletionObserver);
|
|
|
|
- }
|
|
|
|
|
|
+ public void OnNext(Action value)
|
|
|
|
+ {
|
|
|
|
+ if (Interlocked.CompareExchange(ref _disposable, value, null) != null)
|
|
|
|
+ {
|
|
|
|
+ value?.Invoke();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
|
|
- private sealed class TaskDisposeCompletionObserver : IObserver<Action>, IDisposable
|
|
|
|
- {
|
|
|
|
- private readonly IObserver<TResult> _observer;
|
|
|
|
- private Action _disposable;
|
|
|
|
|
|
+ private readonly TaskDisposeCompletionObserver _observer;
|
|
|
|
+ private readonly CancellationTokenSource _cts = new CancellationTokenSource();
|
|
|
|
|
|
- public TaskDisposeCompletionObserver(IObserver<TResult> observer)
|
|
|
|
|
|
+ public Subscription(Func<IObserver<TResult>, CancellationToken, Task<Action>> subscribeAsync, IObserver<TResult> observer)
|
|
{
|
|
{
|
|
- _observer = observer;
|
|
|
|
|
|
+ //
|
|
|
|
+ // We don't cancel the subscription below *ever* and want to make sure the returned resource gets disposed eventually.
|
|
|
|
+ // Notice because we're using the AnonymousObservable<T> type, we get auto-detach behavior for free.
|
|
|
|
+ //
|
|
|
|
+ subscribeAsync(observer, _cts.Token)
|
|
|
|
+ .ToObservable()
|
|
|
|
+ .Subscribe(_observer = new TaskDisposeCompletionObserver(observer));
|
|
}
|
|
}
|
|
|
|
|
|
public void Dispose()
|
|
public void Dispose()
|
|
{
|
|
{
|
|
- Interlocked.Exchange(ref _disposable, Stubs.Nop)?.Invoke();
|
|
|
|
|
|
+ _cts.Cancel();
|
|
|
|
+ _observer.Dispose();
|
|
}
|
|
}
|
|
|
|
+ }
|
|
|
|
|
|
- public void OnCompleted()
|
|
|
|
- {
|
|
|
|
- _observer.OnCompleted();
|
|
|
|
- }
|
|
|
|
|
|
+ private readonly Func<IObserver<TResult>, CancellationToken, Task<Action>> _subscribeAsync;
|
|
|
|
|
|
- public void OnError(Exception error)
|
|
|
|
- {
|
|
|
|
- _observer.OnError(error);
|
|
|
|
- }
|
|
|
|
|
|
+ public CreateWithTaskActionObservable(Func<IObserver<TResult>, CancellationToken, Task<Action>> subscribeAsync)
|
|
|
|
+ {
|
|
|
|
+ _subscribeAsync = subscribeAsync;
|
|
|
|
+ }
|
|
|
|
|
|
- public void OnNext(Action value)
|
|
|
|
- {
|
|
|
|
- if (Interlocked.CompareExchange(ref _disposable, value, null) != null)
|
|
|
|
- {
|
|
|
|
- value?.Invoke();
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
+ protected override IDisposable SubscribeCore(IObserver<TResult> observer)
|
|
|
|
+ {
|
|
|
|
+ return new Subscription(_subscribeAsync, observer);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|