| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450 |
- // Licensed to the .NET Foundation under one or more agreements.
- // The .NET Foundation licenses this file to you under the MIT License.
- // See the LICENSE file in the project root for more information.
- using System.Collections.Generic;
- using System.Threading;
- using System.Threading.Tasks;
- namespace System.Linq
- {
- public static partial class AsyncEnumerable
- {
- #if INCLUDE_SYSTEM_LINQ_ASYNCENUMERABLE_DUPLICATES
- // https://learn.microsoft.com/en-us/dotnet/api/system.linq.asyncenumerable.where?view=net-9.0-pp#system-linq-asyncenumerable-where-1(system-collections-generic-iasyncenumerable((-0))-system-func((-0-system-boolean)))
- /// <summary>
- /// Filters the elements of an async-enumerable sequence based on a predicate.
- /// </summary>
- /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
- /// <param name="source">An async-enumerable sequence whose elements to filter.</param>
- /// <param name="predicate">A function to test each source element for a condition.</param>
- /// <returns>An async-enumerable sequence that contains elements from the input sequence that satisfy the condition.</returns>
- /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="predicate"/> is null.</exception>
- public static IAsyncEnumerable<TSource> Where<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, bool> predicate)
- {
- if (source == null)
- throw Error.ArgumentNull(nameof(source));
- if (predicate == null)
- throw Error.ArgumentNull(nameof(predicate));
- if (source is AsyncIteratorBase<TSource> iterator)
- {
- return iterator.Where(predicate);
- }
- // TODO: Can we add array/list optimizations here, does it make sense?
- return new WhereEnumerableAsyncIterator<TSource>(source, predicate);
- }
- // https://learn.microsoft.com/en-us/dotnet/api/system.linq.asyncenumerable.where?view=net-9.0-pp#system-linq-asyncenumerable-where-1(system-collections-generic-iasyncenumerable((-0))-system-func((-0-system-int32-system-boolean)))
- /// <summary>
- /// Filters the elements of an async-enumerable sequence based on a predicate by incorporating the element's index.
- /// </summary>
- /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
- /// <param name="source">An async-enumerable sequence whose elements to filter.</param>
- /// <param name="predicate">A function to test each source element for a condition; the second parameter of the function represents the index of the source element.</param>
- /// <returns>An async-enumerable sequence that contains elements from the input sequence that satisfy the condition.</returns>
- /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="predicate"/> is null.</exception>
- public static IAsyncEnumerable<TSource> Where<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, int, bool> predicate)
- {
- if (source == null)
- throw Error.ArgumentNull(nameof(source));
- if (predicate == null)
- throw Error.ArgumentNull(nameof(predicate));
- return Core(source, predicate);
- static async IAsyncEnumerable<TSource> Core(IAsyncEnumerable<TSource> source, Func<TSource, int, bool> predicate, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default)
- {
- var index = -1;
- await foreach (var element in source.WithCancellation(cancellationToken).ConfigureAwait(false))
- {
- checked
- {
- index++;
- }
- if (predicate(element, index))
- {
- yield return element;
- }
- }
- }
- }
- #endif // INCLUDE_SYSTEM_LINQ_ASYNCENUMERABLE_DUPLICATES
- /// <summary>
- /// Filters the elements of an async-enumerable sequence based on an asynchronous predicate.
- /// </summary>
- /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
- /// <param name="source">An async-enumerable sequence whose elements to filter.</param>
- /// <param name="predicate">An asynchronous predicate to test each source element for a condition.</param>
- /// <returns>An async-enumerable sequence that contains elements from the input sequence that satisfy the condition.</returns>
- /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="predicate"/> is null.</exception>
- [GenerateAsyncOverload]
- [Obsolete("Use Where. IAsyncEnumerable LINQ is now in System.Linq.AsyncEnumerable, and the WhereAwait functionality now exists as overloads of Where. You will need to modify your callback to take an additional CancellationToken argument.")]
- private static IAsyncEnumerable<TSource> WhereAwaitCore<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, ValueTask<bool>> predicate)
- {
- if (source == null)
- throw Error.ArgumentNull(nameof(source));
- if (predicate == null)
- throw Error.ArgumentNull(nameof(predicate));
- if (source is AsyncIteratorBase<TSource> iterator)
- {
- return iterator.Where(predicate);
- }
- // TODO: Can we add array/list optimizations here, does it make sense?
- return new WhereEnumerableAsyncIteratorWithTask<TSource>(source, predicate);
- }
- #if !NO_DEEP_CANCELLATION
- [GenerateAsyncOverload]
- [Obsolete("Use Where. IAsyncEnumerable LINQ is now in System.Linq.AsyncEnumerable, and the WhereAwaitWithCancellation functionality now exists as overloads of Where.")]
- private static IAsyncEnumerable<TSource> WhereAwaitWithCancellationCore<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<bool>> predicate)
- {
- if (source == null)
- throw Error.ArgumentNull(nameof(source));
- if (predicate == null)
- throw Error.ArgumentNull(nameof(predicate));
- if (source is AsyncIteratorBase<TSource> iterator)
- {
- return iterator.Where(predicate);
- }
- // TODO: Can we add array/list optimizations here, does it make sense?
- return new WhereEnumerableAsyncIteratorWithTaskAndCancellation<TSource>(source, predicate);
- }
- #endif
- /// <summary>
- /// Filters the elements of an async-enumerable sequence based on an asynchronous predicate that incorporates the element's index.
- /// </summary>
- /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
- /// <param name="source">An async-enumerable sequence whose elements to filter.</param>
- /// <param name="predicate">An asynchronous predicate to test each source element for a condition; the second parameter of the function represents the index of the source element.</param>
- /// <returns>An async-enumerable sequence that contains elements from the input sequence that satisfy the condition.</returns>
- /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="predicate"/> is null.</exception>
- [GenerateAsyncOverload]
- [Obsolete("Use Where. IAsyncEnumerable LINQ is now in System.Linq.AsyncEnumerable, and the WhereAwait functionality now exists as overloads of Where. You will need to modify your callback to take an additional CancellationToken argument.")]
- private static IAsyncEnumerable<TSource> WhereAwaitCore<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, int, ValueTask<bool>> predicate)
- {
- if (source == null)
- throw Error.ArgumentNull(nameof(source));
- if (predicate == null)
- throw Error.ArgumentNull(nameof(predicate));
- return Core(source, predicate);
- static async IAsyncEnumerable<TSource> Core(IAsyncEnumerable<TSource> source, Func<TSource, int, ValueTask<bool>> predicate, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default)
- {
- var index = -1;
- await foreach (var element in source.WithCancellation(cancellationToken).ConfigureAwait(false))
- {
- checked
- {
- index++;
- }
- if (await predicate(element, index).ConfigureAwait(false))
- {
- yield return element;
- }
- }
- }
- }
- #if !NO_DEEP_CANCELLATION
- [GenerateAsyncOverload]
- [Obsolete("Use Where. IAsyncEnumerable LINQ is now in System.Linq.AsyncEnumerable, and the WhereAwaitWithCancellation functionality now exists as overloads of Where.")]
- private static IAsyncEnumerable<TSource> WhereAwaitWithCancellationCore<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, int, CancellationToken, ValueTask<bool>> predicate)
- {
- if (source == null)
- throw Error.ArgumentNull(nameof(source));
- if (predicate == null)
- throw Error.ArgumentNull(nameof(predicate));
- return Core(source, predicate);
- static async IAsyncEnumerable<TSource> Core(IAsyncEnumerable<TSource> source, Func<TSource, int, CancellationToken, ValueTask<bool>> predicate, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default)
- {
- var index = -1;
- await foreach (var element in source.WithCancellation(cancellationToken).ConfigureAwait(false))
- {
- checked
- {
- index++;
- }
- if (await predicate(element, index, cancellationToken).ConfigureAwait(false))
- {
- yield return element;
- }
- }
- }
- }
- #endif
- internal sealed class WhereEnumerableAsyncIterator<TSource> : AsyncIterator<TSource>
- {
- private readonly Func<TSource, bool> _predicate;
- private readonly IAsyncEnumerable<TSource> _source;
- private IAsyncEnumerator<TSource>? _enumerator;
- public WhereEnumerableAsyncIterator(IAsyncEnumerable<TSource> source, Func<TSource, bool> predicate)
- {
- _source = source;
- _predicate = predicate;
- }
- public override AsyncIteratorBase<TSource> Clone()
- {
- return new WhereEnumerableAsyncIterator<TSource>(_source, _predicate);
- }
- public override async ValueTask DisposeAsync()
- {
- if (_enumerator != null)
- {
- await _enumerator.DisposeAsync().ConfigureAwait(false);
- _enumerator = null;
- }
- await base.DisposeAsync().ConfigureAwait(false);
- }
- public override IAsyncEnumerable<TResult> Select<TResult>(Func<TSource, TResult> selector)
- {
- return new WhereSelectEnumerableAsyncIterator<TSource, TResult>(_source, _predicate, selector);
- }
- public override IAsyncEnumerable<TSource> Where(Func<TSource, bool> predicate)
- {
- return new WhereEnumerableAsyncIterator<TSource>(_source, CombinePredicates(_predicate, predicate));
- }
- protected override async ValueTask<bool> MoveNextCore()
- {
- switch (_state)
- {
- case AsyncIteratorState.Allocated:
- _enumerator = _source.GetAsyncEnumerator(_cancellationToken);
- _state = AsyncIteratorState.Iterating;
- goto case AsyncIteratorState.Iterating;
- case AsyncIteratorState.Iterating:
- while (await _enumerator!.MoveNextAsync().ConfigureAwait(false))
- {
- var item = _enumerator.Current;
- if (_predicate(item))
- {
- _current = item;
- return true;
- }
- }
- await DisposeAsync().ConfigureAwait(false);
- break;
- }
- return false;
- }
- }
- internal sealed class WhereEnumerableAsyncIteratorWithTask<TSource> : AsyncIterator<TSource>
- {
- private readonly Func<TSource, ValueTask<bool>> _predicate;
- private readonly IAsyncEnumerable<TSource> _source;
- private IAsyncEnumerator<TSource>? _enumerator;
- public WhereEnumerableAsyncIteratorWithTask(IAsyncEnumerable<TSource> source, Func<TSource, ValueTask<bool>> predicate)
- {
- _source = source;
- _predicate = predicate;
- }
- public override AsyncIteratorBase<TSource> Clone()
- {
- return new WhereEnumerableAsyncIteratorWithTask<TSource>(_source, _predicate);
- }
- public override async ValueTask DisposeAsync()
- {
- if (_enumerator != null)
- {
- await _enumerator.DisposeAsync().ConfigureAwait(false);
- _enumerator = null;
- }
- await base.DisposeAsync().ConfigureAwait(false);
- }
- public override IAsyncEnumerable<TSource> Where(Func<TSource, ValueTask<bool>> predicate)
- {
- return new WhereEnumerableAsyncIteratorWithTask<TSource>(_source, CombinePredicates(_predicate, predicate));
- }
- protected override async ValueTask<bool> MoveNextCore()
- {
- switch (_state)
- {
- case AsyncIteratorState.Allocated:
- _enumerator = _source.GetAsyncEnumerator(_cancellationToken);
- _state = AsyncIteratorState.Iterating;
- goto case AsyncIteratorState.Iterating;
- case AsyncIteratorState.Iterating:
- while (await _enumerator!.MoveNextAsync().ConfigureAwait(false))
- {
- var item = _enumerator.Current;
- if (await _predicate(item).ConfigureAwait(false))
- {
- _current = item;
- return true;
- }
- }
- await DisposeAsync().ConfigureAwait(false);
- break;
- }
- return false;
- }
- }
- #if !NO_DEEP_CANCELLATION
- internal sealed class WhereEnumerableAsyncIteratorWithTaskAndCancellation<TSource> : AsyncIterator<TSource>
- {
- private readonly Func<TSource, CancellationToken, ValueTask<bool>> _predicate;
- private readonly IAsyncEnumerable<TSource> _source;
- private IAsyncEnumerator<TSource>? _enumerator;
- public WhereEnumerableAsyncIteratorWithTaskAndCancellation(IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<bool>> predicate)
- {
- _source = source;
- _predicate = predicate;
- }
- public override AsyncIteratorBase<TSource> Clone()
- {
- return new WhereEnumerableAsyncIteratorWithTaskAndCancellation<TSource>(_source, _predicate);
- }
- public override async ValueTask DisposeAsync()
- {
- if (_enumerator != null)
- {
- await _enumerator.DisposeAsync().ConfigureAwait(false);
- _enumerator = null;
- }
- await base.DisposeAsync().ConfigureAwait(false);
- }
- public override IAsyncEnumerable<TSource> Where(Func<TSource, CancellationToken, ValueTask<bool>> predicate)
- {
- return new WhereEnumerableAsyncIteratorWithTaskAndCancellation<TSource>(_source, CombinePredicates(_predicate, predicate));
- }
- protected override async ValueTask<bool> MoveNextCore()
- {
- switch (_state)
- {
- case AsyncIteratorState.Allocated:
- _enumerator = _source.GetAsyncEnumerator(_cancellationToken);
- _state = AsyncIteratorState.Iterating;
- goto case AsyncIteratorState.Iterating;
- case AsyncIteratorState.Iterating:
- while (await _enumerator!.MoveNextAsync().ConfigureAwait(false))
- {
- var item = _enumerator.Current;
- if (await _predicate(item, _cancellationToken).ConfigureAwait(false))
- {
- _current = item;
- return true;
- }
- }
- await DisposeAsync().ConfigureAwait(false);
- break;
- }
- return false;
- }
- }
- #endif
- private sealed class WhereSelectEnumerableAsyncIterator<TSource, TResult> : AsyncIterator<TResult>
- {
- private readonly Func<TSource, bool> _predicate;
- private readonly Func<TSource, TResult> _selector;
- private readonly IAsyncEnumerable<TSource> _source;
- private IAsyncEnumerator<TSource>? _enumerator;
- public WhereSelectEnumerableAsyncIterator(IAsyncEnumerable<TSource> source, Func<TSource, bool> predicate, Func<TSource, TResult> selector)
- {
- _source = source;
- _predicate = predicate;
- _selector = selector;
- }
- public override AsyncIteratorBase<TResult> Clone()
- {
- return new WhereSelectEnumerableAsyncIterator<TSource, TResult>(_source, _predicate, _selector);
- }
- public override async ValueTask DisposeAsync()
- {
- if (_enumerator != null)
- {
- await _enumerator.DisposeAsync().ConfigureAwait(false);
- _enumerator = null;
- }
- await base.DisposeAsync().ConfigureAwait(false);
- }
- public override IAsyncEnumerable<TResult1> Select<TResult1>(Func<TResult, TResult1> selector)
- {
- return new WhereSelectEnumerableAsyncIterator<TSource, TResult1>(_source, _predicate, CombineSelectors(_selector, selector));
- }
- protected override async ValueTask<bool> MoveNextCore()
- {
- switch (_state)
- {
- case AsyncIteratorState.Allocated:
- _enumerator = _source.GetAsyncEnumerator(_cancellationToken);
- _state = AsyncIteratorState.Iterating;
- goto case AsyncIteratorState.Iterating;
- case AsyncIteratorState.Iterating:
- while (await _enumerator!.MoveNextAsync().ConfigureAwait(false))
- {
- var item = _enumerator.Current;
- if (_predicate(item))
- {
- _current = _selector(item);
- return true;
- }
- }
- await DisposeAsync().ConfigureAwait(false);
- break;
- }
- return false;
- }
- }
- }
- }
|