// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT License.
// See the LICENSE file in the project root for more information.
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace System.Linq
{
public static partial class AsyncEnumerableEx
{
///
/// Merges elements from all of the specified async-enumerable sequences into a single async-enumerable sequence.
///
/// The type of the elements in the source sequences.
/// Observable sequences.
/// The async-enumerable sequence that merges the elements of the async-enumerable sequences.
/// is null.
public static IAsyncEnumerable Merge(params IAsyncEnumerable[] sources)
{
if (sources == null)
throw Error.ArgumentNull(nameof(sources));
return Core(sources);
static async IAsyncEnumerable Core(IAsyncEnumerable[] sources, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default)
#if USE_FAIR_AND_CHEAPER_MERGE
//
// This new implementation of Merge differs from the original one in a few ways:
//
// - It's cheaper because:
// - no conversion from ValueTask to Task takes place using AsTask,
// - we don't instantiate Task.WhenAny tasks for each iteration.
// - It's fairer because:
// - the MoveNextAsync tasks are awaited concurently, but completions are queued,
// instead of awaiting a new WhenAny task where "left" sources have preferential
// treatment over "right" sources.
//
{
var count = sources.Length;
var enumerators = new IAsyncEnumerator[count];
var moveNextTasks = new ValueTask[count];
try
{
for (var i = 0; i < count; i++)
{
IAsyncEnumerator enumerator = sources[i].GetAsyncEnumerator(cancellationToken);
enumerators[i] = enumerator;
// REVIEW: This follows the lead of the original implementation where we kick off MoveNextAsync
// operations immediately. An alternative would be to do this in a separate stage, thus
// preventing concurrency across MoveNextAsync and GetAsyncEnumerator calls and avoiding
// any MoveNextAsync calls before all enumerators are acquired (or an exception has
// occurred doing so).
moveNextTasks[i] = enumerator.MoveNextAsync();
}
var whenAny = TaskExt.WhenAny(moveNextTasks);
int active = count;
while (active > 0)
{
int index = await whenAny;
IAsyncEnumerator enumerator = enumerators[index];
ValueTask moveNextTask = moveNextTasks[index];
if (!await moveNextTask.ConfigureAwait(false))
{
//
// Replace the task in our array by a completed task to make finally logic easier. Note that
// the WhenAnyValueTask object has a reference to our array (i.e. no copy is made), so this
// gets rid of any resources the original task may have held onto. However, we *don't* call
// whenAny.Replace to set this value, because it'd attach an awaiter to the already completed
// task, causing spurious wake-ups when awaiting whenAny.
//
moveNextTasks[index] = new ValueTask();
// REVIEW: The original implementation did not dispose eagerly, which could lead to resource
// leaks when merged with other long-running sequences.
enumerators[index] = null; // NB: Avoids attempt at double dispose in finally if disposing fails.
await enumerator.DisposeAsync().ConfigureAwait(false);
active--;
}
else
{
TSource item = enumerator.Current;
//
// Replace the task using whenAny.Replace, which will write it to the moveNextTasks array, and
// will start awaiting the task. Note we don't have to write to moveNextTasks ourselves because
// the whenAny object has a reference to it (i.e. no copy is made).
//
whenAny.Replace(index, enumerator.MoveNextAsync());
yield return item;
}
}
}
finally
{
// REVIEW: The original implementation performs a concurrent dispose, which seems undesirable given the
// additional uncontrollable source of concurrency and the sequential resource acquisition. In
// this modern implementation, we release resources in opposite order as we acquired them, thus
// guaranteeing determinism (and mimicking a series of nested `await using` statements).
// REVIEW: If we decide to phase GetAsyncEnumerator and the initial MoveNextAsync calls at the start of
// the operator implementation, we should make this symmetric and first await all in flight
// MoveNextAsync operations, prior to disposing the enumerators.
var errors = default(List);
for (var i = count - 1; i >= 0; i--)
{
ValueTask moveNextTask = moveNextTasks[i];
IAsyncEnumerator enumerator = enumerators[i];
try
{
try
{
//
// Await the task to ensure outstanding work is completed prior to performing a dispose
// operation. Note that we don't have to do anything special for tasks belonging to
// enumerators that have finished; we swapped in a placeholder completed task.
//
// REVIEW: This adds an additional continuation to all of the pending tasks (note that
// whenAny also has registered one). The whenAny object will be collectible
// after all of these complete. Alternatively, we could drain via whenAny, by
// awaiting it until the active count drops to 0. This saves on attaching the
// additional continuations, but we need to decide on order of dispose. Right
// now, we dispose in opposite order of acquiring the enumerators, with the
// exception of enumerators that were disposed eagerly upon early completion.
// Should we care about the dispose order at all?
_ = await moveNextTask.ConfigureAwait(false);
}
finally
{
if (enumerator != null)
{
await enumerator.DisposeAsync().ConfigureAwait(false);
}
}
}
catch (Exception ex)
{
if (errors == null)
{
errors = new List();
}
errors.Add(ex);
}
}
// NB: If we had any errors during cleaning (and awaiting pending operations), we throw these exceptions
// instead of the original exception that may have led to running the finally block. This is similar
// to throwing from any finally block (except that we catch all exceptions to ensure cleanup of all
// concurrent sequences being merged).
if (errors != null)
{
throw new AggregateException(errors);
}
}
}
#else
{
var count = sources.Length;
var enumerators = new IAsyncEnumerator?[count];
var moveNextTasks = new Task[count];
try
{
for (var i = 0; i < count; i++)
{
var enumerator = sources[i].GetAsyncEnumerator(cancellationToken);
enumerators[i] = enumerator;
// REVIEW: This follows the lead of the original implementation where we kick off MoveNextAsync
// operations immediately. An alternative would be to do this in a separate stage, thus
// preventing concurrency across MoveNextAsync and GetAsyncEnumerator calls and avoiding
// any MoveNextAsync calls before all enumerators are acquired (or an exception has
// occurred doing so).
moveNextTasks[i] = enumerator.MoveNextAsync().AsTask();
}
var active = count;
while (active > 0)
{
// REVIEW: Performance of WhenAny may be an issue when called repeatedly like this. We should
// measure and could consider operating directly on the ValueTask objects, thus
// also preventing the Task allocations from AsTask.
var moveNextTask = await Task.WhenAny(moveNextTasks).ConfigureAwait(false);
// REVIEW: This seems wrong. AsTask can return the original Task (if the ValueTask
// is wrapping one) or return a singleton instance for true and false, at which point
// the use of IndexOf may pick an element closer to the start of the array because of
// reference equality checks and aliasing effects. See GetTaskForResult in the BCL.
var index = Array.IndexOf(moveNextTasks, moveNextTask);
var enumerator = enumerators[index]!; // NB: Only gets set to null after setting task to Never.
if (!await moveNextTask.ConfigureAwait(false))
{
moveNextTasks[index] = TaskExt.Never;
// REVIEW: The original implementation did not dispose eagerly, which could lead to resource
// leaks when merged with other long-running sequences.
enumerators[index] = null; // NB: Avoids attempt at double dispose in finally if disposing fails.
await enumerator.DisposeAsync().ConfigureAwait(false);
active--;
}
else
{
var item = enumerator.Current;
moveNextTasks[index] = enumerator.MoveNextAsync().AsTask();
yield return item;
}
}
}
finally
{
// REVIEW: The original implementation performs a concurrent dispose, which seems undesirable given the
// additional uncontrollable source of concurrency and the sequential resource acquisition. In
// this modern implementation, we release resources in opposite order as we acquired them, thus
// guaranteeing determinism (and mimicking a series of nested `await using` statements).
// REVIEW: If we decide to phase GetAsyncEnumerator and the initial MoveNextAsync calls at the start of
// the operator implementation, we should make this symmetric and first await all in flight
// MoveNextAsync operations, prior to disposing the enumerators.
var errors = default(List);
for (var i = count - 1; i >= 0; i--)
{
var moveNextTask = moveNextTasks[i];
var enumerator = enumerators[i];
try
{
try
{
if (moveNextTask != null && moveNextTask != TaskExt.Never)
{
_ = await moveNextTask.ConfigureAwait(false);
}
}
finally
{
if (enumerator != null)
{
await enumerator.DisposeAsync().ConfigureAwait(false);
}
}
}
catch (Exception ex)
{
if (errors == null)
{
errors = new List();
}
errors.Add(ex);
}
}
// NB: If we had any errors during cleaning (and awaiting pending operations), we throw these exceptions
// instead of the original exception that may have led to running the finally block. This is similar
// to throwing from any finally block (except that we catch all exceptions to ensure cleanup of all
// concurrent sequences being merged).
if (errors != null)
{
throw new AggregateException(errors);
}
}
}
#endif
}
///
/// Merges elements from all async-enumerable sequences in the given enumerable sequence into a single async-enumerable sequence.
///
/// The type of the elements in the source sequences.
/// Enumerable sequence of async-enumerable sequences.
/// The async-enumerable sequence that merges the elements of the async-enumerable sequences.
/// is null.
public static IAsyncEnumerable Merge(this IEnumerable> sources)
{
if (sources == null)
throw Error.ArgumentNull(nameof(sources));
//
// REVIEW: This implementation does not exploit concurrency. We should not introduce such behavior in order to
// avoid breaking changes, but we could introduce a parallel ConcurrentMerge implementation. It is
// unfortunate though that the Merge overload accepting an array has always been concurrent, so we can't
// change that either (in order to have consistency where Merge is non-concurrent, and ConcurrentMerge
// is). We could consider a breaking change to Ix Async to streamline this, but we should do so when
// shipping with the BCL interfaces (which is already a breaking change to existing Ix Async users). If
// we go that route, we can either have:
//
// - All overloads of Merge are concurrent
// - and continue to be named Merge, or,
// - are renamed to ConcurrentMerge for clarity (likely alongside a ConcurrentZip).
// - All overloads of Merge are non-concurrent
// - and are simply SelectMany operator macros (maybe more optimized)
// - Have ConcurrentMerge next to Merge overloads
// - where ConcurrentMerge may need a degree of concurrency parameter (and maybe other options), and,
// - where the overload set of both families may be asymmetric
//
return sources.ToAsyncEnumerable().SelectMany(source => source);
}
///
/// Merges elements from all inner async-enumerable sequences into a single async-enumerable sequence.
///
/// The type of the elements in the source sequences.
/// Observable sequence of inner async-enumerable sequences.
/// The async-enumerable sequence that merges the elements of the inner sequences.
/// is null.
public static IAsyncEnumerable Merge(this IAsyncEnumerable> sources)
{
if (sources == null)
throw Error.ArgumentNull(nameof(sources));
//
// REVIEW: This implementation does not exploit concurrency. We should not introduce such behavior in order to
// avoid breaking changes, but we could introduce a parallel ConcurrentMerge implementation.
//
return sources.SelectMany(source => source);
}
}
}