GroupJoin.cs 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. namespace System.Linq
  8. {
  9. public static partial class AsyncEnumerable
  10. {
  11. public static IAsyncEnumerable<TResult> GroupJoin<TOuter, TInner, TKey, TResult>(this IAsyncEnumerable<TOuter> outer, IAsyncEnumerable<TInner> inner, Func<TOuter, TKey> outerKeySelector, Func<TInner, TKey> innerKeySelector, Func<TOuter, IAsyncEnumerable<TInner>, TResult> resultSelector) =>
  12. GroupJoin(outer, inner, outerKeySelector, innerKeySelector, resultSelector, comparer: null);
  13. public static IAsyncEnumerable<TResult> GroupJoin<TOuter, TInner, TKey, TResult>(this IAsyncEnumerable<TOuter> outer, IAsyncEnumerable<TInner> inner, Func<TOuter, TKey> outerKeySelector, Func<TInner, TKey> innerKeySelector, Func<TOuter, IAsyncEnumerable<TInner>, TResult> resultSelector, IEqualityComparer<TKey> comparer)
  14. {
  15. if (outer == null)
  16. throw Error.ArgumentNull(nameof(outer));
  17. if (inner == null)
  18. throw Error.ArgumentNull(nameof(inner));
  19. if (outerKeySelector == null)
  20. throw Error.ArgumentNull(nameof(outerKeySelector));
  21. if (innerKeySelector == null)
  22. throw Error.ArgumentNull(nameof(innerKeySelector));
  23. if (resultSelector == null)
  24. throw Error.ArgumentNull(nameof(resultSelector));
  25. #if USE_ASYNC_ITERATOR
  26. return Create(Core);
  27. async IAsyncEnumerator<TResult> Core(CancellationToken cancellationToken)
  28. {
  29. var e = outer.GetConfiguredAsyncEnumerator(cancellationToken, false);
  30. try // TODO: Switch to `await using` in preview 3 (https://github.com/dotnet/roslyn/pull/32731)
  31. {
  32. if (await e.MoveNextAsync())
  33. {
  34. var lookup = await Internal.Lookup<TKey, TInner>.CreateForJoinAsync(inner, innerKeySelector, comparer, cancellationToken).ConfigureAwait(false);
  35. do
  36. {
  37. var item = e.Current;
  38. var outerKey = outerKeySelector(item);
  39. yield return resultSelector(item, lookup[outerKey].ToAsyncEnumerable());
  40. }
  41. while (await e.MoveNextAsync());
  42. }
  43. }
  44. finally
  45. {
  46. await e.DisposeAsync();
  47. }
  48. }
  49. #else
  50. return new GroupJoinAsyncEnumerable<TOuter, TInner, TKey, TResult>(outer, inner, outerKeySelector, innerKeySelector, resultSelector, comparer);
  51. #endif
  52. }
  53. public static IAsyncEnumerable<TResult> GroupJoin<TOuter, TInner, TKey, TResult>(this IAsyncEnumerable<TOuter> outer, IAsyncEnumerable<TInner> inner, Func<TOuter, ValueTask<TKey>> outerKeySelector, Func<TInner, ValueTask<TKey>> innerKeySelector, Func<TOuter, IAsyncEnumerable<TInner>, ValueTask<TResult>> resultSelector) =>
  54. GroupJoin<TOuter, TInner, TKey, TResult>(outer, inner, outerKeySelector, innerKeySelector, resultSelector, comparer: null);
  55. public static IAsyncEnumerable<TResult> GroupJoin<TOuter, TInner, TKey, TResult>(this IAsyncEnumerable<TOuter> outer, IAsyncEnumerable<TInner> inner, Func<TOuter, ValueTask<TKey>> outerKeySelector, Func<TInner, ValueTask<TKey>> innerKeySelector, Func<TOuter, IAsyncEnumerable<TInner>, ValueTask<TResult>> resultSelector, IEqualityComparer<TKey> comparer)
  56. {
  57. if (outer == null)
  58. throw Error.ArgumentNull(nameof(outer));
  59. if (inner == null)
  60. throw Error.ArgumentNull(nameof(inner));
  61. if (outerKeySelector == null)
  62. throw Error.ArgumentNull(nameof(outerKeySelector));
  63. if (innerKeySelector == null)
  64. throw Error.ArgumentNull(nameof(innerKeySelector));
  65. if (resultSelector == null)
  66. throw Error.ArgumentNull(nameof(resultSelector));
  67. #if USE_ASYNC_ITERATOR
  68. return Create(Core);
  69. async IAsyncEnumerator<TResult> Core(CancellationToken cancellationToken)
  70. {
  71. var e = outer.GetConfiguredAsyncEnumerator(cancellationToken, false);
  72. try // TODO: Switch to `await using` in preview 3 (https://github.com/dotnet/roslyn/pull/32731)
  73. {
  74. if (await e.MoveNextAsync())
  75. {
  76. var lookup = await Internal.LookupWithTask<TKey, TInner>.CreateForJoinAsync(inner, innerKeySelector, comparer, cancellationToken).ConfigureAwait(false);
  77. do
  78. {
  79. var item = e.Current;
  80. var outerKey = await outerKeySelector(item).ConfigureAwait(false);
  81. yield return await resultSelector(item, lookup[outerKey].ToAsyncEnumerable()).ConfigureAwait(false);
  82. }
  83. while (await e.MoveNextAsync());
  84. }
  85. }
  86. finally
  87. {
  88. await e.DisposeAsync();
  89. }
  90. }
  91. #else
  92. return new GroupJoinAsyncEnumerableWithTask<TOuter, TInner, TKey, TResult>(outer, inner, outerKeySelector, innerKeySelector, resultSelector, comparer);
  93. #endif
  94. }
  95. #if !NO_DEEP_CANCELLATION
  96. public static IAsyncEnumerable<TResult> GroupJoin<TOuter, TInner, TKey, TResult>(this IAsyncEnumerable<TOuter> outer, IAsyncEnumerable<TInner> inner, Func<TOuter, CancellationToken, ValueTask<TKey>> outerKeySelector, Func<TInner, CancellationToken, ValueTask<TKey>> innerKeySelector, Func<TOuter, IAsyncEnumerable<TInner>, CancellationToken, ValueTask<TResult>> resultSelector) =>
  97. GroupJoin<TOuter, TInner, TKey, TResult>(outer, inner, outerKeySelector, innerKeySelector, resultSelector, comparer: null);
  98. public static IAsyncEnumerable<TResult> GroupJoin<TOuter, TInner, TKey, TResult>(this IAsyncEnumerable<TOuter> outer, IAsyncEnumerable<TInner> inner, Func<TOuter, CancellationToken, ValueTask<TKey>> outerKeySelector, Func<TInner, CancellationToken, ValueTask<TKey>> innerKeySelector, Func<TOuter, IAsyncEnumerable<TInner>, CancellationToken, ValueTask<TResult>> resultSelector, IEqualityComparer<TKey> comparer)
  99. {
  100. if (outer == null)
  101. throw Error.ArgumentNull(nameof(outer));
  102. if (inner == null)
  103. throw Error.ArgumentNull(nameof(inner));
  104. if (outerKeySelector == null)
  105. throw Error.ArgumentNull(nameof(outerKeySelector));
  106. if (innerKeySelector == null)
  107. throw Error.ArgumentNull(nameof(innerKeySelector));
  108. if (resultSelector == null)
  109. throw Error.ArgumentNull(nameof(resultSelector));
  110. #if USE_ASYNC_ITERATOR
  111. return Create(Core);
  112. async IAsyncEnumerator<TResult> Core(CancellationToken cancellationToken)
  113. {
  114. var e = outer.GetConfiguredAsyncEnumerator(cancellationToken, false);
  115. try // TODO: Switch to `await using` in preview 3 (https://github.com/dotnet/roslyn/pull/32731)
  116. {
  117. if (await e.MoveNextAsync())
  118. {
  119. var lookup = await Internal.LookupWithTask<TKey, TInner>.CreateForJoinAsync(inner, innerKeySelector, comparer, cancellationToken).ConfigureAwait(false);
  120. do
  121. {
  122. var item = e.Current;
  123. var outerKey = await outerKeySelector(item, cancellationToken).ConfigureAwait(false);
  124. yield return await resultSelector(item, lookup[outerKey].ToAsyncEnumerable(), cancellationToken).ConfigureAwait(false);
  125. }
  126. while (await e.MoveNextAsync());
  127. }
  128. }
  129. finally
  130. {
  131. await e.DisposeAsync();
  132. }
  133. }
  134. #else
  135. return new GroupJoinAsyncEnumerableWithTaskAndCancellation<TOuter, TInner, TKey, TResult>(outer, inner, outerKeySelector, innerKeySelector, resultSelector, comparer);
  136. #endif
  137. }
  138. #endif
  139. #if !USE_ASYNC_ITERATOR
  140. private sealed class GroupJoinAsyncEnumerable<TOuter, TInner, TKey, TResult> : IAsyncEnumerable<TResult>
  141. {
  142. private readonly IEqualityComparer<TKey> _comparer;
  143. private readonly IAsyncEnumerable<TInner> _inner;
  144. private readonly Func<TInner, TKey> _innerKeySelector;
  145. private readonly IAsyncEnumerable<TOuter> _outer;
  146. private readonly Func<TOuter, TKey> _outerKeySelector;
  147. private readonly Func<TOuter, IAsyncEnumerable<TInner>, TResult> _resultSelector;
  148. public GroupJoinAsyncEnumerable(
  149. IAsyncEnumerable<TOuter> outer,
  150. IAsyncEnumerable<TInner> inner,
  151. Func<TOuter, TKey> outerKeySelector,
  152. Func<TInner, TKey> innerKeySelector,
  153. Func<TOuter, IAsyncEnumerable<TInner>, TResult> resultSelector,
  154. IEqualityComparer<TKey> comparer)
  155. {
  156. _outer = outer;
  157. _inner = inner;
  158. _outerKeySelector = outerKeySelector;
  159. _innerKeySelector = innerKeySelector;
  160. _resultSelector = resultSelector;
  161. _comparer = comparer;
  162. }
  163. public IAsyncEnumerator<TResult> GetAsyncEnumerator(CancellationToken cancellationToken)
  164. {
  165. cancellationToken.ThrowIfCancellationRequested(); // NB: [LDM-2018-11-28] Equivalent to async iterator behavior.
  166. return new GroupJoinAsyncEnumerator(
  167. _outer.GetAsyncEnumerator(cancellationToken),
  168. _inner,
  169. _outerKeySelector,
  170. _innerKeySelector,
  171. _resultSelector,
  172. _comparer,
  173. cancellationToken);
  174. }
  175. private sealed class GroupJoinAsyncEnumerator : IAsyncEnumerator<TResult>
  176. {
  177. private readonly IEqualityComparer<TKey> _comparer;
  178. private readonly IAsyncEnumerable<TInner> _inner;
  179. private readonly Func<TInner, TKey> _innerKeySelector;
  180. private readonly IAsyncEnumerator<TOuter> _outer;
  181. private readonly Func<TOuter, TKey> _outerKeySelector;
  182. private readonly Func<TOuter, IAsyncEnumerable<TInner>, TResult> _resultSelector;
  183. private readonly CancellationToken _cancellationToken;
  184. private Internal.Lookup<TKey, TInner> _lookup;
  185. public GroupJoinAsyncEnumerator(
  186. IAsyncEnumerator<TOuter> outer,
  187. IAsyncEnumerable<TInner> inner,
  188. Func<TOuter, TKey> outerKeySelector,
  189. Func<TInner, TKey> innerKeySelector,
  190. Func<TOuter, IAsyncEnumerable<TInner>, TResult> resultSelector,
  191. IEqualityComparer<TKey> comparer,
  192. CancellationToken cancellationToken)
  193. {
  194. _outer = outer;
  195. _inner = inner;
  196. _outerKeySelector = outerKeySelector;
  197. _innerKeySelector = innerKeySelector;
  198. _resultSelector = resultSelector;
  199. _comparer = comparer;
  200. _cancellationToken = cancellationToken;
  201. }
  202. public async ValueTask<bool> MoveNextAsync()
  203. {
  204. // nothing to do
  205. if (!await _outer.MoveNextAsync().ConfigureAwait(false))
  206. {
  207. return false;
  208. }
  209. if (_lookup == null)
  210. {
  211. _lookup = await Internal.Lookup<TKey, TInner>.CreateForJoinAsync(_inner, _innerKeySelector, _comparer, _cancellationToken).ConfigureAwait(false);
  212. }
  213. var item = _outer.Current;
  214. var outerKey = _outerKeySelector(item);
  215. var inner = _lookup[outerKey].ToAsyncEnumerable();
  216. Current = _resultSelector(item, inner);
  217. return true;
  218. }
  219. public TResult Current { get; private set; }
  220. public ValueTask DisposeAsync() => _outer.DisposeAsync();
  221. }
  222. }
  223. private sealed class GroupJoinAsyncEnumerableWithTask<TOuter, TInner, TKey, TResult> : IAsyncEnumerable<TResult>
  224. {
  225. private readonly IEqualityComparer<TKey> _comparer;
  226. private readonly IAsyncEnumerable<TInner> _inner;
  227. private readonly Func<TInner, ValueTask<TKey>> _innerKeySelector;
  228. private readonly IAsyncEnumerable<TOuter> _outer;
  229. private readonly Func<TOuter, ValueTask<TKey>> _outerKeySelector;
  230. private readonly Func<TOuter, IAsyncEnumerable<TInner>, ValueTask<TResult>> _resultSelector;
  231. public GroupJoinAsyncEnumerableWithTask(
  232. IAsyncEnumerable<TOuter> outer,
  233. IAsyncEnumerable<TInner> inner,
  234. Func<TOuter, ValueTask<TKey>> outerKeySelector,
  235. Func<TInner, ValueTask<TKey>> innerKeySelector,
  236. Func<TOuter, IAsyncEnumerable<TInner>, ValueTask<TResult>> resultSelector,
  237. IEqualityComparer<TKey> comparer)
  238. {
  239. _outer = outer;
  240. _inner = inner;
  241. _outerKeySelector = outerKeySelector;
  242. _innerKeySelector = innerKeySelector;
  243. _resultSelector = resultSelector;
  244. _comparer = comparer;
  245. }
  246. public IAsyncEnumerator<TResult> GetAsyncEnumerator(CancellationToken cancellationToken)
  247. {
  248. cancellationToken.ThrowIfCancellationRequested(); // NB: [LDM-2018-11-28] Equivalent to async iterator behavior.
  249. return new GroupJoinAsyncEnumeratorWithTask(
  250. _outer.GetAsyncEnumerator(cancellationToken),
  251. _inner,
  252. _outerKeySelector,
  253. _innerKeySelector,
  254. _resultSelector,
  255. _comparer,
  256. cancellationToken);
  257. }
  258. private sealed class GroupJoinAsyncEnumeratorWithTask : IAsyncEnumerator<TResult>
  259. {
  260. private readonly IEqualityComparer<TKey> _comparer;
  261. private readonly IAsyncEnumerable<TInner> _inner;
  262. private readonly Func<TInner, ValueTask<TKey>> _innerKeySelector;
  263. private readonly IAsyncEnumerator<TOuter> _outer;
  264. private readonly Func<TOuter, ValueTask<TKey>> _outerKeySelector;
  265. private readonly Func<TOuter, IAsyncEnumerable<TInner>, ValueTask<TResult>> _resultSelector;
  266. private readonly CancellationToken _cancellationToken;
  267. private Internal.LookupWithTask<TKey, TInner> _lookup;
  268. public GroupJoinAsyncEnumeratorWithTask(
  269. IAsyncEnumerator<TOuter> outer,
  270. IAsyncEnumerable<TInner> inner,
  271. Func<TOuter, ValueTask<TKey>> outerKeySelector,
  272. Func<TInner, ValueTask<TKey>> innerKeySelector,
  273. Func<TOuter, IAsyncEnumerable<TInner>, ValueTask<TResult>> resultSelector,
  274. IEqualityComparer<TKey> comparer,
  275. CancellationToken cancellationToken)
  276. {
  277. _outer = outer;
  278. _inner = inner;
  279. _outerKeySelector = outerKeySelector;
  280. _innerKeySelector = innerKeySelector;
  281. _resultSelector = resultSelector;
  282. _comparer = comparer;
  283. _cancellationToken = cancellationToken;
  284. }
  285. public async ValueTask<bool> MoveNextAsync()
  286. {
  287. // nothing to do
  288. if (!await _outer.MoveNextAsync().ConfigureAwait(false))
  289. {
  290. return false;
  291. }
  292. if (_lookup == null)
  293. {
  294. _lookup = await Internal.LookupWithTask<TKey, TInner>.CreateForJoinAsync(_inner, _innerKeySelector, _comparer, _cancellationToken).ConfigureAwait(false);
  295. }
  296. var item = _outer.Current;
  297. var outerKey = await _outerKeySelector(item).ConfigureAwait(false);
  298. var inner = _lookup[outerKey].ToAsyncEnumerable();
  299. Current = await _resultSelector(item, inner).ConfigureAwait(false);
  300. return true;
  301. }
  302. public TResult Current { get; private set; }
  303. public ValueTask DisposeAsync() => _outer.DisposeAsync();
  304. }
  305. }
  306. #if !NO_DEEP_CANCELLATION
  307. private sealed class GroupJoinAsyncEnumerableWithTaskAndCancellation<TOuter, TInner, TKey, TResult> : IAsyncEnumerable<TResult>
  308. {
  309. private readonly IEqualityComparer<TKey> _comparer;
  310. private readonly IAsyncEnumerable<TInner> _inner;
  311. private readonly Func<TInner, CancellationToken, ValueTask<TKey>> _innerKeySelector;
  312. private readonly IAsyncEnumerable<TOuter> _outer;
  313. private readonly Func<TOuter, CancellationToken, ValueTask<TKey>> _outerKeySelector;
  314. private readonly Func<TOuter, IAsyncEnumerable<TInner>, CancellationToken, ValueTask<TResult>> _resultSelector;
  315. public GroupJoinAsyncEnumerableWithTaskAndCancellation(
  316. IAsyncEnumerable<TOuter> outer,
  317. IAsyncEnumerable<TInner> inner,
  318. Func<TOuter, CancellationToken, ValueTask<TKey>> outerKeySelector,
  319. Func<TInner, CancellationToken, ValueTask<TKey>> innerKeySelector,
  320. Func<TOuter, IAsyncEnumerable<TInner>, CancellationToken, ValueTask<TResult>> resultSelector,
  321. IEqualityComparer<TKey> comparer)
  322. {
  323. _outer = outer;
  324. _inner = inner;
  325. _outerKeySelector = outerKeySelector;
  326. _innerKeySelector = innerKeySelector;
  327. _resultSelector = resultSelector;
  328. _comparer = comparer;
  329. }
  330. public IAsyncEnumerator<TResult> GetAsyncEnumerator(CancellationToken cancellationToken)
  331. {
  332. cancellationToken.ThrowIfCancellationRequested(); // NB: [LDM-2018-11-28] Equivalent to async iterator behavior.
  333. return new GroupJoinAsyncEnumeratorWithTask(
  334. _outer.GetAsyncEnumerator(cancellationToken),
  335. _inner,
  336. _outerKeySelector,
  337. _innerKeySelector,
  338. _resultSelector,
  339. _comparer,
  340. cancellationToken);
  341. }
  342. private sealed class GroupJoinAsyncEnumeratorWithTask : IAsyncEnumerator<TResult>
  343. {
  344. private readonly IEqualityComparer<TKey> _comparer;
  345. private readonly IAsyncEnumerable<TInner> _inner;
  346. private readonly Func<TInner, CancellationToken, ValueTask<TKey>> _innerKeySelector;
  347. private readonly IAsyncEnumerator<TOuter> _outer;
  348. private readonly Func<TOuter, CancellationToken, ValueTask<TKey>> _outerKeySelector;
  349. private readonly Func<TOuter, IAsyncEnumerable<TInner>, CancellationToken, ValueTask<TResult>> _resultSelector;
  350. private readonly CancellationToken _cancellationToken;
  351. private Internal.LookupWithTask<TKey, TInner> _lookup;
  352. public GroupJoinAsyncEnumeratorWithTask(
  353. IAsyncEnumerator<TOuter> outer,
  354. IAsyncEnumerable<TInner> inner,
  355. Func<TOuter, CancellationToken, ValueTask<TKey>> outerKeySelector,
  356. Func<TInner, CancellationToken, ValueTask<TKey>> innerKeySelector,
  357. Func<TOuter, IAsyncEnumerable<TInner>, CancellationToken, ValueTask<TResult>> resultSelector,
  358. IEqualityComparer<TKey> comparer,
  359. CancellationToken cancellationToken)
  360. {
  361. _outer = outer;
  362. _inner = inner;
  363. _outerKeySelector = outerKeySelector;
  364. _innerKeySelector = innerKeySelector;
  365. _resultSelector = resultSelector;
  366. _comparer = comparer;
  367. _cancellationToken = cancellationToken;
  368. }
  369. public async ValueTask<bool> MoveNextAsync()
  370. {
  371. // nothing to do
  372. if (!await _outer.MoveNextAsync().ConfigureAwait(false))
  373. {
  374. return false;
  375. }
  376. if (_lookup == null)
  377. {
  378. _lookup = await Internal.LookupWithTask<TKey, TInner>.CreateForJoinAsync(_inner, _innerKeySelector, _comparer, _cancellationToken).ConfigureAwait(false);
  379. }
  380. var item = _outer.Current;
  381. var outerKey = await _outerKeySelector(item, _cancellationToken).ConfigureAwait(false);
  382. var inner = _lookup[outerKey].ToAsyncEnumerable();
  383. Current = await _resultSelector(item, inner, _cancellationToken).ConfigureAwait(false);
  384. return true;
  385. }
  386. public TResult Current { get; private set; }
  387. public ValueTask DisposeAsync() => _outer.DisposeAsync();
  388. }
  389. }
  390. #endif
  391. #endif
  392. }
  393. }