// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information. using System.Reactive.Disposables; using System.Threading; using System.Threading.Tasks; namespace System.Reactive.Linq { partial class AsyncObservable { public static IAsyncObservable Amb(this IAsyncObservable first, IAsyncObservable second) { if (first == null) throw new ArgumentNullException(nameof(first)); if (second == null) throw new ArgumentNullException(nameof(second)); return Create(async observer => { var firstSubscription = new SingleAssignmentAsyncDisposable(); var secondSubscription = new SingleAssignmentAsyncDisposable(); var (firstObserver, secondObserver) = AsyncObserver.Amb(observer, firstSubscription, secondSubscription); var firstTask = first.SubscribeAsync(firstObserver).ContinueWith(d => firstSubscription.AssignAsync(d.Result)); var secondTask = second.SubscribeAsync(secondObserver).ContinueWith(d => secondSubscription.AssignAsync(d.Result)); await Task.WhenAll(firstTask, secondTask).ConfigureAwait(false); return StableCompositeAsyncDisposable.Create(firstSubscription, secondSubscription); }); } } partial class AsyncObserver { public static (IAsyncObserver, IAsyncObserver) Amb(IAsyncObserver observer, IAsyncDisposable first, IAsyncDisposable second) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (first == null) throw new ArgumentNullException(nameof(first)); if (second == null) throw new ArgumentNullException(nameof(second)); var gate = new AsyncLock(); var state = AmbState.None; return ( Create( async x => { using (await gate.LockAsync().ConfigureAwait(false)) { if (state == AmbState.None) { state = AmbState.First; await second.DisposeAsync().ConfigureAwait(false); } if (state == AmbState.First) { await observer.OnNextAsync(x).ConfigureAwait(false); } } }, async ex => { using (await gate.LockAsync().ConfigureAwait(false)) { if (state == AmbState.None) { state = AmbState.First; await second.DisposeAsync().ConfigureAwait(false); } if (state == AmbState.First) { await observer.OnErrorAsync(ex).ConfigureAwait(false); } } }, async () => { using (await gate.LockAsync().ConfigureAwait(false)) { if (state == AmbState.None) { state = AmbState.First; await second.DisposeAsync().ConfigureAwait(false); } if (state == AmbState.First) { await observer.OnCompletedAsync().ConfigureAwait(false); } } } ), Create( async x => { using (await gate.LockAsync().ConfigureAwait(false)) { if (state == AmbState.None) { state = AmbState.Second; await first.DisposeAsync().ConfigureAwait(false); } if (state == AmbState.Second) { await observer.OnNextAsync(x).ConfigureAwait(false); } } }, async ex => { using (await gate.LockAsync().ConfigureAwait(false)) { if (state == AmbState.None) { state = AmbState.Second; await first.DisposeAsync().ConfigureAwait(false); } if (state == AmbState.Second) { await observer.OnErrorAsync(ex).ConfigureAwait(false); } } }, async () => { using (await gate.LockAsync().ConfigureAwait(false)) { if (state == AmbState.None) { state = AmbState.Second; await first.DisposeAsync().ConfigureAwait(false); } if (state == AmbState.Second) { await observer.OnCompletedAsync().ConfigureAwait(false); } } } ) ); } private enum AmbState { None, First, Second, } } }