Sample.cs 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Reactive.Concurrency;
  5. using System.Reactive.Disposables;
  6. namespace System.Reactive.Linq.ObservableImpl
  7. {
  8. internal sealed class Sample<TSource, TSample> : Producer<TSource, Sample<TSource, TSample>._>
  9. {
  10. private readonly IObservable<TSource> _source;
  11. private readonly IObservable<TSample> _sampler;
  12. public Sample(IObservable<TSource> source, IObservable<TSample> sampler)
  13. {
  14. _source = source;
  15. _sampler = sampler;
  16. }
  17. protected override _ CreateSink(IObserver<TSource> observer) => new _(observer);
  18. protected override void Run(_ sink) => sink.Run(this);
  19. internal sealed class _ : IdentitySink<TSource>
  20. {
  21. private readonly object _gate = new object();
  22. public _(IObserver<TSource> observer)
  23. : base(observer)
  24. {
  25. }
  26. private IDisposable _sourceDisposable;
  27. private IDisposable _samplerDisposable;
  28. private bool _hasValue;
  29. private TSource _value;
  30. private bool _sourceAtEnd;
  31. private bool _samplerAtEnd;
  32. public void Run(Sample<TSource, TSample> parent)
  33. {
  34. Disposable.SetSingle(ref _sourceDisposable, parent._source.SubscribeSafe(this));
  35. Disposable.SetSingle(ref _samplerDisposable, parent._sampler.SubscribeSafe(new SampleObserver(this)));
  36. }
  37. protected override void Dispose(bool disposing)
  38. {
  39. if (disposing)
  40. {
  41. Disposable.TryDispose(ref _sourceDisposable);
  42. Disposable.TryDispose(ref _samplerDisposable);
  43. }
  44. base.Dispose(disposing);
  45. }
  46. public override void OnNext(TSource value)
  47. {
  48. lock (_gate)
  49. {
  50. _hasValue = true;
  51. _value = value;
  52. }
  53. }
  54. public override void OnError(Exception error)
  55. {
  56. lock (_gate)
  57. {
  58. ForwardOnError(error);
  59. }
  60. }
  61. public override void OnCompleted()
  62. {
  63. lock (_gate)
  64. {
  65. _sourceAtEnd = true;
  66. if (_samplerAtEnd)
  67. ForwardOnCompleted();
  68. else
  69. Disposable.TryDispose(ref _sourceDisposable);
  70. }
  71. }
  72. private sealed class SampleObserver : IObserver<TSample>
  73. {
  74. private readonly _ _parent;
  75. public SampleObserver(_ parent)
  76. {
  77. _parent = parent;
  78. }
  79. public void OnNext(TSample value)
  80. {
  81. lock (_parent._gate)
  82. {
  83. if (_parent._hasValue)
  84. {
  85. _parent._hasValue = false;
  86. _parent.ForwardOnNext(_parent._value);
  87. }
  88. if (_parent._sourceAtEnd)
  89. {
  90. _parent.ForwardOnCompleted();
  91. }
  92. }
  93. }
  94. public void OnError(Exception error)
  95. {
  96. // BREAKING CHANGE v2 > v1.x - This error used to be swallowed
  97. lock (_parent._gate)
  98. {
  99. _parent.ForwardOnError(error);
  100. }
  101. }
  102. public void OnCompleted()
  103. {
  104. lock (_parent._gate)
  105. {
  106. _parent._samplerAtEnd = true;
  107. if (_parent._hasValue)
  108. {
  109. _parent._hasValue = false;
  110. _parent.ForwardOnNext(_parent._value);
  111. }
  112. if (_parent._sourceAtEnd)
  113. {
  114. _parent.ForwardOnCompleted();
  115. }
  116. else
  117. Disposable.TryDispose(ref _parent._samplerDisposable);
  118. }
  119. }
  120. }
  121. }
  122. }
  123. internal sealed class Sample<TSource> : Producer<TSource, Sample<TSource>._>
  124. {
  125. private readonly IObservable<TSource> _source;
  126. private readonly TimeSpan _interval;
  127. private readonly IScheduler _scheduler;
  128. public Sample(IObservable<TSource> source, TimeSpan interval, IScheduler scheduler)
  129. {
  130. _source = source;
  131. _interval = interval;
  132. _scheduler = scheduler;
  133. }
  134. protected override _ CreateSink(IObserver<TSource> observer) => new _(observer);
  135. protected override void Run(_ sink) => sink.Run(this);
  136. internal sealed class _ : IdentitySink<TSource>
  137. {
  138. private object _gate = new object();
  139. public _(IObserver<TSource> observer)
  140. : base(observer)
  141. {
  142. }
  143. private IDisposable _sourceDisposable;
  144. private bool _hasValue;
  145. private TSource _value;
  146. private bool _atEnd;
  147. public void Run(Sample<TSource> parent)
  148. {
  149. Disposable.SetSingle(ref _sourceDisposable, parent._source.SubscribeSafe(this));
  150. SetUpstream(parent._scheduler.SchedulePeriodic(parent._interval, Tick));
  151. }
  152. protected override void Dispose(bool disposing)
  153. {
  154. if (disposing)
  155. {
  156. Disposable.TryDispose(ref _sourceDisposable);
  157. }
  158. base.Dispose(disposing);
  159. }
  160. private void Tick()
  161. {
  162. lock (_gate)
  163. {
  164. if (_hasValue)
  165. {
  166. _hasValue = false;
  167. ForwardOnNext(_value);
  168. }
  169. if (_atEnd)
  170. {
  171. ForwardOnCompleted();
  172. }
  173. }
  174. }
  175. public override void OnNext(TSource value)
  176. {
  177. lock (_gate)
  178. {
  179. _hasValue = true;
  180. _value = value;
  181. }
  182. }
  183. public override void OnError(Exception error)
  184. {
  185. lock (_gate)
  186. {
  187. ForwardOnError(error);
  188. }
  189. }
  190. public override void OnCompleted()
  191. {
  192. lock (_gate)
  193. {
  194. _atEnd = true;
  195. Disposable.TryDispose(ref _sourceDisposable);
  196. }
  197. }
  198. }
  199. }
  200. }