// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT License. // See the LICENSE file in the project root for more information. using System.Threading.Tasks; namespace System.Reactive.Linq { public partial class AsyncObservable { public static IAsyncObservable Do(this IAsyncObservable source, IObserver observer) { if (source == null) throw new ArgumentNullException(nameof(source)); if (observer == null) throw new ArgumentNullException(nameof(observer)); return Create( source, observer, static (source, observer, target) => source.SubscribeSafeAsync(AsyncObserver.Do(target, observer))); } public static IAsyncObservable Do(this IAsyncObservable source, Action onNext) { if (source == null) throw new ArgumentNullException(nameof(source)); if (onNext == null) throw new ArgumentNullException(nameof(onNext)); return Create( source, onNext, static (source, onNext, target) => source.SubscribeSafeAsync(AsyncObserver.Do(target, onNext))); } public static IAsyncObservable Do(this IAsyncObservable source, Action onError) { if (source == null) throw new ArgumentNullException(nameof(source)); if (onError == null) throw new ArgumentNullException(nameof(onError)); return Create( source, onError, static (source, onError, target) => source.SubscribeSafeAsync(AsyncObserver.Do(target, onError))); } public static IAsyncObservable Do(this IAsyncObservable source, Action onCompleted) { if (source == null) throw new ArgumentNullException(nameof(source)); if (onCompleted == null) throw new ArgumentNullException(nameof(onCompleted)); return Create( source, onCompleted, static (source, onCompleted, target) => source.SubscribeSafeAsync(AsyncObserver.Do(target, onCompleted))); } public static IAsyncObservable Do(this IAsyncObservable source, Action onNext, Action onError, Action onCompleted) { if (source == null) throw new ArgumentNullException(nameof(source)); if (onNext == null) throw new ArgumentNullException(nameof(onNext)); if (onError == null) throw new ArgumentNullException(nameof(onError)); if (onCompleted == null) throw new ArgumentNullException(nameof(onCompleted)); return Create( source, (onNext, onError, onCompleted), static (source, state, target) => source.SubscribeSafeAsync(AsyncObserver.Do(target, state.onNext, state.onError, state.onCompleted))); } public static IAsyncObservable Do(this IAsyncObservable source, IAsyncObserver observer) { if (source == null) throw new ArgumentNullException(nameof(source)); if (observer == null) throw new ArgumentNullException(nameof(observer)); return Create( source, observer, static (source, observer, target) => source.SubscribeSafeAsync(AsyncObserver.Do(target, observer))); } public static IAsyncObservable Do(this IAsyncObservable source, Func onNext) { if (source == null) throw new ArgumentNullException(nameof(source)); if (onNext == null) throw new ArgumentNullException(nameof(onNext)); return Create( source, onNext, static (source, onNext, target) => source.SubscribeSafeAsync(AsyncObserver.Do(target, onNext))); } public static IAsyncObservable Do(this IAsyncObservable source, Func onError) { if (source == null) throw new ArgumentNullException(nameof(source)); if (onError == null) throw new ArgumentNullException(nameof(onError)); return Create( source, onError, static (source, onError, target) => source.SubscribeSafeAsync(AsyncObserver.Do(target, onError))); } public static IAsyncObservable Do(this IAsyncObservable source, Func onCompleted) { if (source == null) throw new ArgumentNullException(nameof(source)); if (onCompleted == null) throw new ArgumentNullException(nameof(onCompleted)); return Create( source, onCompleted, static (source, onCompleted, target) => source.SubscribeSafeAsync(AsyncObserver.Do(target, onCompleted))); } public static IAsyncObservable Do(this IAsyncObservable source, Func onNext, Func onError, Func onCompleted) { if (source == null) throw new ArgumentNullException(nameof(source)); if (onNext == null) throw new ArgumentNullException(nameof(onNext)); if (onError == null) throw new ArgumentNullException(nameof(onError)); if (onCompleted == null) throw new ArgumentNullException(nameof(onCompleted)); return Create( source, (onNext, onError, onCompleted), static (source, state, target) => source.SubscribeSafeAsync(AsyncObserver.Do(target, state.onNext, state.onError, state.onCompleted))); } } public partial class AsyncObserver { public static IAsyncObserver Do(IAsyncObserver observer, IAsyncObserver witness) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (witness == null) throw new ArgumentNullException(nameof(witness)); return Create( async x => { try { await witness.OnNextAsync(x).ConfigureAwait(false); } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); return; } await observer.OnNextAsync(x).ConfigureAwait(false); }, async error => { try { await witness.OnErrorAsync(error).ConfigureAwait(false); } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); return; } await observer.OnErrorAsync(error).ConfigureAwait(false); }, async () => { try { await witness.OnCompletedAsync().ConfigureAwait(false); } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); return; } await observer.OnCompletedAsync().ConfigureAwait(false); } ); } public static IAsyncObserver Do(IAsyncObserver observer, Func onNext) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (onNext == null) throw new ArgumentNullException(nameof(onNext)); return Do(observer, Create(onNext)); } public static IAsyncObserver Do(IAsyncObserver observer, Func onError) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (onError == null) throw new ArgumentNullException(nameof(onError)); return Do(observer, Create(_ => default, onError, () => default)); } public static IAsyncObserver Do(IAsyncObserver observer, Func onCompleted) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (onCompleted == null) throw new ArgumentNullException(nameof(onCompleted)); return Do(observer, Create(_ => default, _ => default, onCompleted)); } public static IAsyncObserver Do(IAsyncObserver observer, Func onNext, Func onError, Func onCompleted) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (onNext == null) throw new ArgumentNullException(nameof(onNext)); if (onError == null) throw new ArgumentNullException(nameof(onError)); if (onCompleted == null) throw new ArgumentNullException(nameof(onCompleted)); return Do(observer, Create(onNext, onError, onCompleted)); } public static IAsyncObserver Do(IAsyncObserver observer, IObserver witness) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (witness == null) throw new ArgumentNullException(nameof(witness)); return Create( async x => { try { witness.OnNext(x); } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); return; } await observer.OnNextAsync(x).ConfigureAwait(false); }, async error => { try { witness.OnError(error); } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); return; } await observer.OnErrorAsync(error).ConfigureAwait(false); }, async () => { try { witness.OnCompleted(); } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); return; } await observer.OnCompletedAsync().ConfigureAwait(false); } ); } public static IAsyncObserver Do(IAsyncObserver observer, Action onNext) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (onNext == null) throw new ArgumentNullException(nameof(onNext)); return Do(observer, x => { onNext(x); return default; }); } public static IAsyncObserver Do(IAsyncObserver observer, Action onError) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (onError == null) throw new ArgumentNullException(nameof(onError)); return Do(observer, Create(_ => default, ex => { onError(ex); return default; }, () => default)); } public static IAsyncObserver Do(IAsyncObserver observer, Action onCompleted) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (onCompleted == null) throw new ArgumentNullException(nameof(onCompleted)); return Do(observer, Create(_ => default, _ => default, () => { onCompleted(); return default; })); } public static IAsyncObserver Do(IAsyncObserver observer, Action onNext, Action onError, Action onCompleted) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (onNext == null) throw new ArgumentNullException(nameof(onNext)); if (onError == null) throw new ArgumentNullException(nameof(onError)); if (onCompleted == null) throw new ArgumentNullException(nameof(onCompleted)); return Do(observer, x => { onNext(x); return default; }, ex => { onError(ex); return default; }, () => { onCompleted(); return default; }); } } }