GroupByUntil.cs 45 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Concurrent;
  5. using System.Collections.Generic;
  6. using System.Reactive.Disposables;
  7. using System.Reactive.Subjects;
  8. using System.Threading;
  9. using System.Threading.Tasks;
  10. namespace System.Reactive.Linq
  11. {
  12. partial class AsyncObservable
  13. {
  14. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector)
  15. {
  16. if (source == null)
  17. throw new ArgumentNullException(nameof(source));
  18. if (keySelector == null)
  19. throw new ArgumentNullException(nameof(keySelector));
  20. if (durationSelector == null)
  21. throw new ArgumentNullException(nameof(durationSelector));
  22. return Create<IGroupedAsyncObservable<TKey, TSource>>(observer => GroupByUntilCore<TSource, TKey, TSource, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, keySelector, durationSelector)));
  23. }
  24. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
  25. {
  26. if (source == null)
  27. throw new ArgumentNullException(nameof(source));
  28. if (keySelector == null)
  29. throw new ArgumentNullException(nameof(keySelector));
  30. if (durationSelector == null)
  31. throw new ArgumentNullException(nameof(durationSelector));
  32. if (comparer == null)
  33. throw new ArgumentNullException(nameof(comparer));
  34. return Create<IGroupedAsyncObservable<TKey, TSource>>(observer => GroupByUntilCore<TSource, TKey, TSource, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, keySelector, durationSelector, comparer)));
  35. }
  36. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector, int capacity)
  37. {
  38. if (source == null)
  39. throw new ArgumentNullException(nameof(source));
  40. if (keySelector == null)
  41. throw new ArgumentNullException(nameof(keySelector));
  42. if (durationSelector == null)
  43. throw new ArgumentNullException(nameof(durationSelector));
  44. if (capacity < 0)
  45. throw new ArgumentOutOfRangeException(nameof(capacity));
  46. return Create<IGroupedAsyncObservable<TKey, TSource>>(observer => GroupByUntilCore<TSource, TKey, TSource, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, keySelector, durationSelector, capacity)));
  47. }
  48. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer)
  49. {
  50. if (source == null)
  51. throw new ArgumentNullException(nameof(source));
  52. if (keySelector == null)
  53. throw new ArgumentNullException(nameof(keySelector));
  54. if (durationSelector == null)
  55. throw new ArgumentNullException(nameof(durationSelector));
  56. if (capacity < 0)
  57. throw new ArgumentOutOfRangeException(nameof(capacity));
  58. if (comparer == null)
  59. throw new ArgumentNullException(nameof(comparer));
  60. return Create<IGroupedAsyncObservable<TKey, TSource>>(observer => GroupByUntilCore<TSource, TKey, TSource, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, keySelector, durationSelector, capacity, comparer)));
  61. }
  62. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector)
  63. {
  64. if (source == null)
  65. throw new ArgumentNullException(nameof(source));
  66. if (keySelector == null)
  67. throw new ArgumentNullException(nameof(keySelector));
  68. if (elementSelector == null)
  69. throw new ArgumentNullException(nameof(elementSelector));
  70. if (durationSelector == null)
  71. throw new ArgumentNullException(nameof(durationSelector));
  72. return Create<IGroupedAsyncObservable<TKey, TElement>>(observer => GroupByUntilCore<TSource, TKey, TElement, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, keySelector, elementSelector, durationSelector)));
  73. }
  74. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
  75. {
  76. if (source == null)
  77. throw new ArgumentNullException(nameof(source));
  78. if (keySelector == null)
  79. throw new ArgumentNullException(nameof(keySelector));
  80. if (elementSelector == null)
  81. throw new ArgumentNullException(nameof(elementSelector));
  82. if (durationSelector == null)
  83. throw new ArgumentNullException(nameof(durationSelector));
  84. if (comparer == null)
  85. throw new ArgumentNullException(nameof(comparer));
  86. return Create<IGroupedAsyncObservable<TKey, TElement>>(observer => GroupByUntilCore<TSource, TKey, TElement, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, keySelector, elementSelector, durationSelector, comparer)));
  87. }
  88. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector, int capacity)
  89. {
  90. if (source == null)
  91. throw new ArgumentNullException(nameof(source));
  92. if (keySelector == null)
  93. throw new ArgumentNullException(nameof(keySelector));
  94. if (elementSelector == null)
  95. throw new ArgumentNullException(nameof(elementSelector));
  96. if (durationSelector == null)
  97. throw new ArgumentNullException(nameof(durationSelector));
  98. if (capacity < 0)
  99. throw new ArgumentOutOfRangeException(nameof(capacity));
  100. return Create<IGroupedAsyncObservable<TKey, TElement>>(observer => GroupByUntilCore<TSource, TKey, TElement, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, keySelector, elementSelector, durationSelector, capacity)));
  101. }
  102. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer)
  103. {
  104. if (source == null)
  105. throw new ArgumentNullException(nameof(source));
  106. if (keySelector == null)
  107. throw new ArgumentNullException(nameof(keySelector));
  108. if (elementSelector == null)
  109. throw new ArgumentNullException(nameof(elementSelector));
  110. if (durationSelector == null)
  111. throw new ArgumentNullException(nameof(durationSelector));
  112. if (capacity < 0)
  113. throw new ArgumentOutOfRangeException(nameof(capacity));
  114. if (comparer == null)
  115. throw new ArgumentNullException(nameof(comparer));
  116. return Create<IGroupedAsyncObservable<TKey, TElement>>(observer => GroupByUntilCore<TSource, TKey, TElement, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, keySelector, elementSelector, durationSelector, capacity, comparer)));
  117. }
  118. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, Task<TKey>> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector)
  119. {
  120. if (source == null)
  121. throw new ArgumentNullException(nameof(source));
  122. if (keySelector == null)
  123. throw new ArgumentNullException(nameof(keySelector));
  124. if (durationSelector == null)
  125. throw new ArgumentNullException(nameof(durationSelector));
  126. return Create<IGroupedAsyncObservable<TKey, TSource>>(observer => GroupByUntilCore<TSource, TKey, TSource, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, keySelector, durationSelector)));
  127. }
  128. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, Task<TKey>> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
  129. {
  130. if (source == null)
  131. throw new ArgumentNullException(nameof(source));
  132. if (keySelector == null)
  133. throw new ArgumentNullException(nameof(keySelector));
  134. if (durationSelector == null)
  135. throw new ArgumentNullException(nameof(durationSelector));
  136. if (comparer == null)
  137. throw new ArgumentNullException(nameof(comparer));
  138. return Create<IGroupedAsyncObservable<TKey, TSource>>(observer => GroupByUntilCore<TSource, TKey, TSource, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, keySelector, durationSelector, comparer)));
  139. }
  140. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, Task<TKey>> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector, int capacity)
  141. {
  142. if (source == null)
  143. throw new ArgumentNullException(nameof(source));
  144. if (keySelector == null)
  145. throw new ArgumentNullException(nameof(keySelector));
  146. if (durationSelector == null)
  147. throw new ArgumentNullException(nameof(durationSelector));
  148. if (capacity < 0)
  149. throw new ArgumentOutOfRangeException(nameof(capacity));
  150. return Create<IGroupedAsyncObservable<TKey, TSource>>(observer => GroupByUntilCore<TSource, TKey, TSource, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, keySelector, durationSelector, capacity)));
  151. }
  152. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, Task<TKey>> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer)
  153. {
  154. if (source == null)
  155. throw new ArgumentNullException(nameof(source));
  156. if (keySelector == null)
  157. throw new ArgumentNullException(nameof(keySelector));
  158. if (durationSelector == null)
  159. throw new ArgumentNullException(nameof(durationSelector));
  160. if (capacity < 0)
  161. throw new ArgumentOutOfRangeException(nameof(capacity));
  162. if (comparer == null)
  163. throw new ArgumentNullException(nameof(comparer));
  164. return Create<IGroupedAsyncObservable<TKey, TSource>>(observer => GroupByUntilCore<TSource, TKey, TSource, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, keySelector, durationSelector, capacity, comparer)));
  165. }
  166. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, Task<TKey>> keySelector, Func<TSource, Task<TElement>> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector)
  167. {
  168. if (source == null)
  169. throw new ArgumentNullException(nameof(source));
  170. if (keySelector == null)
  171. throw new ArgumentNullException(nameof(keySelector));
  172. if (elementSelector == null)
  173. throw new ArgumentNullException(nameof(elementSelector));
  174. if (durationSelector == null)
  175. throw new ArgumentNullException(nameof(durationSelector));
  176. return Create<IGroupedAsyncObservable<TKey, TElement>>(observer => GroupByUntilCore<TSource, TKey, TElement, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, keySelector, elementSelector, durationSelector)));
  177. }
  178. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, Task<TKey>> keySelector, Func<TSource, Task<TElement>> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
  179. {
  180. if (source == null)
  181. throw new ArgumentNullException(nameof(source));
  182. if (keySelector == null)
  183. throw new ArgumentNullException(nameof(keySelector));
  184. if (elementSelector == null)
  185. throw new ArgumentNullException(nameof(elementSelector));
  186. if (durationSelector == null)
  187. throw new ArgumentNullException(nameof(durationSelector));
  188. if (comparer == null)
  189. throw new ArgumentNullException(nameof(comparer));
  190. return Create<IGroupedAsyncObservable<TKey, TElement>>(observer => GroupByUntilCore<TSource, TKey, TElement, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, keySelector, elementSelector, durationSelector, comparer)));
  191. }
  192. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, Task<TKey>> keySelector, Func<TSource, Task<TElement>> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector, int capacity)
  193. {
  194. if (source == null)
  195. throw new ArgumentNullException(nameof(source));
  196. if (keySelector == null)
  197. throw new ArgumentNullException(nameof(keySelector));
  198. if (elementSelector == null)
  199. throw new ArgumentNullException(nameof(elementSelector));
  200. if (durationSelector == null)
  201. throw new ArgumentNullException(nameof(durationSelector));
  202. if (capacity < 0)
  203. throw new ArgumentOutOfRangeException(nameof(capacity));
  204. return Create<IGroupedAsyncObservable<TKey, TElement>>(observer => GroupByUntilCore<TSource, TKey, TElement, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, keySelector, elementSelector, durationSelector, capacity)));
  205. }
  206. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, Task<TKey>> keySelector, Func<TSource, Task<TElement>> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer)
  207. {
  208. if (source == null)
  209. throw new ArgumentNullException(nameof(source));
  210. if (keySelector == null)
  211. throw new ArgumentNullException(nameof(keySelector));
  212. if (elementSelector == null)
  213. throw new ArgumentNullException(nameof(elementSelector));
  214. if (durationSelector == null)
  215. throw new ArgumentNullException(nameof(durationSelector));
  216. if (capacity < 0)
  217. throw new ArgumentOutOfRangeException(nameof(capacity));
  218. if (comparer == null)
  219. throw new ArgumentNullException(nameof(comparer));
  220. return Create<IGroupedAsyncObservable<TKey, TElement>>(observer => GroupByUntilCore<TSource, TKey, TElement, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, keySelector, elementSelector, durationSelector, capacity, comparer)));
  221. }
  222. private static async Task<IAsyncDisposable> GroupByUntilCore<TSource, TKey, TElement, TDuration>(IAsyncObservable<TSource> source, IAsyncObserver<IGroupedAsyncObservable<TKey, TElement>> observer, Func<IAsyncObserver<IGroupedAsyncObservable<TKey, TElement>>, IAsyncDisposable, Task<(IAsyncObserver<TSource>, IAsyncDisposable)>> createObserver)
  223. {
  224. var d = new SingleAssignmentAsyncDisposable();
  225. var (sink, subscription) = await createObserver(observer, d).ConfigureAwait(false);
  226. var inner = await source.SubscribeSafeAsync(sink).ConfigureAwait(false);
  227. await d.AssignAsync(inner).ConfigureAwait(false);
  228. return subscription;
  229. }
  230. }
  231. partial class AsyncObserver
  232. {
  233. public static Task<(IAsyncObserver<TSource>, IAsyncDisposable)> GroupByUntil<TSource, TKey, TDuration>(IAsyncObserver<IGroupedAsyncObservable<TKey, TSource>> observer, IAsyncDisposable subscription, Func<TSource, TKey> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector)
  234. {
  235. if (observer == null)
  236. throw new ArgumentNullException(nameof(observer));
  237. if (subscription == null)
  238. throw new ArgumentNullException(nameof(subscription));
  239. if (keySelector == null)
  240. throw new ArgumentNullException(nameof(keySelector));
  241. if (durationSelector == null)
  242. throw new ArgumentNullException(nameof(durationSelector));
  243. return GroupByUntil(observer, subscription, keySelector, durationSelector, int.MaxValue, EqualityComparer<TKey>.Default);
  244. }
  245. public static Task<(IAsyncObserver<TSource>, IAsyncDisposable)> GroupByUntil<TSource, TKey, TDuration>(IAsyncObserver<IGroupedAsyncObservable<TKey, TSource>> observer, IAsyncDisposable subscription, Func<TSource, TKey> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
  246. {
  247. if (observer == null)
  248. throw new ArgumentNullException(nameof(observer));
  249. if (subscription == null)
  250. throw new ArgumentNullException(nameof(subscription));
  251. if (keySelector == null)
  252. throw new ArgumentNullException(nameof(keySelector));
  253. if (durationSelector == null)
  254. throw new ArgumentNullException(nameof(durationSelector));
  255. if (comparer == null)
  256. throw new ArgumentNullException(nameof(comparer));
  257. return GroupByUntil(observer, subscription, keySelector, durationSelector, int.MaxValue, comparer);
  258. }
  259. public static Task<(IAsyncObserver<TSource>, IAsyncDisposable)> GroupByUntil<TSource, TKey, TDuration>(IAsyncObserver<IGroupedAsyncObservable<TKey, TSource>> observer, IAsyncDisposable subscription, Func<TSource, TKey> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector, int capacity)
  260. {
  261. if (observer == null)
  262. throw new ArgumentNullException(nameof(observer));
  263. if (subscription == null)
  264. throw new ArgumentNullException(nameof(subscription));
  265. if (keySelector == null)
  266. throw new ArgumentNullException(nameof(keySelector));
  267. if (durationSelector == null)
  268. throw new ArgumentNullException(nameof(durationSelector));
  269. if (capacity < 0)
  270. throw new ArgumentOutOfRangeException(nameof(capacity));
  271. return GroupByUntil(observer, subscription, keySelector, durationSelector, capacity, EqualityComparer<TKey>.Default);
  272. }
  273. public static Task<(IAsyncObserver<TSource>, IAsyncDisposable)> GroupByUntil<TSource, TKey, TDuration>(IAsyncObserver<IGroupedAsyncObservable<TKey, TSource>> observer, IAsyncDisposable subscription, Func<TSource, TKey> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer)
  274. {
  275. if (observer == null)
  276. throw new ArgumentNullException(nameof(observer));
  277. if (subscription == null)
  278. throw new ArgumentNullException(nameof(subscription));
  279. if (keySelector == null)
  280. throw new ArgumentNullException(nameof(keySelector));
  281. if (durationSelector == null)
  282. throw new ArgumentNullException(nameof(durationSelector));
  283. if (capacity < 0)
  284. throw new ArgumentOutOfRangeException(nameof(capacity));
  285. if (comparer == null)
  286. throw new ArgumentNullException(nameof(comparer));
  287. return GroupByUntil(observer, subscription, x => Task.FromResult(keySelector(x)), durationSelector, capacity, comparer);
  288. }
  289. public static Task<(IAsyncObserver<TSource>, IAsyncDisposable)> GroupByUntil<TSource, TKey, TElement, TDuration>(IAsyncObserver<IGroupedAsyncObservable<TKey, TElement>> observer, IAsyncDisposable subscription, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector)
  290. {
  291. if (observer == null)
  292. throw new ArgumentNullException(nameof(observer));
  293. if (subscription == null)
  294. throw new ArgumentNullException(nameof(subscription));
  295. if (keySelector == null)
  296. throw new ArgumentNullException(nameof(keySelector));
  297. if (elementSelector == null)
  298. throw new ArgumentNullException(nameof(elementSelector));
  299. if (durationSelector == null)
  300. throw new ArgumentNullException(nameof(durationSelector));
  301. return GroupByUntil(observer, subscription, keySelector, elementSelector, durationSelector, int.MaxValue, EqualityComparer<TKey>.Default);
  302. }
  303. public static Task<(IAsyncObserver<TSource>, IAsyncDisposable)> GroupByUntil<TSource, TKey, TElement, TDuration>(IAsyncObserver<IGroupedAsyncObservable<TKey, TElement>> observer, IAsyncDisposable subscription, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
  304. {
  305. if (observer == null)
  306. throw new ArgumentNullException(nameof(observer));
  307. if (subscription == null)
  308. throw new ArgumentNullException(nameof(subscription));
  309. if (keySelector == null)
  310. throw new ArgumentNullException(nameof(keySelector));
  311. if (elementSelector == null)
  312. throw new ArgumentNullException(nameof(elementSelector));
  313. if (durationSelector == null)
  314. throw new ArgumentNullException(nameof(durationSelector));
  315. if (comparer == null)
  316. throw new ArgumentNullException(nameof(comparer));
  317. return GroupByUntil(observer, subscription, keySelector, elementSelector, durationSelector, int.MaxValue, comparer);
  318. }
  319. public static Task<(IAsyncObserver<TSource>, IAsyncDisposable)> GroupByUntil<TSource, TKey, TElement, TDuration>(IAsyncObserver<IGroupedAsyncObservable<TKey, TElement>> observer, IAsyncDisposable subscription, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector, int capacity)
  320. {
  321. if (observer == null)
  322. throw new ArgumentNullException(nameof(observer));
  323. if (subscription == null)
  324. throw new ArgumentNullException(nameof(subscription));
  325. if (keySelector == null)
  326. throw new ArgumentNullException(nameof(keySelector));
  327. if (elementSelector == null)
  328. throw new ArgumentNullException(nameof(elementSelector));
  329. if (durationSelector == null)
  330. throw new ArgumentNullException(nameof(durationSelector));
  331. if (capacity < 0)
  332. throw new ArgumentOutOfRangeException(nameof(capacity));
  333. return GroupByUntil(observer, subscription, keySelector, elementSelector, durationSelector, capacity, EqualityComparer<TKey>.Default);
  334. }
  335. public static Task<(IAsyncObserver<TSource>, IAsyncDisposable)> GroupByUntil<TSource, TKey, TElement, TDuration>(IAsyncObserver<IGroupedAsyncObservable<TKey, TElement>> observer, IAsyncDisposable subscription, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer)
  336. {
  337. if (observer == null)
  338. throw new ArgumentNullException(nameof(observer));
  339. if (subscription == null)
  340. throw new ArgumentNullException(nameof(subscription));
  341. if (keySelector == null)
  342. throw new ArgumentNullException(nameof(keySelector));
  343. if (elementSelector == null)
  344. throw new ArgumentNullException(nameof(elementSelector));
  345. if (durationSelector == null)
  346. throw new ArgumentNullException(nameof(durationSelector));
  347. if (capacity < 0)
  348. throw new ArgumentOutOfRangeException(nameof(capacity));
  349. if (comparer == null)
  350. throw new ArgumentNullException(nameof(comparer));
  351. return GroupByUntil<TSource, TKey, TElement, TDuration>(observer, subscription, x => Task.FromResult(keySelector(x)), x => Task.FromResult(elementSelector(x)), durationSelector, capacity, comparer);
  352. }
  353. public static Task<(IAsyncObserver<TSource>, IAsyncDisposable)> GroupByUntil<TSource, TKey, TDuration>(IAsyncObserver<IGroupedAsyncObservable<TKey, TSource>> observer, IAsyncDisposable subscription, Func<TSource, Task<TKey>> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector)
  354. {
  355. if (observer == null)
  356. throw new ArgumentNullException(nameof(observer));
  357. if (subscription == null)
  358. throw new ArgumentNullException(nameof(subscription));
  359. if (keySelector == null)
  360. throw new ArgumentNullException(nameof(keySelector));
  361. if (durationSelector == null)
  362. throw new ArgumentNullException(nameof(durationSelector));
  363. return GroupByUntil(observer, subscription, keySelector, durationSelector, int.MaxValue, EqualityComparer<TKey>.Default);
  364. }
  365. public static Task<(IAsyncObserver<TSource>, IAsyncDisposable)> GroupByUntil<TSource, TKey, TDuration>(IAsyncObserver<IGroupedAsyncObservable<TKey, TSource>> observer, IAsyncDisposable subscription, Func<TSource, Task<TKey>> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
  366. {
  367. if (observer == null)
  368. throw new ArgumentNullException(nameof(observer));
  369. if (subscription == null)
  370. throw new ArgumentNullException(nameof(subscription));
  371. if (keySelector == null)
  372. throw new ArgumentNullException(nameof(keySelector));
  373. if (durationSelector == null)
  374. throw new ArgumentNullException(nameof(durationSelector));
  375. if (comparer == null)
  376. throw new ArgumentNullException(nameof(comparer));
  377. return GroupByUntil(observer, subscription, keySelector, durationSelector, int.MaxValue, comparer);
  378. }
  379. public static Task<(IAsyncObserver<TSource>, IAsyncDisposable)> GroupByUntil<TSource, TKey, TDuration>(IAsyncObserver<IGroupedAsyncObservable<TKey, TSource>> observer, IAsyncDisposable subscription, Func<TSource, Task<TKey>> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector, int capacity)
  380. {
  381. if (observer == null)
  382. throw new ArgumentNullException(nameof(observer));
  383. if (subscription == null)
  384. throw new ArgumentNullException(nameof(subscription));
  385. if (keySelector == null)
  386. throw new ArgumentNullException(nameof(keySelector));
  387. if (durationSelector == null)
  388. throw new ArgumentNullException(nameof(durationSelector));
  389. if (capacity < 0)
  390. throw new ArgumentOutOfRangeException(nameof(capacity));
  391. return GroupByUntil(observer, subscription, keySelector, durationSelector, capacity, EqualityComparer<TKey>.Default);
  392. }
  393. public static Task<(IAsyncObserver<TSource>, IAsyncDisposable)> GroupByUntil<TSource, TKey, TDuration>(IAsyncObserver<IGroupedAsyncObservable<TKey, TSource>> observer, IAsyncDisposable subscription, Func<TSource, Task<TKey>> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer)
  394. {
  395. if (observer == null)
  396. throw new ArgumentNullException(nameof(observer));
  397. if (subscription == null)
  398. throw new ArgumentNullException(nameof(subscription));
  399. if (keySelector == null)
  400. throw new ArgumentNullException(nameof(keySelector));
  401. if (durationSelector == null)
  402. throw new ArgumentNullException(nameof(durationSelector));
  403. if (capacity < 0)
  404. throw new ArgumentOutOfRangeException(nameof(capacity));
  405. if (comparer == null)
  406. throw new ArgumentNullException(nameof(comparer));
  407. return GroupByUntil(observer, subscription, keySelector, x => Task.FromResult(x), durationSelector, capacity, comparer);
  408. }
  409. public static Task<(IAsyncObserver<TSource>, IAsyncDisposable)> GroupByUntil<TSource, TKey, TElement, TDuration>(IAsyncObserver<IGroupedAsyncObservable<TKey, TElement>> observer, IAsyncDisposable subscription, Func<TSource, Task<TKey>> keySelector, Func<TSource, Task<TElement>> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector)
  410. {
  411. if (observer == null)
  412. throw new ArgumentNullException(nameof(observer));
  413. if (subscription == null)
  414. throw new ArgumentNullException(nameof(subscription));
  415. if (keySelector == null)
  416. throw new ArgumentNullException(nameof(keySelector));
  417. if (elementSelector == null)
  418. throw new ArgumentNullException(nameof(elementSelector));
  419. if (durationSelector == null)
  420. throw new ArgumentNullException(nameof(durationSelector));
  421. return GroupByUntil(observer, subscription, keySelector, elementSelector, durationSelector, int.MaxValue, EqualityComparer<TKey>.Default);
  422. }
  423. public static Task<(IAsyncObserver<TSource>, IAsyncDisposable)> GroupByUntil<TSource, TKey, TElement, TDuration>(IAsyncObserver<IGroupedAsyncObservable<TKey, TElement>> observer, IAsyncDisposable subscription, Func<TSource, Task<TKey>> keySelector, Func<TSource, Task<TElement>> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
  424. {
  425. if (observer == null)
  426. throw new ArgumentNullException(nameof(observer));
  427. if (subscription == null)
  428. throw new ArgumentNullException(nameof(subscription));
  429. if (keySelector == null)
  430. throw new ArgumentNullException(nameof(keySelector));
  431. if (elementSelector == null)
  432. throw new ArgumentNullException(nameof(elementSelector));
  433. if (durationSelector == null)
  434. throw new ArgumentNullException(nameof(durationSelector));
  435. if (comparer == null)
  436. throw new ArgumentNullException(nameof(comparer));
  437. return GroupByUntil(observer, subscription, keySelector, elementSelector, durationSelector, int.MaxValue, comparer);
  438. }
  439. public static Task<(IAsyncObserver<TSource>, IAsyncDisposable)> GroupByUntil<TSource, TKey, TElement, TDuration>(IAsyncObserver<IGroupedAsyncObservable<TKey, TElement>> observer, IAsyncDisposable subscription, Func<TSource, Task<TKey>> keySelector, Func<TSource, Task<TElement>> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector, int capacity)
  440. {
  441. if (observer == null)
  442. throw new ArgumentNullException(nameof(observer));
  443. if (subscription == null)
  444. throw new ArgumentNullException(nameof(subscription));
  445. if (keySelector == null)
  446. throw new ArgumentNullException(nameof(keySelector));
  447. if (elementSelector == null)
  448. throw new ArgumentNullException(nameof(elementSelector));
  449. if (durationSelector == null)
  450. throw new ArgumentNullException(nameof(durationSelector));
  451. if (capacity < 0)
  452. throw new ArgumentOutOfRangeException(nameof(capacity));
  453. return GroupByUntil(observer, subscription, keySelector, elementSelector, durationSelector, capacity, EqualityComparer<TKey>.Default);
  454. }
  455. public static Task<(IAsyncObserver<TSource>, IAsyncDisposable)> GroupByUntil<TSource, TKey, TElement, TDuration>(IAsyncObserver<IGroupedAsyncObservable<TKey, TElement>> observer, IAsyncDisposable subscription, Func<TSource, Task<TKey>> keySelector, Func<TSource, Task<TElement>> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer)
  456. {
  457. if (observer == null)
  458. throw new ArgumentNullException(nameof(observer));
  459. if (subscription == null)
  460. throw new ArgumentNullException(nameof(subscription));
  461. if (keySelector == null)
  462. throw new ArgumentNullException(nameof(keySelector));
  463. if (elementSelector == null)
  464. throw new ArgumentNullException(nameof(elementSelector));
  465. if (durationSelector == null)
  466. throw new ArgumentNullException(nameof(durationSelector));
  467. if (capacity < 0)
  468. throw new ArgumentOutOfRangeException(nameof(capacity));
  469. if (comparer == null)
  470. throw new ArgumentNullException(nameof(comparer));
  471. return CoreAsync();
  472. // REVIEW: Concurrent execution of a duration callback and an event could lead to an OnNext call being queued in an AsyncLockObserver
  473. // after a duration callback makes an OnCompleted call. This seems to be the case in sync Rx as well.
  474. async Task<(IAsyncObserver<TSource>, IAsyncDisposable)> CoreAsync()
  475. {
  476. var d = new CompositeAsyncDisposable();
  477. await d.AddAsync(subscription).ConfigureAwait(false);
  478. var refCount = new RefCountAsyncDisposable(d);
  479. var groups = default(ConcurrentDictionary<TKey, IAsyncSubject<TElement>>);
  480. if (capacity == int.MaxValue)
  481. {
  482. groups = new ConcurrentDictionary<TKey, IAsyncSubject<TElement>>(comparer);
  483. }
  484. else
  485. {
  486. groups = new ConcurrentDictionary<TKey, IAsyncSubject<TElement>>(Environment.ProcessorCount * 4, capacity, comparer);
  487. }
  488. var gate = new AsyncLock();
  489. var nullGate = new object();
  490. var nullGroup = default(IAsyncSubject<TElement>);
  491. async Task OnErrorAsync(Exception ex)
  492. {
  493. var nullGroupLocal = default(IAsyncSubject<TElement>);
  494. lock (nullGate)
  495. {
  496. nullGroupLocal = nullGroup;
  497. }
  498. if (nullGroupLocal != null)
  499. {
  500. await nullGroupLocal.OnErrorAsync(ex).ConfigureAwait(false);
  501. }
  502. foreach (var group in groups.Values)
  503. {
  504. await group.OnErrorAsync(ex).ConfigureAwait(false);
  505. }
  506. using (await gate.LockAsync().ConfigureAwait(false))
  507. {
  508. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  509. }
  510. }
  511. return
  512. (
  513. Create<TSource>
  514. (
  515. async x =>
  516. {
  517. var key = default(TKey);
  518. try
  519. {
  520. key = await keySelector(x).ConfigureAwait(false);
  521. }
  522. catch (Exception ex)
  523. {
  524. await OnErrorAsync(ex).ConfigureAwait(false);
  525. return;
  526. }
  527. var shouldEmit = false;
  528. var group = default(IAsyncSubject<TElement>);
  529. if (key == null)
  530. {
  531. lock (nullGate)
  532. {
  533. if (nullGroup == null)
  534. {
  535. var subject = new SequentialSimpleAsyncSubject<TElement>();
  536. nullGroup = AsyncSubject.Create(new AsyncQueueLockAsyncObserver<TElement>(subject), subject);
  537. shouldEmit = true;
  538. }
  539. }
  540. group = nullGroup;
  541. }
  542. else
  543. {
  544. try
  545. {
  546. if (!groups.TryGetValue(key, out group))
  547. {
  548. var subject = new SequentialSimpleAsyncSubject<TElement>();
  549. group = AsyncSubject.Create(new AsyncQueueLockAsyncObserver<TElement>(subject), subject);
  550. if (groups.TryAdd(key, group))
  551. {
  552. shouldEmit = true;
  553. }
  554. }
  555. }
  556. catch (Exception ex)
  557. {
  558. await OnErrorAsync(ex).ConfigureAwait(false);
  559. return;
  560. }
  561. }
  562. if (shouldEmit)
  563. {
  564. var g = new GroupedAsyncObservable<TKey, TElement>(key, group, refCount);
  565. var duration = default(IAsyncObservable<TDuration>);
  566. try
  567. {
  568. duration = durationSelector(g);
  569. }
  570. catch (Exception ex)
  571. {
  572. await OnErrorAsync(ex).ConfigureAwait(false);
  573. return;
  574. }
  575. using (await gate.LockAsync().ConfigureAwait(false))
  576. {
  577. await observer.OnNextAsync(g).ConfigureAwait(false);
  578. }
  579. var durationSubscription = new SingleAssignmentAsyncDisposable();
  580. async Task Expire()
  581. {
  582. if (key == null)
  583. {
  584. var oldNullGroup = default(IAsyncSubject<TElement>);
  585. lock (nullGate)
  586. {
  587. oldNullGroup = nullGroup;
  588. nullGroup = null;
  589. }
  590. if (oldNullGroup != null)
  591. {
  592. await oldNullGroup.OnCompletedAsync().ConfigureAwait(false);
  593. }
  594. }
  595. else
  596. {
  597. if (groups.TryRemove(key, out var oldGroup))
  598. {
  599. await oldGroup.OnCompletedAsync().ConfigureAwait(false);
  600. }
  601. }
  602. await durationSubscription.DisposeAsync().ConfigureAwait(false);
  603. await d.RemoveAsync(durationSubscription).ConfigureAwait(false);
  604. }
  605. var durationObserver = Create<TDuration>(
  606. y => Expire(),
  607. OnErrorAsync,
  608. Expire
  609. );
  610. await d.AddAsync(durationSubscription).ConfigureAwait(false);
  611. var durationSubscriptionInner = await duration.SubscribeSafeAsync(durationObserver).ConfigureAwait(false);
  612. await durationSubscription.AssignAsync(durationSubscriptionInner).ConfigureAwait(false);
  613. }
  614. var element = default(TElement);
  615. try
  616. {
  617. element = await elementSelector(x).ConfigureAwait(false);
  618. }
  619. catch (Exception ex)
  620. {
  621. await OnErrorAsync(ex).ConfigureAwait(false);
  622. return;
  623. }
  624. await group.OnNextAsync(element).ConfigureAwait(false);
  625. },
  626. OnErrorAsync,
  627. async () =>
  628. {
  629. if (nullGroup != null)
  630. {
  631. await nullGroup.OnCompletedAsync().ConfigureAwait(false);
  632. }
  633. foreach (var group in groups.Values)
  634. {
  635. await group.OnCompletedAsync().ConfigureAwait(false);
  636. }
  637. using (await gate.LockAsync().ConfigureAwait(false))
  638. {
  639. await observer.OnCompletedAsync().ConfigureAwait(false);
  640. }
  641. }
  642. ),
  643. refCount
  644. );
  645. }
  646. }
  647. }
  648. }