// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information. using System.Collections.Generic; using System.Diagnostics; using System.Threading.Tasks; namespace System.Linq { public static partial class AsyncEnumerableEx { public static IAsyncEnumerable Amb(this IAsyncEnumerable first, IAsyncEnumerable second) { if (first == null) throw new ArgumentNullException(nameof(first)); if (second == null) throw new ArgumentNullException(nameof(second)); return new AmbAsyncIterator(first, second); } public static IAsyncEnumerable Amb(params IAsyncEnumerable[] sources) { if (sources == null) throw new ArgumentNullException(nameof(sources)); return new AmbAsyncIteratorN(sources); } public static IAsyncEnumerable Amb(this IEnumerable> sources) { if (sources == null) throw new ArgumentNullException(nameof(sources)); return new AmbAsyncIteratorN(sources.ToArray()); } private sealed class AmbAsyncIterator : AsyncIterator { private readonly IAsyncEnumerable first; private readonly IAsyncEnumerable second; private IAsyncEnumerator enumerator; public AmbAsyncIterator(IAsyncEnumerable first, IAsyncEnumerable second) { Debug.Assert(first != null); Debug.Assert(second != null); this.first = first; this.second = second; } public override AsyncIterator Clone() { return new AmbAsyncIterator(first, second); } public override async Task DisposeAsync() { if (enumerator != null) { await enumerator.DisposeAsync().ConfigureAwait(false); enumerator = null; } await base.DisposeAsync().ConfigureAwait(false); } protected override async Task MoveNextCore() { switch (state) { case AsyncIteratorState.Allocated: var firstEnumerator = first.GetAsyncEnumerator(); var secondEnumerator = second.GetAsyncEnumerator(); var firstMoveNext = firstEnumerator.MoveNextAsync(); var secondMoveNext = secondEnumerator.MoveNextAsync(); var winner = await Task.WhenAny(firstMoveNext, secondMoveNext).ConfigureAwait(false); // // REVIEW: An alternative option is to call DisposeAsync on the other and await it, but this has two drawbacks: // // 1. Concurrent DisposeAsync while a MoveNextAsync is in flight. // 2. The winner elected by Amb is blocked to yield results until the loser unblocks. // // The approach below has one drawback, namely that exceptions raised by loser are dropped on the floor. // if (winner == firstMoveNext) { enumerator = firstEnumerator; var ignored = secondMoveNext.ContinueWith(_ => { secondEnumerator.DisposeAsync(); }); } else { enumerator = secondEnumerator; var ignored = firstMoveNext.ContinueWith(_ => { firstEnumerator.DisposeAsync(); }); } state = AsyncIteratorState.Iterating; if (await winner.ConfigureAwait(false)) { current = enumerator.Current; return true; } break; case AsyncIteratorState.Iterating: if (await enumerator.MoveNextAsync().ConfigureAwait(false)) { current = enumerator.Current; return true; } break; } await DisposeAsync().ConfigureAwait(false); return false; } } private sealed class AmbAsyncIteratorN : AsyncIterator { private readonly IAsyncEnumerable[] sources; private IAsyncEnumerator enumerator; public AmbAsyncIteratorN(IAsyncEnumerable[] sources) { Debug.Assert(sources != null); this.sources = sources; } public override AsyncIterator Clone() { return new AmbAsyncIteratorN(sources); } public override async Task DisposeAsync() { if (enumerator != null) { await enumerator.DisposeAsync().ConfigureAwait(false); enumerator = null; } await base.DisposeAsync().ConfigureAwait(false); } protected override async Task MoveNextCore() { switch (state) { case AsyncIteratorState.Allocated: var n = sources.Length; var enumerators = new IAsyncEnumerator[n]; var moveNexts = new Task[n]; for (var i = 0; i < n; i++) { var enumerator = sources[i].GetAsyncEnumerator(); enumerators[i] = enumerator; moveNexts[i] = enumerator.MoveNextAsync(); } var winner = await Task.WhenAny(moveNexts).ConfigureAwait(false); // // REVIEW: An alternative option is to call DisposeAsync on the other and await it, but this has two drawbacks: // // 1. Concurrent DisposeAsync while a MoveNextAsync is in flight. // 2. The winner elected by Amb is blocked to yield results until all losers unblocks. // // The approach below has one drawback, namely that exceptions raised by any loser are dropped on the floor. // var winnerIndex = Array.IndexOf(moveNexts, winner); enumerator = enumerators[winnerIndex]; for (var i = 0; i < n; i++) { if (i != winnerIndex) { var ignored = moveNexts[i].ContinueWith(_ => { enumerators[i].DisposeAsync(); }); } } state = AsyncIteratorState.Iterating; if (await winner.ConfigureAwait(false)) { current = enumerator.Current; return true; } break; case AsyncIteratorState.Iterating: if (await enumerator.MoveNextAsync().ConfigureAwait(false)) { current = enumerator.Current; return true; } break; } await DisposeAsync().ConfigureAwait(false); return false; } } } }