// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information. using System.Collections.Generic; using System.Diagnostics; using System.Threading; using System.Threading.Tasks; namespace System.Linq { public static partial class AsyncEnumerableEx { public static IAsyncEnumerable Amb(this IAsyncEnumerable first, IAsyncEnumerable second) { if (first == null) throw Error.ArgumentNull(nameof(first)); if (second == null) throw Error.ArgumentNull(nameof(second)); return new AmbAsyncIterator(first, second); } public static IAsyncEnumerable Amb(params IAsyncEnumerable[] sources) { if (sources == null) throw Error.ArgumentNull(nameof(sources)); return new AmbAsyncIteratorN(sources); } public static IAsyncEnumerable Amb(this IEnumerable> sources) { if (sources == null) throw Error.ArgumentNull(nameof(sources)); return new AmbAsyncIteratorN(sources.ToArray()); } private sealed class AmbAsyncIterator : AsyncIterator { private readonly IAsyncEnumerable _first; private readonly IAsyncEnumerable _second; private IAsyncEnumerator _enumerator; public AmbAsyncIterator(IAsyncEnumerable first, IAsyncEnumerable second) { Debug.Assert(first != null); Debug.Assert(second != null); _first = first; _second = second; } public override AsyncIterator Clone() { return new AmbAsyncIterator(_first, _second); } public override async ValueTask DisposeAsync() { if (_enumerator != null) { await _enumerator.DisposeAsync().ConfigureAwait(false); _enumerator = null; } await base.DisposeAsync().ConfigureAwait(false); } protected override async ValueTask MoveNextCore() { switch (_state) { case AsyncIteratorState.Allocated: var firstEnumerator = _first.GetAsyncEnumerator(_cancellationToken); var secondEnumerator = _second.GetAsyncEnumerator(_cancellationToken); var firstMoveNext = firstEnumerator.MoveNextAsync().AsTask(); var secondMoveNext = secondEnumerator.MoveNextAsync().AsTask(); var winner = await Task.WhenAny(firstMoveNext, secondMoveNext).ConfigureAwait(false); // // REVIEW: An alternative option is to call DisposeAsync on the other and await it, but this has two drawbacks: // // 1. Concurrent DisposeAsync while a MoveNextAsync is in flight. // 2. The winner elected by Amb is blocked to yield results until the loser unblocks. // // The approach below has one drawback, namely that exceptions raised by loser are dropped on the floor. // if (winner == firstMoveNext) { _enumerator = firstEnumerator; var ignored = secondMoveNext.ContinueWith(_ => { secondEnumerator.DisposeAsync(); }); } else { _enumerator = secondEnumerator; var ignored = firstMoveNext.ContinueWith(_ => { firstEnumerator.DisposeAsync(); }); } _state = AsyncIteratorState.Iterating; if (await winner.ConfigureAwait(false)) { _current = _enumerator.Current; return true; } break; case AsyncIteratorState.Iterating: if (await _enumerator.MoveNextAsync().ConfigureAwait(false)) { _current = _enumerator.Current; return true; } break; } await DisposeAsync().ConfigureAwait(false); return false; } } private sealed class AmbAsyncIteratorN : AsyncIterator { private readonly IAsyncEnumerable[] _sources; private IAsyncEnumerator _enumerator; public AmbAsyncIteratorN(IAsyncEnumerable[] sources) { Debug.Assert(sources != null); _sources = sources; } public override AsyncIterator Clone() { return new AmbAsyncIteratorN(_sources); } public override async ValueTask DisposeAsync() { if (_enumerator != null) { await _enumerator.DisposeAsync().ConfigureAwait(false); _enumerator = null; } await base.DisposeAsync().ConfigureAwait(false); } protected override async ValueTask MoveNextCore() { switch (_state) { case AsyncIteratorState.Allocated: var n = _sources.Length; var enumerators = new IAsyncEnumerator[n]; var moveNexts = new ValueTask[n]; for (var i = 0; i < n; i++) { var enumerator = _sources[i].GetAsyncEnumerator(_cancellationToken); enumerators[i] = enumerator; moveNexts[i] = enumerator.MoveNextAsync(); } var winner = await Task.WhenAny(moveNexts.Select(t => t.AsTask())).ConfigureAwait(false); // // REVIEW: An alternative option is to call DisposeAsync on the other and await it, but this has two drawbacks: // // 1. Concurrent DisposeAsync while a MoveNextAsync is in flight. // 2. The winner elected by Amb is blocked to yield results until all losers unblocks. // // The approach below has one drawback, namely that exceptions raised by any loser are dropped on the floor. // var winnerIndex = Array.IndexOf(moveNexts, winner); _enumerator = enumerators[winnerIndex]; for (var i = 0; i < n; i++) { if (i != winnerIndex) { var ignored = moveNexts[i].AsTask().ContinueWith(_ => { enumerators[i].DisposeAsync(); }); } } _state = AsyncIteratorState.Iterating; if (await winner.ConfigureAwait(false)) { _current = _enumerator.Current; return true; } break; case AsyncIteratorState.Iterating: if (await _enumerator.MoveNextAsync().ConfigureAwait(false)) { _current = _enumerator.Current; return true; } break; } await DisposeAsync().ConfigureAwait(false); return false; } } } }