GroupBy.cs 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Reactive.Disposables;
  6. using System.Reactive.Subjects;
  7. using System.Threading.Tasks;
  8. namespace System.Reactive.Linq
  9. {
  10. public partial class AsyncObservable
  11. {
  12. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TSource>> GroupBy<TSource, TKey>(this IAsyncObservable<TSource> source, Func<TSource, TKey> keySelector)
  13. {
  14. if (source == null)
  15. throw new ArgumentNullException(nameof(source));
  16. if (keySelector == null)
  17. throw new ArgumentNullException(nameof(keySelector));
  18. return CreateAsyncObservable<IGroupedAsyncObservable<TKey, TSource>>.From(
  19. source,
  20. keySelector,
  21. static (source, keySelector, observer) => GroupByCore(source, observer, (o, d) => AsyncObserver.GroupBy(o, d, keySelector)));
  22. }
  23. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TSource>> GroupBy<TSource, TKey>(this IAsyncObservable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer)
  24. {
  25. if (source == null)
  26. throw new ArgumentNullException(nameof(source));
  27. if (keySelector == null)
  28. throw new ArgumentNullException(nameof(keySelector));
  29. if (comparer == null)
  30. throw new ArgumentNullException(nameof(comparer));
  31. return CreateAsyncObservable<IGroupedAsyncObservable<TKey, TSource>>.From(
  32. source,
  33. (keySelector, comparer),
  34. static (source, state, observer) => GroupByCore(source, observer, (o, d) => AsyncObserver.GroupBy(o, d, state.keySelector, state.comparer)));
  35. }
  36. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TSource>> GroupBy<TSource, TKey>(this IAsyncObservable<TSource> source, Func<TSource, TKey> keySelector, int capacity)
  37. {
  38. if (source == null)
  39. throw new ArgumentNullException(nameof(source));
  40. if (keySelector == null)
  41. throw new ArgumentNullException(nameof(keySelector));
  42. if (capacity < 0)
  43. throw new ArgumentOutOfRangeException(nameof(capacity));
  44. return CreateAsyncObservable<IGroupedAsyncObservable<TKey, TSource>>.From(
  45. source,
  46. (keySelector, capacity),
  47. static (source, state, observer) => GroupByCore(source, observer, (o, d) => AsyncObserver.GroupBy(o, d, state.keySelector, state.capacity)));
  48. }
  49. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TSource>> GroupBy<TSource, TKey>(this IAsyncObservable<TSource> source, Func<TSource, TKey> keySelector, int capacity, IEqualityComparer<TKey> comparer)
  50. {
  51. if (source == null)
  52. throw new ArgumentNullException(nameof(source));
  53. if (keySelector == null)
  54. throw new ArgumentNullException(nameof(keySelector));
  55. if (capacity < 0)
  56. throw new ArgumentOutOfRangeException(nameof(capacity));
  57. if (comparer == null)
  58. throw new ArgumentNullException(nameof(comparer));
  59. return CreateAsyncObservable<IGroupedAsyncObservable<TKey, TSource>>.From(
  60. source,
  61. (keySelector, capacity, comparer),
  62. static (source, state, observer) => GroupByCore(source, observer, (o, d) => AsyncObserver.GroupBy(o, d, state.keySelector, state.capacity, state.comparer)));
  63. }
  64. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(this IAsyncObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector)
  65. {
  66. if (source == null)
  67. throw new ArgumentNullException(nameof(source));
  68. if (keySelector == null)
  69. throw new ArgumentNullException(nameof(keySelector));
  70. if (elementSelector == null)
  71. throw new ArgumentNullException(nameof(elementSelector));
  72. return CreateAsyncObservable<IGroupedAsyncObservable<TKey, TElement>>.From(
  73. source,
  74. (keySelector, elementSelector),
  75. static (source, state, observer) => GroupByCore(source, observer, (o, d) => AsyncObserver.GroupBy(o, d, state.keySelector, state.elementSelector)));
  76. }
  77. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(this IAsyncObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, IEqualityComparer<TKey> comparer)
  78. {
  79. if (source == null)
  80. throw new ArgumentNullException(nameof(source));
  81. if (keySelector == null)
  82. throw new ArgumentNullException(nameof(keySelector));
  83. if (elementSelector == null)
  84. throw new ArgumentNullException(nameof(elementSelector));
  85. if (comparer == null)
  86. throw new ArgumentNullException(nameof(comparer));
  87. return CreateAsyncObservable<IGroupedAsyncObservable<TKey, TElement>>.From(
  88. source,
  89. (keySelector, elementSelector, comparer),
  90. static (source, state, observer) => GroupByCore(source, observer, (o, d) => AsyncObserver.GroupBy(o, d, state.keySelector, state.elementSelector, state.comparer)));
  91. }
  92. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(this IAsyncObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, int capacity)
  93. {
  94. if (source == null)
  95. throw new ArgumentNullException(nameof(source));
  96. if (keySelector == null)
  97. throw new ArgumentNullException(nameof(keySelector));
  98. if (elementSelector == null)
  99. throw new ArgumentNullException(nameof(elementSelector));
  100. if (capacity < 0)
  101. throw new ArgumentOutOfRangeException(nameof(capacity));
  102. return CreateAsyncObservable<IGroupedAsyncObservable<TKey, TElement>>.From(
  103. source,
  104. (keySelector, elementSelector, capacity),
  105. static (source, state, observer) => GroupByCore(source, observer, (o, d) => AsyncObserver.GroupBy(o, d, state.keySelector, state.elementSelector, state.capacity)));
  106. }
  107. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(this IAsyncObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, int capacity, IEqualityComparer<TKey> comparer)
  108. {
  109. if (source == null)
  110. throw new ArgumentNullException(nameof(source));
  111. if (keySelector == null)
  112. throw new ArgumentNullException(nameof(keySelector));
  113. if (elementSelector == null)
  114. throw new ArgumentNullException(nameof(elementSelector));
  115. if (capacity < 0)
  116. throw new ArgumentOutOfRangeException(nameof(capacity));
  117. if (comparer == null)
  118. throw new ArgumentNullException(nameof(comparer));
  119. return CreateAsyncObservable<IGroupedAsyncObservable<TKey, TElement>>.From(
  120. source,
  121. (keySelector, elementSelector, capacity, comparer),
  122. static (source, state, observer) => GroupByCore(source, observer, (o, d) => AsyncObserver.GroupBy(o, d, state.keySelector, state.elementSelector, state.capacity, state.comparer)));
  123. }
  124. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TSource>> GroupBy<TSource, TKey>(this IAsyncObservable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector)
  125. {
  126. if (source == null)
  127. throw new ArgumentNullException(nameof(source));
  128. if (keySelector == null)
  129. throw new ArgumentNullException(nameof(keySelector));
  130. return CreateAsyncObservable<IGroupedAsyncObservable<TKey, TSource>>.From(
  131. source,
  132. keySelector,
  133. static (source, keySelector, observer) => GroupByCore(source, observer, (o, d) => AsyncObserver.GroupBy(o, d, keySelector)));
  134. }
  135. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TSource>> GroupBy<TSource, TKey>(this IAsyncObservable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, IEqualityComparer<TKey> comparer)
  136. {
  137. if (source == null)
  138. throw new ArgumentNullException(nameof(source));
  139. if (keySelector == null)
  140. throw new ArgumentNullException(nameof(keySelector));
  141. if (comparer == null)
  142. throw new ArgumentNullException(nameof(comparer));
  143. return CreateAsyncObservable<IGroupedAsyncObservable<TKey, TSource>>.From(
  144. source,
  145. (keySelector, comparer),
  146. static (source, state, observer) => GroupByCore(source, observer, (o, d) => AsyncObserver.GroupBy(o, d, state.keySelector, state.comparer)));
  147. }
  148. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TSource>> GroupBy<TSource, TKey>(this IAsyncObservable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, int capacity)
  149. {
  150. if (source == null)
  151. throw new ArgumentNullException(nameof(source));
  152. if (keySelector == null)
  153. throw new ArgumentNullException(nameof(keySelector));
  154. if (capacity < 0)
  155. throw new ArgumentOutOfRangeException(nameof(capacity));
  156. return CreateAsyncObservable<IGroupedAsyncObservable<TKey, TSource>>.From(
  157. source,
  158. (keySelector, capacity),
  159. static (source, state, observer) => GroupByCore(source, observer, (o, d) => AsyncObserver.GroupBy(o, d, state.keySelector, state.capacity)));
  160. }
  161. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TSource>> GroupBy<TSource, TKey>(this IAsyncObservable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, int capacity, IEqualityComparer<TKey> comparer)
  162. {
  163. if (source == null)
  164. throw new ArgumentNullException(nameof(source));
  165. if (keySelector == null)
  166. throw new ArgumentNullException(nameof(keySelector));
  167. if (capacity < 0)
  168. throw new ArgumentOutOfRangeException(nameof(capacity));
  169. if (comparer == null)
  170. throw new ArgumentNullException(nameof(comparer));
  171. return CreateAsyncObservable<IGroupedAsyncObservable<TKey, TSource>>.From(
  172. source,
  173. (keySelector, capacity, comparer),
  174. static (source, state, observer) => GroupByCore(source, observer, (o, d) => AsyncObserver.GroupBy(o, d, state.keySelector, state.capacity, state.comparer)));
  175. }
  176. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(this IAsyncObservable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, Func<TSource, ValueTask<TElement>> elementSelector)
  177. {
  178. if (source == null)
  179. throw new ArgumentNullException(nameof(source));
  180. if (keySelector == null)
  181. throw new ArgumentNullException(nameof(keySelector));
  182. if (elementSelector == null)
  183. throw new ArgumentNullException(nameof(elementSelector));
  184. return CreateAsyncObservable<IGroupedAsyncObservable<TKey, TElement>>.From(
  185. source,
  186. (keySelector, elementSelector),
  187. static (source, state, observer) => GroupByCore(source, observer, (o, d) => AsyncObserver.GroupBy(o, d, state.keySelector, state.elementSelector)));
  188. }
  189. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(this IAsyncObservable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, Func<TSource, ValueTask<TElement>> elementSelector, IEqualityComparer<TKey> comparer)
  190. {
  191. if (source == null)
  192. throw new ArgumentNullException(nameof(source));
  193. if (keySelector == null)
  194. throw new ArgumentNullException(nameof(keySelector));
  195. if (elementSelector == null)
  196. throw new ArgumentNullException(nameof(elementSelector));
  197. if (comparer == null)
  198. throw new ArgumentNullException(nameof(comparer));
  199. return CreateAsyncObservable<IGroupedAsyncObservable<TKey, TElement>>.From(
  200. source,
  201. (keySelector, elementSelector, comparer),
  202. static (source, state, observer) => GroupByCore(source, observer, (o, d) => AsyncObserver.GroupBy(o, d, state.keySelector, state.elementSelector, state.comparer)));
  203. }
  204. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(this IAsyncObservable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, Func<TSource, ValueTask<TElement>> elementSelector, int capacity)
  205. {
  206. if (source == null)
  207. throw new ArgumentNullException(nameof(source));
  208. if (keySelector == null)
  209. throw new ArgumentNullException(nameof(keySelector));
  210. if (elementSelector == null)
  211. throw new ArgumentNullException(nameof(elementSelector));
  212. if (capacity < 0)
  213. throw new ArgumentOutOfRangeException(nameof(capacity));
  214. return CreateAsyncObservable<IGroupedAsyncObservable<TKey, TElement>>.From(
  215. source,
  216. (keySelector, elementSelector, capacity),
  217. static (source, state, observer) => GroupByCore(source, observer, (o, d) => AsyncObserver.GroupBy(o, d, state.keySelector, state.elementSelector, state.capacity)));
  218. }
  219. public static IAsyncObservable<IGroupedAsyncObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(this IAsyncObservable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, Func<TSource, ValueTask<TElement>> elementSelector, int capacity, IEqualityComparer<TKey> comparer)
  220. {
  221. if (source == null)
  222. throw new ArgumentNullException(nameof(source));
  223. if (keySelector == null)
  224. throw new ArgumentNullException(nameof(keySelector));
  225. if (elementSelector == null)
  226. throw new ArgumentNullException(nameof(elementSelector));
  227. if (capacity < 0)
  228. throw new ArgumentOutOfRangeException(nameof(capacity));
  229. if (comparer == null)
  230. throw new ArgumentNullException(nameof(comparer));
  231. return CreateAsyncObservable< IGroupedAsyncObservable<TKey, TElement>>.From(
  232. source,
  233. (keySelector, elementSelector, capacity, comparer),
  234. static (source, state, observer) => GroupByCore(source, observer, (o, d) => AsyncObserver.GroupBy(o, d, state.keySelector, state.elementSelector, state.capacity, state.comparer)));
  235. }
  236. private static async ValueTask<IAsyncDisposable> GroupByCore<TSource, TKey, TElement>(IAsyncObservable<TSource> source, IAsyncObserver<IGroupedAsyncObservable<TKey, TElement>> observer, Func<IAsyncObserver<IGroupedAsyncObservable<TKey, TElement>>, IAsyncDisposable, (IAsyncObserver<TSource>, IAsyncDisposable)> createObserver)
  237. {
  238. var d = new SingleAssignmentAsyncDisposable();
  239. var (sink, subscription) = createObserver(observer, d);
  240. var inner = await source.SubscribeSafeAsync(sink).ConfigureAwait(false);
  241. await d.AssignAsync(inner).ConfigureAwait(false);
  242. return subscription;
  243. }
  244. }
  245. public partial class AsyncObserver
  246. {
  247. public static (IAsyncObserver<TSource>, IAsyncDisposable) GroupBy<TSource, TKey>(IAsyncObserver<IGroupedAsyncObservable<TKey, TSource>> observer, IAsyncDisposable subscription, Func<TSource, TKey> keySelector)
  248. {
  249. if (observer == null)
  250. throw new ArgumentNullException(nameof(observer));
  251. if (subscription == null)
  252. throw new ArgumentNullException(nameof(subscription));
  253. if (keySelector == null)
  254. throw new ArgumentNullException(nameof(keySelector));
  255. return GroupBy(observer, subscription, keySelector, int.MaxValue, EqualityComparer<TKey>.Default);
  256. }
  257. public static (IAsyncObserver<TSource>, IAsyncDisposable) GroupBy<TSource, TKey>(IAsyncObserver<IGroupedAsyncObservable<TKey, TSource>> observer, IAsyncDisposable subscription, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer)
  258. {
  259. if (observer == null)
  260. throw new ArgumentNullException(nameof(observer));
  261. if (subscription == null)
  262. throw new ArgumentNullException(nameof(subscription));
  263. if (keySelector == null)
  264. throw new ArgumentNullException(nameof(keySelector));
  265. if (comparer == null)
  266. throw new ArgumentNullException(nameof(comparer));
  267. return GroupBy(observer, subscription, keySelector, int.MaxValue, comparer);
  268. }
  269. public static (IAsyncObserver<TSource>, IAsyncDisposable) GroupBy<TSource, TKey>(IAsyncObserver<IGroupedAsyncObservable<TKey, TSource>> observer, IAsyncDisposable subscription, Func<TSource, TKey> keySelector, int capacity)
  270. {
  271. if (observer == null)
  272. throw new ArgumentNullException(nameof(observer));
  273. if (subscription == null)
  274. throw new ArgumentNullException(nameof(subscription));
  275. if (keySelector == null)
  276. throw new ArgumentNullException(nameof(keySelector));
  277. if (capacity < 0)
  278. throw new ArgumentOutOfRangeException(nameof(capacity));
  279. return GroupBy(observer, subscription, keySelector, capacity, EqualityComparer<TKey>.Default);
  280. }
  281. public static (IAsyncObserver<TSource>, IAsyncDisposable) GroupBy<TSource, TKey>(IAsyncObserver<IGroupedAsyncObservable<TKey, TSource>> observer, IAsyncDisposable subscription, Func<TSource, TKey> keySelector, int capacity, IEqualityComparer<TKey> comparer)
  282. {
  283. if (observer == null)
  284. throw new ArgumentNullException(nameof(observer));
  285. if (subscription == null)
  286. throw new ArgumentNullException(nameof(subscription));
  287. if (keySelector == null)
  288. throw new ArgumentNullException(nameof(keySelector));
  289. if (capacity < 0)
  290. throw new ArgumentOutOfRangeException(nameof(capacity));
  291. if (comparer == null)
  292. throw new ArgumentNullException(nameof(comparer));
  293. return GroupBy(observer, subscription, x => new ValueTask<TKey>(keySelector(x)), capacity, comparer);
  294. }
  295. public static (IAsyncObserver<TSource>, IAsyncDisposable) GroupBy<TSource, TKey, TElement>(IAsyncObserver<IGroupedAsyncObservable<TKey, TElement>> observer, IAsyncDisposable subscription, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector)
  296. {
  297. if (observer == null)
  298. throw new ArgumentNullException(nameof(observer));
  299. if (subscription == null)
  300. throw new ArgumentNullException(nameof(subscription));
  301. if (keySelector == null)
  302. throw new ArgumentNullException(nameof(keySelector));
  303. if (elementSelector == null)
  304. throw new ArgumentNullException(nameof(elementSelector));
  305. return GroupBy(observer, subscription, keySelector, elementSelector, int.MaxValue, EqualityComparer<TKey>.Default);
  306. }
  307. public static (IAsyncObserver<TSource>, IAsyncDisposable) GroupBy<TSource, TKey, TElement>(IAsyncObserver<IGroupedAsyncObservable<TKey, TElement>> observer, IAsyncDisposable subscription, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, IEqualityComparer<TKey> comparer)
  308. {
  309. if (observer == null)
  310. throw new ArgumentNullException(nameof(observer));
  311. if (subscription == null)
  312. throw new ArgumentNullException(nameof(subscription));
  313. if (keySelector == null)
  314. throw new ArgumentNullException(nameof(keySelector));
  315. if (elementSelector == null)
  316. throw new ArgumentNullException(nameof(elementSelector));
  317. if (comparer == null)
  318. throw new ArgumentNullException(nameof(comparer));
  319. return GroupBy(observer, subscription, keySelector, elementSelector, int.MaxValue, comparer);
  320. }
  321. public static (IAsyncObserver<TSource>, IAsyncDisposable) GroupBy<TSource, TKey, TElement>(IAsyncObserver<IGroupedAsyncObservable<TKey, TElement>> observer, IAsyncDisposable subscription, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, int capacity)
  322. {
  323. if (observer == null)
  324. throw new ArgumentNullException(nameof(observer));
  325. if (subscription == null)
  326. throw new ArgumentNullException(nameof(subscription));
  327. if (keySelector == null)
  328. throw new ArgumentNullException(nameof(keySelector));
  329. if (elementSelector == null)
  330. throw new ArgumentNullException(nameof(elementSelector));
  331. if (capacity < 0)
  332. throw new ArgumentOutOfRangeException(nameof(capacity));
  333. return GroupBy(observer, subscription, keySelector, elementSelector, capacity, EqualityComparer<TKey>.Default);
  334. }
  335. public static (IAsyncObserver<TSource>, IAsyncDisposable) GroupBy<TSource, TKey, TElement>(IAsyncObserver<IGroupedAsyncObservable<TKey, TElement>> observer, IAsyncDisposable subscription, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, int capacity, IEqualityComparer<TKey> comparer)
  336. {
  337. if (observer == null)
  338. throw new ArgumentNullException(nameof(observer));
  339. if (subscription == null)
  340. throw new ArgumentNullException(nameof(subscription));
  341. if (keySelector == null)
  342. throw new ArgumentNullException(nameof(keySelector));
  343. if (elementSelector == null)
  344. throw new ArgumentNullException(nameof(elementSelector));
  345. if (capacity < 0)
  346. throw new ArgumentOutOfRangeException(nameof(capacity));
  347. if (comparer == null)
  348. throw new ArgumentNullException(nameof(comparer));
  349. return GroupBy<TSource, TKey, TElement>(observer, subscription, x => new ValueTask<TKey>(keySelector(x)), x => new ValueTask<TElement>(elementSelector(x)), capacity, comparer);
  350. }
  351. public static (IAsyncObserver<TSource>, IAsyncDisposable) GroupBy<TSource, TKey>(IAsyncObserver<IGroupedAsyncObservable<TKey, TSource>> observer, IAsyncDisposable subscription, Func<TSource, ValueTask<TKey>> keySelector)
  352. {
  353. if (observer == null)
  354. throw new ArgumentNullException(nameof(observer));
  355. if (subscription == null)
  356. throw new ArgumentNullException(nameof(subscription));
  357. if (keySelector == null)
  358. throw new ArgumentNullException(nameof(keySelector));
  359. return GroupBy(observer, subscription, keySelector, int.MaxValue, EqualityComparer<TKey>.Default);
  360. }
  361. public static (IAsyncObserver<TSource>, IAsyncDisposable) GroupBy<TSource, TKey>(IAsyncObserver<IGroupedAsyncObservable<TKey, TSource>> observer, IAsyncDisposable subscription, Func<TSource, ValueTask<TKey>> keySelector, IEqualityComparer<TKey> comparer)
  362. {
  363. if (observer == null)
  364. throw new ArgumentNullException(nameof(observer));
  365. if (subscription == null)
  366. throw new ArgumentNullException(nameof(subscription));
  367. if (keySelector == null)
  368. throw new ArgumentNullException(nameof(keySelector));
  369. if (comparer == null)
  370. throw new ArgumentNullException(nameof(comparer));
  371. return GroupBy(observer, subscription, keySelector, int.MaxValue, comparer);
  372. }
  373. public static (IAsyncObserver<TSource>, IAsyncDisposable) GroupBy<TSource, TKey>(IAsyncObserver<IGroupedAsyncObservable<TKey, TSource>> observer, IAsyncDisposable subscription, Func<TSource, ValueTask<TKey>> keySelector, int capacity)
  374. {
  375. if (observer == null)
  376. throw new ArgumentNullException(nameof(observer));
  377. if (subscription == null)
  378. throw new ArgumentNullException(nameof(subscription));
  379. if (keySelector == null)
  380. throw new ArgumentNullException(nameof(keySelector));
  381. if (capacity < 0)
  382. throw new ArgumentOutOfRangeException(nameof(capacity));
  383. return GroupBy(observer, subscription, keySelector, capacity, EqualityComparer<TKey>.Default);
  384. }
  385. public static (IAsyncObserver<TSource>, IAsyncDisposable) GroupBy<TSource, TKey>(IAsyncObserver<IGroupedAsyncObservable<TKey, TSource>> observer, IAsyncDisposable subscription, Func<TSource, ValueTask<TKey>> keySelector, int capacity, IEqualityComparer<TKey> comparer)
  386. {
  387. if (observer == null)
  388. throw new ArgumentNullException(nameof(observer));
  389. if (subscription == null)
  390. throw new ArgumentNullException(nameof(subscription));
  391. if (keySelector == null)
  392. throw new ArgumentNullException(nameof(keySelector));
  393. if (capacity < 0)
  394. throw new ArgumentOutOfRangeException(nameof(capacity));
  395. if (comparer == null)
  396. throw new ArgumentNullException(nameof(comparer));
  397. return GroupBy(observer, subscription, keySelector, x => new ValueTask<TSource>(x), capacity, comparer);
  398. }
  399. public static (IAsyncObserver<TSource>, IAsyncDisposable) GroupBy<TSource, TKey, TElement>(IAsyncObserver<IGroupedAsyncObservable<TKey, TElement>> observer, IAsyncDisposable subscription, Func<TSource, ValueTask<TKey>> keySelector, Func<TSource, ValueTask<TElement>> elementSelector)
  400. {
  401. if (observer == null)
  402. throw new ArgumentNullException(nameof(observer));
  403. if (subscription == null)
  404. throw new ArgumentNullException(nameof(subscription));
  405. if (keySelector == null)
  406. throw new ArgumentNullException(nameof(keySelector));
  407. if (elementSelector == null)
  408. throw new ArgumentNullException(nameof(elementSelector));
  409. return GroupBy(observer, subscription, keySelector, elementSelector, int.MaxValue, EqualityComparer<TKey>.Default);
  410. }
  411. public static (IAsyncObserver<TSource>, IAsyncDisposable) GroupBy<TSource, TKey, TElement>(IAsyncObserver<IGroupedAsyncObservable<TKey, TElement>> observer, IAsyncDisposable subscription, Func<TSource, ValueTask<TKey>> keySelector, Func<TSource, ValueTask<TElement>> elementSelector, IEqualityComparer<TKey> comparer)
  412. {
  413. if (observer == null)
  414. throw new ArgumentNullException(nameof(observer));
  415. if (subscription == null)
  416. throw new ArgumentNullException(nameof(subscription));
  417. if (keySelector == null)
  418. throw new ArgumentNullException(nameof(keySelector));
  419. if (elementSelector == null)
  420. throw new ArgumentNullException(nameof(elementSelector));
  421. if (comparer == null)
  422. throw new ArgumentNullException(nameof(comparer));
  423. return GroupBy(observer, subscription, keySelector, elementSelector, int.MaxValue, comparer);
  424. }
  425. public static (IAsyncObserver<TSource>, IAsyncDisposable) GroupBy<TSource, TKey, TElement>(IAsyncObserver<IGroupedAsyncObservable<TKey, TElement>> observer, IAsyncDisposable subscription, Func<TSource, ValueTask<TKey>> keySelector, Func<TSource, ValueTask<TElement>> elementSelector, int capacity)
  426. {
  427. if (observer == null)
  428. throw new ArgumentNullException(nameof(observer));
  429. if (subscription == null)
  430. throw new ArgumentNullException(nameof(subscription));
  431. if (keySelector == null)
  432. throw new ArgumentNullException(nameof(keySelector));
  433. if (elementSelector == null)
  434. throw new ArgumentNullException(nameof(elementSelector));
  435. if (capacity < 0)
  436. throw new ArgumentOutOfRangeException(nameof(capacity));
  437. return GroupBy(observer, subscription, keySelector, elementSelector, capacity, EqualityComparer<TKey>.Default);
  438. }
  439. public static (IAsyncObserver<TSource>, IAsyncDisposable) GroupBy<TSource, TKey, TElement>(IAsyncObserver<IGroupedAsyncObservable<TKey, TElement>> observer, IAsyncDisposable subscription, Func<TSource, ValueTask<TKey>> keySelector, Func<TSource, ValueTask<TElement>> elementSelector, int capacity, IEqualityComparer<TKey> comparer)
  440. {
  441. if (observer == null)
  442. throw new ArgumentNullException(nameof(observer));
  443. if (subscription == null)
  444. throw new ArgumentNullException(nameof(subscription));
  445. if (keySelector == null)
  446. throw new ArgumentNullException(nameof(keySelector));
  447. if (elementSelector == null)
  448. throw new ArgumentNullException(nameof(elementSelector));
  449. if (capacity < 0)
  450. throw new ArgumentOutOfRangeException(nameof(capacity));
  451. if (comparer == null)
  452. throw new ArgumentNullException(nameof(comparer));
  453. var refCount = new RefCountAsyncDisposable(subscription);
  454. var groups = default(Dictionary<TKey, SequentialSimpleAsyncSubject<TElement>>);
  455. if (capacity == int.MaxValue)
  456. {
  457. groups = new Dictionary<TKey, SequentialSimpleAsyncSubject<TElement>>(comparer);
  458. }
  459. else
  460. {
  461. groups = new Dictionary<TKey, SequentialSimpleAsyncSubject<TElement>>(capacity, comparer);
  462. }
  463. var nullGroup = default(SequentialSimpleAsyncSubject<TElement>);
  464. async ValueTask OnErrorAsync(Exception ex)
  465. {
  466. if (nullGroup != null)
  467. {
  468. await nullGroup.OnErrorAsync(ex).ConfigureAwait(false);
  469. }
  470. foreach (var group in groups.Values)
  471. {
  472. await group.OnErrorAsync(ex).ConfigureAwait(false);
  473. }
  474. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  475. }
  476. return
  477. (
  478. Create<TSource>
  479. (
  480. async x =>
  481. {
  482. var key = default(TKey);
  483. try
  484. {
  485. key = await keySelector(x).ConfigureAwait(false);
  486. }
  487. catch (Exception ex)
  488. {
  489. await OnErrorAsync(ex).ConfigureAwait(false);
  490. return;
  491. }
  492. var shouldEmit = false;
  493. var group = default(SequentialSimpleAsyncSubject<TElement>);
  494. if (key == null)
  495. {
  496. if (nullGroup == null)
  497. {
  498. nullGroup = new SequentialSimpleAsyncSubject<TElement>();
  499. shouldEmit = true;
  500. }
  501. group = nullGroup;
  502. }
  503. else
  504. {
  505. try
  506. {
  507. if (!groups.TryGetValue(key, out group))
  508. {
  509. group = new SequentialSimpleAsyncSubject<TElement>();
  510. groups.Add(key, group);
  511. shouldEmit = true;
  512. }
  513. }
  514. catch (Exception ex)
  515. {
  516. await OnErrorAsync(ex).ConfigureAwait(false);
  517. return;
  518. }
  519. }
  520. if (shouldEmit)
  521. {
  522. var g = new GroupedAsyncObservable<TKey, TElement>(key, group, refCount);
  523. await observer.OnNextAsync(g).ConfigureAwait(false);
  524. }
  525. var element = default(TElement);
  526. try
  527. {
  528. element = await elementSelector(x).ConfigureAwait(false);
  529. }
  530. catch (Exception ex)
  531. {
  532. await OnErrorAsync(ex).ConfigureAwait(false);
  533. return;
  534. }
  535. await group.OnNextAsync(element).ConfigureAwait(false);
  536. },
  537. OnErrorAsync,
  538. async () =>
  539. {
  540. if (nullGroup != null)
  541. {
  542. await nullGroup.OnCompletedAsync().ConfigureAwait(false);
  543. }
  544. foreach (var group in groups.Values)
  545. {
  546. await group.OnCompletedAsync().ConfigureAwait(false);
  547. }
  548. await observer.OnCompletedAsync().ConfigureAwait(false);
  549. }
  550. ),
  551. refCount
  552. );
  553. }
  554. }
  555. }