GroupByUntil.cs 48 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Concurrent;
  5. using System.Collections.Generic;
  6. using System.Reactive.Disposables;
  7. using System.Reactive.Subjects;
  8. using System.Threading;
  9. using System.Threading.Tasks;
  10. namespace System.Reactive.Linq
  11. {
  12. public partial class AsyncObservable
  13. {
  14. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector)
  15. {
  16. if (source == null)
  17. throw new ArgumentNullException(nameof(source));
  18. if (keySelector == null)
  19. throw new ArgumentNullException(nameof(keySelector));
  20. if (durationSelector == null)
  21. throw new ArgumentNullException(nameof(durationSelector));
  22. return CreateAsyncObservable<IGroupedAsyncObservable<TKey, TSource>>.From(
  23. source,
  24. (keySelector, durationSelector),
  25. static (source, state, observer) => GroupByUntilCore<TSource, TKey, TSource, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.durationSelector)));
  26. }
  27. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
  28. {
  29. if (source == null)
  30. throw new ArgumentNullException(nameof(source));
  31. if (keySelector == null)
  32. throw new ArgumentNullException(nameof(keySelector));
  33. if (durationSelector == null)
  34. throw new ArgumentNullException(nameof(durationSelector));
  35. if (comparer == null)
  36. throw new ArgumentNullException(nameof(comparer));
  37. return CreateAsyncObservable<IGroupedAsyncObservable<TKey, TSource>>.From(
  38. source,
  39. (keySelector, durationSelector, comparer),
  40. static (source, state, observer) => GroupByUntilCore<TSource, TKey, TSource, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.durationSelector, state.comparer)));
  41. }
  42. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector, int capacity)
  43. {
  44. if (source == null)
  45. throw new ArgumentNullException(nameof(source));
  46. if (keySelector == null)
  47. throw new ArgumentNullException(nameof(keySelector));
  48. if (durationSelector == null)
  49. throw new ArgumentNullException(nameof(durationSelector));
  50. if (capacity < 0)
  51. throw new ArgumentOutOfRangeException(nameof(capacity));
  52. return CreateAsyncObservable<IGroupedAsyncObservable<TKey, TSource>>.From(
  53. source,
  54. (keySelector, durationSelector, capacity),
  55. static (source, state, observer) => GroupByUntilCore<TSource, TKey, TSource, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.durationSelector, state.capacity)));
  56. }
  57. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer)
  58. {
  59. if (source == null)
  60. throw new ArgumentNullException(nameof(source));
  61. if (keySelector == null)
  62. throw new ArgumentNullException(nameof(keySelector));
  63. if (durationSelector == null)
  64. throw new ArgumentNullException(nameof(durationSelector));
  65. if (capacity < 0)
  66. throw new ArgumentOutOfRangeException(nameof(capacity));
  67. if (comparer == null)
  68. throw new ArgumentNullException(nameof(comparer));
  69. return CreateAsyncObservable<IGroupedAsyncObservable<TKey, TSource>>.From(
  70. source,
  71. (keySelector, durationSelector, capacity, comparer),
  72. static (source, state, observer) => GroupByUntilCore<TSource, TKey, TSource, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.durationSelector, state.capacity, state.comparer)));
  73. }
  74. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector)
  75. {
  76. if (source == null)
  77. throw new ArgumentNullException(nameof(source));
  78. if (keySelector == null)
  79. throw new ArgumentNullException(nameof(keySelector));
  80. if (elementSelector == null)
  81. throw new ArgumentNullException(nameof(elementSelector));
  82. if (durationSelector == null)
  83. throw new ArgumentNullException(nameof(durationSelector));
  84. return CreateAsyncObservable<IGroupedAsyncObservable<TKey, TElement>>.From(
  85. source,
  86. (keySelector, elementSelector, durationSelector),
  87. static (source, state, observer) => GroupByUntilCore<TSource, TKey, TElement, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.elementSelector, state.durationSelector)));
  88. }
  89. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
  90. {
  91. if (source == null)
  92. throw new ArgumentNullException(nameof(source));
  93. if (keySelector == null)
  94. throw new ArgumentNullException(nameof(keySelector));
  95. if (elementSelector == null)
  96. throw new ArgumentNullException(nameof(elementSelector));
  97. if (durationSelector == null)
  98. throw new ArgumentNullException(nameof(durationSelector));
  99. if (comparer == null)
  100. throw new ArgumentNullException(nameof(comparer));
  101. return CreateAsyncObservable<IGroupedAsyncObservable<TKey, TElement>>.From(
  102. source,
  103. (keySelector, elementSelector, durationSelector, comparer),
  104. static (source, state, observer) => GroupByUntilCore<TSource, TKey, TElement, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.elementSelector, state.durationSelector, state.comparer)));
  105. }
  106. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector, int capacity)
  107. {
  108. if (source == null)
  109. throw new ArgumentNullException(nameof(source));
  110. if (keySelector == null)
  111. throw new ArgumentNullException(nameof(keySelector));
  112. if (elementSelector == null)
  113. throw new ArgumentNullException(nameof(elementSelector));
  114. if (durationSelector == null)
  115. throw new ArgumentNullException(nameof(durationSelector));
  116. if (capacity < 0)
  117. throw new ArgumentOutOfRangeException(nameof(capacity));
  118. return CreateAsyncObservable<IGroupedAsyncObservable<TKey, TElement>>.From(
  119. source,
  120. (keySelector, elementSelector, durationSelector, capacity),
  121. static (source, state, observer) => GroupByUntilCore<TSource, TKey, TElement, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.elementSelector, state.durationSelector, state.capacity)));
  122. }
  123. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer)
  124. {
  125. if (source == null)
  126. throw new ArgumentNullException(nameof(source));
  127. if (keySelector == null)
  128. throw new ArgumentNullException(nameof(keySelector));
  129. if (elementSelector == null)
  130. throw new ArgumentNullException(nameof(elementSelector));
  131. if (durationSelector == null)
  132. throw new ArgumentNullException(nameof(durationSelector));
  133. if (capacity < 0)
  134. throw new ArgumentOutOfRangeException(nameof(capacity));
  135. if (comparer == null)
  136. throw new ArgumentNullException(nameof(comparer));
  137. return CreateAsyncObservable<IGroupedAsyncObservable<TKey, TElement>>.From(
  138. source,
  139. (keySelector, elementSelector, durationSelector, capacity, comparer),
  140. static (source, state, observer) => GroupByUntilCore<TSource, TKey, TElement, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.elementSelector, state.durationSelector, state.capacity, state.comparer)));
  141. }
  142. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector)
  143. {
  144. if (source == null)
  145. throw new ArgumentNullException(nameof(source));
  146. if (keySelector == null)
  147. throw new ArgumentNullException(nameof(keySelector));
  148. if (durationSelector == null)
  149. throw new ArgumentNullException(nameof(durationSelector));
  150. return CreateAsyncObservable<IGroupedAsyncObservable<TKey, TSource>>.From(
  151. source,
  152. (keySelector, durationSelector),
  153. static (source, state, observer) => GroupByUntilCore<TSource, TKey, TSource, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.durationSelector)));
  154. }
  155. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
  156. {
  157. if (source == null)
  158. throw new ArgumentNullException(nameof(source));
  159. if (keySelector == null)
  160. throw new ArgumentNullException(nameof(keySelector));
  161. if (durationSelector == null)
  162. throw new ArgumentNullException(nameof(durationSelector));
  163. if (comparer == null)
  164. throw new ArgumentNullException(nameof(comparer));
  165. return CreateAsyncObservable<IGroupedAsyncObservable<TKey, TSource>>.From(
  166. source,
  167. (keySelector, durationSelector, comparer),
  168. static (source, state, observer) => GroupByUntilCore<TSource, TKey, TSource, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.durationSelector, state.comparer)));
  169. }
  170. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector, int capacity)
  171. {
  172. if (source == null)
  173. throw new ArgumentNullException(nameof(source));
  174. if (keySelector == null)
  175. throw new ArgumentNullException(nameof(keySelector));
  176. if (durationSelector == null)
  177. throw new ArgumentNullException(nameof(durationSelector));
  178. if (capacity < 0)
  179. throw new ArgumentOutOfRangeException(nameof(capacity));
  180. return CreateAsyncObservable<IGroupedAsyncObservable<TKey, TSource>>.From(
  181. source,
  182. (keySelector, durationSelector, capacity),
  183. static (source, state, observer) => GroupByUntilCore<TSource, TKey, TSource, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.durationSelector, state.capacity)));
  184. }
  185. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer)
  186. {
  187. if (source == null)
  188. throw new ArgumentNullException(nameof(source));
  189. if (keySelector == null)
  190. throw new ArgumentNullException(nameof(keySelector));
  191. if (durationSelector == null)
  192. throw new ArgumentNullException(nameof(durationSelector));
  193. if (capacity < 0)
  194. throw new ArgumentOutOfRangeException(nameof(capacity));
  195. if (comparer == null)
  196. throw new ArgumentNullException(nameof(comparer));
  197. return CreateAsyncObservable<IGroupedAsyncObservable<TKey, TSource>>.From(
  198. source,
  199. (keySelector, durationSelector, capacity, comparer),
  200. static (source, state, observer) => GroupByUntilCore<TSource, TKey, TSource, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.durationSelector, state.capacity, state.comparer)));
  201. }
  202. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, Func<TSource, ValueTask<TElement>> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector)
  203. {
  204. if (source == null)
  205. throw new ArgumentNullException(nameof(source));
  206. if (keySelector == null)
  207. throw new ArgumentNullException(nameof(keySelector));
  208. if (elementSelector == null)
  209. throw new ArgumentNullException(nameof(elementSelector));
  210. if (durationSelector == null)
  211. throw new ArgumentNullException(nameof(durationSelector));
  212. return CreateAsyncObservable<IGroupedAsyncObservable<TKey, TElement>>.From(
  213. source,
  214. (keySelector, elementSelector, durationSelector),
  215. static (source, state, observer) => GroupByUntilCore<TSource, TKey, TElement, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.elementSelector, state.durationSelector)));
  216. }
  217. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, Func<TSource, ValueTask<TElement>> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
  218. {
  219. if (source == null)
  220. throw new ArgumentNullException(nameof(source));
  221. if (keySelector == null)
  222. throw new ArgumentNullException(nameof(keySelector));
  223. if (elementSelector == null)
  224. throw new ArgumentNullException(nameof(elementSelector));
  225. if (durationSelector == null)
  226. throw new ArgumentNullException(nameof(durationSelector));
  227. if (comparer == null)
  228. throw new ArgumentNullException(nameof(comparer));
  229. return CreateAsyncObservable<IGroupedAsyncObservable<TKey, TElement>>.From(
  230. source,
  231. (keySelector, elementSelector, durationSelector, comparer),
  232. static (source, state, observer) => GroupByUntilCore<TSource, TKey, TElement, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.elementSelector, state.durationSelector, state.comparer)));
  233. }
  234. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, Func<TSource, ValueTask<TElement>> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector, int capacity)
  235. {
  236. if (source == null)
  237. throw new ArgumentNullException(nameof(source));
  238. if (keySelector == null)
  239. throw new ArgumentNullException(nameof(keySelector));
  240. if (elementSelector == null)
  241. throw new ArgumentNullException(nameof(elementSelector));
  242. if (durationSelector == null)
  243. throw new ArgumentNullException(nameof(durationSelector));
  244. if (capacity < 0)
  245. throw new ArgumentOutOfRangeException(nameof(capacity));
  246. return CreateAsyncObservable<IGroupedAsyncObservable<TKey, TElement>>.From(
  247. source,
  248. (keySelector, elementSelector, durationSelector, capacity),
  249. static (source, state, observer) => GroupByUntilCore<TSource, TKey, TElement, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.elementSelector, state.durationSelector, state.capacity)));
  250. }
  251. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, Func<TSource, ValueTask<TElement>> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer)
  252. {
  253. if (source == null)
  254. throw new ArgumentNullException(nameof(source));
  255. if (keySelector == null)
  256. throw new ArgumentNullException(nameof(keySelector));
  257. if (elementSelector == null)
  258. throw new ArgumentNullException(nameof(elementSelector));
  259. if (durationSelector == null)
  260. throw new ArgumentNullException(nameof(durationSelector));
  261. if (capacity < 0)
  262. throw new ArgumentOutOfRangeException(nameof(capacity));
  263. if (comparer == null)
  264. throw new ArgumentNullException(nameof(comparer));
  265. return CreateAsyncObservable<IGroupedAsyncObservable<TKey, TElement>>.From(
  266. source,
  267. (keySelector, elementSelector, durationSelector, capacity, comparer),
  268. static (source, state, observer) => GroupByUntilCore<TSource, TKey, TElement, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.elementSelector, state.durationSelector, state.capacity, state.comparer)));
  269. }
  270. private static async ValueTask<IAsyncDisposable> GroupByUntilCore<TSource, TKey, TElement, TDuration>(IAsyncObservable<TSource> source, IAsyncObserver<IGroupedAsyncObservable<TKey, TElement>> observer, Func<IAsyncObserver<IGroupedAsyncObservable<TKey, TElement>>, IAsyncDisposable, ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)>> createObserver)
  271. {
  272. var d = new SingleAssignmentAsyncDisposable();
  273. var (sink, subscription) = await createObserver(observer, d).ConfigureAwait(false);
  274. var inner = await source.SubscribeSafeAsync(sink).ConfigureAwait(false);
  275. await d.AssignAsync(inner).ConfigureAwait(false);
  276. return subscription;
  277. }
  278. }
  279. public partial class AsyncObserver
  280. {
  281. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> GroupByUntil<TSource, TKey, TDuration>(IAsyncObserver<IGroupedAsyncObservable<TKey, TSource>> observer, IAsyncDisposable subscription, Func<TSource, TKey> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector)
  282. {
  283. if (observer == null)
  284. throw new ArgumentNullException(nameof(observer));
  285. if (subscription == null)
  286. throw new ArgumentNullException(nameof(subscription));
  287. if (keySelector == null)
  288. throw new ArgumentNullException(nameof(keySelector));
  289. if (durationSelector == null)
  290. throw new ArgumentNullException(nameof(durationSelector));
  291. return GroupByUntil(observer, subscription, keySelector, durationSelector, int.MaxValue, EqualityComparer<TKey>.Default);
  292. }
  293. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> GroupByUntil<TSource, TKey, TDuration>(IAsyncObserver<IGroupedAsyncObservable<TKey, TSource>> observer, IAsyncDisposable subscription, Func<TSource, TKey> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
  294. {
  295. if (observer == null)
  296. throw new ArgumentNullException(nameof(observer));
  297. if (subscription == null)
  298. throw new ArgumentNullException(nameof(subscription));
  299. if (keySelector == null)
  300. throw new ArgumentNullException(nameof(keySelector));
  301. if (durationSelector == null)
  302. throw new ArgumentNullException(nameof(durationSelector));
  303. if (comparer == null)
  304. throw new ArgumentNullException(nameof(comparer));
  305. return GroupByUntil(observer, subscription, keySelector, durationSelector, int.MaxValue, comparer);
  306. }
  307. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> GroupByUntil<TSource, TKey, TDuration>(IAsyncObserver<IGroupedAsyncObservable<TKey, TSource>> observer, IAsyncDisposable subscription, Func<TSource, TKey> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector, int capacity)
  308. {
  309. if (observer == null)
  310. throw new ArgumentNullException(nameof(observer));
  311. if (subscription == null)
  312. throw new ArgumentNullException(nameof(subscription));
  313. if (keySelector == null)
  314. throw new ArgumentNullException(nameof(keySelector));
  315. if (durationSelector == null)
  316. throw new ArgumentNullException(nameof(durationSelector));
  317. if (capacity < 0)
  318. throw new ArgumentOutOfRangeException(nameof(capacity));
  319. return GroupByUntil(observer, subscription, keySelector, durationSelector, capacity, EqualityComparer<TKey>.Default);
  320. }
  321. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> GroupByUntil<TSource, TKey, TDuration>(IAsyncObserver<IGroupedAsyncObservable<TKey, TSource>> observer, IAsyncDisposable subscription, Func<TSource, TKey> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer)
  322. {
  323. if (observer == null)
  324. throw new ArgumentNullException(nameof(observer));
  325. if (subscription == null)
  326. throw new ArgumentNullException(nameof(subscription));
  327. if (keySelector == null)
  328. throw new ArgumentNullException(nameof(keySelector));
  329. if (durationSelector == null)
  330. throw new ArgumentNullException(nameof(durationSelector));
  331. if (capacity < 0)
  332. throw new ArgumentOutOfRangeException(nameof(capacity));
  333. if (comparer == null)
  334. throw new ArgumentNullException(nameof(comparer));
  335. return GroupByUntil(observer, subscription, x => new ValueTask<TKey>(keySelector(x)), durationSelector, capacity, comparer);
  336. }
  337. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> GroupByUntil<TSource, TKey, TElement, TDuration>(IAsyncObserver<IGroupedAsyncObservable<TKey, TElement>> observer, IAsyncDisposable subscription, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector)
  338. {
  339. if (observer == null)
  340. throw new ArgumentNullException(nameof(observer));
  341. if (subscription == null)
  342. throw new ArgumentNullException(nameof(subscription));
  343. if (keySelector == null)
  344. throw new ArgumentNullException(nameof(keySelector));
  345. if (elementSelector == null)
  346. throw new ArgumentNullException(nameof(elementSelector));
  347. if (durationSelector == null)
  348. throw new ArgumentNullException(nameof(durationSelector));
  349. return GroupByUntil(observer, subscription, keySelector, elementSelector, durationSelector, int.MaxValue, EqualityComparer<TKey>.Default);
  350. }
  351. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> GroupByUntil<TSource, TKey, TElement, TDuration>(IAsyncObserver<IGroupedAsyncObservable<TKey, TElement>> observer, IAsyncDisposable subscription, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
  352. {
  353. if (observer == null)
  354. throw new ArgumentNullException(nameof(observer));
  355. if (subscription == null)
  356. throw new ArgumentNullException(nameof(subscription));
  357. if (keySelector == null)
  358. throw new ArgumentNullException(nameof(keySelector));
  359. if (elementSelector == null)
  360. throw new ArgumentNullException(nameof(elementSelector));
  361. if (durationSelector == null)
  362. throw new ArgumentNullException(nameof(durationSelector));
  363. if (comparer == null)
  364. throw new ArgumentNullException(nameof(comparer));
  365. return GroupByUntil(observer, subscription, keySelector, elementSelector, durationSelector, int.MaxValue, comparer);
  366. }
  367. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> GroupByUntil<TSource, TKey, TElement, TDuration>(IAsyncObserver<IGroupedAsyncObservable<TKey, TElement>> observer, IAsyncDisposable subscription, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector, int capacity)
  368. {
  369. if (observer == null)
  370. throw new ArgumentNullException(nameof(observer));
  371. if (subscription == null)
  372. throw new ArgumentNullException(nameof(subscription));
  373. if (keySelector == null)
  374. throw new ArgumentNullException(nameof(keySelector));
  375. if (elementSelector == null)
  376. throw new ArgumentNullException(nameof(elementSelector));
  377. if (durationSelector == null)
  378. throw new ArgumentNullException(nameof(durationSelector));
  379. if (capacity < 0)
  380. throw new ArgumentOutOfRangeException(nameof(capacity));
  381. return GroupByUntil(observer, subscription, keySelector, elementSelector, durationSelector, capacity, EqualityComparer<TKey>.Default);
  382. }
  383. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> GroupByUntil<TSource, TKey, TElement, TDuration>(IAsyncObserver<IGroupedAsyncObservable<TKey, TElement>> observer, IAsyncDisposable subscription, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer)
  384. {
  385. if (observer == null)
  386. throw new ArgumentNullException(nameof(observer));
  387. if (subscription == null)
  388. throw new ArgumentNullException(nameof(subscription));
  389. if (keySelector == null)
  390. throw new ArgumentNullException(nameof(keySelector));
  391. if (elementSelector == null)
  392. throw new ArgumentNullException(nameof(elementSelector));
  393. if (durationSelector == null)
  394. throw new ArgumentNullException(nameof(durationSelector));
  395. if (capacity < 0)
  396. throw new ArgumentOutOfRangeException(nameof(capacity));
  397. if (comparer == null)
  398. throw new ArgumentNullException(nameof(comparer));
  399. return GroupByUntil<TSource, TKey, TElement, TDuration>(observer, subscription, x => new ValueTask<TKey>(keySelector(x)), x => new ValueTask<TElement>(elementSelector(x)), durationSelector, capacity, comparer);
  400. }
  401. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> GroupByUntil<TSource, TKey, TDuration>(IAsyncObserver<IGroupedAsyncObservable<TKey, TSource>> observer, IAsyncDisposable subscription, Func<TSource, ValueTask<TKey>> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector)
  402. {
  403. if (observer == null)
  404. throw new ArgumentNullException(nameof(observer));
  405. if (subscription == null)
  406. throw new ArgumentNullException(nameof(subscription));
  407. if (keySelector == null)
  408. throw new ArgumentNullException(nameof(keySelector));
  409. if (durationSelector == null)
  410. throw new ArgumentNullException(nameof(durationSelector));
  411. return GroupByUntil(observer, subscription, keySelector, durationSelector, int.MaxValue, EqualityComparer<TKey>.Default);
  412. }
  413. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> GroupByUntil<TSource, TKey, TDuration>(IAsyncObserver<IGroupedAsyncObservable<TKey, TSource>> observer, IAsyncDisposable subscription, Func<TSource, ValueTask<TKey>> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
  414. {
  415. if (observer == null)
  416. throw new ArgumentNullException(nameof(observer));
  417. if (subscription == null)
  418. throw new ArgumentNullException(nameof(subscription));
  419. if (keySelector == null)
  420. throw new ArgumentNullException(nameof(keySelector));
  421. if (durationSelector == null)
  422. throw new ArgumentNullException(nameof(durationSelector));
  423. if (comparer == null)
  424. throw new ArgumentNullException(nameof(comparer));
  425. return GroupByUntil(observer, subscription, keySelector, durationSelector, int.MaxValue, comparer);
  426. }
  427. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> GroupByUntil<TSource, TKey, TDuration>(IAsyncObserver<IGroupedAsyncObservable<TKey, TSource>> observer, IAsyncDisposable subscription, Func<TSource, ValueTask<TKey>> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector, int capacity)
  428. {
  429. if (observer == null)
  430. throw new ArgumentNullException(nameof(observer));
  431. if (subscription == null)
  432. throw new ArgumentNullException(nameof(subscription));
  433. if (keySelector == null)
  434. throw new ArgumentNullException(nameof(keySelector));
  435. if (durationSelector == null)
  436. throw new ArgumentNullException(nameof(durationSelector));
  437. if (capacity < 0)
  438. throw new ArgumentOutOfRangeException(nameof(capacity));
  439. return GroupByUntil(observer, subscription, keySelector, durationSelector, capacity, EqualityComparer<TKey>.Default);
  440. }
  441. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> GroupByUntil<TSource, TKey, TDuration>(IAsyncObserver<IGroupedAsyncObservable<TKey, TSource>> observer, IAsyncDisposable subscription, Func<TSource, ValueTask<TKey>> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer)
  442. {
  443. if (observer == null)
  444. throw new ArgumentNullException(nameof(observer));
  445. if (subscription == null)
  446. throw new ArgumentNullException(nameof(subscription));
  447. if (keySelector == null)
  448. throw new ArgumentNullException(nameof(keySelector));
  449. if (durationSelector == null)
  450. throw new ArgumentNullException(nameof(durationSelector));
  451. if (capacity < 0)
  452. throw new ArgumentOutOfRangeException(nameof(capacity));
  453. if (comparer == null)
  454. throw new ArgumentNullException(nameof(comparer));
  455. return GroupByUntil(observer, subscription, keySelector, x => new ValueTask<TSource>(x), durationSelector, capacity, comparer);
  456. }
  457. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> GroupByUntil<TSource, TKey, TElement, TDuration>(IAsyncObserver<IGroupedAsyncObservable<TKey, TElement>> observer, IAsyncDisposable subscription, Func<TSource, ValueTask<TKey>> keySelector, Func<TSource, ValueTask<TElement>> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector)
  458. {
  459. if (observer == null)
  460. throw new ArgumentNullException(nameof(observer));
  461. if (subscription == null)
  462. throw new ArgumentNullException(nameof(subscription));
  463. if (keySelector == null)
  464. throw new ArgumentNullException(nameof(keySelector));
  465. if (elementSelector == null)
  466. throw new ArgumentNullException(nameof(elementSelector));
  467. if (durationSelector == null)
  468. throw new ArgumentNullException(nameof(durationSelector));
  469. return GroupByUntil(observer, subscription, keySelector, elementSelector, durationSelector, int.MaxValue, EqualityComparer<TKey>.Default);
  470. }
  471. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> GroupByUntil<TSource, TKey, TElement, TDuration>(IAsyncObserver<IGroupedAsyncObservable<TKey, TElement>> observer, IAsyncDisposable subscription, Func<TSource, ValueTask<TKey>> keySelector, Func<TSource, ValueTask<TElement>> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
  472. {
  473. if (observer == null)
  474. throw new ArgumentNullException(nameof(observer));
  475. if (subscription == null)
  476. throw new ArgumentNullException(nameof(subscription));
  477. if (keySelector == null)
  478. throw new ArgumentNullException(nameof(keySelector));
  479. if (elementSelector == null)
  480. throw new ArgumentNullException(nameof(elementSelector));
  481. if (durationSelector == null)
  482. throw new ArgumentNullException(nameof(durationSelector));
  483. if (comparer == null)
  484. throw new ArgumentNullException(nameof(comparer));
  485. return GroupByUntil(observer, subscription, keySelector, elementSelector, durationSelector, int.MaxValue, comparer);
  486. }
  487. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> GroupByUntil<TSource, TKey, TElement, TDuration>(IAsyncObserver<IGroupedAsyncObservable<TKey, TElement>> observer, IAsyncDisposable subscription, Func<TSource, ValueTask<TKey>> keySelector, Func<TSource, ValueTask<TElement>> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector, int capacity)
  488. {
  489. if (observer == null)
  490. throw new ArgumentNullException(nameof(observer));
  491. if (subscription == null)
  492. throw new ArgumentNullException(nameof(subscription));
  493. if (keySelector == null)
  494. throw new ArgumentNullException(nameof(keySelector));
  495. if (elementSelector == null)
  496. throw new ArgumentNullException(nameof(elementSelector));
  497. if (durationSelector == null)
  498. throw new ArgumentNullException(nameof(durationSelector));
  499. if (capacity < 0)
  500. throw new ArgumentOutOfRangeException(nameof(capacity));
  501. return GroupByUntil(observer, subscription, keySelector, elementSelector, durationSelector, capacity, EqualityComparer<TKey>.Default);
  502. }
  503. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> GroupByUntil<TSource, TKey, TElement, TDuration>(IAsyncObserver<IGroupedAsyncObservable<TKey, TElement>> observer, IAsyncDisposable subscription, Func<TSource, ValueTask<TKey>> keySelector, Func<TSource, ValueTask<TElement>> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer)
  504. {
  505. if (observer == null)
  506. throw new ArgumentNullException(nameof(observer));
  507. if (subscription == null)
  508. throw new ArgumentNullException(nameof(subscription));
  509. if (keySelector == null)
  510. throw new ArgumentNullException(nameof(keySelector));
  511. if (elementSelector == null)
  512. throw new ArgumentNullException(nameof(elementSelector));
  513. if (durationSelector == null)
  514. throw new ArgumentNullException(nameof(durationSelector));
  515. if (capacity < 0)
  516. throw new ArgumentOutOfRangeException(nameof(capacity));
  517. if (comparer == null)
  518. throw new ArgumentNullException(nameof(comparer));
  519. return CoreAsync();
  520. // REVIEW: Concurrent execution of a duration callback and an event could lead to an OnNext call being queued in an AsyncLockObserver
  521. // after a duration callback makes an OnCompleted call. This seems to be the case in sync Rx as well.
  522. async ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> CoreAsync()
  523. {
  524. var d = new CompositeAsyncDisposable();
  525. await d.AddAsync(subscription).ConfigureAwait(false);
  526. var refCount = new RefCountAsyncDisposable(d);
  527. var groups = default(ConcurrentDictionary<TKey, IAsyncSubject<TElement>>);
  528. if (capacity == int.MaxValue)
  529. {
  530. groups = new ConcurrentDictionary<TKey, IAsyncSubject<TElement>>(comparer);
  531. }
  532. else
  533. {
  534. groups = new ConcurrentDictionary<TKey, IAsyncSubject<TElement>>(Environment.ProcessorCount * 4, capacity, comparer);
  535. }
  536. var gate = new AsyncGate();
  537. var nullGate = new object();
  538. var nullGroup = default(IAsyncSubject<TElement>);
  539. async ValueTask OnErrorAsync(Exception ex)
  540. {
  541. var nullGroupLocal = default(IAsyncSubject<TElement>);
  542. lock (nullGate)
  543. {
  544. nullGroupLocal = nullGroup;
  545. }
  546. if (nullGroupLocal != null)
  547. {
  548. await nullGroupLocal.OnErrorAsync(ex).ConfigureAwait(false);
  549. }
  550. foreach (var group in groups.Values)
  551. {
  552. await group.OnErrorAsync(ex).ConfigureAwait(false);
  553. }
  554. using (await gate.LockAsync().ConfigureAwait(false))
  555. {
  556. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  557. }
  558. }
  559. return
  560. (
  561. Create<TSource>
  562. (
  563. async x =>
  564. {
  565. var key = default(TKey);
  566. try
  567. {
  568. key = await keySelector(x).ConfigureAwait(false);
  569. }
  570. catch (Exception ex)
  571. {
  572. await OnErrorAsync(ex).ConfigureAwait(false);
  573. return;
  574. }
  575. var shouldEmit = false;
  576. var group = default(IAsyncSubject<TElement>);
  577. if (key == null)
  578. {
  579. lock (nullGate)
  580. {
  581. if (nullGroup == null)
  582. {
  583. var subject = new SequentialSimpleAsyncSubject<TElement>();
  584. nullGroup = AsyncSubject.Create(new AsyncQueueLockAsyncObserver<TElement>(subject), subject);
  585. shouldEmit = true;
  586. }
  587. }
  588. group = nullGroup;
  589. }
  590. else
  591. {
  592. try
  593. {
  594. if (!groups.TryGetValue(key, out group))
  595. {
  596. var subject = new SequentialSimpleAsyncSubject<TElement>();
  597. group = AsyncSubject.Create(new AsyncQueueLockAsyncObserver<TElement>(subject), subject);
  598. if (groups.TryAdd(key, group))
  599. {
  600. shouldEmit = true;
  601. }
  602. }
  603. }
  604. catch (Exception ex)
  605. {
  606. await OnErrorAsync(ex).ConfigureAwait(false);
  607. return;
  608. }
  609. }
  610. if (shouldEmit)
  611. {
  612. var g = new GroupedAsyncObservable<TKey, TElement>(key, group, refCount);
  613. var duration = default(IAsyncObservable<TDuration>);
  614. try
  615. {
  616. duration = durationSelector(g);
  617. }
  618. catch (Exception ex)
  619. {
  620. await OnErrorAsync(ex).ConfigureAwait(false);
  621. return;
  622. }
  623. using (await gate.LockAsync().ConfigureAwait(false))
  624. {
  625. await observer.OnNextAsync(g).ConfigureAwait(false);
  626. }
  627. var durationSubscription = new SingleAssignmentAsyncDisposable();
  628. async ValueTask Expire()
  629. {
  630. if (key == null)
  631. {
  632. var oldNullGroup = default(IAsyncSubject<TElement>);
  633. lock (nullGate)
  634. {
  635. oldNullGroup = nullGroup;
  636. nullGroup = null;
  637. }
  638. if (oldNullGroup != null)
  639. {
  640. await oldNullGroup.OnCompletedAsync().ConfigureAwait(false);
  641. }
  642. }
  643. else
  644. {
  645. if (groups.TryRemove(key, out var oldGroup))
  646. {
  647. await oldGroup.OnCompletedAsync().ConfigureAwait(false);
  648. }
  649. }
  650. await durationSubscription.DisposeAsync().ConfigureAwait(false);
  651. await d.RemoveAsync(durationSubscription).ConfigureAwait(false);
  652. }
  653. var durationObserver = Create<TDuration>(
  654. y => Expire(),
  655. OnErrorAsync,
  656. Expire
  657. );
  658. await d.AddAsync(durationSubscription).ConfigureAwait(false);
  659. var durationSubscriptionInner = await duration.SubscribeSafeAsync(durationObserver).ConfigureAwait(false);
  660. await durationSubscription.AssignAsync(durationSubscriptionInner).ConfigureAwait(false);
  661. }
  662. var element = default(TElement);
  663. try
  664. {
  665. element = await elementSelector(x).ConfigureAwait(false);
  666. }
  667. catch (Exception ex)
  668. {
  669. await OnErrorAsync(ex).ConfigureAwait(false);
  670. return;
  671. }
  672. await group.OnNextAsync(element).ConfigureAwait(false);
  673. },
  674. OnErrorAsync,
  675. async () =>
  676. {
  677. if (nullGroup != null)
  678. {
  679. await nullGroup.OnCompletedAsync().ConfigureAwait(false);
  680. }
  681. foreach (var group in groups.Values)
  682. {
  683. await group.OnCompletedAsync().ConfigureAwait(false);
  684. }
  685. using (await gate.LockAsync().ConfigureAwait(false))
  686. {
  687. await observer.OnCompletedAsync().ConfigureAwait(false);
  688. }
  689. }
  690. ),
  691. refCount
  692. );
  693. }
  694. }
  695. }
  696. }