// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT License.
// See the LICENSE file in the project root for more information.
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace System.Linq
{
public static partial class AsyncEnumerableEx
{
// REVIEW: Should we convert Task-based overloads to ValueTask?
///
/// Invokes an action for each element in the async-enumerable sequence, and propagates all observer messages through the result sequence.
/// This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline.
///
/// The type of the elements in the source sequence.
/// Source sequence.
/// Action to invoke for each element in the async-enumerable sequence.
/// The source sequence with the side-effecting behavior applied.
/// or is null.
public static IAsyncEnumerable Do(this IAsyncEnumerable source, Action onNext)
{
if (source == null)
throw Error.ArgumentNull(nameof(source));
if (onNext == null)
throw Error.ArgumentNull(nameof(onNext));
return DoCore(source, onNext: onNext, onError: null, onCompleted: null);
}
///
/// Invokes an action for each element in the async-enumerable sequence and invokes an action upon graceful termination of the async-enumerable sequence.
/// This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline.
///
/// The type of the elements in the source sequence.
/// Source sequence.
/// Action to invoke for each element in the async-enumerable sequence.
/// Action to invoke upon graceful termination of the async-enumerable sequence.
/// The source sequence with the side-effecting behavior applied.
/// or or is null.
public static IAsyncEnumerable Do(this IAsyncEnumerable source, Action onNext, Action onCompleted)
{
if (source == null)
throw Error.ArgumentNull(nameof(source));
if (onNext == null)
throw Error.ArgumentNull(nameof(onNext));
if (onCompleted == null)
throw Error.ArgumentNull(nameof(onCompleted));
return DoCore(source, onNext: onNext, onError: null, onCompleted: onCompleted);
}
///
/// Invokes an action for each element in the async-enumerable sequence and invokes an action upon exceptional termination of the async-enumerable sequence.
/// This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline.
///
/// The type of the elements in the source sequence.
/// Source sequence.
/// Action to invoke for each element in the async-enumerable sequence.
/// Action to invoke upon exceptional termination of the async-enumerable sequence.
/// The source sequence with the side-effecting behavior applied.
/// or or is null.
public static IAsyncEnumerable Do(this IAsyncEnumerable source, Action onNext, Action onError)
{
if (source == null)
throw Error.ArgumentNull(nameof(source));
if (onNext == null)
throw Error.ArgumentNull(nameof(onNext));
if (onError == null)
throw Error.ArgumentNull(nameof(onError));
return DoCore(source, onNext: onNext, onError: onError, onCompleted: null);
}
///
/// Invokes an action for each element in the async-enumerable sequence and invokes an action upon graceful or exceptional termination of the async-enumerable sequence.
/// This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline.
///
/// The type of the elements in the source sequence.
/// Source sequence.
/// Action to invoke for each element in the async-enumerable sequence.
/// Action to invoke upon exceptional termination of the async-enumerable sequence.
/// Action to invoke upon graceful termination of the async-enumerable sequence.
/// The source sequence with the side-effecting behavior applied.
/// or or or is null.
public static IAsyncEnumerable Do(this IAsyncEnumerable source, Action onNext, Action onError, Action onCompleted)
{
if (source == null)
throw Error.ArgumentNull(nameof(source));
if (onNext == null)
throw Error.ArgumentNull(nameof(onNext));
if (onError == null)
throw Error.ArgumentNull(nameof(onError));
if (onCompleted == null)
throw Error.ArgumentNull(nameof(onCompleted));
return DoCore(source, onNext, onError, onCompleted);
}
///
/// Invokes and awaits an asynchronous action for each element in the async-enumerable sequence, and propagates all observer messages through the result sequence.
/// This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline.
///
/// The type of the elements in the source sequence.
/// Source sequence.
/// Action to invoke and await for each element in the async-enumerable sequence.
/// The source sequence with the side-effecting behavior applied.
/// or is null.
public static IAsyncEnumerable Do(this IAsyncEnumerable source, Func onNext)
{
if (source == null)
throw Error.ArgumentNull(nameof(source));
if (onNext == null)
throw Error.ArgumentNull(nameof(onNext));
return DoCore(source, onNext: onNext, onError: null, onCompleted: null);
}
///
/// Invokes and awaits an asynchronous action for each element in the async-enumerable sequence, then invokes and awaits an asynchronous an action upon graceful termination of the async-enumerable sequence.
/// This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline.
///
/// The type of the elements in the source sequence.
/// Source sequence.
/// Action to invoke and await for each element in the async-enumerable sequence.
/// Action to invoke and await upon graceful termination of the async-enumerable sequence.
/// The source sequence with the side-effecting behavior applied.
/// or or is null.
public static IAsyncEnumerable Do(this IAsyncEnumerable source, Func onNext, Func onCompleted)
{
if (source == null)
throw Error.ArgumentNull(nameof(source));
if (onNext == null)
throw Error.ArgumentNull(nameof(onNext));
if (onCompleted == null)
throw Error.ArgumentNull(nameof(onCompleted));
return DoCore(source, onNext: onNext, onError: null, onCompleted: onCompleted);
}
///
/// Invokes and awaits an asynchronous action for each element in the async-enumerable sequence, then invokes and awaits an asynchronous action upon exceptional termination of the async-enumerable sequence.
/// This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline.
///
/// The type of the elements in the source sequence.
/// Source sequence.
/// Action to invoke and await for each element in the async-enumerable sequence.
/// Action to invoke and await upon exceptional termination of the async-enumerable sequence.
/// The source sequence with the side-effecting behavior applied.
/// or or is null.
public static IAsyncEnumerable Do(this IAsyncEnumerable source, Func onNext, Func onError)
{
if (source == null)
throw Error.ArgumentNull(nameof(source));
if (onNext == null)
throw Error.ArgumentNull(nameof(onNext));
if (onError == null)
throw Error.ArgumentNull(nameof(onError));
return DoCore(source, onNext: onNext, onError: onError, onCompleted: null);
}
///
/// Invokes and awaits an asynchronous action for each element in the async-enumerable sequence, then invokes and awaits an asynchronous action upon graceful or exceptional termination of the async-enumerable sequence.
/// This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline.
///
/// The type of the elements in the source sequence.
/// Source sequence.
/// Action to invoke and await for each element in the async-enumerable sequence.
/// Action to invoke and await upon exceptional termination of the async-enumerable sequence.
/// Action to invoke and await upon graceful termination of the async-enumerable sequence.
/// The source sequence with the side-effecting behavior applied.
/// or or or is null.
public static IAsyncEnumerable Do(this IAsyncEnumerable source, Func onNext, Func onError, Func onCompleted)
{
if (source == null)
throw Error.ArgumentNull(nameof(source));
if (onNext == null)
throw Error.ArgumentNull(nameof(onNext));
if (onError == null)
throw Error.ArgumentNull(nameof(onError));
if (onCompleted == null)
throw Error.ArgumentNull(nameof(onCompleted));
return DoCore(source, onNext, onError, onCompleted);
}
#if !NO_DEEP_CANCELLATION
///
/// Invokes and awaits an asynchronous (cancellable) action for each element in the async-enumerable sequence, and propagates all observer messages through the result sequence.
/// This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline.
///
/// The type of the elements in the source sequence.
/// Source sequence.
/// Action to invoke and await for each element in the async-enumerable sequence while supporting cancellation.
/// The source sequence with the side-effecting behavior applied.
/// or is null.
public static IAsyncEnumerable Do(this IAsyncEnumerable source, Func onNext)
{
if (source == null)
throw Error.ArgumentNull(nameof(source));
if (onNext == null)
throw Error.ArgumentNull(nameof(onNext));
return DoCore(source, onNext: onNext, onError: null, onCompleted: null);
}
///
/// Invokes and awaits an asynchronous (cancellable) action for each element in the async-enumerable sequence, then invokes and awaits an asynchronous (cancellable) an action upon graceful termination of the async-enumerable sequence.
/// This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline.
///
/// The type of the elements in the source sequence.
/// Source sequence.
/// Action to invoke and await for each element in the async-enumerable sequence while supporting cancellation.
/// Action to invoke and await upon graceful termination of the async-enumerable sequence while supporting cancellation.
/// The source sequence with the side-effecting behavior applied.
/// or or is null.
public static IAsyncEnumerable Do(this IAsyncEnumerable source, Func onNext, Func onCompleted)
{
if (source == null)
throw Error.ArgumentNull(nameof(source));
if (onNext == null)
throw Error.ArgumentNull(nameof(onNext));
if (onCompleted == null)
throw Error.ArgumentNull(nameof(onCompleted));
return DoCore(source, onNext: onNext, onError: null, onCompleted: onCompleted);
}
///
/// Invokes and awaits an asynchronous (cancellable) action for each element in the async-enumerable sequence, then invokes and awaits an asynchronous (cancellable) action upon exceptional termination of the async-enumerable sequence.
/// This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline.
///
/// The type of the elements in the source sequence.
/// Source sequence.
/// Action to invoke and await for each element in the async-enumerable sequence while supporting cancellation.
/// Action to invoke and await upon exceptional termination of the async-enumerable sequence while supporting cancellation.
/// The source sequence with the side-effecting behavior applied.
/// or or is null.
public static IAsyncEnumerable Do(this IAsyncEnumerable source, Func onNext, Func onError)
{
if (source == null)
throw Error.ArgumentNull(nameof(source));
if (onNext == null)
throw Error.ArgumentNull(nameof(onNext));
if (onError == null)
throw Error.ArgumentNull(nameof(onError));
return DoCore(source, onNext: onNext, onError: onError, onCompleted: null);
}
///
/// Invokes and awaits an asynchronous (cancellable) action for each element in the async-enumerable sequence, then invokes and awaits an asynchronous (cancellable) action upon graceful or exceptional termination of the async-enumerable sequence.
/// This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline.
///
/// The type of the elements in the source sequence.
/// Source sequence.
/// Action to invoke and await for each element in the async-enumerable sequence while supporting cancellation.
/// Action to invoke and await upon exceptional termination of the async-enumerable sequence while supporting cancellation.
/// Action to invoke and await upon graceful termination of the async-enumerable sequence while supporting cancellation.
/// The source sequence with the side-effecting behavior applied.
/// or or or is null.
public static IAsyncEnumerable Do(this IAsyncEnumerable source, Func onNext, Func onError, Func onCompleted)
{
if (source == null)
throw Error.ArgumentNull(nameof(source));
if (onNext == null)
throw Error.ArgumentNull(nameof(onNext));
if (onError == null)
throw Error.ArgumentNull(nameof(onError));
if (onCompleted == null)
throw Error.ArgumentNull(nameof(onCompleted));
return DoCore(source, onNext, onError, onCompleted);
}
#endif
///
/// Invokes the observer's methods for each message in the source async-enumerable sequence.
/// This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline.
///
/// The type of the elements in the source sequence.
/// Source sequence.
/// Observer whose methods to invoke as part of the source sequence's observation.
/// The source sequence with the side-effecting behavior applied.
/// or is null.
public static IAsyncEnumerable Do(this IAsyncEnumerable source, IObserver observer)
{
if (source == null)
throw Error.ArgumentNull(nameof(source));
if (observer == null)
throw Error.ArgumentNull(nameof(observer));
return DoCore(source, new Action(observer.OnNext), new Action(observer.OnError), new Action(observer.OnCompleted));
}
private static async IAsyncEnumerable DoCore(IAsyncEnumerable source, Action onNext, Action? onError, Action? onCompleted, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default)
{
await using var e = source.GetConfiguredAsyncEnumerator(cancellationToken, false);
while (true)
{
TSource item;
try
{
if (!await e.MoveNextAsync())
{
break;
}
item = e.Current;
onNext(item);
}
catch (OperationCanceledException)
{
throw;
}
catch (Exception ex) when (onError != null)
{
onError(ex);
throw;
}
yield return item;
}
onCompleted?.Invoke();
}
private static async IAsyncEnumerable DoCore(IAsyncEnumerable source, Func onNext, Func? onError, Func? onCompleted, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default)
{
await using var e = source.GetConfiguredAsyncEnumerator(cancellationToken, false);
while (true)
{
TSource item;
try
{
if (!await e.MoveNextAsync())
{
break;
}
item = e.Current;
await onNext(item).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
throw;
}
catch (Exception ex) when (onError != null)
{
await onError(ex).ConfigureAwait(false);
throw;
}
yield return item;
}
if (onCompleted != null)
{
await onCompleted().ConfigureAwait(false);
}
}
#if !NO_DEEP_CANCELLATION
private static async IAsyncEnumerable DoCore(IAsyncEnumerable source, Func onNext, Func? onError, Func? onCompleted, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default)
{
await using var e = source.GetConfiguredAsyncEnumerator(cancellationToken, false);
while (true)
{
TSource item;
try
{
if (!await e.MoveNextAsync())
{
break;
}
item = e.Current;
await onNext(item, cancellationToken).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
throw;
}
catch (Exception ex) when (onError != null)
{
await onError(ex, cancellationToken).ConfigureAwait(false);
throw;
}
yield return item;
}
if (onCompleted != null)
{
await onCompleted(cancellationToken).ConfigureAwait(false);
}
}
#endif
}
}