GroupJoin.cs 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. namespace System.Linq
  8. {
  9. public static partial class AsyncEnumerable
  10. {
  11. public static IAsyncEnumerable<TResult> GroupJoin<TOuter, TInner, TKey, TResult>(this IAsyncEnumerable<TOuter> outer, IAsyncEnumerable<TInner> inner, Func<TOuter, TKey> outerKeySelector, Func<TInner, TKey> innerKeySelector, Func<TOuter, IAsyncEnumerable<TInner>, TResult> resultSelector) =>
  12. GroupJoin(outer, inner, outerKeySelector, innerKeySelector, resultSelector, comparer: null);
  13. public static IAsyncEnumerable<TResult> GroupJoin<TOuter, TInner, TKey, TResult>(this IAsyncEnumerable<TOuter> outer, IAsyncEnumerable<TInner> inner, Func<TOuter, TKey> outerKeySelector, Func<TInner, TKey> innerKeySelector, Func<TOuter, IAsyncEnumerable<TInner>, TResult> resultSelector, IEqualityComparer<TKey> comparer)
  14. {
  15. if (outer == null)
  16. throw Error.ArgumentNull(nameof(outer));
  17. if (inner == null)
  18. throw Error.ArgumentNull(nameof(inner));
  19. if (outerKeySelector == null)
  20. throw Error.ArgumentNull(nameof(outerKeySelector));
  21. if (innerKeySelector == null)
  22. throw Error.ArgumentNull(nameof(innerKeySelector));
  23. if (resultSelector == null)
  24. throw Error.ArgumentNull(nameof(resultSelector));
  25. #if USE_ASYNC_ITERATOR
  26. return Create(Core);
  27. async IAsyncEnumerator<TResult> Core(CancellationToken cancellationToken)
  28. {
  29. await using (var e = outer.GetConfiguredAsyncEnumerator(cancellationToken, false))
  30. {
  31. if (await e.MoveNextAsync())
  32. {
  33. var lookup = await Internal.Lookup<TKey, TInner>.CreateForJoinAsync(inner, innerKeySelector, comparer, cancellationToken).ConfigureAwait(false);
  34. do
  35. {
  36. var item = e.Current;
  37. var outerKey = outerKeySelector(item);
  38. yield return resultSelector(item, lookup[outerKey].ToAsyncEnumerable());
  39. }
  40. while (await e.MoveNextAsync());
  41. }
  42. }
  43. }
  44. #else
  45. return new GroupJoinAsyncEnumerable<TOuter, TInner, TKey, TResult>(outer, inner, outerKeySelector, innerKeySelector, resultSelector, comparer);
  46. #endif
  47. }
  48. public static IAsyncEnumerable<TResult> GroupJoinAwait<TOuter, TInner, TKey, TResult>(this IAsyncEnumerable<TOuter> outer, IAsyncEnumerable<TInner> inner, Func<TOuter, ValueTask<TKey>> outerKeySelector, Func<TInner, ValueTask<TKey>> innerKeySelector, Func<TOuter, IAsyncEnumerable<TInner>, ValueTask<TResult>> resultSelector) =>
  49. GroupJoinAwait<TOuter, TInner, TKey, TResult>(outer, inner, outerKeySelector, innerKeySelector, resultSelector, comparer: null);
  50. public static IAsyncEnumerable<TResult> GroupJoinAwait<TOuter, TInner, TKey, TResult>(this IAsyncEnumerable<TOuter> outer, IAsyncEnumerable<TInner> inner, Func<TOuter, ValueTask<TKey>> outerKeySelector, Func<TInner, ValueTask<TKey>> innerKeySelector, Func<TOuter, IAsyncEnumerable<TInner>, ValueTask<TResult>> resultSelector, IEqualityComparer<TKey> comparer)
  51. {
  52. if (outer == null)
  53. throw Error.ArgumentNull(nameof(outer));
  54. if (inner == null)
  55. throw Error.ArgumentNull(nameof(inner));
  56. if (outerKeySelector == null)
  57. throw Error.ArgumentNull(nameof(outerKeySelector));
  58. if (innerKeySelector == null)
  59. throw Error.ArgumentNull(nameof(innerKeySelector));
  60. if (resultSelector == null)
  61. throw Error.ArgumentNull(nameof(resultSelector));
  62. #if USE_ASYNC_ITERATOR
  63. return Create(Core);
  64. async IAsyncEnumerator<TResult> Core(CancellationToken cancellationToken)
  65. {
  66. await using (var e = outer.GetConfiguredAsyncEnumerator(cancellationToken, false))
  67. {
  68. if (await e.MoveNextAsync())
  69. {
  70. var lookup = await Internal.LookupWithTask<TKey, TInner>.CreateForJoinAsync(inner, innerKeySelector, comparer, cancellationToken).ConfigureAwait(false);
  71. do
  72. {
  73. var item = e.Current;
  74. var outerKey = await outerKeySelector(item).ConfigureAwait(false);
  75. yield return await resultSelector(item, lookup[outerKey].ToAsyncEnumerable()).ConfigureAwait(false);
  76. }
  77. while (await e.MoveNextAsync());
  78. }
  79. }
  80. }
  81. #else
  82. return new GroupJoinAsyncEnumerableWithTask<TOuter, TInner, TKey, TResult>(outer, inner, outerKeySelector, innerKeySelector, resultSelector, comparer);
  83. #endif
  84. }
  85. #if !NO_DEEP_CANCELLATION
  86. public static IAsyncEnumerable<TResult> GroupJoinAwaitWithCancellation<TOuter, TInner, TKey, TResult>(this IAsyncEnumerable<TOuter> outer, IAsyncEnumerable<TInner> inner, Func<TOuter, CancellationToken, ValueTask<TKey>> outerKeySelector, Func<TInner, CancellationToken, ValueTask<TKey>> innerKeySelector, Func<TOuter, IAsyncEnumerable<TInner>, CancellationToken, ValueTask<TResult>> resultSelector) =>
  87. GroupJoinAwaitWithCancellation<TOuter, TInner, TKey, TResult>(outer, inner, outerKeySelector, innerKeySelector, resultSelector, comparer: null);
  88. public static IAsyncEnumerable<TResult> GroupJoinAwaitWithCancellation<TOuter, TInner, TKey, TResult>(this IAsyncEnumerable<TOuter> outer, IAsyncEnumerable<TInner> inner, Func<TOuter, CancellationToken, ValueTask<TKey>> outerKeySelector, Func<TInner, CancellationToken, ValueTask<TKey>> innerKeySelector, Func<TOuter, IAsyncEnumerable<TInner>, CancellationToken, ValueTask<TResult>> resultSelector, IEqualityComparer<TKey> comparer)
  89. {
  90. if (outer == null)
  91. throw Error.ArgumentNull(nameof(outer));
  92. if (inner == null)
  93. throw Error.ArgumentNull(nameof(inner));
  94. if (outerKeySelector == null)
  95. throw Error.ArgumentNull(nameof(outerKeySelector));
  96. if (innerKeySelector == null)
  97. throw Error.ArgumentNull(nameof(innerKeySelector));
  98. if (resultSelector == null)
  99. throw Error.ArgumentNull(nameof(resultSelector));
  100. #if USE_ASYNC_ITERATOR
  101. return Create(Core);
  102. async IAsyncEnumerator<TResult> Core(CancellationToken cancellationToken)
  103. {
  104. await using (var e = outer.GetConfiguredAsyncEnumerator(cancellationToken, false))
  105. {
  106. if (await e.MoveNextAsync())
  107. {
  108. var lookup = await Internal.LookupWithTask<TKey, TInner>.CreateForJoinAsync(inner, innerKeySelector, comparer, cancellationToken).ConfigureAwait(false);
  109. do
  110. {
  111. var item = e.Current;
  112. var outerKey = await outerKeySelector(item, cancellationToken).ConfigureAwait(false);
  113. yield return await resultSelector(item, lookup[outerKey].ToAsyncEnumerable(), cancellationToken).ConfigureAwait(false);
  114. }
  115. while (await e.MoveNextAsync());
  116. }
  117. }
  118. }
  119. #else
  120. return new GroupJoinAsyncEnumerableWithTaskAndCancellation<TOuter, TInner, TKey, TResult>(outer, inner, outerKeySelector, innerKeySelector, resultSelector, comparer);
  121. #endif
  122. }
  123. #endif
  124. #if !USE_ASYNC_ITERATOR
  125. private sealed class GroupJoinAsyncEnumerable<TOuter, TInner, TKey, TResult> : IAsyncEnumerable<TResult>
  126. {
  127. private readonly IEqualityComparer<TKey> _comparer;
  128. private readonly IAsyncEnumerable<TInner> _inner;
  129. private readonly Func<TInner, TKey> _innerKeySelector;
  130. private readonly IAsyncEnumerable<TOuter> _outer;
  131. private readonly Func<TOuter, TKey> _outerKeySelector;
  132. private readonly Func<TOuter, IAsyncEnumerable<TInner>, TResult> _resultSelector;
  133. public GroupJoinAsyncEnumerable(
  134. IAsyncEnumerable<TOuter> outer,
  135. IAsyncEnumerable<TInner> inner,
  136. Func<TOuter, TKey> outerKeySelector,
  137. Func<TInner, TKey> innerKeySelector,
  138. Func<TOuter, IAsyncEnumerable<TInner>, TResult> resultSelector,
  139. IEqualityComparer<TKey> comparer)
  140. {
  141. _outer = outer;
  142. _inner = inner;
  143. _outerKeySelector = outerKeySelector;
  144. _innerKeySelector = innerKeySelector;
  145. _resultSelector = resultSelector;
  146. _comparer = comparer;
  147. }
  148. public IAsyncEnumerator<TResult> GetAsyncEnumerator(CancellationToken cancellationToken)
  149. {
  150. cancellationToken.ThrowIfCancellationRequested(); // NB: [LDM-2018-11-28] Equivalent to async iterator behavior.
  151. return new GroupJoinAsyncEnumerator(
  152. _outer.GetAsyncEnumerator(cancellationToken),
  153. _inner,
  154. _outerKeySelector,
  155. _innerKeySelector,
  156. _resultSelector,
  157. _comparer,
  158. cancellationToken);
  159. }
  160. private sealed class GroupJoinAsyncEnumerator : IAsyncEnumerator<TResult>
  161. {
  162. private readonly IEqualityComparer<TKey> _comparer;
  163. private readonly IAsyncEnumerable<TInner> _inner;
  164. private readonly Func<TInner, TKey> _innerKeySelector;
  165. private readonly IAsyncEnumerator<TOuter> _outer;
  166. private readonly Func<TOuter, TKey> _outerKeySelector;
  167. private readonly Func<TOuter, IAsyncEnumerable<TInner>, TResult> _resultSelector;
  168. private readonly CancellationToken _cancellationToken;
  169. private Internal.Lookup<TKey, TInner> _lookup;
  170. public GroupJoinAsyncEnumerator(
  171. IAsyncEnumerator<TOuter> outer,
  172. IAsyncEnumerable<TInner> inner,
  173. Func<TOuter, TKey> outerKeySelector,
  174. Func<TInner, TKey> innerKeySelector,
  175. Func<TOuter, IAsyncEnumerable<TInner>, TResult> resultSelector,
  176. IEqualityComparer<TKey> comparer,
  177. CancellationToken cancellationToken)
  178. {
  179. _outer = outer;
  180. _inner = inner;
  181. _outerKeySelector = outerKeySelector;
  182. _innerKeySelector = innerKeySelector;
  183. _resultSelector = resultSelector;
  184. _comparer = comparer;
  185. _cancellationToken = cancellationToken;
  186. }
  187. public async ValueTask<bool> MoveNextAsync()
  188. {
  189. // nothing to do
  190. if (!await _outer.MoveNextAsync().ConfigureAwait(false))
  191. {
  192. return false;
  193. }
  194. if (_lookup == null)
  195. {
  196. _lookup = await Internal.Lookup<TKey, TInner>.CreateForJoinAsync(_inner, _innerKeySelector, _comparer, _cancellationToken).ConfigureAwait(false);
  197. }
  198. var item = _outer.Current;
  199. var outerKey = _outerKeySelector(item);
  200. var inner = _lookup[outerKey].ToAsyncEnumerable();
  201. Current = _resultSelector(item, inner);
  202. return true;
  203. }
  204. public TResult Current { get; private set; }
  205. public ValueTask DisposeAsync() => _outer.DisposeAsync();
  206. }
  207. }
  208. private sealed class GroupJoinAsyncEnumerableWithTask<TOuter, TInner, TKey, TResult> : IAsyncEnumerable<TResult>
  209. {
  210. private readonly IEqualityComparer<TKey> _comparer;
  211. private readonly IAsyncEnumerable<TInner> _inner;
  212. private readonly Func<TInner, ValueTask<TKey>> _innerKeySelector;
  213. private readonly IAsyncEnumerable<TOuter> _outer;
  214. private readonly Func<TOuter, ValueTask<TKey>> _outerKeySelector;
  215. private readonly Func<TOuter, IAsyncEnumerable<TInner>, ValueTask<TResult>> _resultSelector;
  216. public GroupJoinAsyncEnumerableWithTask(
  217. IAsyncEnumerable<TOuter> outer,
  218. IAsyncEnumerable<TInner> inner,
  219. Func<TOuter, ValueTask<TKey>> outerKeySelector,
  220. Func<TInner, ValueTask<TKey>> innerKeySelector,
  221. Func<TOuter, IAsyncEnumerable<TInner>, ValueTask<TResult>> resultSelector,
  222. IEqualityComparer<TKey> comparer)
  223. {
  224. _outer = outer;
  225. _inner = inner;
  226. _outerKeySelector = outerKeySelector;
  227. _innerKeySelector = innerKeySelector;
  228. _resultSelector = resultSelector;
  229. _comparer = comparer;
  230. }
  231. public IAsyncEnumerator<TResult> GetAsyncEnumerator(CancellationToken cancellationToken)
  232. {
  233. cancellationToken.ThrowIfCancellationRequested(); // NB: [LDM-2018-11-28] Equivalent to async iterator behavior.
  234. return new GroupJoinAsyncEnumeratorWithTask(
  235. _outer.GetAsyncEnumerator(cancellationToken),
  236. _inner,
  237. _outerKeySelector,
  238. _innerKeySelector,
  239. _resultSelector,
  240. _comparer,
  241. cancellationToken);
  242. }
  243. private sealed class GroupJoinAsyncEnumeratorWithTask : IAsyncEnumerator<TResult>
  244. {
  245. private readonly IEqualityComparer<TKey> _comparer;
  246. private readonly IAsyncEnumerable<TInner> _inner;
  247. private readonly Func<TInner, ValueTask<TKey>> _innerKeySelector;
  248. private readonly IAsyncEnumerator<TOuter> _outer;
  249. private readonly Func<TOuter, ValueTask<TKey>> _outerKeySelector;
  250. private readonly Func<TOuter, IAsyncEnumerable<TInner>, ValueTask<TResult>> _resultSelector;
  251. private readonly CancellationToken _cancellationToken;
  252. private Internal.LookupWithTask<TKey, TInner> _lookup;
  253. public GroupJoinAsyncEnumeratorWithTask(
  254. IAsyncEnumerator<TOuter> outer,
  255. IAsyncEnumerable<TInner> inner,
  256. Func<TOuter, ValueTask<TKey>> outerKeySelector,
  257. Func<TInner, ValueTask<TKey>> innerKeySelector,
  258. Func<TOuter, IAsyncEnumerable<TInner>, ValueTask<TResult>> resultSelector,
  259. IEqualityComparer<TKey> comparer,
  260. CancellationToken cancellationToken)
  261. {
  262. _outer = outer;
  263. _inner = inner;
  264. _outerKeySelector = outerKeySelector;
  265. _innerKeySelector = innerKeySelector;
  266. _resultSelector = resultSelector;
  267. _comparer = comparer;
  268. _cancellationToken = cancellationToken;
  269. }
  270. public async ValueTask<bool> MoveNextAsync()
  271. {
  272. // nothing to do
  273. if (!await _outer.MoveNextAsync().ConfigureAwait(false))
  274. {
  275. return false;
  276. }
  277. if (_lookup == null)
  278. {
  279. _lookup = await Internal.LookupWithTask<TKey, TInner>.CreateForJoinAsync(_inner, _innerKeySelector, _comparer, _cancellationToken).ConfigureAwait(false);
  280. }
  281. var item = _outer.Current;
  282. var outerKey = await _outerKeySelector(item).ConfigureAwait(false);
  283. var inner = _lookup[outerKey].ToAsyncEnumerable();
  284. Current = await _resultSelector(item, inner).ConfigureAwait(false);
  285. return true;
  286. }
  287. public TResult Current { get; private set; }
  288. public ValueTask DisposeAsync() => _outer.DisposeAsync();
  289. }
  290. }
  291. #if !NO_DEEP_CANCELLATION
  292. private sealed class GroupJoinAsyncEnumerableWithTaskAndCancellation<TOuter, TInner, TKey, TResult> : IAsyncEnumerable<TResult>
  293. {
  294. private readonly IEqualityComparer<TKey> _comparer;
  295. private readonly IAsyncEnumerable<TInner> _inner;
  296. private readonly Func<TInner, CancellationToken, ValueTask<TKey>> _innerKeySelector;
  297. private readonly IAsyncEnumerable<TOuter> _outer;
  298. private readonly Func<TOuter, CancellationToken, ValueTask<TKey>> _outerKeySelector;
  299. private readonly Func<TOuter, IAsyncEnumerable<TInner>, CancellationToken, ValueTask<TResult>> _resultSelector;
  300. public GroupJoinAsyncEnumerableWithTaskAndCancellation(
  301. IAsyncEnumerable<TOuter> outer,
  302. IAsyncEnumerable<TInner> inner,
  303. Func<TOuter, CancellationToken, ValueTask<TKey>> outerKeySelector,
  304. Func<TInner, CancellationToken, ValueTask<TKey>> innerKeySelector,
  305. Func<TOuter, IAsyncEnumerable<TInner>, CancellationToken, ValueTask<TResult>> resultSelector,
  306. IEqualityComparer<TKey> comparer)
  307. {
  308. _outer = outer;
  309. _inner = inner;
  310. _outerKeySelector = outerKeySelector;
  311. _innerKeySelector = innerKeySelector;
  312. _resultSelector = resultSelector;
  313. _comparer = comparer;
  314. }
  315. public IAsyncEnumerator<TResult> GetAsyncEnumerator(CancellationToken cancellationToken)
  316. {
  317. cancellationToken.ThrowIfCancellationRequested(); // NB: [LDM-2018-11-28] Equivalent to async iterator behavior.
  318. return new GroupJoinAsyncEnumeratorWithTask(
  319. _outer.GetAsyncEnumerator(cancellationToken),
  320. _inner,
  321. _outerKeySelector,
  322. _innerKeySelector,
  323. _resultSelector,
  324. _comparer,
  325. cancellationToken);
  326. }
  327. private sealed class GroupJoinAsyncEnumeratorWithTask : IAsyncEnumerator<TResult>
  328. {
  329. private readonly IEqualityComparer<TKey> _comparer;
  330. private readonly IAsyncEnumerable<TInner> _inner;
  331. private readonly Func<TInner, CancellationToken, ValueTask<TKey>> _innerKeySelector;
  332. private readonly IAsyncEnumerator<TOuter> _outer;
  333. private readonly Func<TOuter, CancellationToken, ValueTask<TKey>> _outerKeySelector;
  334. private readonly Func<TOuter, IAsyncEnumerable<TInner>, CancellationToken, ValueTask<TResult>> _resultSelector;
  335. private readonly CancellationToken _cancellationToken;
  336. private Internal.LookupWithTask<TKey, TInner> _lookup;
  337. public GroupJoinAsyncEnumeratorWithTask(
  338. IAsyncEnumerator<TOuter> outer,
  339. IAsyncEnumerable<TInner> inner,
  340. Func<TOuter, CancellationToken, ValueTask<TKey>> outerKeySelector,
  341. Func<TInner, CancellationToken, ValueTask<TKey>> innerKeySelector,
  342. Func<TOuter, IAsyncEnumerable<TInner>, CancellationToken, ValueTask<TResult>> resultSelector,
  343. IEqualityComparer<TKey> comparer,
  344. CancellationToken cancellationToken)
  345. {
  346. _outer = outer;
  347. _inner = inner;
  348. _outerKeySelector = outerKeySelector;
  349. _innerKeySelector = innerKeySelector;
  350. _resultSelector = resultSelector;
  351. _comparer = comparer;
  352. _cancellationToken = cancellationToken;
  353. }
  354. public async ValueTask<bool> MoveNextAsync()
  355. {
  356. // nothing to do
  357. if (!await _outer.MoveNextAsync().ConfigureAwait(false))
  358. {
  359. return false;
  360. }
  361. if (_lookup == null)
  362. {
  363. _lookup = await Internal.LookupWithTask<TKey, TInner>.CreateForJoinAsync(_inner, _innerKeySelector, _comparer, _cancellationToken).ConfigureAwait(false);
  364. }
  365. var item = _outer.Current;
  366. var outerKey = await _outerKeySelector(item, _cancellationToken).ConfigureAwait(false);
  367. var inner = _lookup[outerKey].ToAsyncEnumerable();
  368. Current = await _resultSelector(item, inner, _cancellationToken).ConfigureAwait(false);
  369. return true;
  370. }
  371. public TResult Current { get; private set; }
  372. public ValueTask DisposeAsync() => _outer.DisposeAsync();
  373. }
  374. }
  375. #endif
  376. #endif
  377. }
  378. }