Throttle.cs 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Reactive.Concurrency;
  5. using System.Reactive.Disposables;
  6. namespace System.Reactive.Linq.ObservableImpl
  7. {
  8. internal sealed class Throttle<TSource> : Producer<TSource, Throttle<TSource>._>
  9. {
  10. private readonly IObservable<TSource> _source;
  11. private readonly TimeSpan _dueTime;
  12. private readonly IScheduler _scheduler;
  13. public Throttle(IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler)
  14. {
  15. _source = source;
  16. _dueTime = dueTime;
  17. _scheduler = scheduler;
  18. }
  19. protected override _ CreateSink(IObserver<TSource> observer) => new _(this, observer);
  20. protected override void Run(_ sink) => sink.Run(_source);
  21. internal sealed class _ : IdentitySink<TSource>
  22. {
  23. private readonly TimeSpan _dueTime;
  24. private readonly IScheduler _scheduler;
  25. public _(Throttle<TSource> parent, IObserver<TSource> observer)
  26. : base(observer)
  27. {
  28. _dueTime = parent._dueTime;
  29. _scheduler = parent._scheduler;
  30. }
  31. private object _gate;
  32. private TSource _value;
  33. private bool _hasValue;
  34. private IDisposable _serialCancelable;
  35. private ulong _id;
  36. public override void Run(IObservable<TSource> source)
  37. {
  38. _gate = new object();
  39. _value = default(TSource);
  40. _hasValue = false;
  41. _id = 0UL;
  42. base.Run(source);
  43. }
  44. protected override void Dispose(bool disposing)
  45. {
  46. if (disposing)
  47. {
  48. Disposable.TryDispose(ref _serialCancelable);
  49. }
  50. base.Dispose(disposing);
  51. }
  52. public override void OnNext(TSource value)
  53. {
  54. var currentid = default(ulong);
  55. lock (_gate)
  56. {
  57. _hasValue = true;
  58. _value = value;
  59. _id = unchecked(_id + 1);
  60. currentid = _id;
  61. }
  62. var d = new SingleAssignmentDisposable();
  63. Disposable.TrySetSerial(ref _serialCancelable, d);
  64. d.Disposable = _scheduler.Schedule(currentid, _dueTime, Propagate);
  65. }
  66. private IDisposable Propagate(IScheduler self, ulong currentid)
  67. {
  68. lock (_gate)
  69. {
  70. if (_hasValue && _id == currentid)
  71. ForwardOnNext(_value);
  72. _hasValue = false;
  73. }
  74. return Disposable.Empty;
  75. }
  76. public override void OnError(Exception error)
  77. {
  78. Disposable.TryDispose(ref _serialCancelable);
  79. lock (_gate)
  80. {
  81. ForwardOnError(error);
  82. _hasValue = false;
  83. _id = unchecked(_id + 1);
  84. }
  85. }
  86. public override void OnCompleted()
  87. {
  88. Disposable.TryDispose(ref _serialCancelable);
  89. lock (_gate)
  90. {
  91. if (_hasValue)
  92. ForwardOnNext(_value);
  93. ForwardOnCompleted();
  94. _hasValue = false;
  95. _id = unchecked(_id + 1);
  96. }
  97. }
  98. }
  99. }
  100. internal sealed class Throttle<TSource, TThrottle> : Producer<TSource, Throttle<TSource, TThrottle>._>
  101. {
  102. private readonly IObservable<TSource> _source;
  103. private readonly Func<TSource, IObservable<TThrottle>> _throttleSelector;
  104. public Throttle(IObservable<TSource> source, Func<TSource, IObservable<TThrottle>> throttleSelector)
  105. {
  106. _source = source;
  107. _throttleSelector = throttleSelector;
  108. }
  109. protected override _ CreateSink(IObserver<TSource> observer) => new _(this, observer);
  110. protected override void Run(_ sink) => sink.Run(this);
  111. internal sealed class _ : IdentitySink<TSource>
  112. {
  113. private readonly Func<TSource, IObservable<TThrottle>> _throttleSelector;
  114. public _(Throttle<TSource, TThrottle> parent, IObserver<TSource> observer)
  115. : base(observer)
  116. {
  117. _throttleSelector = parent._throttleSelector;
  118. }
  119. private object _gate;
  120. private TSource _value;
  121. private bool _hasValue;
  122. private IDisposable _serialCancelable;
  123. private ulong _id;
  124. public void Run(Throttle<TSource, TThrottle> parent)
  125. {
  126. _gate = new object();
  127. _value = default(TSource);
  128. _hasValue = false;
  129. _id = 0UL;
  130. base.Run(parent._source);
  131. }
  132. protected override void Dispose(bool disposing)
  133. {
  134. if (disposing)
  135. {
  136. Disposable.TryDispose(ref _serialCancelable);
  137. }
  138. base.Dispose(disposing);
  139. }
  140. public override void OnNext(TSource value)
  141. {
  142. var throttle = default(IObservable<TThrottle>);
  143. try
  144. {
  145. throttle = _throttleSelector(value);
  146. }
  147. catch (Exception error)
  148. {
  149. lock (_gate)
  150. {
  151. ForwardOnError(error);
  152. }
  153. return;
  154. }
  155. ulong currentid;
  156. lock (_gate)
  157. {
  158. _hasValue = true;
  159. _value = value;
  160. _id = unchecked(_id + 1);
  161. currentid = _id;
  162. }
  163. var d = new SingleAssignmentDisposable();
  164. Disposable.TrySetSerial(ref _serialCancelable, d);
  165. d.Disposable = throttle.SubscribeSafe(new ThrottleObserver(this, value, currentid, d));
  166. }
  167. public override void OnError(Exception error)
  168. {
  169. Disposable.TryDispose(ref _serialCancelable);
  170. lock (_gate)
  171. {
  172. ForwardOnError(error);
  173. _hasValue = false;
  174. _id = unchecked(_id + 1);
  175. }
  176. }
  177. public override void OnCompleted()
  178. {
  179. Disposable.TryDispose(ref _serialCancelable);
  180. lock (_gate)
  181. {
  182. if (_hasValue)
  183. ForwardOnNext(_value);
  184. ForwardOnCompleted();
  185. _hasValue = false;
  186. _id = unchecked(_id + 1);
  187. }
  188. }
  189. private sealed class ThrottleObserver : IObserver<TThrottle>
  190. {
  191. private readonly _ _parent;
  192. private readonly TSource _value;
  193. private readonly ulong _currentid;
  194. private readonly IDisposable _self;
  195. public ThrottleObserver(_ parent, TSource value, ulong currentid, IDisposable self)
  196. {
  197. _parent = parent;
  198. _value = value;
  199. _currentid = currentid;
  200. _self = self;
  201. }
  202. public void OnNext(TThrottle value)
  203. {
  204. lock (_parent._gate)
  205. {
  206. if (_parent._hasValue && _parent._id == _currentid)
  207. _parent.ForwardOnNext(_value);
  208. _parent._hasValue = false;
  209. _self.Dispose();
  210. }
  211. }
  212. public void OnError(Exception error)
  213. {
  214. lock (_parent._gate)
  215. {
  216. _parent.ForwardOnError(error);
  217. }
  218. }
  219. public void OnCompleted()
  220. {
  221. lock (_parent._gate)
  222. {
  223. if (_parent._hasValue && _parent._id == _currentid)
  224. _parent.ForwardOnNext(_value);
  225. _parent._hasValue = false;
  226. _self.Dispose();
  227. }
  228. }
  229. }
  230. }
  231. }
  232. }