// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT License. // See the LICENSE file in the project root for more information. using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; namespace System.Linq { public static partial class AsyncEnumerableEx { // REVIEW: Should we convert Task-based overloads to ValueTask? /// /// Invokes an action for each element in the async-enumerable sequence, and propagates all observer messages through the result sequence. /// This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline. /// /// The type of the elements in the source sequence. /// Source sequence. /// Action to invoke for each element in the async-enumerable sequence. /// The source sequence with the side-effecting behavior applied. /// or is null. public static IAsyncEnumerable Do(this IAsyncEnumerable source, Action onNext) { if (source == null) throw Error.ArgumentNull(nameof(source)); if (onNext == null) throw Error.ArgumentNull(nameof(onNext)); return DoCore(source, onNext: onNext, onError: null, onCompleted: null); } /// /// Invokes an action for each element in the async-enumerable sequence and invokes an action upon graceful termination of the async-enumerable sequence. /// This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline. /// /// The type of the elements in the source sequence. /// Source sequence. /// Action to invoke for each element in the async-enumerable sequence. /// Action to invoke upon graceful termination of the async-enumerable sequence. /// The source sequence with the side-effecting behavior applied. /// or or is null. public static IAsyncEnumerable Do(this IAsyncEnumerable source, Action onNext, Action onCompleted) { if (source == null) throw Error.ArgumentNull(nameof(source)); if (onNext == null) throw Error.ArgumentNull(nameof(onNext)); if (onCompleted == null) throw Error.ArgumentNull(nameof(onCompleted)); return DoCore(source, onNext: onNext, onError: null, onCompleted: onCompleted); } /// /// Invokes an action for each element in the async-enumerable sequence and invokes an action upon exceptional termination of the async-enumerable sequence. /// This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline. /// /// The type of the elements in the source sequence. /// Source sequence. /// Action to invoke for each element in the async-enumerable sequence. /// Action to invoke upon exceptional termination of the async-enumerable sequence. /// The source sequence with the side-effecting behavior applied. /// or or is null. public static IAsyncEnumerable Do(this IAsyncEnumerable source, Action onNext, Action onError) { if (source == null) throw Error.ArgumentNull(nameof(source)); if (onNext == null) throw Error.ArgumentNull(nameof(onNext)); if (onError == null) throw Error.ArgumentNull(nameof(onError)); return DoCore(source, onNext: onNext, onError: onError, onCompleted: null); } /// /// Invokes an action for each element in the async-enumerable sequence and invokes an action upon graceful or exceptional termination of the async-enumerable sequence. /// This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline. /// /// The type of the elements in the source sequence. /// Source sequence. /// Action to invoke for each element in the async-enumerable sequence. /// Action to invoke upon exceptional termination of the async-enumerable sequence. /// Action to invoke upon graceful termination of the async-enumerable sequence. /// The source sequence with the side-effecting behavior applied. /// or or or is null. public static IAsyncEnumerable Do(this IAsyncEnumerable source, Action onNext, Action onError, Action onCompleted) { if (source == null) throw Error.ArgumentNull(nameof(source)); if (onNext == null) throw Error.ArgumentNull(nameof(onNext)); if (onError == null) throw Error.ArgumentNull(nameof(onError)); if (onCompleted == null) throw Error.ArgumentNull(nameof(onCompleted)); return DoCore(source, onNext, onError, onCompleted); } /// /// Invokes and awaits an asynchronous action for each element in the async-enumerable sequence, and propagates all observer messages through the result sequence. /// This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline. /// /// The type of the elements in the source sequence. /// Source sequence. /// Action to invoke and await for each element in the async-enumerable sequence. /// The source sequence with the side-effecting behavior applied. /// or is null. public static IAsyncEnumerable Do(this IAsyncEnumerable source, Func onNext) { if (source == null) throw Error.ArgumentNull(nameof(source)); if (onNext == null) throw Error.ArgumentNull(nameof(onNext)); return DoCore(source, onNext: onNext, onError: null, onCompleted: null); } /// /// Invokes and awaits an asynchronous action for each element in the async-enumerable sequence, then invokes and awaits an asynchronous an action upon graceful termination of the async-enumerable sequence. /// This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline. /// /// The type of the elements in the source sequence. /// Source sequence. /// Action to invoke and await for each element in the async-enumerable sequence. /// Action to invoke and await upon graceful termination of the async-enumerable sequence. /// The source sequence with the side-effecting behavior applied. /// or or is null. public static IAsyncEnumerable Do(this IAsyncEnumerable source, Func onNext, Func onCompleted) { if (source == null) throw Error.ArgumentNull(nameof(source)); if (onNext == null) throw Error.ArgumentNull(nameof(onNext)); if (onCompleted == null) throw Error.ArgumentNull(nameof(onCompleted)); return DoCore(source, onNext: onNext, onError: null, onCompleted: onCompleted); } /// /// Invokes and awaits an asynchronous action for each element in the async-enumerable sequence, then invokes and awaits an asynchronous action upon exceptional termination of the async-enumerable sequence. /// This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline. /// /// The type of the elements in the source sequence. /// Source sequence. /// Action to invoke and await for each element in the async-enumerable sequence. /// Action to invoke and await upon exceptional termination of the async-enumerable sequence. /// The source sequence with the side-effecting behavior applied. /// or or is null. public static IAsyncEnumerable Do(this IAsyncEnumerable source, Func onNext, Func onError) { if (source == null) throw Error.ArgumentNull(nameof(source)); if (onNext == null) throw Error.ArgumentNull(nameof(onNext)); if (onError == null) throw Error.ArgumentNull(nameof(onError)); return DoCore(source, onNext: onNext, onError: onError, onCompleted: null); } /// /// Invokes and awaits an asynchronous action for each element in the async-enumerable sequence, then invokes and awaits an asynchronous action upon graceful or exceptional termination of the async-enumerable sequence. /// This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline. /// /// The type of the elements in the source sequence. /// Source sequence. /// Action to invoke and await for each element in the async-enumerable sequence. /// Action to invoke and await upon exceptional termination of the async-enumerable sequence. /// Action to invoke and await upon graceful termination of the async-enumerable sequence. /// The source sequence with the side-effecting behavior applied. /// or or or is null. public static IAsyncEnumerable Do(this IAsyncEnumerable source, Func onNext, Func onError, Func onCompleted) { if (source == null) throw Error.ArgumentNull(nameof(source)); if (onNext == null) throw Error.ArgumentNull(nameof(onNext)); if (onError == null) throw Error.ArgumentNull(nameof(onError)); if (onCompleted == null) throw Error.ArgumentNull(nameof(onCompleted)); return DoCore(source, onNext, onError, onCompleted); } #if !NO_DEEP_CANCELLATION /// /// Invokes and awaits an asynchronous (cancellable) action for each element in the async-enumerable sequence, and propagates all observer messages through the result sequence. /// This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline. /// /// The type of the elements in the source sequence. /// Source sequence. /// Action to invoke and await for each element in the async-enumerable sequence while supporting cancellation. /// The source sequence with the side-effecting behavior applied. /// or is null. public static IAsyncEnumerable Do(this IAsyncEnumerable source, Func onNext) { if (source == null) throw Error.ArgumentNull(nameof(source)); if (onNext == null) throw Error.ArgumentNull(nameof(onNext)); return DoCore(source, onNext: onNext, onError: null, onCompleted: null); } /// /// Invokes and awaits an asynchronous (cancellable) action for each element in the async-enumerable sequence, then invokes and awaits an asynchronous (cancellable) an action upon graceful termination of the async-enumerable sequence. /// This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline. /// /// The type of the elements in the source sequence. /// Source sequence. /// Action to invoke and await for each element in the async-enumerable sequence while supporting cancellation. /// Action to invoke and await upon graceful termination of the async-enumerable sequence while supporting cancellation. /// The source sequence with the side-effecting behavior applied. /// or or is null. public static IAsyncEnumerable Do(this IAsyncEnumerable source, Func onNext, Func onCompleted) { if (source == null) throw Error.ArgumentNull(nameof(source)); if (onNext == null) throw Error.ArgumentNull(nameof(onNext)); if (onCompleted == null) throw Error.ArgumentNull(nameof(onCompleted)); return DoCore(source, onNext: onNext, onError: null, onCompleted: onCompleted); } /// /// Invokes and awaits an asynchronous (cancellable) action for each element in the async-enumerable sequence, then invokes and awaits an asynchronous (cancellable) action upon exceptional termination of the async-enumerable sequence. /// This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline. /// /// The type of the elements in the source sequence. /// Source sequence. /// Action to invoke and await for each element in the async-enumerable sequence while supporting cancellation. /// Action to invoke and await upon exceptional termination of the async-enumerable sequence while supporting cancellation. /// The source sequence with the side-effecting behavior applied. /// or or is null. public static IAsyncEnumerable Do(this IAsyncEnumerable source, Func onNext, Func onError) { if (source == null) throw Error.ArgumentNull(nameof(source)); if (onNext == null) throw Error.ArgumentNull(nameof(onNext)); if (onError == null) throw Error.ArgumentNull(nameof(onError)); return DoCore(source, onNext: onNext, onError: onError, onCompleted: null); } /// /// Invokes and awaits an asynchronous (cancellable) action for each element in the async-enumerable sequence, then invokes and awaits an asynchronous (cancellable) action upon graceful or exceptional termination of the async-enumerable sequence. /// This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline. /// /// The type of the elements in the source sequence. /// Source sequence. /// Action to invoke and await for each element in the async-enumerable sequence while supporting cancellation. /// Action to invoke and await upon exceptional termination of the async-enumerable sequence while supporting cancellation. /// Action to invoke and await upon graceful termination of the async-enumerable sequence while supporting cancellation. /// The source sequence with the side-effecting behavior applied. /// or or or is null. public static IAsyncEnumerable Do(this IAsyncEnumerable source, Func onNext, Func onError, Func onCompleted) { if (source == null) throw Error.ArgumentNull(nameof(source)); if (onNext == null) throw Error.ArgumentNull(nameof(onNext)); if (onError == null) throw Error.ArgumentNull(nameof(onError)); if (onCompleted == null) throw Error.ArgumentNull(nameof(onCompleted)); return DoCore(source, onNext, onError, onCompleted); } #endif /// /// Invokes the observer's methods for each message in the source async-enumerable sequence. /// This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline. /// /// The type of the elements in the source sequence. /// Source sequence. /// Observer whose methods to invoke as part of the source sequence's observation. /// The source sequence with the side-effecting behavior applied. /// or is null. public static IAsyncEnumerable Do(this IAsyncEnumerable source, IObserver observer) { if (source == null) throw Error.ArgumentNull(nameof(source)); if (observer == null) throw Error.ArgumentNull(nameof(observer)); return DoCore(source, new Action(observer.OnNext), new Action(observer.OnError), new Action(observer.OnCompleted)); } private static IAsyncEnumerable DoCore(IAsyncEnumerable source, Action onNext, Action? onError, Action? onCompleted) { #if HAS_ASYNC_ENUMERABLE_CANCELLATION return Core(source, onNext, onError, onCompleted); // TODO: Can remove local function. static async IAsyncEnumerable Core(IAsyncEnumerable source, Action onNext, Action? onError, Action? onCompleted, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default) #else return AsyncEnumerable.Create(Core); async IAsyncEnumerator Core(CancellationToken cancellationToken) #endif { await using var e = source.GetConfiguredAsyncEnumerator(cancellationToken, false); while (true) { TSource item; try { if (!await e.MoveNextAsync()) { break; } item = e.Current; onNext(item); } catch (OperationCanceledException) { throw; } catch (Exception ex) when (onError != null) { onError(ex); throw; } yield return item; } onCompleted?.Invoke(); } } private static IAsyncEnumerable DoCore(IAsyncEnumerable source, Func onNext, Func? onError, Func? onCompleted) { #if HAS_ASYNC_ENUMERABLE_CANCELLATION return Core(source, onNext, onError, onCompleted); // TODO: Can remove local function. static async IAsyncEnumerable Core(IAsyncEnumerable source, Func onNext, Func? onError, Func? onCompleted, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default) #else return AsyncEnumerable.Create(Core); async IAsyncEnumerator Core(CancellationToken cancellationToken) #endif { await using var e = source.GetConfiguredAsyncEnumerator(cancellationToken, false); while (true) { TSource item; try { if (!await e.MoveNextAsync()) { break; } item = e.Current; await onNext(item).ConfigureAwait(false); } catch (OperationCanceledException) { throw; } catch (Exception ex) when (onError != null) { await onError(ex).ConfigureAwait(false); throw; } yield return item; } if (onCompleted != null) { await onCompleted().ConfigureAwait(false); } } } #if !NO_DEEP_CANCELLATION private static IAsyncEnumerable DoCore(IAsyncEnumerable source, Func onNext, Func? onError, Func? onCompleted) { #if HAS_ASYNC_ENUMERABLE_CANCELLATION return Core(source, onNext, onError, onCompleted); // TODO: Can remove local function. static async IAsyncEnumerable Core(IAsyncEnumerable source, Func onNext, Func? onError, Func? onCompleted, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default) #else return AsyncEnumerable.Create(Core); async IAsyncEnumerator Core(CancellationToken cancellationToken) #endif { await using var e = source.GetConfiguredAsyncEnumerator(cancellationToken, false); while (true) { TSource item; try { if (!await e.MoveNextAsync()) { break; } item = e.Current; await onNext(item, cancellationToken).ConfigureAwait(false); } catch (OperationCanceledException) { throw; } catch (Exception ex) when (onError != null) { await onError(ex, cancellationToken).ConfigureAwait(false); throw; } yield return item; } if (onCompleted != null) { await onCompleted(cancellationToken).ConfigureAwait(false); } } } #endif } }