123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428 |
- // Licensed to the .NET Foundation under one or more agreements.
- // The .NET Foundation licenses this file to you under the Apache 2.0 License.
- // See the LICENSE file in the project root for more information.
- using System.Collections.Generic;
- using System.Threading;
- using System.Threading.Tasks;
- namespace System.Linq
- {
- public static partial class AsyncEnumerableEx
- {
- // REVIEW: Should we convert Task-based overloads to ValueTask?
- /// <summary>
- /// Invokes an action for each element in the observable sequence, and propagates all observer messages through the result sequence.
- /// This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline.
- /// </summary>
- /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
- /// <param name="source">Source sequence.</param>
- /// <param name="onNext">Action to invoke for each element in the observable sequence.</param>
- /// <returns>The source sequence with the side-effecting behavior applied.</returns>
- /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="onNext"/> is null.</exception>
- public static IAsyncEnumerable<TSource> Do<TSource>(this IAsyncEnumerable<TSource> source, Action<TSource> onNext)
- {
- if (source == null)
- throw Error.ArgumentNull(nameof(source));
- if (onNext == null)
- throw Error.ArgumentNull(nameof(onNext));
- return DoCore(source, onNext: onNext, onError: null, onCompleted: null);
- }
- /// <summary>
- /// Invokes an action for each element in the observable sequence and invokes an action upon graceful termination of the observable sequence.
- /// This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline.
- /// </summary>
- /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
- /// <param name="source">Source sequence.</param>
- /// <param name="onNext">Action to invoke for each element in the observable sequence.</param>
- /// <param name="onCompleted">Action to invoke upon graceful termination of the observable sequence.</param>
- /// <returns>The source sequence with the side-effecting behavior applied.</returns>
- /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="onNext"/> or <paramref name="onCompleted"/> is null.</exception>
- public static IAsyncEnumerable<TSource> Do<TSource>(this IAsyncEnumerable<TSource> source, Action<TSource> onNext, Action onCompleted)
- {
- if (source == null)
- throw Error.ArgumentNull(nameof(source));
- if (onNext == null)
- throw Error.ArgumentNull(nameof(onNext));
- if (onCompleted == null)
- throw Error.ArgumentNull(nameof(onCompleted));
- return DoCore(source, onNext: onNext, onError: null, onCompleted: onCompleted);
- }
- /// <summary>
- /// Invokes an action for each element in the observable sequence and invokes an action upon exceptional termination of the observable sequence.
- /// This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline.
- /// </summary>
- /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
- /// <param name="source">Source sequence.</param>
- /// <param name="onNext">Action to invoke for each element in the observable sequence.</param>
- /// <param name="onError">Action to invoke upon exceptional termination of the observable sequence.</param>
- /// <returns>The source sequence with the side-effecting behavior applied.</returns>
- /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="onNext"/> or <paramref name="onError"/> is null.</exception>
- public static IAsyncEnumerable<TSource> Do<TSource>(this IAsyncEnumerable<TSource> source, Action<TSource> onNext, Action<Exception> onError)
- {
- if (source == null)
- throw Error.ArgumentNull(nameof(source));
- if (onNext == null)
- throw Error.ArgumentNull(nameof(onNext));
- if (onError == null)
- throw Error.ArgumentNull(nameof(onError));
- return DoCore(source, onNext: onNext, onError: onError, onCompleted: null);
- }
- /// <summary>
- /// Invokes an action for each element in the observable sequence and invokes an action upon graceful or exceptional termination of the observable sequence.
- /// This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline.
- /// </summary>
- /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
- /// <param name="source">Source sequence.</param>
- /// <param name="onNext">Action to invoke for each element in the observable sequence.</param>
- /// <param name="onError">Action to invoke upon exceptional termination of the observable sequence.</param>
- /// <param name="onCompleted">Action to invoke upon graceful termination of the observable sequence.</param>
- /// <returns>The source sequence with the side-effecting behavior applied.</returns>
- /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="onNext"/> or <paramref name="onError"/> or <paramref name="onCompleted"/> is null.</exception>
- public static IAsyncEnumerable<TSource> Do<TSource>(this IAsyncEnumerable<TSource> source, Action<TSource> onNext, Action<Exception> onError, Action onCompleted)
- {
- if (source == null)
- throw Error.ArgumentNull(nameof(source));
- if (onNext == null)
- throw Error.ArgumentNull(nameof(onNext));
- if (onError == null)
- throw Error.ArgumentNull(nameof(onError));
- if (onCompleted == null)
- throw Error.ArgumentNull(nameof(onCompleted));
- return DoCore(source, onNext, onError, onCompleted);
- }
- /// <summary>
- /// Invokes and awaits an asynchronous action for each element in the observable sequence, and propagates all observer messages through the result sequence.
- /// This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline.
- /// </summary>
- /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
- /// <param name="source">Source sequence.</param>
- /// <param name="onNext">Action to invoke and await for each element in the observable sequence.</param>
- /// <returns>The source sequence with the side-effecting behavior applied.</returns>
- /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="onNext"/> is null.</exception>
- public static IAsyncEnumerable<TSource> Do<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, Task> onNext)
- {
- if (source == null)
- throw Error.ArgumentNull(nameof(source));
- if (onNext == null)
- throw Error.ArgumentNull(nameof(onNext));
- return DoCore(source, onNext: onNext, onError: null, onCompleted: null);
- }
- /// <summary>
- /// Invokes and awaits an asynchronous action for each element in the observable sequence, then invokes and awaits an asynchronous an action upon graceful termination of the observable sequence.
- /// This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline.
- /// </summary>
- /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
- /// <param name="source">Source sequence.</param>
- /// <param name="onNext">Action to invoke and await for each element in the observable sequence.</param>
- /// <param name="onCompleted">Action to invoke and await upon graceful termination of the observable sequence.</param>
- /// <returns>The source sequence with the side-effecting behavior applied.</returns>
- /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="onNext"/> or <paramref name="onCompleted"/> is null.</exception>
- public static IAsyncEnumerable<TSource> Do<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, Task> onNext, Func<Task> onCompleted)
- {
- if (source == null)
- throw Error.ArgumentNull(nameof(source));
- if (onNext == null)
- throw Error.ArgumentNull(nameof(onNext));
- if (onCompleted == null)
- throw Error.ArgumentNull(nameof(onCompleted));
- return DoCore(source, onNext: onNext, onError: null, onCompleted: onCompleted);
- }
- /// <summary>
- /// Invokes and awaits an asynchronous action for each element in the observable sequence, then invokes and awaits an asynchronous action upon exceptional termination of the observable sequence.
- /// This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline.
- /// </summary>
- /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
- /// <param name="source">Source sequence.</param>
- /// <param name="onNext">Action to invoke and await for each element in the observable sequence.</param>
- /// <param name="onError">Action to invoke and await upon exceptional termination of the observable sequence.</param>
- /// <returns>The source sequence with the side-effecting behavior applied.</returns>
- /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="onNext"/> or <paramref name="onError"/> is null.</exception>
- public static IAsyncEnumerable<TSource> Do<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, Task> onNext, Func<Exception, Task> onError)
- {
- if (source == null)
- throw Error.ArgumentNull(nameof(source));
- if (onNext == null)
- throw Error.ArgumentNull(nameof(onNext));
- if (onError == null)
- throw Error.ArgumentNull(nameof(onError));
- return DoCore(source, onNext: onNext, onError: onError, onCompleted: null);
- }
- /// <summary>
- /// Invokes and awaits an asynchronous action for each element in the observable sequence, then invokes and awaits an asynchronous action upon graceful or exceptional termination of the observable sequence.
- /// This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline.
- /// </summary>
- /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
- /// <param name="source">Source sequence.</param>
- /// <param name="onNext">Action to invoke and await for each element in the observable sequence.</param>
- /// <param name="onError">Action to invoke and await upon exceptional termination of the observable sequence.</param>
- /// <param name="onCompleted">Action to invoke and await upon graceful termination of the observable sequence.</param>
- /// <returns>The source sequence with the side-effecting behavior applied.</returns>
- /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="onNext"/> or <paramref name="onError"/> or <paramref name="onCompleted"/> is null.</exception>
- public static IAsyncEnumerable<TSource> Do<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, Task> onNext, Func<Exception, Task> onError, Func<Task> onCompleted)
- {
- if (source == null)
- throw Error.ArgumentNull(nameof(source));
- if (onNext == null)
- throw Error.ArgumentNull(nameof(onNext));
- if (onError == null)
- throw Error.ArgumentNull(nameof(onError));
- if (onCompleted == null)
- throw Error.ArgumentNull(nameof(onCompleted));
- return DoCore(source, onNext, onError, onCompleted);
- }
- #if !NO_DEEP_CANCELLATION
- /// <summary>
- /// Invokes and awaits an asynchronous (cancellable) action for each element in the observable sequence, and propagates all observer messages through the result sequence.
- /// This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline.
- /// </summary>
- /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
- /// <param name="source">Source sequence.</param>
- /// <param name="onNext">Action to invoke and await for each element in the observable sequence while supporting cancellation.</param>
- /// <returns>The source sequence with the side-effecting behavior applied.</returns>
- /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="onNext"/> is null.</exception>
- public static IAsyncEnumerable<TSource> Do<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, Task> onNext)
- {
- if (source == null)
- throw Error.ArgumentNull(nameof(source));
- if (onNext == null)
- throw Error.ArgumentNull(nameof(onNext));
- return DoCore(source, onNext: onNext, onError: null, onCompleted: null);
- }
- /// <summary>
- /// Invokes and awaits an asynchronous (cancellable) action for each element in the observable sequence, then invokes and awaits an asynchronous (cancellable) an action upon graceful termination of the observable sequence.
- /// This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline.
- /// </summary>
- /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
- /// <param name="source">Source sequence.</param>
- /// <param name="onNext">Action to invoke and await for each element in the observable sequence while supporting cancellation.</param>
- /// <param name="onCompleted">Action to invoke and await upon graceful termination of the observable sequence while supporting cancellation.</param>
- /// <returns>The source sequence with the side-effecting behavior applied.</returns>
- /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="onNext"/> or <paramref name="onCompleted"/> is null.</exception>
- public static IAsyncEnumerable<TSource> Do<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, Task> onNext, Func<CancellationToken, Task> onCompleted)
- {
- if (source == null)
- throw Error.ArgumentNull(nameof(source));
- if (onNext == null)
- throw Error.ArgumentNull(nameof(onNext));
- if (onCompleted == null)
- throw Error.ArgumentNull(nameof(onCompleted));
- return DoCore(source, onNext: onNext, onError: null, onCompleted: onCompleted);
- }
- /// <summary>
- /// Invokes and awaits an asynchronous (cancellable) action for each element in the observable sequence, then invokes and awaits an asynchronous (cancellable) action upon exceptional termination of the observable sequence.
- /// This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline.
- /// </summary>
- /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
- /// <param name="source">Source sequence.</param>
- /// <param name="onNext">Action to invoke and await for each element in the observable sequence while supporting cancellation.</param>
- /// <param name="onError">Action to invoke and await upon exceptional termination of the observable sequence while supporting cancellation.</param>
- /// <returns>The source sequence with the side-effecting behavior applied.</returns>
- /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="onNext"/> or <paramref name="onError"/> is null.</exception>
- public static IAsyncEnumerable<TSource> Do<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, Task> onNext, Func<Exception, CancellationToken, Task> onError)
- {
- if (source == null)
- throw Error.ArgumentNull(nameof(source));
- if (onNext == null)
- throw Error.ArgumentNull(nameof(onNext));
- if (onError == null)
- throw Error.ArgumentNull(nameof(onError));
- return DoCore(source, onNext: onNext, onError: onError, onCompleted: null);
- }
- /// <summary>
- /// Invokes and awaits an asynchronous (cancellable) action for each element in the observable sequence, then invokes and awaits an asynchronous (cancellable) action upon graceful or exceptional termination of the observable sequence.
- /// This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline.
- /// </summary>
- /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
- /// <param name="source">Source sequence.</param>
- /// <param name="onNext">Action to invoke and await for each element in the observable sequence while supporting cancellation.</param>
- /// <param name="onError">Action to invoke and await upon exceptional termination of the observable sequence while supporting cancellation.</param>
- /// <param name="onCompleted">Action to invoke and await upon graceful termination of the observable sequence while supporting cancellation.</param>
- /// <returns>The source sequence with the side-effecting behavior applied.</returns>
- /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="onNext"/> or <paramref name="onError"/> or <paramref name="onCompleted"/> is null.</exception>
- public static IAsyncEnumerable<TSource> Do<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, Task> onNext, Func<Exception, CancellationToken, Task> onError, Func<CancellationToken, Task> onCompleted)
- {
- if (source == null)
- throw Error.ArgumentNull(nameof(source));
- if (onNext == null)
- throw Error.ArgumentNull(nameof(onNext));
- if (onError == null)
- throw Error.ArgumentNull(nameof(onError));
- if (onCompleted == null)
- throw Error.ArgumentNull(nameof(onCompleted));
- return DoCore(source, onNext, onError, onCompleted);
- }
- #endif
- /// <summary>
- /// Invokes the observer's methods for each message in the source async-enumerable sequence.
- /// This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline.
- /// </summary>
- /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
- /// <param name="source">Source sequence.</param>
- /// <param name="observer">Observer whose methods to invoke as part of the source sequence's observation.</param>
- /// <returns>The source sequence with the side-effecting behavior applied.</returns>
- /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="observer"/> is null.</exception>
- public static IAsyncEnumerable<TSource> Do<TSource>(this IAsyncEnumerable<TSource> source, IObserver<TSource> observer)
- {
- if (source == null)
- throw Error.ArgumentNull(nameof(source));
- if (observer == null)
- throw Error.ArgumentNull(nameof(observer));
- return DoCore(source, new Action<TSource>(observer.OnNext), new Action<Exception>(observer.OnError), new Action(observer.OnCompleted));
- }
- private static IAsyncEnumerable<TSource> DoCore<TSource>(IAsyncEnumerable<TSource> source, Action<TSource> onNext, Action<Exception>? onError, Action? onCompleted)
- {
- return AsyncEnumerable.Create(Core);
- async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
- {
- await using var e = source.GetConfiguredAsyncEnumerator(cancellationToken, false);
- while (true)
- {
- TSource item;
- try
- {
- if (!await e.MoveNextAsync())
- {
- break;
- }
- item = e.Current;
- onNext(item);
- }
- catch (OperationCanceledException)
- {
- throw;
- }
- catch (Exception ex) when (onError != null)
- {
- onError(ex);
- throw;
- }
- yield return item;
- }
- onCompleted?.Invoke();
- }
- }
- private static IAsyncEnumerable<TSource> DoCore<TSource>(IAsyncEnumerable<TSource> source, Func<TSource, Task> onNext, Func<Exception, Task>? onError, Func<Task>? onCompleted)
- {
- return AsyncEnumerable.Create(Core);
- async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
- {
- await using var e = source.GetConfiguredAsyncEnumerator(cancellationToken, false);
- while (true)
- {
- TSource item;
- try
- {
- if (!await e.MoveNextAsync())
- {
- break;
- }
- item = e.Current;
- await onNext(item).ConfigureAwait(false);
- }
- catch (OperationCanceledException)
- {
- throw;
- }
- catch (Exception ex) when (onError != null)
- {
- await onError(ex).ConfigureAwait(false);
- throw;
- }
- yield return item;
- }
- if (onCompleted != null)
- {
- await onCompleted().ConfigureAwait(false);
- }
- }
- }
- #if !NO_DEEP_CANCELLATION
- private static IAsyncEnumerable<TSource> DoCore<TSource>(IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, Task> onNext, Func<Exception, CancellationToken, Task>? onError, Func<CancellationToken, Task>? onCompleted)
- {
- return AsyncEnumerable.Create(Core);
- async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
- {
- await using var e = source.GetConfiguredAsyncEnumerator(cancellationToken, false);
- while (true)
- {
- TSource item;
- try
- {
- if (!await e.MoveNextAsync())
- {
- break;
- }
- item = e.Current;
- await onNext(item, cancellationToken).ConfigureAwait(false);
- }
- catch (OperationCanceledException)
- {
- throw;
- }
- catch (Exception ex) when (onError != null)
- {
- await onError(ex, cancellationToken).ConfigureAwait(false);
- throw;
- }
- yield return item;
- }
- if (onCompleted != null)
- {
- await onCompleted(cancellationToken).ConfigureAwait(false);
- }
- }
- }
- #endif
- }
- }
|