Throttle.cs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Reactive.Concurrency;
  5. using System.Reactive.Disposables;
  6. using System.Threading;
  7. namespace System.Reactive.Linq
  8. {
  9. partial class AsyncObservable
  10. {
  11. public static IAsyncObservable<TSource> Throttle<TSource>(this IAsyncObservable<TSource> source, TimeSpan dueTime)
  12. {
  13. if (source == null)
  14. throw new ArgumentNullException(nameof(source));
  15. if (dueTime < TimeSpan.Zero)
  16. throw new ArgumentOutOfRangeException(nameof(dueTime));
  17. return Create<TSource>(async observer =>
  18. {
  19. var d = new CompositeAsyncDisposable();
  20. var (sink, throttler) = AsyncObserver.Throttle(observer, dueTime);
  21. await d.AddAsync(throttler).ConfigureAwait(false);
  22. var sourceSubscription = await source.SubscribeSafeAsync(sink).ConfigureAwait(false);
  23. await d.AddAsync(sourceSubscription).ConfigureAwait(false);
  24. return d;
  25. });
  26. }
  27. public static IAsyncObservable<TSource> Throttle<TSource>(this IAsyncObservable<TSource> source, TimeSpan dueTime, IAsyncScheduler scheduler)
  28. {
  29. if (source == null)
  30. throw new ArgumentNullException(nameof(source));
  31. if (dueTime < TimeSpan.Zero)
  32. throw new ArgumentOutOfRangeException(nameof(dueTime));
  33. if (scheduler == null)
  34. throw new ArgumentNullException(nameof(scheduler));
  35. return Create<TSource>(async observer =>
  36. {
  37. var d = new CompositeAsyncDisposable();
  38. var (sink, throttler) = AsyncObserver.Throttle(observer, dueTime, scheduler);
  39. await d.AddAsync(throttler).ConfigureAwait(false);
  40. var sourceSubscription = await source.SubscribeSafeAsync(sink).ConfigureAwait(false);
  41. await d.AddAsync(sourceSubscription).ConfigureAwait(false);
  42. return d;
  43. });
  44. }
  45. public static IAsyncObservable<TSource> Throttle<TSource, TThrottle>(this IAsyncObservable<TSource> source, Func<TSource, IAsyncObservable<TThrottle>> throttleSelector)
  46. {
  47. if (source == null)
  48. throw new ArgumentNullException(nameof(source));
  49. if (throttleSelector == null)
  50. throw new ArgumentNullException(nameof(throttleSelector));
  51. return Create<TSource>(async observer =>
  52. {
  53. var d = new CompositeAsyncDisposable();
  54. var (sink, throttler) = AsyncObserver.Throttle(observer, throttleSelector);
  55. await d.AddAsync(throttler).ConfigureAwait(false);
  56. var sourceSubscription = await source.SubscribeSafeAsync(sink).ConfigureAwait(false);
  57. await d.AddAsync(sourceSubscription).ConfigureAwait(false);
  58. return d;
  59. });
  60. }
  61. }
  62. partial class AsyncObserver
  63. {
  64. public static (IAsyncObserver<TSource>, IAsyncDisposable) Throttle<TSource>(IAsyncObserver<TSource> observer, TimeSpan dueTime) => Throttle(observer, dueTime, TaskPoolAsyncScheduler.Default);
  65. public static (IAsyncObserver<TSource>, IAsyncDisposable) Throttle<TSource>(IAsyncObserver<TSource> observer, TimeSpan dueTime, IAsyncScheduler scheduler)
  66. {
  67. if (observer == null)
  68. throw new ArgumentNullException(nameof(observer));
  69. if (dueTime < TimeSpan.Zero)
  70. throw new ArgumentOutOfRangeException(nameof(dueTime));
  71. if (scheduler == null)
  72. throw new ArgumentNullException(nameof(scheduler));
  73. var gate = new AsyncLock();
  74. var timer = new SerialAsyncDisposable();
  75. var hasValue = false;
  76. var value = default(TSource);
  77. var id = 0UL;
  78. return
  79. (
  80. Create<TSource>(
  81. async x =>
  82. {
  83. var myId = default(ulong);
  84. using (await gate.LockAsync().ConfigureAwait(false))
  85. {
  86. hasValue = true;
  87. value = x;
  88. myId = ++id;
  89. }
  90. var d = new SingleAssignmentAsyncDisposable();
  91. await timer.AssignAsync(d).ConfigureAwait(false);
  92. var t = await scheduler.ScheduleAsync(async ct =>
  93. {
  94. if (!ct.IsCancellationRequested)
  95. {
  96. using (await gate.LockAsync().ConfigureAwait(false))
  97. {
  98. if (hasValue && id == myId)
  99. {
  100. await observer.OnNextAsync(value).ConfigureAwait(false);
  101. }
  102. hasValue = false;
  103. }
  104. }
  105. }).ConfigureAwait(false);
  106. await d.AssignAsync(t).ConfigureAwait(false);
  107. },
  108. async ex =>
  109. {
  110. await timer.DisposeAsync().ConfigureAwait(false);
  111. using (await gate.LockAsync().ConfigureAwait(false))
  112. {
  113. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  114. hasValue = false;
  115. id++;
  116. }
  117. },
  118. async () =>
  119. {
  120. await timer.DisposeAsync().ConfigureAwait(false);
  121. using (await gate.LockAsync().ConfigureAwait(false))
  122. {
  123. if (hasValue)
  124. {
  125. await observer.OnNextAsync(value).ConfigureAwait(false);
  126. }
  127. await observer.OnCompletedAsync().ConfigureAwait(false);
  128. hasValue = false;
  129. id++;
  130. }
  131. }
  132. ),
  133. timer
  134. );
  135. }
  136. public static (IAsyncObserver<TSource>, IAsyncDisposable) Throttle<TSource, TThrottle>(IAsyncObserver<TSource> observer, Func<TSource, IAsyncObservable<TThrottle>> throttleSelector)
  137. {
  138. if (observer == null)
  139. throw new ArgumentNullException(nameof(observer));
  140. if (throttleSelector == null)
  141. throw new ArgumentNullException(nameof(throttleSelector));
  142. var gate = new AsyncLock();
  143. var throttler = new SerialAsyncDisposable();
  144. var hasValue = false;
  145. var value = default(TSource);
  146. var id = 0UL;
  147. return
  148. (
  149. Create<TSource>(
  150. async x =>
  151. {
  152. var throttleSource = default(IAsyncObservable<TThrottle>);
  153. try
  154. {
  155. throttleSource = throttleSelector(x); // REVIEW: Do we need an async variant?
  156. }
  157. catch (Exception ex)
  158. {
  159. using (await gate.LockAsync().ConfigureAwait(false))
  160. {
  161. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  162. }
  163. return;
  164. }
  165. var myId = default(ulong);
  166. using (await gate.LockAsync().ConfigureAwait(false))
  167. {
  168. hasValue = true;
  169. value = x;
  170. myId = ++id;
  171. }
  172. var d = new SingleAssignmentAsyncDisposable();
  173. await throttler.AssignAsync(d).ConfigureAwait(false);
  174. var throttleObserver = Create<TThrottle>(
  175. async y =>
  176. {
  177. using (await gate.LockAsync().ConfigureAwait(false))
  178. {
  179. if (hasValue && myId == id)
  180. {
  181. await observer.OnNextAsync(x).ConfigureAwait(false);
  182. }
  183. hasValue = false;
  184. await d.DisposeAsync().ConfigureAwait(false);
  185. }
  186. },
  187. async ex =>
  188. {
  189. using (await gate.LockAsync().ConfigureAwait(false))
  190. {
  191. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  192. }
  193. },
  194. async () =>
  195. {
  196. using (await gate.LockAsync().ConfigureAwait(false))
  197. {
  198. if (hasValue && myId == id)
  199. {
  200. await observer.OnNextAsync(x).ConfigureAwait(false);
  201. }
  202. hasValue = false;
  203. await d.DisposeAsync().ConfigureAwait(false);
  204. }
  205. }
  206. );
  207. var t = await throttleSource.SubscribeSafeAsync(throttleObserver).ConfigureAwait(false);
  208. await d.AssignAsync(t).ConfigureAwait(false);
  209. },
  210. async ex =>
  211. {
  212. await throttler.DisposeAsync().ConfigureAwait(false);
  213. using (await gate.LockAsync().ConfigureAwait(false))
  214. {
  215. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  216. hasValue = false;
  217. id++;
  218. }
  219. },
  220. async () =>
  221. {
  222. await throttler.DisposeAsync().ConfigureAwait(false);
  223. using (await gate.LockAsync().ConfigureAwait(false))
  224. {
  225. if (hasValue)
  226. {
  227. await observer.OnNextAsync(value).ConfigureAwait(false);
  228. }
  229. await observer.OnCompletedAsync().ConfigureAwait(false);
  230. hasValue = false;
  231. id++;
  232. }
  233. }
  234. ),
  235. throttler
  236. );
  237. }
  238. }
  239. }