Replay.cs 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Reactive.Concurrency;
  5. using System.Reactive.Subjects;
  6. using System.Threading.Tasks;
  7. namespace System.Reactive.Linq
  8. {
  9. // REVIEW: Expose Replay using ConcurrentAsyncAsyncSubject<T> underneath.
  10. public partial class AsyncObservable
  11. {
  12. public static IConnectableAsyncObservable<TSource> Replay<TSource>(this IAsyncObservable<TSource> source)
  13. {
  14. if (source == null)
  15. throw new ArgumentNullException(nameof(source));
  16. return Multicast(source, new SequentialReplayAsyncSubject<TSource>());
  17. }
  18. public static IConnectableAsyncObservable<TSource> Replay<TSource>(this IAsyncObservable<TSource> source, IAsyncScheduler scheduler)
  19. {
  20. if (source == null)
  21. throw new ArgumentNullException(nameof(source));
  22. if (scheduler == null)
  23. throw new ArgumentNullException(nameof(scheduler));
  24. return Multicast(source, new SequentialReplayAsyncSubject<TSource>(scheduler));
  25. }
  26. public static IConnectableAsyncObservable<TSource> Replay<TSource>(this IAsyncObservable<TSource> source, int bufferSize)
  27. {
  28. if (source == null)
  29. throw new ArgumentNullException(nameof(source));
  30. if (bufferSize < 0)
  31. throw new ArgumentOutOfRangeException(nameof(bufferSize));
  32. return Multicast(source, new SequentialReplayAsyncSubject<TSource>(bufferSize));
  33. }
  34. public static IConnectableAsyncObservable<TSource> Replay<TSource>(this IAsyncObservable<TSource> source, int bufferSize, IAsyncScheduler scheduler)
  35. {
  36. if (source == null)
  37. throw new ArgumentNullException(nameof(source));
  38. if (bufferSize < 0)
  39. throw new ArgumentOutOfRangeException(nameof(bufferSize));
  40. if (scheduler == null)
  41. throw new ArgumentNullException(nameof(scheduler));
  42. return Multicast(source, new SequentialReplayAsyncSubject<TSource>(bufferSize, scheduler));
  43. }
  44. public static IConnectableAsyncObservable<TSource> Replay<TSource>(this IAsyncObservable<TSource> source, TimeSpan window)
  45. {
  46. if (source == null)
  47. throw new ArgumentNullException(nameof(source));
  48. if (window < TimeSpan.Zero)
  49. throw new ArgumentOutOfRangeException(nameof(window));
  50. return Multicast(source, new SequentialReplayAsyncSubject<TSource>(window));
  51. }
  52. public static IConnectableAsyncObservable<TSource> Replay<TSource>(this IAsyncObservable<TSource> source, TimeSpan window, IAsyncScheduler scheduler)
  53. {
  54. if (source == null)
  55. throw new ArgumentNullException(nameof(source));
  56. if (window < TimeSpan.Zero)
  57. throw new ArgumentOutOfRangeException(nameof(window));
  58. if (scheduler == null)
  59. throw new ArgumentNullException(nameof(scheduler));
  60. return Multicast(source, new SequentialReplayAsyncSubject<TSource>(window, scheduler));
  61. }
  62. public static IConnectableAsyncObservable<TSource> Replay<TSource>(this IAsyncObservable<TSource> source, int bufferSize, TimeSpan window)
  63. {
  64. if (source == null)
  65. throw new ArgumentNullException(nameof(source));
  66. if (bufferSize < 0)
  67. throw new ArgumentOutOfRangeException(nameof(bufferSize));
  68. if (window < TimeSpan.Zero)
  69. throw new ArgumentOutOfRangeException(nameof(window));
  70. return Multicast(source, new SequentialReplayAsyncSubject<TSource>(bufferSize, window));
  71. }
  72. public static IConnectableAsyncObservable<TSource> Replay<TSource>(this IAsyncObservable<TSource> source, int bufferSize, TimeSpan window, IAsyncScheduler scheduler)
  73. {
  74. if (source == null)
  75. throw new ArgumentNullException(nameof(source));
  76. if (bufferSize < 0)
  77. throw new ArgumentOutOfRangeException(nameof(bufferSize));
  78. if (window < TimeSpan.Zero)
  79. throw new ArgumentOutOfRangeException(nameof(window));
  80. if (scheduler == null)
  81. throw new ArgumentNullException(nameof(scheduler));
  82. return Multicast(source, new SequentialReplayAsyncSubject<TSource>(bufferSize, window, scheduler));
  83. }
  84. public static IAsyncObservable<TResult> Replay<TSource, TResult>(this IAsyncObservable<TSource> source, Func<IAsyncObservable<TSource>, IAsyncObservable<TResult>> selector)
  85. {
  86. if (source == null)
  87. throw new ArgumentNullException(nameof(source));
  88. if (selector == null)
  89. throw new ArgumentNullException(nameof(selector));
  90. return Multicast(source, () => new SequentialReplayAsyncSubject<TSource>(), selector);
  91. }
  92. public static IAsyncObservable<TResult> Replay<TSource, TResult>(this IAsyncObservable<TSource> source, Func<IAsyncObservable<TSource>, IAsyncObservable<TResult>> selector, IAsyncScheduler scheduler)
  93. {
  94. if (source == null)
  95. throw new ArgumentNullException(nameof(source));
  96. if (selector == null)
  97. throw new ArgumentNullException(nameof(selector));
  98. if (scheduler == null)
  99. throw new ArgumentNullException(nameof(scheduler));
  100. return Multicast(source, () => new SequentialReplayAsyncSubject<TSource>(scheduler), selector);
  101. }
  102. public static IAsyncObservable<TResult> Replay<TSource, TResult>(this IAsyncObservable<TSource> source, Func<IAsyncObservable<TSource>, IAsyncObservable<TResult>> selector, int bufferSize)
  103. {
  104. if (source == null)
  105. throw new ArgumentNullException(nameof(source));
  106. if (selector == null)
  107. throw new ArgumentNullException(nameof(selector));
  108. if (bufferSize < 0)
  109. throw new ArgumentOutOfRangeException(nameof(bufferSize));
  110. return Multicast(source, () => new SequentialReplayAsyncSubject<TSource>(bufferSize), selector);
  111. }
  112. public static IAsyncObservable<TResult> Replay<TSource, TResult>(this IAsyncObservable<TSource> source, Func<IAsyncObservable<TSource>, IAsyncObservable<TResult>> selector, int bufferSize, IAsyncScheduler scheduler)
  113. {
  114. if (source == null)
  115. throw new ArgumentNullException(nameof(source));
  116. if (selector == null)
  117. throw new ArgumentNullException(nameof(selector));
  118. if (bufferSize < 0)
  119. throw new ArgumentOutOfRangeException(nameof(bufferSize));
  120. if (scheduler == null)
  121. throw new ArgumentNullException(nameof(scheduler));
  122. return Multicast(source, () => new SequentialReplayAsyncSubject<TSource>(bufferSize, scheduler), selector);
  123. }
  124. public static IAsyncObservable<TResult> Replay<TSource, TResult>(this IAsyncObservable<TSource> source, Func<IAsyncObservable<TSource>, IAsyncObservable<TResult>> selector, TimeSpan window)
  125. {
  126. if (source == null)
  127. throw new ArgumentNullException(nameof(source));
  128. if (selector == null)
  129. throw new ArgumentNullException(nameof(selector));
  130. if (window < TimeSpan.Zero)
  131. throw new ArgumentOutOfRangeException(nameof(window));
  132. return Multicast(source, () => new SequentialReplayAsyncSubject<TSource>(window), selector);
  133. }
  134. public static IAsyncObservable<TResult> Replay<TSource, TResult>(this IAsyncObservable<TSource> source, Func<IAsyncObservable<TSource>, IAsyncObservable<TResult>> selector, TimeSpan window, IAsyncScheduler scheduler)
  135. {
  136. if (source == null)
  137. throw new ArgumentNullException(nameof(source));
  138. if (selector == null)
  139. throw new ArgumentNullException(nameof(selector));
  140. if (window < TimeSpan.Zero)
  141. throw new ArgumentOutOfRangeException(nameof(window));
  142. if (scheduler == null)
  143. throw new ArgumentNullException(nameof(scheduler));
  144. return Multicast(source, () => new SequentialReplayAsyncSubject<TSource>(window, scheduler), selector);
  145. }
  146. public static IAsyncObservable<TResult> Replay<TSource, TResult>(this IAsyncObservable<TSource> source, Func<IAsyncObservable<TSource>, IAsyncObservable<TResult>> selector, int bufferSize, TimeSpan window)
  147. {
  148. if (source == null)
  149. throw new ArgumentNullException(nameof(source));
  150. if (selector == null)
  151. throw new ArgumentNullException(nameof(selector));
  152. if (bufferSize < 0)
  153. throw new ArgumentOutOfRangeException(nameof(bufferSize));
  154. if (window < TimeSpan.Zero)
  155. throw new ArgumentOutOfRangeException(nameof(window));
  156. return Multicast(source, () => new SequentialReplayAsyncSubject<TSource>(bufferSize, window), selector);
  157. }
  158. public static IAsyncObservable<TResult> Replay<TSource, TResult>(this IAsyncObservable<TSource> source, Func<IAsyncObservable<TSource>, IAsyncObservable<TResult>> selector, int bufferSize, TimeSpan window, IAsyncScheduler scheduler)
  159. {
  160. if (source == null)
  161. throw new ArgumentNullException(nameof(source));
  162. if (selector == null)
  163. throw new ArgumentNullException(nameof(selector));
  164. if (bufferSize < 0)
  165. throw new ArgumentOutOfRangeException(nameof(bufferSize));
  166. if (window < TimeSpan.Zero)
  167. throw new ArgumentOutOfRangeException(nameof(window));
  168. if (scheduler == null)
  169. throw new ArgumentNullException(nameof(scheduler));
  170. return Multicast(source, () => new SequentialReplayAsyncSubject<TSource>(bufferSize, window, scheduler), selector);
  171. }
  172. public static IAsyncObservable<TResult> Replay<TSource, TResult>(this IAsyncObservable<TSource> source, Func<IAsyncObservable<TSource>, ValueTask<IAsyncObservable<TResult>>> selector)
  173. {
  174. if (source == null)
  175. throw new ArgumentNullException(nameof(source));
  176. if (selector == null)
  177. throw new ArgumentNullException(nameof(selector));
  178. return Multicast(source, () => new ValueTask<IAsyncSubject<TSource, TSource>>(new SequentialReplayAsyncSubject<TSource>()), selector);
  179. }
  180. public static IAsyncObservable<TResult> Replay<TSource, TResult>(this IAsyncObservable<TSource> source, Func<IAsyncObservable<TSource>, ValueTask<IAsyncObservable<TResult>>> selector, IAsyncScheduler scheduler)
  181. {
  182. if (source == null)
  183. throw new ArgumentNullException(nameof(source));
  184. if (selector == null)
  185. throw new ArgumentNullException(nameof(selector));
  186. if (scheduler == null)
  187. throw new ArgumentNullException(nameof(scheduler));
  188. return Multicast(source, () => new ValueTask<IAsyncSubject<TSource, TSource>>(new SequentialReplayAsyncSubject<TSource>(scheduler)), selector);
  189. }
  190. public static IAsyncObservable<TResult> Replay<TSource, TResult>(this IAsyncObservable<TSource> source, Func<IAsyncObservable<TSource>, ValueTask<IAsyncObservable<TResult>>> selector, int bufferSize)
  191. {
  192. if (source == null)
  193. throw new ArgumentNullException(nameof(source));
  194. if (selector == null)
  195. throw new ArgumentNullException(nameof(selector));
  196. if (bufferSize < 0)
  197. throw new ArgumentOutOfRangeException(nameof(bufferSize));
  198. return Multicast(source, () => new ValueTask<IAsyncSubject<TSource, TSource>>(new SequentialReplayAsyncSubject<TSource>(bufferSize)), selector);
  199. }
  200. public static IAsyncObservable<TResult> Replay<TSource, TResult>(this IAsyncObservable<TSource> source, Func<IAsyncObservable<TSource>, ValueTask<IAsyncObservable<TResult>>> selector, int bufferSize, IAsyncScheduler scheduler)
  201. {
  202. if (source == null)
  203. throw new ArgumentNullException(nameof(source));
  204. if (selector == null)
  205. throw new ArgumentNullException(nameof(selector));
  206. if (bufferSize < 0)
  207. throw new ArgumentOutOfRangeException(nameof(bufferSize));
  208. if (scheduler == null)
  209. throw new ArgumentNullException(nameof(scheduler));
  210. return Multicast(source, () => new ValueTask<IAsyncSubject<TSource, TSource>>(new SequentialReplayAsyncSubject<TSource>(bufferSize, scheduler)), selector);
  211. }
  212. public static IAsyncObservable<TResult> Replay<TSource, TResult>(this IAsyncObservable<TSource> source, Func<IAsyncObservable<TSource>, ValueTask<IAsyncObservable<TResult>>> selector, TimeSpan window)
  213. {
  214. if (source == null)
  215. throw new ArgumentNullException(nameof(source));
  216. if (selector == null)
  217. throw new ArgumentNullException(nameof(selector));
  218. if (window < TimeSpan.Zero)
  219. throw new ArgumentOutOfRangeException(nameof(window));
  220. return Multicast(source, () => new ValueTask<IAsyncSubject<TSource, TSource>>(new SequentialReplayAsyncSubject<TSource>(window)), selector);
  221. }
  222. public static IAsyncObservable<TResult> Replay<TSource, TResult>(this IAsyncObservable<TSource> source, Func<IAsyncObservable<TSource>, ValueTask<IAsyncObservable<TResult>>> selector, TimeSpan window, IAsyncScheduler scheduler)
  223. {
  224. if (source == null)
  225. throw new ArgumentNullException(nameof(source));
  226. if (selector == null)
  227. throw new ArgumentNullException(nameof(selector));
  228. if (window < TimeSpan.Zero)
  229. throw new ArgumentOutOfRangeException(nameof(window));
  230. if (scheduler == null)
  231. throw new ArgumentNullException(nameof(scheduler));
  232. return Multicast(source, () => new ValueTask<IAsyncSubject<TSource, TSource>>(new SequentialReplayAsyncSubject<TSource>(window, scheduler)), selector);
  233. }
  234. public static IAsyncObservable<TResult> Replay<TSource, TResult>(this IAsyncObservable<TSource> source, Func<IAsyncObservable<TSource>, ValueTask<IAsyncObservable<TResult>>> selector, int bufferSize, TimeSpan window)
  235. {
  236. if (source == null)
  237. throw new ArgumentNullException(nameof(source));
  238. if (selector == null)
  239. throw new ArgumentNullException(nameof(selector));
  240. if (bufferSize < 0)
  241. throw new ArgumentOutOfRangeException(nameof(bufferSize));
  242. if (window < TimeSpan.Zero)
  243. throw new ArgumentOutOfRangeException(nameof(window));
  244. return Multicast(source, () => new ValueTask<IAsyncSubject<TSource, TSource>>(new SequentialReplayAsyncSubject<TSource>(bufferSize, window)), selector);
  245. }
  246. public static IAsyncObservable<TResult> Replay<TSource, TResult>(this IAsyncObservable<TSource> source, Func<IAsyncObservable<TSource>, ValueTask<IAsyncObservable<TResult>>> selector, int bufferSize, TimeSpan window, IAsyncScheduler scheduler)
  247. {
  248. if (source == null)
  249. throw new ArgumentNullException(nameof(source));
  250. if (selector == null)
  251. throw new ArgumentNullException(nameof(selector));
  252. if (bufferSize < 0)
  253. throw new ArgumentOutOfRangeException(nameof(bufferSize));
  254. if (window < TimeSpan.Zero)
  255. throw new ArgumentOutOfRangeException(nameof(window));
  256. if (scheduler == null)
  257. throw new ArgumentNullException(nameof(scheduler));
  258. return Multicast(source, () => new ValueTask<IAsyncSubject<TSource, TSource>>(new SequentialReplayAsyncSubject<TSource>(bufferSize, window, scheduler)), selector);
  259. }
  260. }
  261. }