1
0

Aggregate.cs 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. #if !NO_PERF
  5. using System;
  6. namespace System.Reactive.Linq.ObservableImpl
  7. {
  8. class Aggregate<TSource, TAccumulate, TResult> : Producer<TResult>
  9. {
  10. private readonly IObservable<TSource> _source;
  11. private readonly TAccumulate _seed;
  12. private readonly Func<TAccumulate, TSource, TAccumulate> _accumulator;
  13. private readonly Func<TAccumulate, TResult> _resultSelector;
  14. public Aggregate(IObservable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator, Func<TAccumulate, TResult> resultSelector)
  15. {
  16. _source = source;
  17. _seed = seed;
  18. _accumulator = accumulator;
  19. _resultSelector = resultSelector;
  20. }
  21. protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
  22. {
  23. var sink = new _(this, observer, cancel);
  24. setSink(sink);
  25. return _source.SubscribeSafe(sink);
  26. }
  27. class _ : Sink<TResult>, IObserver<TSource>
  28. {
  29. private readonly Aggregate<TSource, TAccumulate, TResult> _parent;
  30. private TAccumulate _accumulation;
  31. public _(Aggregate<TSource, TAccumulate, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
  32. : base(observer, cancel)
  33. {
  34. _parent = parent;
  35. _accumulation = _parent._seed;
  36. }
  37. public void OnNext(TSource value)
  38. {
  39. try
  40. {
  41. _accumulation = _parent._accumulator(_accumulation, value);
  42. }
  43. catch (Exception exception)
  44. {
  45. base._observer.OnError(exception);
  46. base.Dispose();
  47. }
  48. }
  49. public void OnError(Exception error)
  50. {
  51. base._observer.OnError(error);
  52. base.Dispose();
  53. }
  54. public void OnCompleted()
  55. {
  56. var result = default(TResult);
  57. try
  58. {
  59. result = _parent._resultSelector(_accumulation);
  60. }
  61. catch (Exception exception)
  62. {
  63. base._observer.OnError(exception);
  64. base.Dispose();
  65. return;
  66. }
  67. base._observer.OnNext(result);
  68. base._observer.OnCompleted();
  69. base.Dispose();
  70. }
  71. }
  72. }
  73. class Aggregate<TSource> : Producer<TSource>
  74. {
  75. private readonly IObservable<TSource> _source;
  76. private readonly Func<TSource, TSource, TSource> _accumulator;
  77. public Aggregate(IObservable<TSource> source, Func<TSource, TSource, TSource> accumulator)
  78. {
  79. _source = source;
  80. _accumulator = accumulator;
  81. }
  82. protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
  83. {
  84. var sink = new _(this, observer, cancel);
  85. setSink(sink);
  86. return _source.SubscribeSafe(sink);
  87. }
  88. class _ : Sink<TSource>, IObserver<TSource>
  89. {
  90. private readonly Aggregate<TSource> _parent;
  91. private TSource _accumulation;
  92. private bool _hasAccumulation;
  93. public _(Aggregate<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
  94. : base(observer, cancel)
  95. {
  96. _parent = parent;
  97. _accumulation = default(TSource);
  98. _hasAccumulation = false;
  99. }
  100. public void OnNext(TSource value)
  101. {
  102. if (!_hasAccumulation)
  103. {
  104. _accumulation = value;
  105. _hasAccumulation = true;
  106. }
  107. else
  108. {
  109. try
  110. {
  111. _accumulation = _parent._accumulator(_accumulation, value);
  112. }
  113. catch (Exception exception)
  114. {
  115. base._observer.OnError(exception);
  116. base.Dispose();
  117. }
  118. }
  119. }
  120. public void OnError(Exception error)
  121. {
  122. base._observer.OnError(error);
  123. base.Dispose();
  124. }
  125. public void OnCompleted()
  126. {
  127. if (!_hasAccumulation)
  128. {
  129. base._observer.OnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS));
  130. base.Dispose();
  131. }
  132. else
  133. {
  134. base._observer.OnNext(_accumulation);
  135. base._observer.OnCompleted();
  136. base.Dispose();
  137. }
  138. }
  139. }
  140. }
  141. }
  142. #endif