// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information. namespace System.Reactive.Linq.ObservableImpl { internal sealed class AverageDouble : Producer { private readonly IObservable _source; public AverageDouble(IObservable source) { _source = source; } protected override _ CreateSink(IObserver observer) => new _(observer); protected override void Run(_ sink) => sink.Run(_source); internal sealed class _ : IdentitySink { private double _sum; private long _count; public _(IObserver observer) : base(observer) { _sum = 0.0; _count = 0L; } public override void OnNext(double value) { try { checked { _sum += value; _count++; } } catch (Exception ex) { ForwardOnError(ex); } } public override void OnCompleted() { if (_count > 0) { ForwardOnNext(_sum / _count); ForwardOnCompleted(); } else { ForwardOnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS)); } } } } internal sealed class AverageSingle : Producer { private readonly IObservable _source; public AverageSingle(IObservable source) { _source = source; } protected override _ CreateSink(IObserver observer) => new _(observer); protected override void Run(_ sink) => sink.Run(_source); internal sealed class _ : IdentitySink { private double _sum; // NOTE: Uses a different accumulator type (double), conform LINQ to Objects. private long _count; public _(IObserver observer) : base(observer) { _sum = 0.0; _count = 0L; } public override void OnNext(float value) { try { checked { _sum += value; _count++; } } catch (Exception ex) { ForwardOnError(ex); } } public override void OnCompleted() { if (_count > 0) { ForwardOnNext((float)(_sum / _count)); ForwardOnCompleted(); } else { ForwardOnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS)); } } } } internal sealed class AverageDecimal : Producer { private readonly IObservable _source; public AverageDecimal(IObservable source) { _source = source; } protected override _ CreateSink(IObserver observer) => new _(observer); protected override void Run(_ sink) => sink.Run(_source); internal sealed class _ : IdentitySink { private decimal _sum; private long _count; public _(IObserver observer) : base(observer) { _sum = 0M; _count = 0L; } public override void OnNext(decimal value) { try { checked { _sum += value; _count++; } } catch (Exception ex) { ForwardOnError(ex); } } public override void OnCompleted() { if (_count > 0) { ForwardOnNext(_sum / _count); ForwardOnCompleted(); } else { ForwardOnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS)); } } } } internal sealed class AverageInt32 : Producer { private readonly IObservable _source; public AverageInt32(IObservable source) { _source = source; } protected override _ CreateSink(IObserver observer) => new _(observer); protected override void Run(_ sink) => sink.Run(_source); internal sealed class _ : Sink { private long _sum; private long _count; public _(IObserver observer) : base(observer) { _sum = 0L; _count = 0L; } public override void OnNext(int value) { try { checked { _sum += value; _count++; } } catch (Exception ex) { ForwardOnError(ex); } } public override void OnCompleted() { if (_count > 0) { ForwardOnNext((double)_sum / _count); ForwardOnCompleted(); } else { ForwardOnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS)); } } } } internal sealed class AverageInt64 : Producer { private readonly IObservable _source; public AverageInt64(IObservable source) { _source = source; } protected override _ CreateSink(IObserver observer) => new _(observer); protected override void Run(_ sink) => sink.Run(_source); internal sealed class _ : Sink { private long _sum; private long _count; public _(IObserver observer) : base(observer) { _sum = 0L; _count = 0L; } public override void OnNext(long value) { try { checked { _sum += value; _count++; } } catch (Exception ex) { ForwardOnError(ex); } } public override void OnCompleted() { if (_count > 0) { ForwardOnNext((double)_sum / _count); ForwardOnCompleted(); } else { ForwardOnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS)); } } } } internal sealed class AverageDoubleNullable : Producer { private readonly IObservable _source; public AverageDoubleNullable(IObservable source) { _source = source; } protected override _ CreateSink(IObserver observer) => new _(observer); protected override void Run(_ sink) => sink.Run(_source); internal sealed class _ : IdentitySink { private double _sum; private long _count; public _(IObserver observer) : base(observer) { _sum = 0.0; _count = 0L; } public override void OnNext(double? value) { try { checked { if (value != null) { _sum += value.Value; _count++; } } } catch (Exception ex) { ForwardOnError(ex); } } public override void OnCompleted() { if (_count > 0) { ForwardOnNext(_sum / _count); } else { ForwardOnNext(null); } ForwardOnCompleted(); } } } internal sealed class AverageSingleNullable : Producer { private readonly IObservable _source; public AverageSingleNullable(IObservable source) { _source = source; } protected override _ CreateSink(IObserver observer) => new _(observer); protected override void Run(_ sink) => sink.Run(_source); internal sealed class _ : IdentitySink { private double _sum; // NOTE: Uses a different accumulator type (double), conform LINQ to Objects. private long _count; public _(IObserver observer) : base(observer) { _sum = 0.0; _count = 0L; } public override void OnNext(float? value) { try { checked { if (value != null) { _sum += value.Value; _count++; } } } catch (Exception ex) { ForwardOnError(ex); } } public override void OnCompleted() { if (_count > 0) { ForwardOnNext((float)(_sum / _count)); } else { ForwardOnNext(null); } ForwardOnCompleted(); } } } internal sealed class AverageDecimalNullable : Producer { private readonly IObservable _source; public AverageDecimalNullable(IObservable source) { _source = source; } protected override _ CreateSink(IObserver observer) => new _(observer); protected override void Run(_ sink) => sink.Run(_source); internal sealed class _ : IdentitySink { private decimal _sum; private long _count; public _(IObserver observer) : base(observer) { _sum = 0M; _count = 0L; } public override void OnNext(decimal? value) { try { checked { if (value != null) { _sum += value.Value; _count++; } } } catch (Exception ex) { ForwardOnError(ex); } } public override void OnCompleted() { if (_count > 0) { ForwardOnNext(_sum / _count); } else { ForwardOnNext(null); } ForwardOnCompleted(); } } } internal sealed class AverageInt32Nullable : Producer { private readonly IObservable _source; public AverageInt32Nullable(IObservable source) { _source = source; } protected override _ CreateSink(IObserver observer) => new _(observer); protected override void Run(_ sink) => sink.Run(_source); internal sealed class _ : Sink { private long _sum; private long _count; public _(IObserver observer) : base(observer) { _sum = 0L; _count = 0L; } public override void OnNext(int? value) { try { checked { if (value != null) { _sum += value.Value; _count++; } } } catch (Exception ex) { ForwardOnError(ex); } } public override void OnCompleted() { if (_count > 0) { ForwardOnNext((double)_sum / _count); } else { ForwardOnNext(null); } ForwardOnCompleted(); } } } internal sealed class AverageInt64Nullable : Producer { private readonly IObservable _source; public AverageInt64Nullable(IObservable source) { _source = source; } protected override _ CreateSink(IObserver observer) => new _(observer); protected override void Run(_ sink) => sink.Run(_source); internal sealed class _ : Sink { private long _sum; private long _count; public _(IObserver observer) : base(observer) { _sum = 0L; _count = 0L; } public override void OnNext(long? value) { try { checked { if (value != null) { _sum += value.Value; _count++; } } } catch (Exception ex) { ForwardOnError(ex); } } public override void OnCompleted() { if (_count > 0) { ForwardOnNext((double)_sum / _count); } else { ForwardOnNext(null); } ForwardOnCompleted(); } } } }