// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT License. // See the LICENSE file in the project root for more information. namespace System.Reactive.Linq { partial class AsyncObserver { public static IAsyncObserver AverageInt32(IAsyncObserver observer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); var sum = 0L; var count = 0L; return Create( async x => { try { checked { sum += x; count++; } } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); } }, observer.OnErrorAsync, async () => { if (count > 0) { var res = default(double); try { checked { res = (double)sum / count; } } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); return; } await observer.OnNextAsync(res).ConfigureAwait(false); await observer.OnCompletedAsync().ConfigureAwait(false); } else { await observer.OnErrorAsync(new InvalidOperationException("The sequence is empty.")).ConfigureAwait(false); } } ); } public static IAsyncObserver AverageInt64(IAsyncObserver observer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); var sum = 0L; var count = 0L; return Create( async x => { try { checked { sum += x; count++; } } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); } }, observer.OnErrorAsync, async () => { if (count > 0) { var res = default(double); try { checked { res = (double)sum / count; } } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); return; } await observer.OnNextAsync(res).ConfigureAwait(false); await observer.OnCompletedAsync().ConfigureAwait(false); } else { await observer.OnErrorAsync(new InvalidOperationException("The sequence is empty.")).ConfigureAwait(false); } } ); } public static IAsyncObserver AverageSingle(IAsyncObserver observer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); var sum = 0.0; var count = 0L; return Create( async x => { try { checked { sum += x; count++; } } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); } }, observer.OnErrorAsync, async () => { if (count > 0) { var res = default(float); try { checked { res = (float)(sum / count); } } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); return; } await observer.OnNextAsync(res).ConfigureAwait(false); await observer.OnCompletedAsync().ConfigureAwait(false); } else { await observer.OnErrorAsync(new InvalidOperationException("The sequence is empty.")).ConfigureAwait(false); } } ); } public static IAsyncObserver AverageDouble(IAsyncObserver observer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); var sum = 0.0; var count = 0L; return Create( async x => { try { checked { sum += x; count++; } } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); } }, observer.OnErrorAsync, async () => { if (count > 0) { var res = default(double); try { checked { res = sum / count; } } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); return; } await observer.OnNextAsync(res).ConfigureAwait(false); await observer.OnCompletedAsync().ConfigureAwait(false); } else { await observer.OnErrorAsync(new InvalidOperationException("The sequence is empty.")).ConfigureAwait(false); } } ); } public static IAsyncObserver AverageDecimal(IAsyncObserver observer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); var sum = 0m; var count = 0L; return Create( async x => { try { checked { sum += x; count++; } } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); } }, observer.OnErrorAsync, async () => { if (count > 0) { var res = default(decimal); try { checked { res = sum / count; } } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); return; } await observer.OnNextAsync(res).ConfigureAwait(false); await observer.OnCompletedAsync().ConfigureAwait(false); } else { await observer.OnErrorAsync(new InvalidOperationException("The sequence is empty.")).ConfigureAwait(false); } } ); } public static IAsyncObserver AverageNullableInt32(IAsyncObserver observer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); var sum = 0L; var count = 0L; return Create( async x => { try { if (x.HasValue) { checked { sum += x.GetValueOrDefault(); count++; } } } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); } }, observer.OnErrorAsync, async () => { if (count > 0) { var res = default(double); try { checked { res = (double)sum / count; } } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); return; } await observer.OnNextAsync(res).ConfigureAwait(false); await observer.OnCompletedAsync().ConfigureAwait(false); } else { await observer.OnNextAsync(null).ConfigureAwait(false); await observer.OnCompletedAsync().ConfigureAwait(false); } } ); } public static IAsyncObserver AverageNullableInt64(IAsyncObserver observer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); var sum = 0L; var count = 0L; return Create( async x => { try { if (x.HasValue) { checked { sum += x.GetValueOrDefault(); count++; } } } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); } }, observer.OnErrorAsync, async () => { if (count > 0) { var res = default(double); try { checked { res = (double)sum / count; } } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); return; } await observer.OnNextAsync(res).ConfigureAwait(false); await observer.OnCompletedAsync().ConfigureAwait(false); } else { await observer.OnNextAsync(null).ConfigureAwait(false); await observer.OnCompletedAsync().ConfigureAwait(false); } } ); } public static IAsyncObserver AverageNullableSingle(IAsyncObserver observer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); var sum = 0.0; var count = 0L; return Create( async x => { try { if (x.HasValue) { checked { sum += x.GetValueOrDefault(); count++; } } } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); } }, observer.OnErrorAsync, async () => { if (count > 0) { var res = default(float); try { checked { res = (float)(sum / count); } } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); return; } await observer.OnNextAsync(res).ConfigureAwait(false); await observer.OnCompletedAsync().ConfigureAwait(false); } else { await observer.OnNextAsync(null).ConfigureAwait(false); await observer.OnCompletedAsync().ConfigureAwait(false); } } ); } public static IAsyncObserver AverageNullableDouble(IAsyncObserver observer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); var sum = 0.0; var count = 0L; return Create( async x => { try { if (x.HasValue) { checked { sum += x.GetValueOrDefault(); count++; } } } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); } }, observer.OnErrorAsync, async () => { if (count > 0) { var res = default(double); try { checked { res = sum / count; } } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); return; } await observer.OnNextAsync(res).ConfigureAwait(false); await observer.OnCompletedAsync().ConfigureAwait(false); } else { await observer.OnNextAsync(null).ConfigureAwait(false); await observer.OnCompletedAsync().ConfigureAwait(false); } } ); } public static IAsyncObserver AverageNullableDecimal(IAsyncObserver observer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); var sum = 0m; var count = 0L; return Create( async x => { try { if (x.HasValue) { checked { sum += x.GetValueOrDefault(); count++; } } } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); } }, observer.OnErrorAsync, async () => { if (count > 0) { var res = default(decimal); try { checked { res = sum / count; } } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); return; } await observer.OnNextAsync(res).ConfigureAwait(false); await observer.OnCompletedAsync().ConfigureAwait(false); } else { await observer.OnNextAsync(null).ConfigureAwait(false); await observer.OnCompletedAsync().ConfigureAwait(false); } } ); } } }