// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information. using System.Threading.Tasks; namespace System.Reactive.Linq { partial class AsyncObserver { public static IAsyncObserver SumInt32(IAsyncObserver observer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); var sum = 0; return Create( async x => { try { checked { sum += x; } } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); } }, observer.OnErrorAsync, async () => { await observer.OnNextAsync(sum).ConfigureAwait(false); await observer.OnCompletedAsync().ConfigureAwait(false); } ); } public static IAsyncObserver SumInt64(IAsyncObserver observer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); var sum = 0L; return Create( async x => { try { checked { sum += x; } } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); } }, observer.OnErrorAsync, async () => { await observer.OnNextAsync(sum).ConfigureAwait(false); await observer.OnCompletedAsync().ConfigureAwait(false); } ); } public static IAsyncObserver SumSingle(IAsyncObserver observer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); var sum = 0.0; return Create( x => { sum += x; return Task.CompletedTask; }, observer.OnErrorAsync, async () => { var res = default(float); try { checked { res = (float)sum; } } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); return; } await observer.OnNextAsync(res).ConfigureAwait(false); await observer.OnCompletedAsync().ConfigureAwait(false); } ); } public static IAsyncObserver SumDouble(IAsyncObserver observer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); var sum = 0.0; return Create( x => { sum += x; return Task.CompletedTask; }, observer.OnErrorAsync, async () => { await observer.OnNextAsync(sum).ConfigureAwait(false); await observer.OnCompletedAsync().ConfigureAwait(false); } ); } public static IAsyncObserver SumDecimal(IAsyncObserver observer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); var sum = 0m; return Create( x => { sum += x; return Task.CompletedTask; }, observer.OnErrorAsync, async () => { await observer.OnNextAsync(sum).ConfigureAwait(false); await observer.OnCompletedAsync().ConfigureAwait(false); } ); } public static IAsyncObserver SumNullableInt32(IAsyncObserver observer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); var sum = 0; return Create( async x => { try { checked { if (x != null) { sum += x.GetValueOrDefault(); } } } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); } }, observer.OnErrorAsync, async () => { await observer.OnNextAsync(sum).ConfigureAwait(false); await observer.OnCompletedAsync().ConfigureAwait(false); } ); } public static IAsyncObserver SumNullableInt64(IAsyncObserver observer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); var sum = (long?)0L; return Create( async x => { try { checked { if (x != null) { sum += x.GetValueOrDefault(); } } } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); } }, observer.OnErrorAsync, async () => { await observer.OnNextAsync(sum).ConfigureAwait(false); await observer.OnCompletedAsync().ConfigureAwait(false); } ); } public static IAsyncObserver SumNullableSingle(IAsyncObserver observer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); var sum = 0.0; return Create( x => { if (x != null) { sum += x.GetValueOrDefault(); } return Task.CompletedTask; }, observer.OnErrorAsync, async () => { var res = default(float); try { checked { res = (float)sum; } } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); return; } await observer.OnNextAsync(res).ConfigureAwait(false); await observer.OnCompletedAsync().ConfigureAwait(false); } ); } public static IAsyncObserver SumNullableDouble(IAsyncObserver observer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); var sum = 0.0; return Create( x => { if (x != null) { sum += x.GetValueOrDefault(); } return Task.CompletedTask; }, observer.OnErrorAsync, async () => { await observer.OnNextAsync(sum).ConfigureAwait(false); await observer.OnCompletedAsync().ConfigureAwait(false); } ); } public static IAsyncObserver SumNullableDecimal(IAsyncObserver observer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); var sum = 0m; return Create( x => { if (x != null) { sum += x.GetValueOrDefault(); } return Task.CompletedTask; }, observer.OnErrorAsync, async () => { await observer.OnNextAsync(sum).ConfigureAwait(false); await observer.OnCompletedAsync().ConfigureAwait(false); } ); } } }