// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT License. // See the LICENSE file in the project root for more information. using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; namespace System.Linq { public static partial class AsyncEnumerableEx { /// /// Propagates the async-enumerable sequence that reacts first. /// /// The type of the elements in the source sequences. /// First async-enumerable sequence. /// Second async-enumerable sequence. /// An async-enumerable sequence that surfaces either of the given sequences, whichever reacted first. /// or is null. public static IAsyncEnumerable Amb(this IAsyncEnumerable first, IAsyncEnumerable second) { if (first == null) throw Error.ArgumentNull(nameof(first)); if (second == null) throw Error.ArgumentNull(nameof(second)); #if HAS_ASYNC_ENUMERABLE_CANCELLATION return Core(first, second); static async IAsyncEnumerable Core(IAsyncEnumerable first, IAsyncEnumerable second, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default) #else return AsyncEnumerable.Create(Core); async IAsyncEnumerator Core(CancellationToken cancellationToken) #endif { IAsyncEnumerator? firstEnumerator = null; IAsyncEnumerator? secondEnumerator = null; Task? firstMoveNext = null; Task? secondMoveNext = null; // // We need separate tokens for each source so that the non-winner can get disposed and unblocked // i.e., see Never() // var firstCancelToken = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); var secondCancelToken = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); try { // // REVIEW: We start both sequences unconditionally. An alternative implementation could be to just stick // to the first sequence if we notice it already has a value (or exception) available. This would // be similar to Task.WhenAny behavior (see CommonCWAnyLogic in dotnet/coreclr). We could consider // adding a WhenAny combinator that does exactly that. We can even avoid calling AsTask. // firstEnumerator = first.GetAsyncEnumerator(firstCancelToken.Token); firstMoveNext = firstEnumerator.MoveNextAsync().AsTask(); // // REVIEW: Order of operations has changed here compared to the original, but is now in sync with the N-ary // overload which performs GetAsyncEnumerator/MoveNextAsync in pairs, rather than phased. // secondEnumerator = second.GetAsyncEnumerator(secondCancelToken.Token); secondMoveNext = secondEnumerator.MoveNextAsync().AsTask(); } catch { secondCancelToken.Cancel(); firstCancelToken.Cancel(); // NB: AwaitMoveNextAsyncAndDispose checks for null for both arguments, reducing the need for many null // checks over here. var cleanup = new[] { AwaitMoveNextAsyncAndDispose(secondMoveNext, secondEnumerator), AwaitMoveNextAsyncAndDispose(firstMoveNext, firstEnumerator) }; await Task.WhenAll(cleanup).ConfigureAwait(false); throw; } // // REVIEW: Consider using the WhenAny combinator defined for Merge in TaskExt, which would avoid the need // to convert to Task prior to calling Task.WhenAny. // var moveNextWinner = await Task.WhenAny(firstMoveNext, secondMoveNext).ConfigureAwait(false); // // REVIEW: An alternative option is to call DisposeAsync on the other and await it, but this has two drawbacks: // // 1. Concurrent DisposeAsync while a MoveNextAsync is in flight. // 2. The winner elected by Amb is blocked to yield results until the loser unblocks. // IAsyncEnumerator winner; Task disposeLoser; if (moveNextWinner == firstMoveNext) { winner = firstEnumerator; secondCancelToken.Cancel(); disposeLoser = AwaitMoveNextAsyncAndDispose(secondMoveNext, secondEnumerator); } else { winner = secondEnumerator; firstCancelToken.Cancel(); disposeLoser = AwaitMoveNextAsyncAndDispose(firstMoveNext, firstEnumerator); } try { await using (winner.ConfigureAwait(false)) { if (!await moveNextWinner.ConfigureAwait(false)) { yield break; } yield return winner.Current; while (await winner.MoveNextAsync().ConfigureAwait(false)) { yield return winner.Current; } } } finally { // // REVIEW: This behavior differs from the original implementation in that we never discard any in flight // asynchronous operations. If an exception occurs while enumerating the winner, it can be // subsumed by an exception thrown due to cleanup of the loser. Also, if Amb is used to deal with // a potentially long-blocking sequence, this implementation would transfer this blocking behavior // to the resulting sequence. However, it seems never discarding a non-completed task should be a // general design tenet, and fire-and-forget dispose behavior could be added as another "unsafe" // operator, so all such sins are made explicit by the user. Nonetheless, this change is a breaking // change for Ix Async. // await disposeLoser.ConfigureAwait(false); } } } /// /// Propagates the async-enumerable sequence that reacts first. /// /// The type of the elements in the source sequences. /// Observable sources competing to react first. /// An async-enumerable sequence that surfaces any of the given sequences, whichever reacted first. /// is null. public static IAsyncEnumerable Amb(params IAsyncEnumerable[] sources) { if (sources == null) throw Error.ArgumentNull(nameof(sources)); return AsyncEnumerable.Create(Core); async IAsyncEnumerator Core(CancellationToken cancellationToken) { // // REVIEW: See remarks on binary overload for changes compared to the original. // var n = sources.Length; var enumerators = new IAsyncEnumerator[n]; var moveNexts = new Task[n]; var individualTokenSources = new CancellationTokenSource[n]; for (var i = 0; i < n; i++) { individualTokenSources[i] = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); } try { for (var i = 0; i < n; i++) { var enumerator = sources[i].GetAsyncEnumerator(individualTokenSources[i].Token); enumerators[i] = enumerator; moveNexts[i] = enumerator.MoveNextAsync().AsTask(); } } catch { var cleanup = new Task[n]; for (var i = n - 1; i >= 0; i--) { individualTokenSources[i].Cancel(); cleanup[i] = AwaitMoveNextAsyncAndDispose(moveNexts[i], enumerators[i]); } await Task.WhenAll(cleanup).ConfigureAwait(false); throw; } var moveNextWinner = await Task.WhenAny(moveNexts).ConfigureAwait(false); // // NB: The use of IndexOf is fine. If task N completed by returning a ValueTask // which is equivalent to the task returned by task M (where M < N), AsTask may // return the same reference (e.g. due to caching of completed Boolean tasks). In // such a case, IndexOf will find task M rather than N in the array, but both have // an equivalent completion state (because they're reference equal). This only leads // to a left-bias in selection of sources, but given Amb's "ambiguous" nature, this // is acceptable. // var winnerIndex = Array.IndexOf(moveNexts, moveNextWinner); var winner = enumerators[winnerIndex]; var loserCleanupTasks = new List(n - 1); for (var i = n - 1; i >= 0; i--) { if (i != winnerIndex) { individualTokenSources[i].Cancel(); var loserCleanupTask = AwaitMoveNextAsyncAndDispose(moveNexts[i], enumerators[i]); loserCleanupTasks.Add(loserCleanupTask); } } var cleanupLosers = Task.WhenAll(loserCleanupTasks); try { await using (winner.ConfigureAwait(false)) { if (!await moveNextWinner.ConfigureAwait(false)) { yield break; } yield return winner.Current; while (await winner.MoveNextAsync().ConfigureAwait(false)) { yield return winner.Current; } } } finally { await cleanupLosers.ConfigureAwait(false); } } } /// /// Propagates the async-enumerable sequence that reacts first. /// /// The type of the elements in the source sequences. /// Observable sources competing to react first. /// An async-enumerable sequence that surfaces any of the given sequences, whichever reacted first. /// is null. public static IAsyncEnumerable Amb(this IEnumerable> sources) { if (sources == null) throw Error.ArgumentNull(nameof(sources)); return Amb(sources.ToArray()); } private static async Task AwaitMoveNextAsyncAndDispose(Task? moveNextAsync, IAsyncEnumerator? enumerator) { if (enumerator != null) { await using (enumerator.ConfigureAwait(false)) { if (moveNextAsync != null) { try { await moveNextAsync.ConfigureAwait(false); } catch (TaskCanceledException) { // ignored because of cancelling the non-winners } } } } } } }