GroupBy.cs 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Reactive.Disposables;
  6. using System.Reactive.Subjects;
  7. using System.Threading.Tasks;
  8. namespace System.Reactive.Linq
  9. {
  10. public partial class AsyncObservable
  11. {
  12. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TSource>> GroupBy<TSource, TKey>(this IAsyncObservable<TSource> source, Func<TSource, TKey> keySelector)
  13. {
  14. if (source == null)
  15. throw new ArgumentNullException(nameof(source));
  16. if (keySelector == null)
  17. throw new ArgumentNullException(nameof(keySelector));
  18. return Create(
  19. source,
  20. keySelector,
  21. default(IGroupedAsyncObservable<TKey, TSource>),
  22. (source, keySelector, observer) => GroupByCore(source, observer, (o, d) => AsyncObserver.GroupBy(o, d, keySelector)));
  23. }
  24. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TSource>> GroupBy<TSource, TKey>(this IAsyncObservable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer)
  25. {
  26. if (source == null)
  27. throw new ArgumentNullException(nameof(source));
  28. if (keySelector == null)
  29. throw new ArgumentNullException(nameof(keySelector));
  30. if (comparer == null)
  31. throw new ArgumentNullException(nameof(comparer));
  32. return Create(
  33. source,
  34. (keySelector, comparer),
  35. default(IGroupedAsyncObservable<TKey, TSource>),
  36. (source, state, observer) => GroupByCore(source, observer, (o, d) => AsyncObserver.GroupBy(o, d, state.keySelector, state.comparer)));
  37. }
  38. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TSource>> GroupBy<TSource, TKey>(this IAsyncObservable<TSource> source, Func<TSource, TKey> keySelector, int capacity)
  39. {
  40. if (source == null)
  41. throw new ArgumentNullException(nameof(source));
  42. if (keySelector == null)
  43. throw new ArgumentNullException(nameof(keySelector));
  44. if (capacity < 0)
  45. throw new ArgumentOutOfRangeException(nameof(capacity));
  46. return Create(
  47. source,
  48. (keySelector, capacity),
  49. default(IGroupedAsyncObservable<TKey, TSource>),
  50. (source, state, observer) => GroupByCore(source, observer, (o, d) => AsyncObserver.GroupBy(o, d, state.keySelector, state.capacity)));
  51. }
  52. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TSource>> GroupBy<TSource, TKey>(this IAsyncObservable<TSource> source, Func<TSource, TKey> keySelector, int capacity, IEqualityComparer<TKey> comparer)
  53. {
  54. if (source == null)
  55. throw new ArgumentNullException(nameof(source));
  56. if (keySelector == null)
  57. throw new ArgumentNullException(nameof(keySelector));
  58. if (capacity < 0)
  59. throw new ArgumentOutOfRangeException(nameof(capacity));
  60. if (comparer == null)
  61. throw new ArgumentNullException(nameof(comparer));
  62. return Create(
  63. source,
  64. (keySelector, capacity, comparer),
  65. default(IGroupedAsyncObservable<TKey, TSource>),
  66. (source, state, observer) => GroupByCore(source, observer, (o, d) => AsyncObserver.GroupBy(o, d, state.keySelector, state.capacity, state.comparer)));
  67. }
  68. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(this IAsyncObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector)
  69. {
  70. if (source == null)
  71. throw new ArgumentNullException(nameof(source));
  72. if (keySelector == null)
  73. throw new ArgumentNullException(nameof(keySelector));
  74. if (elementSelector == null)
  75. throw new ArgumentNullException(nameof(elementSelector));
  76. return Create(
  77. source,
  78. (keySelector, elementSelector),
  79. default(IGroupedAsyncObservable<TKey, TElement>),
  80. (source, state, observer) => GroupByCore(source, observer, (o, d) => AsyncObserver.GroupBy(o, d, state.keySelector, state.elementSelector)));
  81. }
  82. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(this IAsyncObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, IEqualityComparer<TKey> comparer)
  83. {
  84. if (source == null)
  85. throw new ArgumentNullException(nameof(source));
  86. if (keySelector == null)
  87. throw new ArgumentNullException(nameof(keySelector));
  88. if (elementSelector == null)
  89. throw new ArgumentNullException(nameof(elementSelector));
  90. if (comparer == null)
  91. throw new ArgumentNullException(nameof(comparer));
  92. return Create(
  93. source,
  94. (keySelector, elementSelector, comparer),
  95. default(IGroupedAsyncObservable<TKey, TElement>),
  96. (source, state, observer) => GroupByCore(source, observer, (o, d) => AsyncObserver.GroupBy(o, d, state.keySelector, state.elementSelector, state.comparer)));
  97. }
  98. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(this IAsyncObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, int capacity)
  99. {
  100. if (source == null)
  101. throw new ArgumentNullException(nameof(source));
  102. if (keySelector == null)
  103. throw new ArgumentNullException(nameof(keySelector));
  104. if (elementSelector == null)
  105. throw new ArgumentNullException(nameof(elementSelector));
  106. if (capacity < 0)
  107. throw new ArgumentOutOfRangeException(nameof(capacity));
  108. return Create(
  109. source,
  110. (keySelector, elementSelector, capacity),
  111. default(IGroupedAsyncObservable<TKey, TElement>),
  112. (source, state, observer) => GroupByCore(source, observer, (o, d) => AsyncObserver.GroupBy(o, d, state.keySelector, state.elementSelector, state.capacity)));
  113. }
  114. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(this IAsyncObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, int capacity, IEqualityComparer<TKey> comparer)
  115. {
  116. if (source == null)
  117. throw new ArgumentNullException(nameof(source));
  118. if (keySelector == null)
  119. throw new ArgumentNullException(nameof(keySelector));
  120. if (elementSelector == null)
  121. throw new ArgumentNullException(nameof(elementSelector));
  122. if (capacity < 0)
  123. throw new ArgumentOutOfRangeException(nameof(capacity));
  124. if (comparer == null)
  125. throw new ArgumentNullException(nameof(comparer));
  126. return Create(
  127. source,
  128. (keySelector, elementSelector, capacity, comparer),
  129. default(IGroupedAsyncObservable<TKey, TElement>),
  130. (source, state, observer) => GroupByCore(source, observer, (o, d) => AsyncObserver.GroupBy(o, d, state.keySelector, state.elementSelector, state.capacity, state.comparer)));
  131. }
  132. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TSource>> GroupBy<TSource, TKey>(this IAsyncObservable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector)
  133. {
  134. if (source == null)
  135. throw new ArgumentNullException(nameof(source));
  136. if (keySelector == null)
  137. throw new ArgumentNullException(nameof(keySelector));
  138. return Create(
  139. source,
  140. keySelector,
  141. default(IGroupedAsyncObservable<TKey, TSource>),
  142. (source, keySelector, observer) => GroupByCore(source, observer, (o, d) => AsyncObserver.GroupBy(o, d, keySelector)));
  143. }
  144. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TSource>> GroupBy<TSource, TKey>(this IAsyncObservable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, IEqualityComparer<TKey> comparer)
  145. {
  146. if (source == null)
  147. throw new ArgumentNullException(nameof(source));
  148. if (keySelector == null)
  149. throw new ArgumentNullException(nameof(keySelector));
  150. if (comparer == null)
  151. throw new ArgumentNullException(nameof(comparer));
  152. return Create(
  153. source,
  154. (keySelector, comparer),
  155. default(IGroupedAsyncObservable<TKey, TSource>),
  156. (source, state, observer) => GroupByCore(source, observer, (o, d) => AsyncObserver.GroupBy(o, d, state.keySelector, state.comparer)));
  157. }
  158. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TSource>> GroupBy<TSource, TKey>(this IAsyncObservable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, int capacity)
  159. {
  160. if (source == null)
  161. throw new ArgumentNullException(nameof(source));
  162. if (keySelector == null)
  163. throw new ArgumentNullException(nameof(keySelector));
  164. if (capacity < 0)
  165. throw new ArgumentOutOfRangeException(nameof(capacity));
  166. return Create(
  167. source,
  168. (keySelector, capacity),
  169. default(IGroupedAsyncObservable<TKey, TSource>),
  170. (source, state, observer) => GroupByCore(source, observer, (o, d) => AsyncObserver.GroupBy(o, d, state.keySelector, state.capacity)));
  171. }
  172. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TSource>> GroupBy<TSource, TKey>(this IAsyncObservable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, int capacity, IEqualityComparer<TKey> comparer)
  173. {
  174. if (source == null)
  175. throw new ArgumentNullException(nameof(source));
  176. if (keySelector == null)
  177. throw new ArgumentNullException(nameof(keySelector));
  178. if (capacity < 0)
  179. throw new ArgumentOutOfRangeException(nameof(capacity));
  180. if (comparer == null)
  181. throw new ArgumentNullException(nameof(comparer));
  182. return Create(
  183. source,
  184. (keySelector, capacity, comparer),
  185. default(IGroupedAsyncObservable<TKey, TSource>),
  186. (source, state, observer) => GroupByCore(source, observer, (o, d) => AsyncObserver.GroupBy(o, d, state.keySelector, state.capacity, state.comparer)));
  187. }
  188. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(this IAsyncObservable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, Func<TSource, ValueTask<TElement>> elementSelector)
  189. {
  190. if (source == null)
  191. throw new ArgumentNullException(nameof(source));
  192. if (keySelector == null)
  193. throw new ArgumentNullException(nameof(keySelector));
  194. if (elementSelector == null)
  195. throw new ArgumentNullException(nameof(elementSelector));
  196. return Create(
  197. source,
  198. (keySelector, elementSelector),
  199. default(IGroupedAsyncObservable<TKey, TElement>),
  200. (source, state, observer) => GroupByCore(source, observer, (o, d) => AsyncObserver.GroupBy(o, d, state.keySelector, state.elementSelector)));
  201. }
  202. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(this IAsyncObservable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, Func<TSource, ValueTask<TElement>> elementSelector, IEqualityComparer<TKey> comparer)
  203. {
  204. if (source == null)
  205. throw new ArgumentNullException(nameof(source));
  206. if (keySelector == null)
  207. throw new ArgumentNullException(nameof(keySelector));
  208. if (elementSelector == null)
  209. throw new ArgumentNullException(nameof(elementSelector));
  210. if (comparer == null)
  211. throw new ArgumentNullException(nameof(comparer));
  212. return Create(
  213. source,
  214. (keySelector, elementSelector, comparer),
  215. default(IGroupedAsyncObservable<TKey, TElement>),
  216. (source, state, observer) => GroupByCore(source, observer, (o, d) => AsyncObserver.GroupBy(o, d, state.keySelector, state.elementSelector, state.comparer)));
  217. }
  218. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(this IAsyncObservable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, Func<TSource, ValueTask<TElement>> elementSelector, int capacity)
  219. {
  220. if (source == null)
  221. throw new ArgumentNullException(nameof(source));
  222. if (keySelector == null)
  223. throw new ArgumentNullException(nameof(keySelector));
  224. if (elementSelector == null)
  225. throw new ArgumentNullException(nameof(elementSelector));
  226. if (capacity < 0)
  227. throw new ArgumentOutOfRangeException(nameof(capacity));
  228. return Create(
  229. source,
  230. (keySelector, elementSelector, capacity),
  231. default(IGroupedAsyncObservable<TKey, TElement>),
  232. (source, state, observer) => GroupByCore(source, observer, (o, d) => AsyncObserver.GroupBy(o, d, state.keySelector, state.elementSelector, state.capacity)));
  233. }
  234. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(this IAsyncObservable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, Func<TSource, ValueTask<TElement>> elementSelector, int capacity, IEqualityComparer<TKey> comparer)
  235. {
  236. if (source == null)
  237. throw new ArgumentNullException(nameof(source));
  238. if (keySelector == null)
  239. throw new ArgumentNullException(nameof(keySelector));
  240. if (elementSelector == null)
  241. throw new ArgumentNullException(nameof(elementSelector));
  242. if (capacity < 0)
  243. throw new ArgumentOutOfRangeException(nameof(capacity));
  244. if (comparer == null)
  245. throw new ArgumentNullException(nameof(comparer));
  246. return Create(
  247. source,
  248. (keySelector, elementSelector, capacity, comparer),
  249. default(IGroupedAsyncObservable<TKey, TElement>),
  250. (source, state, observer) => GroupByCore(source, observer, (o, d) => AsyncObserver.GroupBy(o, d, state.keySelector, state.elementSelector, state.capacity, state.comparer)));
  251. }
  252. private static async ValueTask<IAsyncDisposable> GroupByCore<TSource, TKey, TElement>(IAsyncObservable<TSource> source, IAsyncObserver<IGroupedAsyncObservable<TKey, TElement>> observer, Func<IAsyncObserver<IGroupedAsyncObservable<TKey, TElement>>, IAsyncDisposable, (IAsyncObserver<TSource>, IAsyncDisposable)> createObserver)
  253. {
  254. var d = new SingleAssignmentAsyncDisposable();
  255. var (sink, subscription) = createObserver(observer, d);
  256. var inner = await source.SubscribeSafeAsync(sink).ConfigureAwait(false);
  257. await d.AssignAsync(inner).ConfigureAwait(false);
  258. return subscription;
  259. }
  260. }
  261. public partial class AsyncObserver
  262. {
  263. public static (IAsyncObserver<TSource>, IAsyncDisposable) GroupBy<TSource, TKey>(IAsyncObserver<IGroupedAsyncObservable<TKey, TSource>> observer, IAsyncDisposable subscription, Func<TSource, TKey> keySelector)
  264. {
  265. if (observer == null)
  266. throw new ArgumentNullException(nameof(observer));
  267. if (subscription == null)
  268. throw new ArgumentNullException(nameof(subscription));
  269. if (keySelector == null)
  270. throw new ArgumentNullException(nameof(keySelector));
  271. return GroupBy(observer, subscription, keySelector, int.MaxValue, EqualityComparer<TKey>.Default);
  272. }
  273. public static (IAsyncObserver<TSource>, IAsyncDisposable) GroupBy<TSource, TKey>(IAsyncObserver<IGroupedAsyncObservable<TKey, TSource>> observer, IAsyncDisposable subscription, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer)
  274. {
  275. if (observer == null)
  276. throw new ArgumentNullException(nameof(observer));
  277. if (subscription == null)
  278. throw new ArgumentNullException(nameof(subscription));
  279. if (keySelector == null)
  280. throw new ArgumentNullException(nameof(keySelector));
  281. if (comparer == null)
  282. throw new ArgumentNullException(nameof(comparer));
  283. return GroupBy(observer, subscription, keySelector, int.MaxValue, comparer);
  284. }
  285. public static (IAsyncObserver<TSource>, IAsyncDisposable) GroupBy<TSource, TKey>(IAsyncObserver<IGroupedAsyncObservable<TKey, TSource>> observer, IAsyncDisposable subscription, Func<TSource, TKey> keySelector, int capacity)
  286. {
  287. if (observer == null)
  288. throw new ArgumentNullException(nameof(observer));
  289. if (subscription == null)
  290. throw new ArgumentNullException(nameof(subscription));
  291. if (keySelector == null)
  292. throw new ArgumentNullException(nameof(keySelector));
  293. if (capacity < 0)
  294. throw new ArgumentOutOfRangeException(nameof(capacity));
  295. return GroupBy(observer, subscription, keySelector, capacity, EqualityComparer<TKey>.Default);
  296. }
  297. public static (IAsyncObserver<TSource>, IAsyncDisposable) GroupBy<TSource, TKey>(IAsyncObserver<IGroupedAsyncObservable<TKey, TSource>> observer, IAsyncDisposable subscription, Func<TSource, TKey> keySelector, int capacity, IEqualityComparer<TKey> comparer)
  298. {
  299. if (observer == null)
  300. throw new ArgumentNullException(nameof(observer));
  301. if (subscription == null)
  302. throw new ArgumentNullException(nameof(subscription));
  303. if (keySelector == null)
  304. throw new ArgumentNullException(nameof(keySelector));
  305. if (capacity < 0)
  306. throw new ArgumentOutOfRangeException(nameof(capacity));
  307. if (comparer == null)
  308. throw new ArgumentNullException(nameof(comparer));
  309. return GroupBy(observer, subscription, x => new ValueTask<TKey>(keySelector(x)), capacity, comparer);
  310. }
  311. public static (IAsyncObserver<TSource>, IAsyncDisposable) GroupBy<TSource, TKey, TElement>(IAsyncObserver<IGroupedAsyncObservable<TKey, TElement>> observer, IAsyncDisposable subscription, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector)
  312. {
  313. if (observer == null)
  314. throw new ArgumentNullException(nameof(observer));
  315. if (subscription == null)
  316. throw new ArgumentNullException(nameof(subscription));
  317. if (keySelector == null)
  318. throw new ArgumentNullException(nameof(keySelector));
  319. if (elementSelector == null)
  320. throw new ArgumentNullException(nameof(elementSelector));
  321. return GroupBy(observer, subscription, keySelector, elementSelector, int.MaxValue, EqualityComparer<TKey>.Default);
  322. }
  323. public static (IAsyncObserver<TSource>, IAsyncDisposable) GroupBy<TSource, TKey, TElement>(IAsyncObserver<IGroupedAsyncObservable<TKey, TElement>> observer, IAsyncDisposable subscription, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, IEqualityComparer<TKey> comparer)
  324. {
  325. if (observer == null)
  326. throw new ArgumentNullException(nameof(observer));
  327. if (subscription == null)
  328. throw new ArgumentNullException(nameof(subscription));
  329. if (keySelector == null)
  330. throw new ArgumentNullException(nameof(keySelector));
  331. if (elementSelector == null)
  332. throw new ArgumentNullException(nameof(elementSelector));
  333. if (comparer == null)
  334. throw new ArgumentNullException(nameof(comparer));
  335. return GroupBy(observer, subscription, keySelector, elementSelector, int.MaxValue, comparer);
  336. }
  337. public static (IAsyncObserver<TSource>, IAsyncDisposable) GroupBy<TSource, TKey, TElement>(IAsyncObserver<IGroupedAsyncObservable<TKey, TElement>> observer, IAsyncDisposable subscription, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, int capacity)
  338. {
  339. if (observer == null)
  340. throw new ArgumentNullException(nameof(observer));
  341. if (subscription == null)
  342. throw new ArgumentNullException(nameof(subscription));
  343. if (keySelector == null)
  344. throw new ArgumentNullException(nameof(keySelector));
  345. if (elementSelector == null)
  346. throw new ArgumentNullException(nameof(elementSelector));
  347. if (capacity < 0)
  348. throw new ArgumentOutOfRangeException(nameof(capacity));
  349. return GroupBy(observer, subscription, keySelector, elementSelector, capacity, EqualityComparer<TKey>.Default);
  350. }
  351. public static (IAsyncObserver<TSource>, IAsyncDisposable) GroupBy<TSource, TKey, TElement>(IAsyncObserver<IGroupedAsyncObservable<TKey, TElement>> observer, IAsyncDisposable subscription, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, int capacity, IEqualityComparer<TKey> comparer)
  352. {
  353. if (observer == null)
  354. throw new ArgumentNullException(nameof(observer));
  355. if (subscription == null)
  356. throw new ArgumentNullException(nameof(subscription));
  357. if (keySelector == null)
  358. throw new ArgumentNullException(nameof(keySelector));
  359. if (elementSelector == null)
  360. throw new ArgumentNullException(nameof(elementSelector));
  361. if (capacity < 0)
  362. throw new ArgumentOutOfRangeException(nameof(capacity));
  363. if (comparer == null)
  364. throw new ArgumentNullException(nameof(comparer));
  365. return GroupBy<TSource, TKey, TElement>(observer, subscription, x => new ValueTask<TKey>(keySelector(x)), x => new ValueTask<TElement>(elementSelector(x)), capacity, comparer);
  366. }
  367. public static (IAsyncObserver<TSource>, IAsyncDisposable) GroupBy<TSource, TKey>(IAsyncObserver<IGroupedAsyncObservable<TKey, TSource>> observer, IAsyncDisposable subscription, Func<TSource, ValueTask<TKey>> keySelector)
  368. {
  369. if (observer == null)
  370. throw new ArgumentNullException(nameof(observer));
  371. if (subscription == null)
  372. throw new ArgumentNullException(nameof(subscription));
  373. if (keySelector == null)
  374. throw new ArgumentNullException(nameof(keySelector));
  375. return GroupBy(observer, subscription, keySelector, int.MaxValue, EqualityComparer<TKey>.Default);
  376. }
  377. public static (IAsyncObserver<TSource>, IAsyncDisposable) GroupBy<TSource, TKey>(IAsyncObserver<IGroupedAsyncObservable<TKey, TSource>> observer, IAsyncDisposable subscription, Func<TSource, ValueTask<TKey>> keySelector, IEqualityComparer<TKey> comparer)
  378. {
  379. if (observer == null)
  380. throw new ArgumentNullException(nameof(observer));
  381. if (subscription == null)
  382. throw new ArgumentNullException(nameof(subscription));
  383. if (keySelector == null)
  384. throw new ArgumentNullException(nameof(keySelector));
  385. if (comparer == null)
  386. throw new ArgumentNullException(nameof(comparer));
  387. return GroupBy(observer, subscription, keySelector, int.MaxValue, comparer);
  388. }
  389. public static (IAsyncObserver<TSource>, IAsyncDisposable) GroupBy<TSource, TKey>(IAsyncObserver<IGroupedAsyncObservable<TKey, TSource>> observer, IAsyncDisposable subscription, Func<TSource, ValueTask<TKey>> keySelector, int capacity)
  390. {
  391. if (observer == null)
  392. throw new ArgumentNullException(nameof(observer));
  393. if (subscription == null)
  394. throw new ArgumentNullException(nameof(subscription));
  395. if (keySelector == null)
  396. throw new ArgumentNullException(nameof(keySelector));
  397. if (capacity < 0)
  398. throw new ArgumentOutOfRangeException(nameof(capacity));
  399. return GroupBy(observer, subscription, keySelector, capacity, EqualityComparer<TKey>.Default);
  400. }
  401. public static (IAsyncObserver<TSource>, IAsyncDisposable) GroupBy<TSource, TKey>(IAsyncObserver<IGroupedAsyncObservable<TKey, TSource>> observer, IAsyncDisposable subscription, Func<TSource, ValueTask<TKey>> keySelector, int capacity, IEqualityComparer<TKey> comparer)
  402. {
  403. if (observer == null)
  404. throw new ArgumentNullException(nameof(observer));
  405. if (subscription == null)
  406. throw new ArgumentNullException(nameof(subscription));
  407. if (keySelector == null)
  408. throw new ArgumentNullException(nameof(keySelector));
  409. if (capacity < 0)
  410. throw new ArgumentOutOfRangeException(nameof(capacity));
  411. if (comparer == null)
  412. throw new ArgumentNullException(nameof(comparer));
  413. return GroupBy(observer, subscription, keySelector, x => new ValueTask<TSource>(x), capacity, comparer);
  414. }
  415. public static (IAsyncObserver<TSource>, IAsyncDisposable) GroupBy<TSource, TKey, TElement>(IAsyncObserver<IGroupedAsyncObservable<TKey, TElement>> observer, IAsyncDisposable subscription, Func<TSource, ValueTask<TKey>> keySelector, Func<TSource, ValueTask<TElement>> elementSelector)
  416. {
  417. if (observer == null)
  418. throw new ArgumentNullException(nameof(observer));
  419. if (subscription == null)
  420. throw new ArgumentNullException(nameof(subscription));
  421. if (keySelector == null)
  422. throw new ArgumentNullException(nameof(keySelector));
  423. if (elementSelector == null)
  424. throw new ArgumentNullException(nameof(elementSelector));
  425. return GroupBy(observer, subscription, keySelector, elementSelector, int.MaxValue, EqualityComparer<TKey>.Default);
  426. }
  427. public static (IAsyncObserver<TSource>, IAsyncDisposable) GroupBy<TSource, TKey, TElement>(IAsyncObserver<IGroupedAsyncObservable<TKey, TElement>> observer, IAsyncDisposable subscription, Func<TSource, ValueTask<TKey>> keySelector, Func<TSource, ValueTask<TElement>> elementSelector, IEqualityComparer<TKey> comparer)
  428. {
  429. if (observer == null)
  430. throw new ArgumentNullException(nameof(observer));
  431. if (subscription == null)
  432. throw new ArgumentNullException(nameof(subscription));
  433. if (keySelector == null)
  434. throw new ArgumentNullException(nameof(keySelector));
  435. if (elementSelector == null)
  436. throw new ArgumentNullException(nameof(elementSelector));
  437. if (comparer == null)
  438. throw new ArgumentNullException(nameof(comparer));
  439. return GroupBy(observer, subscription, keySelector, elementSelector, int.MaxValue, comparer);
  440. }
  441. public static (IAsyncObserver<TSource>, IAsyncDisposable) GroupBy<TSource, TKey, TElement>(IAsyncObserver<IGroupedAsyncObservable<TKey, TElement>> observer, IAsyncDisposable subscription, Func<TSource, ValueTask<TKey>> keySelector, Func<TSource, ValueTask<TElement>> elementSelector, int capacity)
  442. {
  443. if (observer == null)
  444. throw new ArgumentNullException(nameof(observer));
  445. if (subscription == null)
  446. throw new ArgumentNullException(nameof(subscription));
  447. if (keySelector == null)
  448. throw new ArgumentNullException(nameof(keySelector));
  449. if (elementSelector == null)
  450. throw new ArgumentNullException(nameof(elementSelector));
  451. if (capacity < 0)
  452. throw new ArgumentOutOfRangeException(nameof(capacity));
  453. return GroupBy(observer, subscription, keySelector, elementSelector, capacity, EqualityComparer<TKey>.Default);
  454. }
  455. public static (IAsyncObserver<TSource>, IAsyncDisposable) GroupBy<TSource, TKey, TElement>(IAsyncObserver<IGroupedAsyncObservable<TKey, TElement>> observer, IAsyncDisposable subscription, Func<TSource, ValueTask<TKey>> keySelector, Func<TSource, ValueTask<TElement>> elementSelector, int capacity, IEqualityComparer<TKey> comparer)
  456. {
  457. if (observer == null)
  458. throw new ArgumentNullException(nameof(observer));
  459. if (subscription == null)
  460. throw new ArgumentNullException(nameof(subscription));
  461. if (keySelector == null)
  462. throw new ArgumentNullException(nameof(keySelector));
  463. if (elementSelector == null)
  464. throw new ArgumentNullException(nameof(elementSelector));
  465. if (capacity < 0)
  466. throw new ArgumentOutOfRangeException(nameof(capacity));
  467. if (comparer == null)
  468. throw new ArgumentNullException(nameof(comparer));
  469. var refCount = new RefCountAsyncDisposable(subscription);
  470. var groups = default(Dictionary<TKey, SequentialSimpleAsyncSubject<TElement>>);
  471. if (capacity == int.MaxValue)
  472. {
  473. groups = new Dictionary<TKey, SequentialSimpleAsyncSubject<TElement>>(comparer);
  474. }
  475. else
  476. {
  477. groups = new Dictionary<TKey, SequentialSimpleAsyncSubject<TElement>>(capacity, comparer);
  478. }
  479. var nullGroup = default(SequentialSimpleAsyncSubject<TElement>);
  480. async ValueTask OnErrorAsync(Exception ex)
  481. {
  482. if (nullGroup != null)
  483. {
  484. await nullGroup.OnErrorAsync(ex).ConfigureAwait(false);
  485. }
  486. foreach (var group in groups.Values)
  487. {
  488. await group.OnErrorAsync(ex).ConfigureAwait(false);
  489. }
  490. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  491. }
  492. return
  493. (
  494. Create<TSource>
  495. (
  496. async x =>
  497. {
  498. var key = default(TKey);
  499. try
  500. {
  501. key = await keySelector(x).ConfigureAwait(false);
  502. }
  503. catch (Exception ex)
  504. {
  505. await OnErrorAsync(ex).ConfigureAwait(false);
  506. return;
  507. }
  508. var shouldEmit = false;
  509. var group = default(SequentialSimpleAsyncSubject<TElement>);
  510. if (key == null)
  511. {
  512. if (nullGroup == null)
  513. {
  514. nullGroup = new SequentialSimpleAsyncSubject<TElement>();
  515. shouldEmit = true;
  516. }
  517. group = nullGroup;
  518. }
  519. else
  520. {
  521. try
  522. {
  523. if (!groups.TryGetValue(key, out group))
  524. {
  525. group = new SequentialSimpleAsyncSubject<TElement>();
  526. groups.Add(key, group);
  527. shouldEmit = true;
  528. }
  529. }
  530. catch (Exception ex)
  531. {
  532. await OnErrorAsync(ex).ConfigureAwait(false);
  533. return;
  534. }
  535. }
  536. if (shouldEmit)
  537. {
  538. var g = new GroupedAsyncObservable<TKey, TElement>(key, group, refCount);
  539. await observer.OnNextAsync(g).ConfigureAwait(false);
  540. }
  541. var element = default(TElement);
  542. try
  543. {
  544. element = await elementSelector(x).ConfigureAwait(false);
  545. }
  546. catch (Exception ex)
  547. {
  548. await OnErrorAsync(ex).ConfigureAwait(false);
  549. return;
  550. }
  551. await group.OnNextAsync(element).ConfigureAwait(false);
  552. },
  553. OnErrorAsync,
  554. async () =>
  555. {
  556. if (nullGroup != null)
  557. {
  558. await nullGroup.OnCompletedAsync().ConfigureAwait(false);
  559. }
  560. foreach (var group in groups.Values)
  561. {
  562. await group.OnCompletedAsync().ConfigureAwait(false);
  563. }
  564. await observer.OnCompletedAsync().ConfigureAwait(false);
  565. }
  566. ),
  567. refCount
  568. );
  569. }
  570. }
  571. }