123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210 |
- // Licensed to the .NET Foundation under one or more agreements.
- // The .NET Foundation licenses this file to you under the MIT License.
- // See the LICENSE file in the project root for more information.
- using System.Threading.Tasks;
- namespace System.Reactive
- {
- public abstract class AsyncObservableBase<T> : IAsyncObservable<T>
- {
- public async ValueTask<IAsyncDisposable> SubscribeAsync(IAsyncObserver<T> observer)
- {
- if (observer == null)
- throw new ArgumentNullException(nameof(observer));
- var autoDetach = new AutoDetachAsyncObserver(observer);
- var subscription = await SubscribeAsyncCore(autoDetach).ConfigureAwait(false);
- await autoDetach.AssignAsync(subscription).ConfigureAwait(false);
- return autoDetach;
- }
- protected abstract ValueTask<IAsyncDisposable> SubscribeAsyncCore(IAsyncObserver<T> observer);
- private sealed class AutoDetachAsyncObserver : AsyncObserverBase<T>, IAsyncDisposable
- {
- private readonly IAsyncObserver<T> _observer;
- private readonly object _gate = new object();
- private IAsyncDisposable _subscription;
- private ValueTask _task;
- private bool _disposing;
- public AutoDetachAsyncObserver(IAsyncObserver<T> observer)
- {
- _observer = observer;
- }
- public async ValueTask AssignAsync(IAsyncDisposable subscription)
- {
- var shouldDispose = false;
- lock (_gate)
- {
- if (_disposing)
- {
- shouldDispose = true;
- }
- else
- {
- _subscription = subscription;
- }
- }
- if (shouldDispose)
- {
- await subscription.DisposeAsync().ConfigureAwait(false);
- }
- }
- public async ValueTask DisposeAsync()
- {
- var task = default(ValueTask);
- var subscription = default(IAsyncDisposable);
- lock (_gate)
- {
- //
- // NB: The postcondition of awaiting the first DisposeAsync call to complete is that all message
- // processing has ceased, i.e. no further On*AsyncCore calls will be made. This is achieved
- // here by setting _disposing to true, which is checked by the On*AsyncCore calls upon
- // entry, and by awaiting the task of any in-flight On*AsyncCore calls.
- //
- // Timing of the disposal of the subscription is less deterministic due to the intersection
- // with the AssignAsync code path. However, the auto-detach observer can only be returned
- // from the SubscribeAsync call *after* a call to AssignAsync has been made and awaited, so
- // either AssignAsync triggers the disposal and an already disposed instance is returned, or
- // the user calling DisposeAsync will either encounter a busy observer which will be stopped
- // in its tracks (as described above) or it will trigger a disposal of the subscription. In
- // both these cases the result of awaiting DisposeAsync guarantees no further message flow.
- //
- if (!_disposing)
- {
- _disposing = true;
- task = _task;
- subscription = _subscription;
- }
- }
- try
- {
- //
- // BUGBUG: This causes grief when an outgoing On*Async call reenters the DisposeAsync method and
- // results in the task returned from the On*Async call to be awaited to serialize the
- // call to subscription.DisposeAsync after it's done. We need to either detect reentrancy
- // and queue up the call to DisposeAsync or follow an when we trigger the disposal without
- // awaiting outstanding work (thus allowing for concurrency).
- //
- // if (task != null)
- // {
- // await task.ConfigureAwait(false);
- // }
- //
- }
- finally
- {
- if (subscription != null)
- {
- await subscription.DisposeAsync().ConfigureAwait(false);
- }
- }
- }
- protected override async ValueTask OnCompletedAsyncCore()
- {
- lock (_gate)
- {
- if (_disposing)
- {
- return;
- }
- _task = _observer.OnCompletedAsync();
- }
- try
- {
- await _task.ConfigureAwait(false);
- }
- finally
- {
- await FinishAsync().ConfigureAwait(false);
- }
- }
- protected override async ValueTask OnErrorAsyncCore(Exception error)
- {
- lock (_gate)
- {
- if (_disposing)
- {
- return;
- }
- _task = _observer.OnErrorAsync(error);
- }
- try
- {
- await _task.ConfigureAwait(false);
- }
- finally
- {
- await FinishAsync().ConfigureAwait(false);
- }
- }
- protected override async ValueTask OnNextAsyncCore(T value)
- {
- lock (_gate)
- {
- if (_disposing)
- {
- return;
- }
- _task = _observer.OnNextAsync(value);
- }
- try
- {
- await _task.ConfigureAwait(false);
- }
- finally
- {
- lock (_gate)
- {
- _task = default;
- }
- }
- }
- private async ValueTask FinishAsync()
- {
- var subscription = default(IAsyncDisposable);
- lock (_gate)
- {
- if (!_disposing)
- {
- _disposing = true;
- subscription = _subscription;
- }
- _task = default;
- }
- if (subscription != null)
- {
- await subscription.DisposeAsync().ConfigureAwait(false);
- }
- }
- }
- }
- }
|