1
0

GroupByUntil.cs 50 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Concurrent;
  5. using System.Collections.Generic;
  6. using System.Reactive.Disposables;
  7. using System.Reactive.Subjects;
  8. using System.Threading;
  9. using System.Threading.Tasks;
  10. namespace System.Reactive.Linq
  11. {
  12. public partial class AsyncObservable
  13. {
  14. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector)
  15. {
  16. if (source == null)
  17. throw new ArgumentNullException(nameof(source));
  18. if (keySelector == null)
  19. throw new ArgumentNullException(nameof(keySelector));
  20. if (durationSelector == null)
  21. throw new ArgumentNullException(nameof(durationSelector));
  22. return CreateAsyncObservable<IGroupedAsyncObservable<TKey, TSource>>.From(
  23. source,
  24. (keySelector, durationSelector),
  25. static (source, state, observer) => GroupByUntilCore<TSource, TKey, TSource, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.durationSelector)));
  26. }
  27. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
  28. {
  29. if (source == null)
  30. throw new ArgumentNullException(nameof(source));
  31. if (keySelector == null)
  32. throw new ArgumentNullException(nameof(keySelector));
  33. if (durationSelector == null)
  34. throw new ArgumentNullException(nameof(durationSelector));
  35. if (comparer == null)
  36. throw new ArgumentNullException(nameof(comparer));
  37. return CreateAsyncObservable<IGroupedAsyncObservable<TKey, TSource>>.From(
  38. source,
  39. (keySelector, durationSelector, comparer),
  40. static (source, state, observer) => GroupByUntilCore<TSource, TKey, TSource, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.durationSelector, state.comparer)));
  41. }
  42. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector, int capacity)
  43. {
  44. if (source == null)
  45. throw new ArgumentNullException(nameof(source));
  46. if (keySelector == null)
  47. throw new ArgumentNullException(nameof(keySelector));
  48. if (durationSelector == null)
  49. throw new ArgumentNullException(nameof(durationSelector));
  50. if (capacity < 0)
  51. throw new ArgumentOutOfRangeException(nameof(capacity));
  52. return CreateAsyncObservable<IGroupedAsyncObservable<TKey, TSource>>.From(
  53. source,
  54. (keySelector, durationSelector, capacity),
  55. static (source, state, observer) => GroupByUntilCore<TSource, TKey, TSource, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.durationSelector, state.capacity)));
  56. }
  57. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer)
  58. {
  59. if (source == null)
  60. throw new ArgumentNullException(nameof(source));
  61. if (keySelector == null)
  62. throw new ArgumentNullException(nameof(keySelector));
  63. if (durationSelector == null)
  64. throw new ArgumentNullException(nameof(durationSelector));
  65. if (capacity < 0)
  66. throw new ArgumentOutOfRangeException(nameof(capacity));
  67. if (comparer == null)
  68. throw new ArgumentNullException(nameof(comparer));
  69. return CreateAsyncObservable<IGroupedAsyncObservable<TKey, TSource>>.From(
  70. source,
  71. (keySelector, durationSelector, capacity, comparer),
  72. static (source, state, observer) => GroupByUntilCore<TSource, TKey, TSource, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.durationSelector, state.capacity, state.comparer)));
  73. }
  74. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector)
  75. {
  76. if (source == null)
  77. throw new ArgumentNullException(nameof(source));
  78. if (keySelector == null)
  79. throw new ArgumentNullException(nameof(keySelector));
  80. if (elementSelector == null)
  81. throw new ArgumentNullException(nameof(elementSelector));
  82. if (durationSelector == null)
  83. throw new ArgumentNullException(nameof(durationSelector));
  84. return CreateAsyncObservable<IGroupedAsyncObservable<TKey, TElement>>.From(
  85. source,
  86. (keySelector, elementSelector, durationSelector),
  87. static (source, state, observer) => GroupByUntilCore<TSource, TKey, TElement, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.elementSelector, state.durationSelector)));
  88. }
  89. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
  90. {
  91. if (source == null)
  92. throw new ArgumentNullException(nameof(source));
  93. if (keySelector == null)
  94. throw new ArgumentNullException(nameof(keySelector));
  95. if (elementSelector == null)
  96. throw new ArgumentNullException(nameof(elementSelector));
  97. if (durationSelector == null)
  98. throw new ArgumentNullException(nameof(durationSelector));
  99. if (comparer == null)
  100. throw new ArgumentNullException(nameof(comparer));
  101. return CreateAsyncObservable<IGroupedAsyncObservable<TKey, TElement>>.From(
  102. source,
  103. (keySelector, elementSelector, durationSelector, comparer),
  104. static (source, state, observer) => GroupByUntilCore<TSource, TKey, TElement, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.elementSelector, state.durationSelector, state.comparer)));
  105. }
  106. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector, int capacity)
  107. {
  108. if (source == null)
  109. throw new ArgumentNullException(nameof(source));
  110. if (keySelector == null)
  111. throw new ArgumentNullException(nameof(keySelector));
  112. if (elementSelector == null)
  113. throw new ArgumentNullException(nameof(elementSelector));
  114. if (durationSelector == null)
  115. throw new ArgumentNullException(nameof(durationSelector));
  116. if (capacity < 0)
  117. throw new ArgumentOutOfRangeException(nameof(capacity));
  118. return CreateAsyncObservable<IGroupedAsyncObservable<TKey, TElement>>.From(
  119. source,
  120. (keySelector, elementSelector, durationSelector, capacity),
  121. static (source, state, observer) => GroupByUntilCore<TSource, TKey, TElement, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.elementSelector, state.durationSelector, state.capacity)));
  122. }
  123. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer)
  124. {
  125. if (source == null)
  126. throw new ArgumentNullException(nameof(source));
  127. if (keySelector == null)
  128. throw new ArgumentNullException(nameof(keySelector));
  129. if (elementSelector == null)
  130. throw new ArgumentNullException(nameof(elementSelector));
  131. if (durationSelector == null)
  132. throw new ArgumentNullException(nameof(durationSelector));
  133. if (capacity < 0)
  134. throw new ArgumentOutOfRangeException(nameof(capacity));
  135. if (comparer == null)
  136. throw new ArgumentNullException(nameof(comparer));
  137. return CreateAsyncObservable<IGroupedAsyncObservable<TKey, TElement>>.From(
  138. source,
  139. (keySelector, elementSelector, durationSelector, capacity, comparer),
  140. static (source, state, observer) => GroupByUntilCore<TSource, TKey, TElement, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.elementSelector, state.durationSelector, state.capacity, state.comparer)));
  141. }
  142. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector)
  143. {
  144. if (source == null)
  145. throw new ArgumentNullException(nameof(source));
  146. if (keySelector == null)
  147. throw new ArgumentNullException(nameof(keySelector));
  148. if (durationSelector == null)
  149. throw new ArgumentNullException(nameof(durationSelector));
  150. return CreateAsyncObservable<IGroupedAsyncObservable<TKey, TSource>>.From(
  151. source,
  152. (keySelector, durationSelector),
  153. static (source, state, observer) => GroupByUntilCore<TSource, TKey, TSource, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.durationSelector)));
  154. }
  155. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
  156. {
  157. if (source == null)
  158. throw new ArgumentNullException(nameof(source));
  159. if (keySelector == null)
  160. throw new ArgumentNullException(nameof(keySelector));
  161. if (durationSelector == null)
  162. throw new ArgumentNullException(nameof(durationSelector));
  163. if (comparer == null)
  164. throw new ArgumentNullException(nameof(comparer));
  165. return CreateAsyncObservable<IGroupedAsyncObservable<TKey, TSource>>.From(
  166. source,
  167. (keySelector, durationSelector, comparer),
  168. static (source, state, observer) => GroupByUntilCore<TSource, TKey, TSource, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.durationSelector, state.comparer)));
  169. }
  170. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector, int capacity)
  171. {
  172. if (source == null)
  173. throw new ArgumentNullException(nameof(source));
  174. if (keySelector == null)
  175. throw new ArgumentNullException(nameof(keySelector));
  176. if (durationSelector == null)
  177. throw new ArgumentNullException(nameof(durationSelector));
  178. if (capacity < 0)
  179. throw new ArgumentOutOfRangeException(nameof(capacity));
  180. return CreateAsyncObservable<IGroupedAsyncObservable<TKey, TSource>>.From(
  181. source,
  182. (keySelector, durationSelector, capacity),
  183. static (source, state, observer) => GroupByUntilCore<TSource, TKey, TSource, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.durationSelector, state.capacity)));
  184. }
  185. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer)
  186. {
  187. if (source == null)
  188. throw new ArgumentNullException(nameof(source));
  189. if (keySelector == null)
  190. throw new ArgumentNullException(nameof(keySelector));
  191. if (durationSelector == null)
  192. throw new ArgumentNullException(nameof(durationSelector));
  193. if (capacity < 0)
  194. throw new ArgumentOutOfRangeException(nameof(capacity));
  195. if (comparer == null)
  196. throw new ArgumentNullException(nameof(comparer));
  197. return CreateAsyncObservable<IGroupedAsyncObservable<TKey, TSource>>.From(
  198. source,
  199. (keySelector, durationSelector, capacity, comparer),
  200. static (source, state, observer) => GroupByUntilCore<TSource, TKey, TSource, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.durationSelector, state.capacity, state.comparer)));
  201. }
  202. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, Func<TSource, ValueTask<TElement>> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector)
  203. {
  204. if (source == null)
  205. throw new ArgumentNullException(nameof(source));
  206. if (keySelector == null)
  207. throw new ArgumentNullException(nameof(keySelector));
  208. if (elementSelector == null)
  209. throw new ArgumentNullException(nameof(elementSelector));
  210. if (durationSelector == null)
  211. throw new ArgumentNullException(nameof(durationSelector));
  212. return CreateAsyncObservable<IGroupedAsyncObservable<TKey, TElement>>.From(
  213. source,
  214. (keySelector, elementSelector, durationSelector),
  215. static (source, state, observer) => GroupByUntilCore<TSource, TKey, TElement, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.elementSelector, state.durationSelector)));
  216. }
  217. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, Func<TSource, ValueTask<TElement>> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
  218. {
  219. if (source == null)
  220. throw new ArgumentNullException(nameof(source));
  221. if (keySelector == null)
  222. throw new ArgumentNullException(nameof(keySelector));
  223. if (elementSelector == null)
  224. throw new ArgumentNullException(nameof(elementSelector));
  225. if (durationSelector == null)
  226. throw new ArgumentNullException(nameof(durationSelector));
  227. if (comparer == null)
  228. throw new ArgumentNullException(nameof(comparer));
  229. return CreateAsyncObservable<IGroupedAsyncObservable<TKey, TElement>>.From(
  230. source,
  231. (keySelector, elementSelector, durationSelector, comparer),
  232. static (source, state, observer) => GroupByUntilCore<TSource, TKey, TElement, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.elementSelector, state.durationSelector, state.comparer)));
  233. }
  234. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, Func<TSource, ValueTask<TElement>> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector, int capacity)
  235. {
  236. if (source == null)
  237. throw new ArgumentNullException(nameof(source));
  238. if (keySelector == null)
  239. throw new ArgumentNullException(nameof(keySelector));
  240. if (elementSelector == null)
  241. throw new ArgumentNullException(nameof(elementSelector));
  242. if (durationSelector == null)
  243. throw new ArgumentNullException(nameof(durationSelector));
  244. if (capacity < 0)
  245. throw new ArgumentOutOfRangeException(nameof(capacity));
  246. return CreateAsyncObservable<IGroupedAsyncObservable<TKey, TElement>>.From(
  247. source,
  248. (keySelector, elementSelector, durationSelector, capacity),
  249. static (source, state, observer) => GroupByUntilCore<TSource, TKey, TElement, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.elementSelector, state.durationSelector, state.capacity)));
  250. }
  251. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, Func<TSource, ValueTask<TElement>> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer)
  252. {
  253. if (source == null)
  254. throw new ArgumentNullException(nameof(source));
  255. if (keySelector == null)
  256. throw new ArgumentNullException(nameof(keySelector));
  257. if (elementSelector == null)
  258. throw new ArgumentNullException(nameof(elementSelector));
  259. if (durationSelector == null)
  260. throw new ArgumentNullException(nameof(durationSelector));
  261. if (capacity < 0)
  262. throw new ArgumentOutOfRangeException(nameof(capacity));
  263. if (comparer == null)
  264. throw new ArgumentNullException(nameof(comparer));
  265. return CreateAsyncObservable<IGroupedAsyncObservable<TKey, TElement>>.From(
  266. source,
  267. (keySelector, elementSelector, durationSelector, capacity, comparer),
  268. static (source, state, observer) => GroupByUntilCore<TSource, TKey, TElement, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.elementSelector, state.durationSelector, state.capacity, state.comparer)));
  269. }
  270. private static async ValueTask<IAsyncDisposable> GroupByUntilCore<TSource, TKey, TElement, TDuration>(IAsyncObservable<TSource> source, IAsyncObserver<IGroupedAsyncObservable<TKey, TElement>> observer, Func<IAsyncObserver<IGroupedAsyncObservable<TKey, TElement>>, IAsyncDisposable, ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)>> createObserver)
  271. {
  272. var d = new SingleAssignmentAsyncDisposable();
  273. var (sink, subscription) = await createObserver(observer, d).ConfigureAwait(false);
  274. var inner = await source.SubscribeSafeAsync(sink).ConfigureAwait(false);
  275. await d.AssignAsync(inner).ConfigureAwait(false);
  276. return subscription;
  277. }
  278. }
  279. public partial class AsyncObserver
  280. {
  281. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> GroupByUntil<TSource, TKey, TDuration>(IAsyncObserver<IGroupedAsyncObservable<TKey, TSource>> observer, IAsyncDisposable subscription, Func<TSource, TKey> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector)
  282. {
  283. if (observer == null)
  284. throw new ArgumentNullException(nameof(observer));
  285. if (subscription == null)
  286. throw new ArgumentNullException(nameof(subscription));
  287. if (keySelector == null)
  288. throw new ArgumentNullException(nameof(keySelector));
  289. if (durationSelector == null)
  290. throw new ArgumentNullException(nameof(durationSelector));
  291. return GroupByUntil(observer, subscription, keySelector, durationSelector, int.MaxValue, EqualityComparer<TKey>.Default);
  292. }
  293. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> GroupByUntil<TSource, TKey, TDuration>(IAsyncObserver<IGroupedAsyncObservable<TKey, TSource>> observer, IAsyncDisposable subscription, Func<TSource, TKey> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
  294. {
  295. if (observer == null)
  296. throw new ArgumentNullException(nameof(observer));
  297. if (subscription == null)
  298. throw new ArgumentNullException(nameof(subscription));
  299. if (keySelector == null)
  300. throw new ArgumentNullException(nameof(keySelector));
  301. if (durationSelector == null)
  302. throw new ArgumentNullException(nameof(durationSelector));
  303. if (comparer == null)
  304. throw new ArgumentNullException(nameof(comparer));
  305. return GroupByUntil(observer, subscription, keySelector, durationSelector, int.MaxValue, comparer);
  306. }
  307. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> GroupByUntil<TSource, TKey, TDuration>(IAsyncObserver<IGroupedAsyncObservable<TKey, TSource>> observer, IAsyncDisposable subscription, Func<TSource, TKey> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector, int capacity)
  308. {
  309. if (observer == null)
  310. throw new ArgumentNullException(nameof(observer));
  311. if (subscription == null)
  312. throw new ArgumentNullException(nameof(subscription));
  313. if (keySelector == null)
  314. throw new ArgumentNullException(nameof(keySelector));
  315. if (durationSelector == null)
  316. throw new ArgumentNullException(nameof(durationSelector));
  317. if (capacity < 0)
  318. throw new ArgumentOutOfRangeException(nameof(capacity));
  319. return GroupByUntil(observer, subscription, keySelector, durationSelector, capacity, EqualityComparer<TKey>.Default);
  320. }
  321. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> GroupByUntil<TSource, TKey, TDuration>(IAsyncObserver<IGroupedAsyncObservable<TKey, TSource>> observer, IAsyncDisposable subscription, Func<TSource, TKey> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer)
  322. {
  323. if (observer == null)
  324. throw new ArgumentNullException(nameof(observer));
  325. if (subscription == null)
  326. throw new ArgumentNullException(nameof(subscription));
  327. if (keySelector == null)
  328. throw new ArgumentNullException(nameof(keySelector));
  329. if (durationSelector == null)
  330. throw new ArgumentNullException(nameof(durationSelector));
  331. if (capacity < 0)
  332. throw new ArgumentOutOfRangeException(nameof(capacity));
  333. if (comparer == null)
  334. throw new ArgumentNullException(nameof(comparer));
  335. return GroupByUntil(observer, subscription, x => new ValueTask<TKey>(keySelector(x)), durationSelector, capacity, comparer);
  336. }
  337. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> GroupByUntil<TSource, TKey, TElement, TDuration>(IAsyncObserver<IGroupedAsyncObservable<TKey, TElement>> observer, IAsyncDisposable subscription, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector)
  338. {
  339. if (observer == null)
  340. throw new ArgumentNullException(nameof(observer));
  341. if (subscription == null)
  342. throw new ArgumentNullException(nameof(subscription));
  343. if (keySelector == null)
  344. throw new ArgumentNullException(nameof(keySelector));
  345. if (elementSelector == null)
  346. throw new ArgumentNullException(nameof(elementSelector));
  347. if (durationSelector == null)
  348. throw new ArgumentNullException(nameof(durationSelector));
  349. return GroupByUntil(observer, subscription, keySelector, elementSelector, durationSelector, int.MaxValue, EqualityComparer<TKey>.Default);
  350. }
  351. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> GroupByUntil<TSource, TKey, TElement, TDuration>(IAsyncObserver<IGroupedAsyncObservable<TKey, TElement>> observer, IAsyncDisposable subscription, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
  352. {
  353. if (observer == null)
  354. throw new ArgumentNullException(nameof(observer));
  355. if (subscription == null)
  356. throw new ArgumentNullException(nameof(subscription));
  357. if (keySelector == null)
  358. throw new ArgumentNullException(nameof(keySelector));
  359. if (elementSelector == null)
  360. throw new ArgumentNullException(nameof(elementSelector));
  361. if (durationSelector == null)
  362. throw new ArgumentNullException(nameof(durationSelector));
  363. if (comparer == null)
  364. throw new ArgumentNullException(nameof(comparer));
  365. return GroupByUntil(observer, subscription, keySelector, elementSelector, durationSelector, int.MaxValue, comparer);
  366. }
  367. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> GroupByUntil<TSource, TKey, TElement, TDuration>(IAsyncObserver<IGroupedAsyncObservable<TKey, TElement>> observer, IAsyncDisposable subscription, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector, int capacity)
  368. {
  369. if (observer == null)
  370. throw new ArgumentNullException(nameof(observer));
  371. if (subscription == null)
  372. throw new ArgumentNullException(nameof(subscription));
  373. if (keySelector == null)
  374. throw new ArgumentNullException(nameof(keySelector));
  375. if (elementSelector == null)
  376. throw new ArgumentNullException(nameof(elementSelector));
  377. if (durationSelector == null)
  378. throw new ArgumentNullException(nameof(durationSelector));
  379. if (capacity < 0)
  380. throw new ArgumentOutOfRangeException(nameof(capacity));
  381. return GroupByUntil(observer, subscription, keySelector, elementSelector, durationSelector, capacity, EqualityComparer<TKey>.Default);
  382. }
  383. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> GroupByUntil<TSource, TKey, TElement, TDuration>(IAsyncObserver<IGroupedAsyncObservable<TKey, TElement>> observer, IAsyncDisposable subscription, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer)
  384. {
  385. if (observer == null)
  386. throw new ArgumentNullException(nameof(observer));
  387. if (subscription == null)
  388. throw new ArgumentNullException(nameof(subscription));
  389. if (keySelector == null)
  390. throw new ArgumentNullException(nameof(keySelector));
  391. if (elementSelector == null)
  392. throw new ArgumentNullException(nameof(elementSelector));
  393. if (durationSelector == null)
  394. throw new ArgumentNullException(nameof(durationSelector));
  395. if (capacity < 0)
  396. throw new ArgumentOutOfRangeException(nameof(capacity));
  397. if (comparer == null)
  398. throw new ArgumentNullException(nameof(comparer));
  399. return GroupByUntil<TSource, TKey, TElement, TDuration>(observer, subscription, x => new ValueTask<TKey>(keySelector(x)), x => new ValueTask<TElement>(elementSelector(x)), durationSelector, capacity, comparer);
  400. }
  401. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> GroupByUntil<TSource, TKey, TDuration>(IAsyncObserver<IGroupedAsyncObservable<TKey, TSource>> observer, IAsyncDisposable subscription, Func<TSource, ValueTask<TKey>> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector)
  402. {
  403. if (observer == null)
  404. throw new ArgumentNullException(nameof(observer));
  405. if (subscription == null)
  406. throw new ArgumentNullException(nameof(subscription));
  407. if (keySelector == null)
  408. throw new ArgumentNullException(nameof(keySelector));
  409. if (durationSelector == null)
  410. throw new ArgumentNullException(nameof(durationSelector));
  411. return GroupByUntil(observer, subscription, keySelector, durationSelector, int.MaxValue, EqualityComparer<TKey>.Default);
  412. }
  413. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> GroupByUntil<TSource, TKey, TDuration>(IAsyncObserver<IGroupedAsyncObservable<TKey, TSource>> observer, IAsyncDisposable subscription, Func<TSource, ValueTask<TKey>> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
  414. {
  415. if (observer == null)
  416. throw new ArgumentNullException(nameof(observer));
  417. if (subscription == null)
  418. throw new ArgumentNullException(nameof(subscription));
  419. if (keySelector == null)
  420. throw new ArgumentNullException(nameof(keySelector));
  421. if (durationSelector == null)
  422. throw new ArgumentNullException(nameof(durationSelector));
  423. if (comparer == null)
  424. throw new ArgumentNullException(nameof(comparer));
  425. return GroupByUntil(observer, subscription, keySelector, durationSelector, int.MaxValue, comparer);
  426. }
  427. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> GroupByUntil<TSource, TKey, TDuration>(IAsyncObserver<IGroupedAsyncObservable<TKey, TSource>> observer, IAsyncDisposable subscription, Func<TSource, ValueTask<TKey>> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector, int capacity)
  428. {
  429. if (observer == null)
  430. throw new ArgumentNullException(nameof(observer));
  431. if (subscription == null)
  432. throw new ArgumentNullException(nameof(subscription));
  433. if (keySelector == null)
  434. throw new ArgumentNullException(nameof(keySelector));
  435. if (durationSelector == null)
  436. throw new ArgumentNullException(nameof(durationSelector));
  437. if (capacity < 0)
  438. throw new ArgumentOutOfRangeException(nameof(capacity));
  439. return GroupByUntil(observer, subscription, keySelector, durationSelector, capacity, EqualityComparer<TKey>.Default);
  440. }
  441. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> GroupByUntil<TSource, TKey, TDuration>(IAsyncObserver<IGroupedAsyncObservable<TKey, TSource>> observer, IAsyncDisposable subscription, Func<TSource, ValueTask<TKey>> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer)
  442. {
  443. if (observer == null)
  444. throw new ArgumentNullException(nameof(observer));
  445. if (subscription == null)
  446. throw new ArgumentNullException(nameof(subscription));
  447. if (keySelector == null)
  448. throw new ArgumentNullException(nameof(keySelector));
  449. if (durationSelector == null)
  450. throw new ArgumentNullException(nameof(durationSelector));
  451. if (capacity < 0)
  452. throw new ArgumentOutOfRangeException(nameof(capacity));
  453. if (comparer == null)
  454. throw new ArgumentNullException(nameof(comparer));
  455. return GroupByUntil(observer, subscription, keySelector, x => new ValueTask<TSource>(x), durationSelector, capacity, comparer);
  456. }
  457. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> GroupByUntil<TSource, TKey, TElement, TDuration>(IAsyncObserver<IGroupedAsyncObservable<TKey, TElement>> observer, IAsyncDisposable subscription, Func<TSource, ValueTask<TKey>> keySelector, Func<TSource, ValueTask<TElement>> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector)
  458. {
  459. if (observer == null)
  460. throw new ArgumentNullException(nameof(observer));
  461. if (subscription == null)
  462. throw new ArgumentNullException(nameof(subscription));
  463. if (keySelector == null)
  464. throw new ArgumentNullException(nameof(keySelector));
  465. if (elementSelector == null)
  466. throw new ArgumentNullException(nameof(elementSelector));
  467. if (durationSelector == null)
  468. throw new ArgumentNullException(nameof(durationSelector));
  469. return GroupByUntil(observer, subscription, keySelector, elementSelector, durationSelector, int.MaxValue, EqualityComparer<TKey>.Default);
  470. }
  471. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> GroupByUntil<TSource, TKey, TElement, TDuration>(IAsyncObserver<IGroupedAsyncObservable<TKey, TElement>> observer, IAsyncDisposable subscription, Func<TSource, ValueTask<TKey>> keySelector, Func<TSource, ValueTask<TElement>> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
  472. {
  473. if (observer == null)
  474. throw new ArgumentNullException(nameof(observer));
  475. if (subscription == null)
  476. throw new ArgumentNullException(nameof(subscription));
  477. if (keySelector == null)
  478. throw new ArgumentNullException(nameof(keySelector));
  479. if (elementSelector == null)
  480. throw new ArgumentNullException(nameof(elementSelector));
  481. if (durationSelector == null)
  482. throw new ArgumentNullException(nameof(durationSelector));
  483. if (comparer == null)
  484. throw new ArgumentNullException(nameof(comparer));
  485. return GroupByUntil(observer, subscription, keySelector, elementSelector, durationSelector, int.MaxValue, comparer);
  486. }
  487. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> GroupByUntil<TSource, TKey, TElement, TDuration>(IAsyncObserver<IGroupedAsyncObservable<TKey, TElement>> observer, IAsyncDisposable subscription, Func<TSource, ValueTask<TKey>> keySelector, Func<TSource, ValueTask<TElement>> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector, int capacity)
  488. {
  489. if (observer == null)
  490. throw new ArgumentNullException(nameof(observer));
  491. if (subscription == null)
  492. throw new ArgumentNullException(nameof(subscription));
  493. if (keySelector == null)
  494. throw new ArgumentNullException(nameof(keySelector));
  495. if (elementSelector == null)
  496. throw new ArgumentNullException(nameof(elementSelector));
  497. if (durationSelector == null)
  498. throw new ArgumentNullException(nameof(durationSelector));
  499. if (capacity < 0)
  500. throw new ArgumentOutOfRangeException(nameof(capacity));
  501. return GroupByUntil(observer, subscription, keySelector, elementSelector, durationSelector, capacity, EqualityComparer<TKey>.Default);
  502. }
  503. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> GroupByUntil<TSource, TKey, TElement, TDuration>(IAsyncObserver<IGroupedAsyncObservable<TKey, TElement>> observer, IAsyncDisposable subscription, Func<TSource, ValueTask<TKey>> keySelector, Func<TSource, ValueTask<TElement>> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer)
  504. {
  505. if (observer == null)
  506. throw new ArgumentNullException(nameof(observer));
  507. if (subscription == null)
  508. throw new ArgumentNullException(nameof(subscription));
  509. if (keySelector == null)
  510. throw new ArgumentNullException(nameof(keySelector));
  511. if (elementSelector == null)
  512. throw new ArgumentNullException(nameof(elementSelector));
  513. if (durationSelector == null)
  514. throw new ArgumentNullException(nameof(durationSelector));
  515. if (capacity < 0)
  516. throw new ArgumentOutOfRangeException(nameof(capacity));
  517. if (comparer == null)
  518. throw new ArgumentNullException(nameof(comparer));
  519. return CoreAsync();
  520. // REVIEW: Concurrent execution of a duration callback and an event could lead to an OnNext call being queued in an AsyncLockObserver
  521. // after a duration callback makes an OnCompleted call. This seems to be the case in sync Rx as well.
  522. async ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> CoreAsync()
  523. {
  524. var d = new CompositeAsyncDisposable();
  525. await d.AddAsync(subscription).ConfigureAwait(false);
  526. var refCount = new RefCountAsyncDisposable(d);
  527. var groups = default(ConcurrentDictionary<TKey, IAsyncSubject<TElement>>);
  528. if (capacity == int.MaxValue)
  529. {
  530. groups = new ConcurrentDictionary<TKey, IAsyncSubject<TElement>>(comparer);
  531. }
  532. else
  533. {
  534. groups = new ConcurrentDictionary<TKey, IAsyncSubject<TElement>>(Environment.ProcessorCount * 4, capacity, comparer);
  535. }
  536. var gate = new AsyncGate();
  537. var nullGate = new object();
  538. var nullGroup = default(IAsyncSubject<TElement>);
  539. bool observerComplete = false;
  540. async ValueTask OnErrorAsync(Exception ex)
  541. {
  542. await ErrorAndRemoveNullGroupIfPresentAsync(ex);
  543. await ErrorAndRemoveAllGroupsIfPresentAsync(ex);
  544. using (await gate.LockAsync().ConfigureAwait(false))
  545. {
  546. if (!observerComplete)
  547. {
  548. observerComplete = true;
  549. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  550. }
  551. }
  552. }
  553. return
  554. (
  555. Create<TSource>
  556. (
  557. async x =>
  558. {
  559. var key = default(TKey);
  560. try
  561. {
  562. key = await keySelector(x).ConfigureAwait(false);
  563. }
  564. catch (Exception ex)
  565. {
  566. await OnErrorAsync(ex).ConfigureAwait(false);
  567. return;
  568. }
  569. var shouldEmit = false;
  570. var group = default(IAsyncSubject<TElement>);
  571. if (key == null)
  572. {
  573. lock (nullGate)
  574. {
  575. if (nullGroup == null)
  576. {
  577. var subject = new SequentialSimpleAsyncSubject<TElement>();
  578. nullGroup = AsyncSubject.Create(new AsyncQueueLockAsyncObserver<TElement>(subject), subject);
  579. shouldEmit = true;
  580. }
  581. }
  582. group = nullGroup;
  583. }
  584. else
  585. {
  586. try
  587. {
  588. if (!groups.TryGetValue(key, out group))
  589. {
  590. var subject = new SequentialSimpleAsyncSubject<TElement>();
  591. group = AsyncSubject.Create(new AsyncQueueLockAsyncObserver<TElement>(subject), subject);
  592. if (groups.TryAdd(key, group))
  593. {
  594. shouldEmit = true;
  595. }
  596. }
  597. }
  598. catch (Exception ex)
  599. {
  600. await OnErrorAsync(ex).ConfigureAwait(false);
  601. return;
  602. }
  603. }
  604. if (shouldEmit)
  605. {
  606. var g = new GroupedAsyncObservable<TKey, TElement>(key, group, refCount);
  607. var duration = default(IAsyncObservable<TDuration>);
  608. try
  609. {
  610. duration = durationSelector(g);
  611. }
  612. catch (Exception ex)
  613. {
  614. await OnErrorAsync(ex).ConfigureAwait(false);
  615. return;
  616. }
  617. using (await gate.LockAsync().ConfigureAwait(false))
  618. {
  619. if (!observerComplete)
  620. {
  621. await observer.OnNextAsync(g).ConfigureAwait(false);
  622. }
  623. }
  624. var durationSubscription = new SingleAssignmentAsyncDisposable();
  625. async ValueTask Expire()
  626. {
  627. if (key == null)
  628. {
  629. await CompleteAndRemoveNullGroupIfPresentAsync();
  630. }
  631. else
  632. {
  633. if (groups.TryRemove(key, out var oldGroup))
  634. {
  635. await oldGroup.OnCompletedAsync().ConfigureAwait(false);
  636. }
  637. }
  638. await durationSubscription.DisposeAsync().ConfigureAwait(false);
  639. await d.RemoveAsync(durationSubscription).ConfigureAwait(false);
  640. }
  641. var durationObserver = Create<TDuration>(
  642. y => Expire(),
  643. OnErrorAsync,
  644. Expire
  645. );
  646. await d.AddAsync(durationSubscription).ConfigureAwait(false);
  647. var durationSubscriptionInner = await duration.SubscribeSafeAsync(durationObserver).ConfigureAwait(false);
  648. await durationSubscription.AssignAsync(durationSubscriptionInner).ConfigureAwait(false);
  649. }
  650. var element = default(TElement);
  651. try
  652. {
  653. element = await elementSelector(x).ConfigureAwait(false);
  654. }
  655. catch (Exception ex)
  656. {
  657. await OnErrorAsync(ex).ConfigureAwait(false);
  658. return;
  659. }
  660. await group.OnNextAsync(element).ConfigureAwait(false);
  661. },
  662. OnErrorAsync,
  663. async () =>
  664. {
  665. if (nullGroup != null)
  666. {
  667. await CompleteAndRemoveNullGroupIfPresentAsync();
  668. }
  669. await CompleteAndRemoveAllGroupsIfPresentAsync();
  670. using (await gate.LockAsync().ConfigureAwait(false))
  671. {
  672. if (!observerComplete)
  673. {
  674. observerComplete = true;
  675. await observer.OnCompletedAsync().ConfigureAwait(false);
  676. }
  677. }
  678. }
  679. ),
  680. refCount
  681. );
  682. ValueTask CompleteAndRemoveNullGroupIfPresentAsync() => CompleteOrErrorAndRemoveNullGroupIfPresentAsync(null);
  683. ValueTask ErrorAndRemoveNullGroupIfPresentAsync(Exception x) => CompleteOrErrorAndRemoveNullGroupIfPresentAsync(x);
  684. async ValueTask CompleteOrErrorAndRemoveNullGroupIfPresentAsync(Exception x)
  685. {
  686. var oldNullGroup = default(IAsyncSubject<TElement>);
  687. lock (nullGate)
  688. {
  689. oldNullGroup = nullGroup;
  690. nullGroup = null;
  691. }
  692. if (oldNullGroup != null)
  693. {
  694. if (x is null)
  695. {
  696. await oldNullGroup.OnCompletedAsync().ConfigureAwait(false);
  697. }
  698. else
  699. {
  700. await oldNullGroup.OnErrorAsync(x).ConfigureAwait(false);
  701. }
  702. }
  703. }
  704. ValueTask CompleteAndRemoveAllGroupsIfPresentAsync() => CompleteOrErrorAndRemoveAllGroupsAsync(null);
  705. ValueTask ErrorAndRemoveAllGroupsIfPresentAsync(Exception x) => CompleteOrErrorAndRemoveAllGroupsAsync(x);
  706. async ValueTask CompleteOrErrorAndRemoveAllGroupsAsync(Exception x)
  707. {
  708. foreach (var key in groups.Keys)
  709. {
  710. // The ConcurrentDictionary's Keys property is a snapshot, so
  711. // although this TryRemove should always succeed for the first
  712. // key in the dictionary (as long as our upstream observable is
  713. // obeying the rules, and not making multiple concurrent calls
  714. // to our observer) each await in this loop offers an opportunity
  715. // for one of the group duration observables to complete, which
  716. // will cause the Expire method above to run, meaning that an
  717. // entry that was present when we retrieved Keys at the start of
  718. // this loop might already have been completed and removed by the
  719. // time this loop reaches it.
  720. if (groups.TryRemove(key, out var group))
  721. {
  722. if (x is null)
  723. {
  724. await group.OnCompletedAsync().ConfigureAwait(false);
  725. }
  726. else
  727. {
  728. await group.OnErrorAsync(x).ConfigureAwait(false);
  729. }
  730. }
  731. }
  732. }
  733. }
  734. }
  735. }
  736. }