GroupByUntil.cs 48 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Concurrent;
  5. using System.Collections.Generic;
  6. using System.Reactive.Disposables;
  7. using System.Reactive.Subjects;
  8. using System.Threading;
  9. using System.Threading.Tasks;
  10. namespace System.Reactive.Linq
  11. {
  12. public partial class AsyncObservable
  13. {
  14. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector)
  15. {
  16. if (source == null)
  17. throw new ArgumentNullException(nameof(source));
  18. if (keySelector == null)
  19. throw new ArgumentNullException(nameof(keySelector));
  20. if (durationSelector == null)
  21. throw new ArgumentNullException(nameof(durationSelector));
  22. return Create(
  23. source,
  24. (keySelector, durationSelector),
  25. default(IGroupedAsyncObservable<TKey, TSource>),
  26. (source, state, observer) => GroupByUntilCore<TSource, TKey, TSource, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.durationSelector)));
  27. }
  28. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
  29. {
  30. if (source == null)
  31. throw new ArgumentNullException(nameof(source));
  32. if (keySelector == null)
  33. throw new ArgumentNullException(nameof(keySelector));
  34. if (durationSelector == null)
  35. throw new ArgumentNullException(nameof(durationSelector));
  36. if (comparer == null)
  37. throw new ArgumentNullException(nameof(comparer));
  38. return Create(
  39. source,
  40. (keySelector, durationSelector, comparer),
  41. default(IGroupedAsyncObservable<TKey, TSource>),
  42. (source, state, observer) => GroupByUntilCore<TSource, TKey, TSource, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.durationSelector, state.comparer)));
  43. }
  44. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector, int capacity)
  45. {
  46. if (source == null)
  47. throw new ArgumentNullException(nameof(source));
  48. if (keySelector == null)
  49. throw new ArgumentNullException(nameof(keySelector));
  50. if (durationSelector == null)
  51. throw new ArgumentNullException(nameof(durationSelector));
  52. if (capacity < 0)
  53. throw new ArgumentOutOfRangeException(nameof(capacity));
  54. return Create(
  55. source,
  56. (keySelector, durationSelector, capacity),
  57. default(IGroupedAsyncObservable<TKey, TSource>),
  58. (source, state, observer) => GroupByUntilCore<TSource, TKey, TSource, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.durationSelector, state.capacity)));
  59. }
  60. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer)
  61. {
  62. if (source == null)
  63. throw new ArgumentNullException(nameof(source));
  64. if (keySelector == null)
  65. throw new ArgumentNullException(nameof(keySelector));
  66. if (durationSelector == null)
  67. throw new ArgumentNullException(nameof(durationSelector));
  68. if (capacity < 0)
  69. throw new ArgumentOutOfRangeException(nameof(capacity));
  70. if (comparer == null)
  71. throw new ArgumentNullException(nameof(comparer));
  72. return Create(
  73. source,
  74. (keySelector, durationSelector, capacity, comparer),
  75. default(IGroupedAsyncObservable<TKey, TSource>),
  76. (source, state, observer) => GroupByUntilCore<TSource, TKey, TSource, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.durationSelector, state.capacity, state.comparer)));
  77. }
  78. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector)
  79. {
  80. if (source == null)
  81. throw new ArgumentNullException(nameof(source));
  82. if (keySelector == null)
  83. throw new ArgumentNullException(nameof(keySelector));
  84. if (elementSelector == null)
  85. throw new ArgumentNullException(nameof(elementSelector));
  86. if (durationSelector == null)
  87. throw new ArgumentNullException(nameof(durationSelector));
  88. return Create(
  89. source,
  90. (keySelector, elementSelector, durationSelector),
  91. default(IGroupedAsyncObservable<TKey, TElement>),
  92. (source, state, observer) => GroupByUntilCore<TSource, TKey, TElement, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.elementSelector, state.durationSelector)));
  93. }
  94. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
  95. {
  96. if (source == null)
  97. throw new ArgumentNullException(nameof(source));
  98. if (keySelector == null)
  99. throw new ArgumentNullException(nameof(keySelector));
  100. if (elementSelector == null)
  101. throw new ArgumentNullException(nameof(elementSelector));
  102. if (durationSelector == null)
  103. throw new ArgumentNullException(nameof(durationSelector));
  104. if (comparer == null)
  105. throw new ArgumentNullException(nameof(comparer));
  106. return Create(
  107. source,
  108. (keySelector, elementSelector, durationSelector, comparer),
  109. default(IGroupedAsyncObservable<TKey, TElement>),
  110. (source, state, observer) => GroupByUntilCore<TSource, TKey, TElement, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.elementSelector, state.durationSelector, state.comparer)));
  111. }
  112. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector, int capacity)
  113. {
  114. if (source == null)
  115. throw new ArgumentNullException(nameof(source));
  116. if (keySelector == null)
  117. throw new ArgumentNullException(nameof(keySelector));
  118. if (elementSelector == null)
  119. throw new ArgumentNullException(nameof(elementSelector));
  120. if (durationSelector == null)
  121. throw new ArgumentNullException(nameof(durationSelector));
  122. if (capacity < 0)
  123. throw new ArgumentOutOfRangeException(nameof(capacity));
  124. return Create(
  125. source,
  126. (keySelector, elementSelector, durationSelector, capacity),
  127. default(IGroupedAsyncObservable<TKey, TElement>),
  128. (source, state, observer) => GroupByUntilCore<TSource, TKey, TElement, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.elementSelector, state.durationSelector, state.capacity)));
  129. }
  130. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer)
  131. {
  132. if (source == null)
  133. throw new ArgumentNullException(nameof(source));
  134. if (keySelector == null)
  135. throw new ArgumentNullException(nameof(keySelector));
  136. if (elementSelector == null)
  137. throw new ArgumentNullException(nameof(elementSelector));
  138. if (durationSelector == null)
  139. throw new ArgumentNullException(nameof(durationSelector));
  140. if (capacity < 0)
  141. throw new ArgumentOutOfRangeException(nameof(capacity));
  142. if (comparer == null)
  143. throw new ArgumentNullException(nameof(comparer));
  144. return Create(
  145. source,
  146. (keySelector, elementSelector, durationSelector, capacity, comparer),
  147. default(IGroupedAsyncObservable<TKey, TElement>),
  148. (source, state, observer) => GroupByUntilCore<TSource, TKey, TElement, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.elementSelector, state.durationSelector, state.capacity, state.comparer)));
  149. }
  150. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector)
  151. {
  152. if (source == null)
  153. throw new ArgumentNullException(nameof(source));
  154. if (keySelector == null)
  155. throw new ArgumentNullException(nameof(keySelector));
  156. if (durationSelector == null)
  157. throw new ArgumentNullException(nameof(durationSelector));
  158. return Create(
  159. source,
  160. (keySelector, durationSelector),
  161. default(IGroupedAsyncObservable<TKey, TSource>),
  162. (source, state, observer) => GroupByUntilCore<TSource, TKey, TSource, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.durationSelector)));
  163. }
  164. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
  165. {
  166. if (source == null)
  167. throw new ArgumentNullException(nameof(source));
  168. if (keySelector == null)
  169. throw new ArgumentNullException(nameof(keySelector));
  170. if (durationSelector == null)
  171. throw new ArgumentNullException(nameof(durationSelector));
  172. if (comparer == null)
  173. throw new ArgumentNullException(nameof(comparer));
  174. return Create(
  175. source,
  176. (keySelector, durationSelector, comparer),
  177. default(IGroupedAsyncObservable<TKey, TSource>),
  178. (source, state, observer) => GroupByUntilCore<TSource, TKey, TSource, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.durationSelector, state.comparer)));
  179. }
  180. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector, int capacity)
  181. {
  182. if (source == null)
  183. throw new ArgumentNullException(nameof(source));
  184. if (keySelector == null)
  185. throw new ArgumentNullException(nameof(keySelector));
  186. if (durationSelector == null)
  187. throw new ArgumentNullException(nameof(durationSelector));
  188. if (capacity < 0)
  189. throw new ArgumentOutOfRangeException(nameof(capacity));
  190. return Create(
  191. source,
  192. (keySelector, durationSelector, capacity),
  193. default(IGroupedAsyncObservable<TKey, TSource>),
  194. (source, state, observer) => GroupByUntilCore<TSource, TKey, TSource, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.durationSelector, state.capacity)));
  195. }
  196. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer)
  197. {
  198. if (source == null)
  199. throw new ArgumentNullException(nameof(source));
  200. if (keySelector == null)
  201. throw new ArgumentNullException(nameof(keySelector));
  202. if (durationSelector == null)
  203. throw new ArgumentNullException(nameof(durationSelector));
  204. if (capacity < 0)
  205. throw new ArgumentOutOfRangeException(nameof(capacity));
  206. if (comparer == null)
  207. throw new ArgumentNullException(nameof(comparer));
  208. return Create(
  209. source,
  210. (keySelector, durationSelector, capacity, comparer),
  211. default(IGroupedAsyncObservable<TKey, TSource>),
  212. (source, state, observer) => GroupByUntilCore<TSource, TKey, TSource, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.durationSelector, state.capacity, state.comparer)));
  213. }
  214. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, Func<TSource, ValueTask<TElement>> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector)
  215. {
  216. if (source == null)
  217. throw new ArgumentNullException(nameof(source));
  218. if (keySelector == null)
  219. throw new ArgumentNullException(nameof(keySelector));
  220. if (elementSelector == null)
  221. throw new ArgumentNullException(nameof(elementSelector));
  222. if (durationSelector == null)
  223. throw new ArgumentNullException(nameof(durationSelector));
  224. return Create(
  225. source,
  226. (keySelector, elementSelector, durationSelector),
  227. default(IGroupedAsyncObservable<TKey, TElement>),
  228. (source, state, observer) => GroupByUntilCore<TSource, TKey, TElement, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.elementSelector, state.durationSelector)));
  229. }
  230. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, Func<TSource, ValueTask<TElement>> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
  231. {
  232. if (source == null)
  233. throw new ArgumentNullException(nameof(source));
  234. if (keySelector == null)
  235. throw new ArgumentNullException(nameof(keySelector));
  236. if (elementSelector == null)
  237. throw new ArgumentNullException(nameof(elementSelector));
  238. if (durationSelector == null)
  239. throw new ArgumentNullException(nameof(durationSelector));
  240. if (comparer == null)
  241. throw new ArgumentNullException(nameof(comparer));
  242. return Create(
  243. source,
  244. (keySelector, elementSelector, durationSelector, comparer),
  245. default(IGroupedAsyncObservable<TKey, TElement>),
  246. (source, state, observer) => GroupByUntilCore<TSource, TKey, TElement, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.elementSelector, state.durationSelector, state.comparer)));
  247. }
  248. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, Func<TSource, ValueTask<TElement>> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector, int capacity)
  249. {
  250. if (source == null)
  251. throw new ArgumentNullException(nameof(source));
  252. if (keySelector == null)
  253. throw new ArgumentNullException(nameof(keySelector));
  254. if (elementSelector == null)
  255. throw new ArgumentNullException(nameof(elementSelector));
  256. if (durationSelector == null)
  257. throw new ArgumentNullException(nameof(durationSelector));
  258. if (capacity < 0)
  259. throw new ArgumentOutOfRangeException(nameof(capacity));
  260. return Create(
  261. source,
  262. (keySelector, elementSelector, durationSelector, capacity),
  263. default(IGroupedAsyncObservable<TKey, TElement>),
  264. (source, state, observer) => GroupByUntilCore<TSource, TKey, TElement, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.elementSelector, state.durationSelector, state.capacity)));
  265. }
  266. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, Func<TSource, ValueTask<TElement>> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer)
  267. {
  268. if (source == null)
  269. throw new ArgumentNullException(nameof(source));
  270. if (keySelector == null)
  271. throw new ArgumentNullException(nameof(keySelector));
  272. if (elementSelector == null)
  273. throw new ArgumentNullException(nameof(elementSelector));
  274. if (durationSelector == null)
  275. throw new ArgumentNullException(nameof(durationSelector));
  276. if (capacity < 0)
  277. throw new ArgumentOutOfRangeException(nameof(capacity));
  278. if (comparer == null)
  279. throw new ArgumentNullException(nameof(comparer));
  280. return Create(
  281. source,
  282. (keySelector, elementSelector, durationSelector, capacity, comparer),
  283. default(IGroupedAsyncObservable<TKey, TElement>),
  284. (source, state, observer) => GroupByUntilCore<TSource, TKey, TElement, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.elementSelector, state.durationSelector, state.capacity, state.comparer)));
  285. }
  286. private static async ValueTask<IAsyncDisposable> GroupByUntilCore<TSource, TKey, TElement, TDuration>(IAsyncObservable<TSource> source, IAsyncObserver<IGroupedAsyncObservable<TKey, TElement>> observer, Func<IAsyncObserver<IGroupedAsyncObservable<TKey, TElement>>, IAsyncDisposable, ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)>> createObserver)
  287. {
  288. var d = new SingleAssignmentAsyncDisposable();
  289. var (sink, subscription) = await createObserver(observer, d).ConfigureAwait(false);
  290. var inner = await source.SubscribeSafeAsync(sink).ConfigureAwait(false);
  291. await d.AssignAsync(inner).ConfigureAwait(false);
  292. return subscription;
  293. }
  294. }
  295. public partial class AsyncObserver
  296. {
  297. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> GroupByUntil<TSource, TKey, TDuration>(IAsyncObserver<IGroupedAsyncObservable<TKey, TSource>> observer, IAsyncDisposable subscription, Func<TSource, TKey> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector)
  298. {
  299. if (observer == null)
  300. throw new ArgumentNullException(nameof(observer));
  301. if (subscription == null)
  302. throw new ArgumentNullException(nameof(subscription));
  303. if (keySelector == null)
  304. throw new ArgumentNullException(nameof(keySelector));
  305. if (durationSelector == null)
  306. throw new ArgumentNullException(nameof(durationSelector));
  307. return GroupByUntil(observer, subscription, keySelector, durationSelector, int.MaxValue, EqualityComparer<TKey>.Default);
  308. }
  309. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> GroupByUntil<TSource, TKey, TDuration>(IAsyncObserver<IGroupedAsyncObservable<TKey, TSource>> observer, IAsyncDisposable subscription, Func<TSource, TKey> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
  310. {
  311. if (observer == null)
  312. throw new ArgumentNullException(nameof(observer));
  313. if (subscription == null)
  314. throw new ArgumentNullException(nameof(subscription));
  315. if (keySelector == null)
  316. throw new ArgumentNullException(nameof(keySelector));
  317. if (durationSelector == null)
  318. throw new ArgumentNullException(nameof(durationSelector));
  319. if (comparer == null)
  320. throw new ArgumentNullException(nameof(comparer));
  321. return GroupByUntil(observer, subscription, keySelector, durationSelector, int.MaxValue, comparer);
  322. }
  323. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> GroupByUntil<TSource, TKey, TDuration>(IAsyncObserver<IGroupedAsyncObservable<TKey, TSource>> observer, IAsyncDisposable subscription, Func<TSource, TKey> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector, int capacity)
  324. {
  325. if (observer == null)
  326. throw new ArgumentNullException(nameof(observer));
  327. if (subscription == null)
  328. throw new ArgumentNullException(nameof(subscription));
  329. if (keySelector == null)
  330. throw new ArgumentNullException(nameof(keySelector));
  331. if (durationSelector == null)
  332. throw new ArgumentNullException(nameof(durationSelector));
  333. if (capacity < 0)
  334. throw new ArgumentOutOfRangeException(nameof(capacity));
  335. return GroupByUntil(observer, subscription, keySelector, durationSelector, capacity, EqualityComparer<TKey>.Default);
  336. }
  337. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> GroupByUntil<TSource, TKey, TDuration>(IAsyncObserver<IGroupedAsyncObservable<TKey, TSource>> observer, IAsyncDisposable subscription, Func<TSource, TKey> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer)
  338. {
  339. if (observer == null)
  340. throw new ArgumentNullException(nameof(observer));
  341. if (subscription == null)
  342. throw new ArgumentNullException(nameof(subscription));
  343. if (keySelector == null)
  344. throw new ArgumentNullException(nameof(keySelector));
  345. if (durationSelector == null)
  346. throw new ArgumentNullException(nameof(durationSelector));
  347. if (capacity < 0)
  348. throw new ArgumentOutOfRangeException(nameof(capacity));
  349. if (comparer == null)
  350. throw new ArgumentNullException(nameof(comparer));
  351. return GroupByUntil(observer, subscription, x => new ValueTask<TKey>(keySelector(x)), durationSelector, capacity, comparer);
  352. }
  353. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> GroupByUntil<TSource, TKey, TElement, TDuration>(IAsyncObserver<IGroupedAsyncObservable<TKey, TElement>> observer, IAsyncDisposable subscription, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector)
  354. {
  355. if (observer == null)
  356. throw new ArgumentNullException(nameof(observer));
  357. if (subscription == null)
  358. throw new ArgumentNullException(nameof(subscription));
  359. if (keySelector == null)
  360. throw new ArgumentNullException(nameof(keySelector));
  361. if (elementSelector == null)
  362. throw new ArgumentNullException(nameof(elementSelector));
  363. if (durationSelector == null)
  364. throw new ArgumentNullException(nameof(durationSelector));
  365. return GroupByUntil(observer, subscription, keySelector, elementSelector, durationSelector, int.MaxValue, EqualityComparer<TKey>.Default);
  366. }
  367. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> GroupByUntil<TSource, TKey, TElement, TDuration>(IAsyncObserver<IGroupedAsyncObservable<TKey, TElement>> observer, IAsyncDisposable subscription, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
  368. {
  369. if (observer == null)
  370. throw new ArgumentNullException(nameof(observer));
  371. if (subscription == null)
  372. throw new ArgumentNullException(nameof(subscription));
  373. if (keySelector == null)
  374. throw new ArgumentNullException(nameof(keySelector));
  375. if (elementSelector == null)
  376. throw new ArgumentNullException(nameof(elementSelector));
  377. if (durationSelector == null)
  378. throw new ArgumentNullException(nameof(durationSelector));
  379. if (comparer == null)
  380. throw new ArgumentNullException(nameof(comparer));
  381. return GroupByUntil(observer, subscription, keySelector, elementSelector, durationSelector, int.MaxValue, comparer);
  382. }
  383. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> GroupByUntil<TSource, TKey, TElement, TDuration>(IAsyncObserver<IGroupedAsyncObservable<TKey, TElement>> observer, IAsyncDisposable subscription, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector, int capacity)
  384. {
  385. if (observer == null)
  386. throw new ArgumentNullException(nameof(observer));
  387. if (subscription == null)
  388. throw new ArgumentNullException(nameof(subscription));
  389. if (keySelector == null)
  390. throw new ArgumentNullException(nameof(keySelector));
  391. if (elementSelector == null)
  392. throw new ArgumentNullException(nameof(elementSelector));
  393. if (durationSelector == null)
  394. throw new ArgumentNullException(nameof(durationSelector));
  395. if (capacity < 0)
  396. throw new ArgumentOutOfRangeException(nameof(capacity));
  397. return GroupByUntil(observer, subscription, keySelector, elementSelector, durationSelector, capacity, EqualityComparer<TKey>.Default);
  398. }
  399. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> GroupByUntil<TSource, TKey, TElement, TDuration>(IAsyncObserver<IGroupedAsyncObservable<TKey, TElement>> observer, IAsyncDisposable subscription, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer)
  400. {
  401. if (observer == null)
  402. throw new ArgumentNullException(nameof(observer));
  403. if (subscription == null)
  404. throw new ArgumentNullException(nameof(subscription));
  405. if (keySelector == null)
  406. throw new ArgumentNullException(nameof(keySelector));
  407. if (elementSelector == null)
  408. throw new ArgumentNullException(nameof(elementSelector));
  409. if (durationSelector == null)
  410. throw new ArgumentNullException(nameof(durationSelector));
  411. if (capacity < 0)
  412. throw new ArgumentOutOfRangeException(nameof(capacity));
  413. if (comparer == null)
  414. throw new ArgumentNullException(nameof(comparer));
  415. return GroupByUntil<TSource, TKey, TElement, TDuration>(observer, subscription, x => new ValueTask<TKey>(keySelector(x)), x => new ValueTask<TElement>(elementSelector(x)), durationSelector, capacity, comparer);
  416. }
  417. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> GroupByUntil<TSource, TKey, TDuration>(IAsyncObserver<IGroupedAsyncObservable<TKey, TSource>> observer, IAsyncDisposable subscription, Func<TSource, ValueTask<TKey>> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector)
  418. {
  419. if (observer == null)
  420. throw new ArgumentNullException(nameof(observer));
  421. if (subscription == null)
  422. throw new ArgumentNullException(nameof(subscription));
  423. if (keySelector == null)
  424. throw new ArgumentNullException(nameof(keySelector));
  425. if (durationSelector == null)
  426. throw new ArgumentNullException(nameof(durationSelector));
  427. return GroupByUntil(observer, subscription, keySelector, durationSelector, int.MaxValue, EqualityComparer<TKey>.Default);
  428. }
  429. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> GroupByUntil<TSource, TKey, TDuration>(IAsyncObserver<IGroupedAsyncObservable<TKey, TSource>> observer, IAsyncDisposable subscription, Func<TSource, ValueTask<TKey>> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
  430. {
  431. if (observer == null)
  432. throw new ArgumentNullException(nameof(observer));
  433. if (subscription == null)
  434. throw new ArgumentNullException(nameof(subscription));
  435. if (keySelector == null)
  436. throw new ArgumentNullException(nameof(keySelector));
  437. if (durationSelector == null)
  438. throw new ArgumentNullException(nameof(durationSelector));
  439. if (comparer == null)
  440. throw new ArgumentNullException(nameof(comparer));
  441. return GroupByUntil(observer, subscription, keySelector, durationSelector, int.MaxValue, comparer);
  442. }
  443. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> GroupByUntil<TSource, TKey, TDuration>(IAsyncObserver<IGroupedAsyncObservable<TKey, TSource>> observer, IAsyncDisposable subscription, Func<TSource, ValueTask<TKey>> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector, int capacity)
  444. {
  445. if (observer == null)
  446. throw new ArgumentNullException(nameof(observer));
  447. if (subscription == null)
  448. throw new ArgumentNullException(nameof(subscription));
  449. if (keySelector == null)
  450. throw new ArgumentNullException(nameof(keySelector));
  451. if (durationSelector == null)
  452. throw new ArgumentNullException(nameof(durationSelector));
  453. if (capacity < 0)
  454. throw new ArgumentOutOfRangeException(nameof(capacity));
  455. return GroupByUntil(observer, subscription, keySelector, durationSelector, capacity, EqualityComparer<TKey>.Default);
  456. }
  457. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> GroupByUntil<TSource, TKey, TDuration>(IAsyncObserver<IGroupedAsyncObservable<TKey, TSource>> observer, IAsyncDisposable subscription, Func<TSource, ValueTask<TKey>> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer)
  458. {
  459. if (observer == null)
  460. throw new ArgumentNullException(nameof(observer));
  461. if (subscription == null)
  462. throw new ArgumentNullException(nameof(subscription));
  463. if (keySelector == null)
  464. throw new ArgumentNullException(nameof(keySelector));
  465. if (durationSelector == null)
  466. throw new ArgumentNullException(nameof(durationSelector));
  467. if (capacity < 0)
  468. throw new ArgumentOutOfRangeException(nameof(capacity));
  469. if (comparer == null)
  470. throw new ArgumentNullException(nameof(comparer));
  471. return GroupByUntil(observer, subscription, keySelector, x => new ValueTask<TSource>(x), durationSelector, capacity, comparer);
  472. }
  473. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> GroupByUntil<TSource, TKey, TElement, TDuration>(IAsyncObserver<IGroupedAsyncObservable<TKey, TElement>> observer, IAsyncDisposable subscription, Func<TSource, ValueTask<TKey>> keySelector, Func<TSource, ValueTask<TElement>> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector)
  474. {
  475. if (observer == null)
  476. throw new ArgumentNullException(nameof(observer));
  477. if (subscription == null)
  478. throw new ArgumentNullException(nameof(subscription));
  479. if (keySelector == null)
  480. throw new ArgumentNullException(nameof(keySelector));
  481. if (elementSelector == null)
  482. throw new ArgumentNullException(nameof(elementSelector));
  483. if (durationSelector == null)
  484. throw new ArgumentNullException(nameof(durationSelector));
  485. return GroupByUntil(observer, subscription, keySelector, elementSelector, durationSelector, int.MaxValue, EqualityComparer<TKey>.Default);
  486. }
  487. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> GroupByUntil<TSource, TKey, TElement, TDuration>(IAsyncObserver<IGroupedAsyncObservable<TKey, TElement>> observer, IAsyncDisposable subscription, Func<TSource, ValueTask<TKey>> keySelector, Func<TSource, ValueTask<TElement>> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
  488. {
  489. if (observer == null)
  490. throw new ArgumentNullException(nameof(observer));
  491. if (subscription == null)
  492. throw new ArgumentNullException(nameof(subscription));
  493. if (keySelector == null)
  494. throw new ArgumentNullException(nameof(keySelector));
  495. if (elementSelector == null)
  496. throw new ArgumentNullException(nameof(elementSelector));
  497. if (durationSelector == null)
  498. throw new ArgumentNullException(nameof(durationSelector));
  499. if (comparer == null)
  500. throw new ArgumentNullException(nameof(comparer));
  501. return GroupByUntil(observer, subscription, keySelector, elementSelector, durationSelector, int.MaxValue, comparer);
  502. }
  503. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> GroupByUntil<TSource, TKey, TElement, TDuration>(IAsyncObserver<IGroupedAsyncObservable<TKey, TElement>> observer, IAsyncDisposable subscription, Func<TSource, ValueTask<TKey>> keySelector, Func<TSource, ValueTask<TElement>> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector, int capacity)
  504. {
  505. if (observer == null)
  506. throw new ArgumentNullException(nameof(observer));
  507. if (subscription == null)
  508. throw new ArgumentNullException(nameof(subscription));
  509. if (keySelector == null)
  510. throw new ArgumentNullException(nameof(keySelector));
  511. if (elementSelector == null)
  512. throw new ArgumentNullException(nameof(elementSelector));
  513. if (durationSelector == null)
  514. throw new ArgumentNullException(nameof(durationSelector));
  515. if (capacity < 0)
  516. throw new ArgumentOutOfRangeException(nameof(capacity));
  517. return GroupByUntil(observer, subscription, keySelector, elementSelector, durationSelector, capacity, EqualityComparer<TKey>.Default);
  518. }
  519. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> GroupByUntil<TSource, TKey, TElement, TDuration>(IAsyncObserver<IGroupedAsyncObservable<TKey, TElement>> observer, IAsyncDisposable subscription, Func<TSource, ValueTask<TKey>> keySelector, Func<TSource, ValueTask<TElement>> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer)
  520. {
  521. if (observer == null)
  522. throw new ArgumentNullException(nameof(observer));
  523. if (subscription == null)
  524. throw new ArgumentNullException(nameof(subscription));
  525. if (keySelector == null)
  526. throw new ArgumentNullException(nameof(keySelector));
  527. if (elementSelector == null)
  528. throw new ArgumentNullException(nameof(elementSelector));
  529. if (durationSelector == null)
  530. throw new ArgumentNullException(nameof(durationSelector));
  531. if (capacity < 0)
  532. throw new ArgumentOutOfRangeException(nameof(capacity));
  533. if (comparer == null)
  534. throw new ArgumentNullException(nameof(comparer));
  535. return CoreAsync();
  536. // REVIEW: Concurrent execution of a duration callback and an event could lead to an OnNext call being queued in an AsyncLockObserver
  537. // after a duration callback makes an OnCompleted call. This seems to be the case in sync Rx as well.
  538. async ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> CoreAsync()
  539. {
  540. var d = new CompositeAsyncDisposable();
  541. await d.AddAsync(subscription).ConfigureAwait(false);
  542. var refCount = new RefCountAsyncDisposable(d);
  543. var groups = default(ConcurrentDictionary<TKey, IAsyncSubject<TElement>>);
  544. if (capacity == int.MaxValue)
  545. {
  546. groups = new ConcurrentDictionary<TKey, IAsyncSubject<TElement>>(comparer);
  547. }
  548. else
  549. {
  550. groups = new ConcurrentDictionary<TKey, IAsyncSubject<TElement>>(Environment.ProcessorCount * 4, capacity, comparer);
  551. }
  552. var gate = new AsyncLock();
  553. var nullGate = new object();
  554. var nullGroup = default(IAsyncSubject<TElement>);
  555. async ValueTask OnErrorAsync(Exception ex)
  556. {
  557. var nullGroupLocal = default(IAsyncSubject<TElement>);
  558. lock (nullGate)
  559. {
  560. nullGroupLocal = nullGroup;
  561. }
  562. if (nullGroupLocal != null)
  563. {
  564. await nullGroupLocal.OnErrorAsync(ex).ConfigureAwait(false);
  565. }
  566. foreach (var group in groups.Values)
  567. {
  568. await group.OnErrorAsync(ex).ConfigureAwait(false);
  569. }
  570. using (await gate.LockAsync().ConfigureAwait(false))
  571. {
  572. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  573. }
  574. }
  575. return
  576. (
  577. Create<TSource>
  578. (
  579. async x =>
  580. {
  581. var key = default(TKey);
  582. try
  583. {
  584. key = await keySelector(x).ConfigureAwait(false);
  585. }
  586. catch (Exception ex)
  587. {
  588. await OnErrorAsync(ex).ConfigureAwait(false);
  589. return;
  590. }
  591. var shouldEmit = false;
  592. var group = default(IAsyncSubject<TElement>);
  593. if (key == null)
  594. {
  595. lock (nullGate)
  596. {
  597. if (nullGroup == null)
  598. {
  599. var subject = new SequentialSimpleAsyncSubject<TElement>();
  600. nullGroup = AsyncSubject.Create(new AsyncQueueLockAsyncObserver<TElement>(subject), subject);
  601. shouldEmit = true;
  602. }
  603. }
  604. group = nullGroup;
  605. }
  606. else
  607. {
  608. try
  609. {
  610. if (!groups.TryGetValue(key, out group))
  611. {
  612. var subject = new SequentialSimpleAsyncSubject<TElement>();
  613. group = AsyncSubject.Create(new AsyncQueueLockAsyncObserver<TElement>(subject), subject);
  614. if (groups.TryAdd(key, group))
  615. {
  616. shouldEmit = true;
  617. }
  618. }
  619. }
  620. catch (Exception ex)
  621. {
  622. await OnErrorAsync(ex).ConfigureAwait(false);
  623. return;
  624. }
  625. }
  626. if (shouldEmit)
  627. {
  628. var g = new GroupedAsyncObservable<TKey, TElement>(key, group, refCount);
  629. var duration = default(IAsyncObservable<TDuration>);
  630. try
  631. {
  632. duration = durationSelector(g);
  633. }
  634. catch (Exception ex)
  635. {
  636. await OnErrorAsync(ex).ConfigureAwait(false);
  637. return;
  638. }
  639. using (await gate.LockAsync().ConfigureAwait(false))
  640. {
  641. await observer.OnNextAsync(g).ConfigureAwait(false);
  642. }
  643. var durationSubscription = new SingleAssignmentAsyncDisposable();
  644. async ValueTask Expire()
  645. {
  646. if (key == null)
  647. {
  648. var oldNullGroup = default(IAsyncSubject<TElement>);
  649. lock (nullGate)
  650. {
  651. oldNullGroup = nullGroup;
  652. nullGroup = null;
  653. }
  654. if (oldNullGroup != null)
  655. {
  656. await oldNullGroup.OnCompletedAsync().ConfigureAwait(false);
  657. }
  658. }
  659. else
  660. {
  661. if (groups.TryRemove(key, out var oldGroup))
  662. {
  663. await oldGroup.OnCompletedAsync().ConfigureAwait(false);
  664. }
  665. }
  666. await durationSubscription.DisposeAsync().ConfigureAwait(false);
  667. await d.RemoveAsync(durationSubscription).ConfigureAwait(false);
  668. }
  669. var durationObserver = Create<TDuration>(
  670. y => Expire(),
  671. OnErrorAsync,
  672. Expire
  673. );
  674. await d.AddAsync(durationSubscription).ConfigureAwait(false);
  675. var durationSubscriptionInner = await duration.SubscribeSafeAsync(durationObserver).ConfigureAwait(false);
  676. await durationSubscription.AssignAsync(durationSubscriptionInner).ConfigureAwait(false);
  677. }
  678. var element = default(TElement);
  679. try
  680. {
  681. element = await elementSelector(x).ConfigureAwait(false);
  682. }
  683. catch (Exception ex)
  684. {
  685. await OnErrorAsync(ex).ConfigureAwait(false);
  686. return;
  687. }
  688. await group.OnNextAsync(element).ConfigureAwait(false);
  689. },
  690. OnErrorAsync,
  691. async () =>
  692. {
  693. if (nullGroup != null)
  694. {
  695. await nullGroup.OnCompletedAsync().ConfigureAwait(false);
  696. }
  697. foreach (var group in groups.Values)
  698. {
  699. await group.OnCompletedAsync().ConfigureAwait(false);
  700. }
  701. using (await gate.LockAsync().ConfigureAwait(false))
  702. {
  703. await observer.OnCompletedAsync().ConfigureAwait(false);
  704. }
  705. }
  706. ),
  707. refCount
  708. );
  709. }
  710. }
  711. }
  712. }