// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information. using System.Collections.Generic; using System.Diagnostics; using System.Threading.Tasks; namespace System.Linq { public static partial class AsyncEnumerable { public static IAsyncEnumerable SelectMany(this IAsyncEnumerable source, IAsyncEnumerable other) { if (source == null) throw new ArgumentNullException(nameof(source)); if (other == null) throw new ArgumentNullException(nameof(other)); return source.SelectMany(_ => other); } public static IAsyncEnumerable SelectMany(this IAsyncEnumerable source, Func> selector) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return new SelectManyAsyncIterator(source, selector); } public static IAsyncEnumerable SelectMany(this IAsyncEnumerable source, Func>> selector) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return new SelectManyAsyncIteratorWithTask(source, selector); } public static IAsyncEnumerable SelectMany(this IAsyncEnumerable source, Func> selector) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return new SelectManyWithIndexAsyncIterator(source, selector); } public static IAsyncEnumerable SelectMany(this IAsyncEnumerable source, Func>> selector) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); return new SelectManyWithIndexAsyncIteratorWithTask(source, selector); } public static IAsyncEnumerable SelectMany(this IAsyncEnumerable source, Func> selector, Func resultSelector) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); if (resultSelector == null) throw new ArgumentNullException(nameof(resultSelector)); return new SelectManyAsyncIterator(source, selector, resultSelector); } public static IAsyncEnumerable SelectMany(this IAsyncEnumerable source, Func>> selector, Func> resultSelector) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); if (resultSelector == null) throw new ArgumentNullException(nameof(resultSelector)); return new SelectManyAsyncIteratorWithTask(source, selector, resultSelector); } public static IAsyncEnumerable SelectMany(this IAsyncEnumerable source, Func> selector, Func resultSelector) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); if (resultSelector == null) throw new ArgumentNullException(nameof(resultSelector)); return new SelectManyWithIndexAsyncIterator(source, selector, resultSelector); } public static IAsyncEnumerable SelectMany(this IAsyncEnumerable source, Func>> selector, Func> resultSelector) { if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); if (resultSelector == null) throw new ArgumentNullException(nameof(resultSelector)); return new SelectManyWithIndexAsyncIteratorWithTask(source, selector, resultSelector); } private sealed class SelectManyAsyncIterator : AsyncIterator { private const int State_Source = 1; private const int State_Result = 2; private readonly Func> selector; private readonly IAsyncEnumerable source; private int mode; private IAsyncEnumerator resultEnumerator; private IAsyncEnumerator sourceEnumerator; public SelectManyAsyncIterator(IAsyncEnumerable source, Func> selector) { Debug.Assert(source != null); Debug.Assert(selector != null); this.source = source; this.selector = selector; } public override AsyncIterator Clone() { return new SelectManyAsyncIterator(source, selector); } public override async Task DisposeAsync() { if (sourceEnumerator != null) { await sourceEnumerator.DisposeAsync().ConfigureAwait(false); sourceEnumerator = null; } if (resultEnumerator != null) { await resultEnumerator.DisposeAsync().ConfigureAwait(false); resultEnumerator = null; } await base.DisposeAsync().ConfigureAwait(false); } protected override async Task MoveNextCore() { switch (state) { case AsyncIteratorState.Allocated: sourceEnumerator = source.GetAsyncEnumerator(); mode = State_Source; state = AsyncIteratorState.Iterating; goto case AsyncIteratorState.Iterating; case AsyncIteratorState.Iterating: switch (mode) { case State_Source: if (await sourceEnumerator.MoveNextAsync().ConfigureAwait(false)) { if (resultEnumerator != null) { await resultEnumerator.DisposeAsync().ConfigureAwait(false); } var inner = selector(sourceEnumerator.Current); resultEnumerator = inner.GetAsyncEnumerator(); mode = State_Result; goto case State_Result; } break; case State_Result: if (await resultEnumerator.MoveNextAsync().ConfigureAwait(false)) { current = resultEnumerator.Current; return true; } mode = State_Source; goto case State_Source; // loop } break; } await DisposeAsync().ConfigureAwait(false); return false; } } private sealed class SelectManyAsyncIteratorWithTask : AsyncIterator { private const int State_Source = 1; private const int State_Result = 2; private readonly Func>> selector; private readonly IAsyncEnumerable source; private int mode; private IAsyncEnumerator resultEnumerator; private IAsyncEnumerator sourceEnumerator; public SelectManyAsyncIteratorWithTask(IAsyncEnumerable source, Func>> selector) { Debug.Assert(source != null); Debug.Assert(selector != null); this.source = source; this.selector = selector; } public override AsyncIterator Clone() { return new SelectManyAsyncIteratorWithTask(source, selector); } public override async Task DisposeAsync() { if (sourceEnumerator != null) { await sourceEnumerator.DisposeAsync().ConfigureAwait(false); sourceEnumerator = null; } if (resultEnumerator != null) { await resultEnumerator.DisposeAsync().ConfigureAwait(false); resultEnumerator = null; } await base.DisposeAsync().ConfigureAwait(false); } protected override async Task MoveNextCore() { switch (state) { case AsyncIteratorState.Allocated: sourceEnumerator = source.GetAsyncEnumerator(); mode = State_Source; state = AsyncIteratorState.Iterating; goto case AsyncIteratorState.Iterating; case AsyncIteratorState.Iterating: switch (mode) { case State_Source: if (await sourceEnumerator.MoveNextAsync().ConfigureAwait(false)) { if (resultEnumerator != null) { await resultEnumerator.DisposeAsync().ConfigureAwait(false); } var inner = await selector(sourceEnumerator.Current).ConfigureAwait(false); resultEnumerator = inner.GetAsyncEnumerator(); mode = State_Result; goto case State_Result; } break; case State_Result: if (await resultEnumerator.MoveNextAsync().ConfigureAwait(false)) { current = resultEnumerator.Current; return true; } mode = State_Source; goto case State_Source; // loop } break; } await DisposeAsync().ConfigureAwait(false); return false; } } private sealed class SelectManyAsyncIterator : AsyncIterator { private const int State_Source = 1; private const int State_Result = 2; private readonly Func> collectionSelector; private readonly Func resultSelector; private readonly IAsyncEnumerable source; private TSource currentSource; private int mode; private IAsyncEnumerator resultEnumerator; private IAsyncEnumerator sourceEnumerator; public SelectManyAsyncIterator(IAsyncEnumerable source, Func> collectionSelector, Func resultSelector) { Debug.Assert(source != null); Debug.Assert(collectionSelector != null); Debug.Assert(resultSelector != null); this.source = source; this.collectionSelector = collectionSelector; this.resultSelector = resultSelector; } public override AsyncIterator Clone() { return new SelectManyAsyncIterator(source, collectionSelector, resultSelector); } public override async Task DisposeAsync() { if (sourceEnumerator != null) { await sourceEnumerator.DisposeAsync().ConfigureAwait(false); sourceEnumerator = null; } if (resultEnumerator != null) { await resultEnumerator.DisposeAsync().ConfigureAwait(false); resultEnumerator = null; } currentSource = default(TSource); await base.DisposeAsync().ConfigureAwait(false); } protected override async Task MoveNextCore() { switch (state) { case AsyncIteratorState.Allocated: sourceEnumerator = source.GetAsyncEnumerator(); mode = State_Source; state = AsyncIteratorState.Iterating; goto case AsyncIteratorState.Iterating; case AsyncIteratorState.Iterating: switch (mode) { case State_Source: if (await sourceEnumerator.MoveNextAsync().ConfigureAwait(false)) { if (resultEnumerator != null) { await resultEnumerator.DisposeAsync().ConfigureAwait(false); } currentSource = sourceEnumerator.Current; var inner = collectionSelector(currentSource); resultEnumerator = inner.GetAsyncEnumerator(); mode = State_Result; goto case State_Result; } break; case State_Result: if (await resultEnumerator.MoveNextAsync().ConfigureAwait(false)) { current = resultSelector(currentSource, resultEnumerator.Current); return true; } mode = State_Source; goto case State_Source; // loop } break; } await DisposeAsync().ConfigureAwait(false); return false; } } private sealed class SelectManyAsyncIteratorWithTask : AsyncIterator { private const int State_Source = 1; private const int State_Result = 2; private readonly Func>> collectionSelector; private readonly Func> resultSelector; private readonly IAsyncEnumerable source; private TSource currentSource; private int mode; private IAsyncEnumerator resultEnumerator; private IAsyncEnumerator sourceEnumerator; public SelectManyAsyncIteratorWithTask(IAsyncEnumerable source, Func>> collectionSelector, Func> resultSelector) { Debug.Assert(source != null); Debug.Assert(collectionSelector != null); Debug.Assert(resultSelector != null); this.source = source; this.collectionSelector = collectionSelector; this.resultSelector = resultSelector; } public override AsyncIterator Clone() { return new SelectManyAsyncIteratorWithTask(source, collectionSelector, resultSelector); } public override async Task DisposeAsync() { if (sourceEnumerator != null) { await sourceEnumerator.DisposeAsync().ConfigureAwait(false); sourceEnumerator = null; } if (resultEnumerator != null) { await resultEnumerator.DisposeAsync().ConfigureAwait(false); resultEnumerator = null; } currentSource = default(TSource); await base.DisposeAsync().ConfigureAwait(false); } protected override async Task MoveNextCore() { switch (state) { case AsyncIteratorState.Allocated: sourceEnumerator = source.GetAsyncEnumerator(); mode = State_Source; state = AsyncIteratorState.Iterating; goto case AsyncIteratorState.Iterating; case AsyncIteratorState.Iterating: switch (mode) { case State_Source: if (await sourceEnumerator.MoveNextAsync().ConfigureAwait(false)) { if (resultEnumerator != null) { await resultEnumerator.DisposeAsync().ConfigureAwait(false); } currentSource = sourceEnumerator.Current; var inner = await collectionSelector(currentSource).ConfigureAwait(false); resultEnumerator = inner.GetAsyncEnumerator(); mode = State_Result; goto case State_Result; } break; case State_Result: if (await resultEnumerator.MoveNextAsync().ConfigureAwait(false)) { current = await resultSelector(currentSource, resultEnumerator.Current).ConfigureAwait(false); return true; } mode = State_Source; goto case State_Source; // loop } break; } await DisposeAsync().ConfigureAwait(false); return false; } } private sealed class SelectManyWithIndexAsyncIterator : AsyncIterator { private const int State_Source = 1; private const int State_Result = 2; private readonly Func> collectionSelector; private readonly Func resultSelector; private readonly IAsyncEnumerable source; private TSource currentSource; private int index; private int mode; private IAsyncEnumerator resultEnumerator; private IAsyncEnumerator sourceEnumerator; public SelectManyWithIndexAsyncIterator(IAsyncEnumerable source, Func> collectionSelector, Func resultSelector) { Debug.Assert(source != null); Debug.Assert(collectionSelector != null); Debug.Assert(resultSelector != null); this.source = source; this.collectionSelector = collectionSelector; this.resultSelector = resultSelector; } public override AsyncIterator Clone() { return new SelectManyWithIndexAsyncIterator(source, collectionSelector, resultSelector); } public override async Task DisposeAsync() { if (sourceEnumerator != null) { await sourceEnumerator.DisposeAsync().ConfigureAwait(false); sourceEnumerator = null; } if (resultEnumerator != null) { await resultEnumerator.DisposeAsync().ConfigureAwait(false); resultEnumerator = null; } currentSource = default(TSource); await base.DisposeAsync().ConfigureAwait(false); } protected override async Task MoveNextCore() { switch (state) { case AsyncIteratorState.Allocated: sourceEnumerator = source.GetAsyncEnumerator(); index = -1; mode = State_Source; state = AsyncIteratorState.Iterating; goto case AsyncIteratorState.Iterating; case AsyncIteratorState.Iterating: switch (mode) { case State_Source: if (await sourceEnumerator.MoveNextAsync().ConfigureAwait(false)) { if (resultEnumerator != null) { await resultEnumerator.DisposeAsync().ConfigureAwait(false); } currentSource = sourceEnumerator.Current; checked { index++; } var inner = collectionSelector(currentSource, index); resultEnumerator = inner.GetAsyncEnumerator(); mode = State_Result; goto case State_Result; } break; case State_Result: if (await resultEnumerator.MoveNextAsync().ConfigureAwait(false)) { current = resultSelector(currentSource, resultEnumerator.Current); return true; } mode = State_Source; goto case State_Source; // loop } break; } await DisposeAsync().ConfigureAwait(false); return false; } } private sealed class SelectManyWithIndexAsyncIteratorWithTask : AsyncIterator { private const int State_Source = 1; private const int State_Result = 2; private readonly Func>> collectionSelector; private readonly Func> resultSelector; private readonly IAsyncEnumerable source; private TSource currentSource; private int index; private int mode; private IAsyncEnumerator resultEnumerator; private IAsyncEnumerator sourceEnumerator; public SelectManyWithIndexAsyncIteratorWithTask(IAsyncEnumerable source, Func>> collectionSelector, Func> resultSelector) { Debug.Assert(source != null); Debug.Assert(collectionSelector != null); Debug.Assert(resultSelector != null); this.source = source; this.collectionSelector = collectionSelector; this.resultSelector = resultSelector; } public override AsyncIterator Clone() { return new SelectManyWithIndexAsyncIteratorWithTask(source, collectionSelector, resultSelector); } public override async Task DisposeAsync() { if (sourceEnumerator != null) { await sourceEnumerator.DisposeAsync().ConfigureAwait(false); sourceEnumerator = null; } if (resultEnumerator != null) { await resultEnumerator.DisposeAsync().ConfigureAwait(false); resultEnumerator = null; } currentSource = default(TSource); await base.DisposeAsync().ConfigureAwait(false); } protected override async Task MoveNextCore() { switch (state) { case AsyncIteratorState.Allocated: sourceEnumerator = source.GetAsyncEnumerator(); index = -1; mode = State_Source; state = AsyncIteratorState.Iterating; goto case AsyncIteratorState.Iterating; case AsyncIteratorState.Iterating: switch (mode) { case State_Source: if (await sourceEnumerator.MoveNextAsync().ConfigureAwait(false)) { if (resultEnumerator != null) { await resultEnumerator.DisposeAsync().ConfigureAwait(false); } currentSource = sourceEnumerator.Current; checked { index++; } var inner = await collectionSelector(currentSource, index).ConfigureAwait(false); resultEnumerator = inner.GetAsyncEnumerator(); mode = State_Result; goto case State_Result; } break; case State_Result: if (await resultEnumerator.MoveNextAsync().ConfigureAwait(false)) { current = await resultSelector(currentSource, resultEnumerator.Current).ConfigureAwait(false); return true; } mode = State_Source; goto case State_Source; // loop } break; } await DisposeAsync().ConfigureAwait(false); return false; } } private sealed class SelectManyWithIndexAsyncIterator : AsyncIterator { private const int State_Source = 1; private const int State_Result = 2; private readonly Func> selector; private readonly IAsyncEnumerable source; private int index; private int mode; private IAsyncEnumerator resultEnumerator; private IAsyncEnumerator sourceEnumerator; public SelectManyWithIndexAsyncIterator(IAsyncEnumerable source, Func> selector) { Debug.Assert(source != null); Debug.Assert(selector != null); this.source = source; this.selector = selector; } public override AsyncIterator Clone() { return new SelectManyWithIndexAsyncIterator(source, selector); } public override async Task DisposeAsync() { if (sourceEnumerator != null) { await sourceEnumerator.DisposeAsync().ConfigureAwait(false); sourceEnumerator = null; } if (resultEnumerator != null) { await resultEnumerator.DisposeAsync().ConfigureAwait(false); resultEnumerator = null; } await base.DisposeAsync().ConfigureAwait(false); } protected override async Task MoveNextCore() { switch (state) { case AsyncIteratorState.Allocated: sourceEnumerator = source.GetAsyncEnumerator(); index = -1; mode = State_Source; state = AsyncIteratorState.Iterating; goto case AsyncIteratorState.Iterating; case AsyncIteratorState.Iterating: switch (mode) { case State_Source: if (await sourceEnumerator.MoveNextAsync().ConfigureAwait(false)) { if (resultEnumerator != null) { await resultEnumerator.DisposeAsync().ConfigureAwait(false); } checked { index++; } var inner = selector(sourceEnumerator.Current, index); resultEnumerator = inner.GetAsyncEnumerator(); mode = State_Result; goto case State_Result; } break; case State_Result: if (await resultEnumerator.MoveNextAsync().ConfigureAwait(false)) { current = resultEnumerator.Current; return true; } mode = State_Source; goto case State_Source; // loop } break; } await DisposeAsync().ConfigureAwait(false); return false; } } private sealed class SelectManyWithIndexAsyncIteratorWithTask : AsyncIterator { private const int State_Source = 1; private const int State_Result = 2; private readonly Func>> selector; private readonly IAsyncEnumerable source; private int index; private int mode; private IAsyncEnumerator resultEnumerator; private IAsyncEnumerator sourceEnumerator; public SelectManyWithIndexAsyncIteratorWithTask(IAsyncEnumerable source, Func>> selector) { Debug.Assert(source != null); Debug.Assert(selector != null); this.source = source; this.selector = selector; } public override AsyncIterator Clone() { return new SelectManyWithIndexAsyncIteratorWithTask(source, selector); } public override async Task DisposeAsync() { if (sourceEnumerator != null) { await sourceEnumerator.DisposeAsync().ConfigureAwait(false); sourceEnumerator = null; } if (resultEnumerator != null) { await resultEnumerator.DisposeAsync().ConfigureAwait(false); resultEnumerator = null; } await base.DisposeAsync().ConfigureAwait(false); } protected override async Task MoveNextCore() { switch (state) { case AsyncIteratorState.Allocated: sourceEnumerator = source.GetAsyncEnumerator(); index = -1; mode = State_Source; state = AsyncIteratorState.Iterating; goto case AsyncIteratorState.Iterating; case AsyncIteratorState.Iterating: switch (mode) { case State_Source: if (await sourceEnumerator.MoveNextAsync().ConfigureAwait(false)) { if (resultEnumerator != null) { await resultEnumerator.DisposeAsync().ConfigureAwait(false); } checked { index++; } var inner = await selector(sourceEnumerator.Current, index).ConfigureAwait(false); resultEnumerator = inner.GetAsyncEnumerator(); mode = State_Result; goto case State_Result; } break; case State_Result: if (await resultEnumerator.MoveNextAsync().ConfigureAwait(false)) { current = resultEnumerator.Current; return true; } mode = State_Source; goto case State_Source; // loop } break; } await DisposeAsync().ConfigureAwait(false); return false; } } } }