// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information. using System.Reactive.Linq; using System.Threading; using System.Threading.Tasks; namespace System.Reactive.Subjects { internal sealed class ConnectableAsyncObservable : IConnectableAsyncObservable { private readonly IAsyncSubject subject; private readonly IAsyncObservable source; private readonly AsyncLock gate = new AsyncLock(); private Connection connection; public ConnectableAsyncObservable(IAsyncObservable source, IAsyncSubject subject) { this.subject = subject; this.source = source.AsAsyncObservable(); } public async Task ConnectAsync() { using (await gate.LockAsync().ConfigureAwait(false)) { if (connection == null) { var subscription = await source.SubscribeAsync(subject).ConfigureAwait(false); connection = new Connection(this, subscription); } return connection; } } private sealed class Connection : IAsyncDisposable { private readonly ConnectableAsyncObservable parent; private IAsyncDisposable subscription; public Connection(ConnectableAsyncObservable parent, IAsyncDisposable subscription) { this.parent = parent; this.subscription = subscription; } public async Task DisposeAsync() { using (await parent.gate.LockAsync().ConfigureAwait(false)) { if (subscription != null) { await subscription.DisposeAsync().ConfigureAwait(false); subscription = null; parent.connection = null; } } } } public Task SubscribeAsync(IAsyncObserver observer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); return subject.SubscribeAsync(observer); } } }