// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information. using System.Reactive.Disposables; using System.Reactive.Subjects; using System.Threading.Tasks; namespace System.Reactive.Linq { partial class AsyncObservable { public static IConnectableAsyncObservable Multicast(this IAsyncObservable source, IAsyncSubject subject) { if (source == null) throw new ArgumentNullException(nameof(source)); if (subject == null) throw new ArgumentNullException(nameof(subject)); return new ConnectableAsyncObservable(source, subject); } public static IAsyncObservable Multicast(this IAsyncObservable source, Func> subjectFactory) { if (source == null) throw new ArgumentNullException(nameof(source)); if (subjectFactory == null) throw new ArgumentNullException(nameof(subjectFactory)); return Multicast(source, () => Task.FromResult>(subjectFactory()), x => Task.FromResult(x)); } public static IAsyncObservable Multicast(this IAsyncObservable source, Func>> subjectFactory) { if (source == null) throw new ArgumentNullException(nameof(source)); if (subjectFactory == null) throw new ArgumentNullException(nameof(subjectFactory)); return Multicast(source, async () => await subjectFactory().ConfigureAwait(false), x => Task.FromResult(x)); } public static IAsyncObservable Multicast(this IAsyncObservable source, Func> subjectFactory, Func, IAsyncObservable> selector) { if (source == null) throw new ArgumentNullException(nameof(source)); if (subjectFactory == null) throw new ArgumentNullException(nameof(subjectFactory)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return Multicast(source, () => Task.FromResult(subjectFactory()), x => Task.FromResult(selector(x))); } public static IAsyncObservable Multicast(this IAsyncObservable source, Func>> subjectFactory, Func, Task>> selector) { if (source == null) throw new ArgumentNullException(nameof(source)); if (subjectFactory == null) throw new ArgumentNullException(nameof(subjectFactory)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return Create(async observer => { var observable = default(IAsyncObservable); var connectable = default(IConnectableAsyncObservable); try { var subject = await subjectFactory().ConfigureAwait(false); connectable = new ConnectableAsyncObservable(source, subject); observable = await selector(connectable).ConfigureAwait(false); } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); return AsyncDisposable.Nop; } var d = new CompositeAsyncDisposable(); var subscription = await observable.SubscribeAsync(observer).ConfigureAwait(false); await d.AddAsync(subscription).ConfigureAwait(false); var connection = await connectable.ConnectAsync().ConfigureAwait(false); await d.AddAsync(connection).ConfigureAwait(false); return d; }); } } }