Sample.cs 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. #if !NO_PERF
  3. using System;
  4. using System.Reactive.Concurrency;
  5. using System.Reactive.Disposables;
  6. namespace System.Reactive.Linq.Observαble
  7. {
  8. class Sample<TSource, TSample> : Producer<TSource>
  9. {
  10. private readonly IObservable<TSource> _source;
  11. private readonly IObservable<TSample> _sampler;
  12. public Sample(IObservable<TSource> source, IObservable<TSample> sampler)
  13. {
  14. _source = source;
  15. _sampler = sampler;
  16. }
  17. protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
  18. {
  19. var sink = new _(this, observer, cancel);
  20. setSink(sink);
  21. return sink.Run();
  22. }
  23. class _ : Sink<TSource>, IObserver<TSource>
  24. {
  25. private readonly Sample<TSource, TSample> _parent;
  26. public _(Sample<TSource, TSample> parent, IObserver<TSource> observer, IDisposable cancel)
  27. : base(observer, cancel)
  28. {
  29. _parent = parent;
  30. }
  31. private object _gate;
  32. private IDisposable _sourceSubscription;
  33. private bool _hasValue;
  34. private TSource _value;
  35. private bool _atEnd;
  36. public IDisposable Run()
  37. {
  38. _gate = new object();
  39. var sourceSubscription = new SingleAssignmentDisposable();
  40. _sourceSubscription = sourceSubscription;
  41. sourceSubscription.Disposable = _parent._source.SubscribeSafe(this);
  42. var samplerSubscription = _parent._sampler.SubscribeSafe(new σ(this));
  43. return new CompositeDisposable(_sourceSubscription, samplerSubscription);
  44. }
  45. public void OnNext(TSource value)
  46. {
  47. lock (_gate)
  48. {
  49. _hasValue = true;
  50. _value = value;
  51. }
  52. }
  53. public void OnError(Exception error)
  54. {
  55. lock (_gate)
  56. {
  57. base._observer.OnError(error);
  58. base.Dispose();
  59. }
  60. }
  61. public void OnCompleted()
  62. {
  63. lock (_gate)
  64. {
  65. _atEnd = true;
  66. _sourceSubscription.Dispose();
  67. }
  68. }
  69. class σ : IObserver<TSample>
  70. {
  71. private readonly _ _parent;
  72. public σ(_ parent)
  73. {
  74. _parent = parent;
  75. }
  76. public void OnNext(TSample value)
  77. {
  78. lock (_parent._gate)
  79. {
  80. if (_parent._hasValue)
  81. {
  82. _parent._hasValue = false;
  83. _parent._observer.OnNext(_parent._value);
  84. }
  85. if (_parent._atEnd)
  86. {
  87. _parent._observer.OnCompleted();
  88. _parent.Dispose();
  89. }
  90. }
  91. }
  92. public void OnError(Exception error)
  93. {
  94. // BREAKING CHANGE v2 > v1.x - This error used to be swallowed
  95. lock (_parent._gate)
  96. {
  97. _parent._observer.OnError(error);
  98. _parent.Dispose();
  99. }
  100. }
  101. public void OnCompleted()
  102. {
  103. lock (_parent._gate)
  104. {
  105. if (_parent._hasValue)
  106. {
  107. _parent._hasValue = false;
  108. _parent._observer.OnNext(_parent._value);
  109. }
  110. if (_parent._atEnd)
  111. {
  112. _parent._observer.OnCompleted();
  113. _parent.Dispose();
  114. }
  115. }
  116. }
  117. }
  118. }
  119. }
  120. class Sample<TSource> : Producer<TSource>
  121. {
  122. private readonly IObservable<TSource> _source;
  123. private readonly TimeSpan _interval;
  124. private readonly IScheduler _scheduler;
  125. public Sample(IObservable<TSource> source, TimeSpan interval, IScheduler scheduler)
  126. {
  127. _source = source;
  128. _interval = interval;
  129. _scheduler = scheduler;
  130. }
  131. protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
  132. {
  133. var sink = new _(this, observer, cancel);
  134. setSink(sink);
  135. return sink.Run();
  136. }
  137. class _ : Sink<TSource>, IObserver<TSource>
  138. {
  139. private readonly Sample<TSource> _parent;
  140. public _(Sample<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
  141. : base(observer, cancel)
  142. {
  143. _parent = parent;
  144. }
  145. private object _gate;
  146. private IDisposable _sourceSubscription;
  147. private bool _hasValue;
  148. private TSource _value;
  149. private bool _atEnd;
  150. public IDisposable Run()
  151. {
  152. _gate = new object();
  153. var sourceSubscription = new SingleAssignmentDisposable();
  154. _sourceSubscription = sourceSubscription;
  155. sourceSubscription.Disposable = _parent._source.SubscribeSafe(this);
  156. return new CompositeDisposable(
  157. sourceSubscription,
  158. _parent._scheduler.SchedulePeriodic(_parent._interval, Tick)
  159. );
  160. }
  161. private void Tick()
  162. {
  163. lock (_gate)
  164. {
  165. if (_hasValue)
  166. {
  167. _hasValue = false;
  168. base._observer.OnNext(_value);
  169. }
  170. if (_atEnd)
  171. {
  172. base._observer.OnCompleted();
  173. base.Dispose();
  174. }
  175. }
  176. }
  177. public void OnNext(TSource value)
  178. {
  179. lock (_gate)
  180. {
  181. _hasValue = true;
  182. _value = value;
  183. }
  184. }
  185. public void OnError(Exception error)
  186. {
  187. lock (_gate)
  188. {
  189. base._observer.OnError(error);
  190. base.Dispose();
  191. }
  192. }
  193. public void OnCompleted()
  194. {
  195. lock (_gate)
  196. {
  197. _atEnd = true;
  198. _sourceSubscription.Dispose();
  199. }
  200. }
  201. }
  202. }
  203. }
  204. #endif