Sample.cs 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Reactive.Concurrency;
  5. using System.Reactive.Disposables;
  6. namespace System.Reactive.Linq.ObservableImpl
  7. {
  8. internal sealed class Sample<TSource, TSample> : Producer<TSource, Sample<TSource, TSample>._>
  9. {
  10. private readonly IObservable<TSource> _source;
  11. private readonly IObservable<TSample> _sampler;
  12. public Sample(IObservable<TSource> source, IObservable<TSample> sampler)
  13. {
  14. _source = source;
  15. _sampler = sampler;
  16. }
  17. protected override _ CreateSink(IObserver<TSource> observer) => new _(observer);
  18. protected override void Run(_ sink) => sink.Run(this);
  19. internal sealed class _ : IdentitySink<TSource>
  20. {
  21. private readonly object _gate = new object();
  22. public _(IObserver<TSource> observer)
  23. : base(observer)
  24. {
  25. }
  26. private SingleAssignmentDisposableValue _sourceDisposable;
  27. private SingleAssignmentDisposableValue _samplerDisposable;
  28. private bool _hasValue;
  29. private TSource? _value;
  30. private bool _sourceAtEnd;
  31. private bool _samplerAtEnd;
  32. public void Run(Sample<TSource, TSample> parent)
  33. {
  34. _sourceDisposable.Disposable = parent._source.SubscribeSafe(this);
  35. _samplerDisposable.Disposable = parent._sampler.SubscribeSafe(new SampleObserver(this));
  36. }
  37. protected override void Dispose(bool disposing)
  38. {
  39. if (disposing)
  40. {
  41. _sourceDisposable.Dispose();
  42. _samplerDisposable.Dispose();
  43. }
  44. base.Dispose(disposing);
  45. }
  46. public override void OnNext(TSource value)
  47. {
  48. lock (_gate)
  49. {
  50. _hasValue = true;
  51. _value = value;
  52. }
  53. }
  54. public override void OnError(Exception error)
  55. {
  56. lock (_gate)
  57. {
  58. ForwardOnError(error);
  59. }
  60. }
  61. public override void OnCompleted()
  62. {
  63. lock (_gate)
  64. {
  65. _sourceAtEnd = true;
  66. if (_samplerAtEnd)
  67. {
  68. ForwardOnCompleted();
  69. }
  70. else
  71. {
  72. _sourceDisposable.Dispose();
  73. }
  74. }
  75. }
  76. private sealed class SampleObserver : IObserver<TSample>
  77. {
  78. private readonly _ _parent;
  79. public SampleObserver(_ parent)
  80. {
  81. _parent = parent;
  82. }
  83. public void OnNext(TSample value)
  84. {
  85. lock (_parent._gate)
  86. {
  87. if (_parent._hasValue)
  88. {
  89. _parent._hasValue = false;
  90. _parent.ForwardOnNext(_parent._value!);
  91. }
  92. if (_parent._sourceAtEnd)
  93. {
  94. _parent.ForwardOnCompleted();
  95. }
  96. }
  97. }
  98. public void OnError(Exception error)
  99. {
  100. // BREAKING CHANGE v2 > v1.x - This error used to be swallowed
  101. lock (_parent._gate)
  102. {
  103. _parent.ForwardOnError(error);
  104. }
  105. }
  106. public void OnCompleted()
  107. {
  108. lock (_parent._gate)
  109. {
  110. _parent._samplerAtEnd = true;
  111. if (_parent._hasValue)
  112. {
  113. _parent._hasValue = false;
  114. _parent.ForwardOnNext(_parent._value!);
  115. }
  116. if (_parent._sourceAtEnd)
  117. {
  118. _parent.ForwardOnCompleted();
  119. }
  120. else
  121. {
  122. _parent._samplerDisposable.Dispose();
  123. }
  124. }
  125. }
  126. }
  127. }
  128. }
  129. internal sealed class Sample<TSource> : Producer<TSource, Sample<TSource>._>
  130. {
  131. private readonly IObservable<TSource> _source;
  132. private readonly TimeSpan _interval;
  133. private readonly IScheduler _scheduler;
  134. public Sample(IObservable<TSource> source, TimeSpan interval, IScheduler scheduler)
  135. {
  136. _source = source;
  137. _interval = interval;
  138. _scheduler = scheduler;
  139. }
  140. protected override _ CreateSink(IObserver<TSource> observer) => new _(observer);
  141. protected override void Run(_ sink) => sink.Run(this);
  142. internal sealed class _ : IdentitySink<TSource>
  143. {
  144. private readonly object _gate = new object();
  145. public _(IObserver<TSource> observer)
  146. : base(observer)
  147. {
  148. }
  149. private SingleAssignmentDisposableValue _sourceDisposable;
  150. private bool _hasValue;
  151. private TSource? _value;
  152. private bool _atEnd;
  153. public void Run(Sample<TSource> parent)
  154. {
  155. _sourceDisposable.Disposable = parent._source.SubscribeSafe(this);
  156. SetUpstream(parent._scheduler.SchedulePeriodic(parent._interval, Tick));
  157. }
  158. protected override void Dispose(bool disposing)
  159. {
  160. if (disposing)
  161. {
  162. _sourceDisposable.Dispose();
  163. }
  164. base.Dispose(disposing);
  165. }
  166. private void Tick()
  167. {
  168. lock (_gate)
  169. {
  170. if (_hasValue)
  171. {
  172. _hasValue = false;
  173. ForwardOnNext(_value!);
  174. }
  175. if (_atEnd)
  176. {
  177. ForwardOnCompleted();
  178. }
  179. }
  180. }
  181. public override void OnNext(TSource value)
  182. {
  183. lock (_gate)
  184. {
  185. _hasValue = true;
  186. _value = value;
  187. }
  188. }
  189. public override void OnError(Exception error)
  190. {
  191. lock (_gate)
  192. {
  193. ForwardOnError(error);
  194. }
  195. }
  196. public override void OnCompleted()
  197. {
  198. lock (_gate)
  199. {
  200. _atEnd = true;
  201. _sourceDisposable.Dispose();
  202. }
  203. }
  204. }
  205. }
  206. }