// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT License. // See the LICENSE file in the project root for more information. using System.Reactive.Linq; using System.Threading; using System.Threading.Tasks; namespace System.Reactive.Subjects { internal sealed class ConnectableAsyncObservable : IConnectableAsyncObservable { private readonly IAsyncSubject _subject; private readonly IAsyncObservable _source; private readonly AsyncGate _gate = new(); private Connection _connection; public ConnectableAsyncObservable(IAsyncObservable source, IAsyncSubject subject) { _subject = subject; _source = source.AsAsyncObservable(); } public async ValueTask ConnectAsync() { using (await _gate.LockAsync().ConfigureAwait(false)) { if (_connection == null) { var subscription = await _source.SubscribeAsync(_subject).ConfigureAwait(false); _connection = new Connection(this, subscription); } return _connection; } } private sealed class Connection : IAsyncDisposable { private readonly ConnectableAsyncObservable _parent; private IAsyncDisposable _subscription; public Connection(ConnectableAsyncObservable parent, IAsyncDisposable subscription) { _parent = parent; _subscription = subscription; } public async ValueTask DisposeAsync() { using (await _parent._gate.LockAsync().ConfigureAwait(false)) { if (_subscription != null) { await _subscription.DisposeAsync().ConfigureAwait(false); _subscription = null; _parent._connection = null; } } } } public ValueTask SubscribeAsync(IAsyncObserver observer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); return _subject.SubscribeAsync(observer); } } }