Join.cs 26 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Diagnostics;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. namespace System.Linq
  9. {
  10. public static partial class AsyncEnumerable
  11. {
  12. public static IAsyncEnumerable<TResult> Join<TOuter, TInner, TKey, TResult>(this IAsyncEnumerable<TOuter> outer, IAsyncEnumerable<TInner> inner, Func<TOuter, TKey> outerKeySelector, Func<TInner, TKey> innerKeySelector, Func<TOuter, TInner, TResult> resultSelector) =>
  13. Join(outer, inner, outerKeySelector, innerKeySelector, resultSelector, comparer: null);
  14. public static IAsyncEnumerable<TResult> Join<TOuter, TInner, TKey, TResult>(this IAsyncEnumerable<TOuter> outer, IAsyncEnumerable<TInner> inner, Func<TOuter, TKey> outerKeySelector, Func<TInner, TKey> innerKeySelector, Func<TOuter, TInner, TResult> resultSelector, IEqualityComparer<TKey> comparer)
  15. {
  16. if (outer == null)
  17. throw Error.ArgumentNull(nameof(outer));
  18. if (inner == null)
  19. throw Error.ArgumentNull(nameof(inner));
  20. if (outerKeySelector == null)
  21. throw Error.ArgumentNull(nameof(outerKeySelector));
  22. if (innerKeySelector == null)
  23. throw Error.ArgumentNull(nameof(innerKeySelector));
  24. if (resultSelector == null)
  25. throw Error.ArgumentNull(nameof(resultSelector));
  26. #if USE_ASYNC_ITERATOR
  27. return Create(Core);
  28. async IAsyncEnumerator<TResult> Core(CancellationToken cancellationToken)
  29. {
  30. var e = outer.GetConfiguredAsyncEnumerator(cancellationToken, false);
  31. try // REVIEW: Can use `await using` if we get pattern bind (HAS_AWAIT_USING_PATTERN_BIND)
  32. {
  33. if (await e.MoveNextAsync())
  34. {
  35. var lookup = await Internal.Lookup<TKey, TInner>.CreateForJoinAsync(inner, innerKeySelector, comparer, cancellationToken).ConfigureAwait(false);
  36. if (lookup.Count != 0)
  37. {
  38. do
  39. {
  40. var item = e.Current;
  41. var outerKey = outerKeySelector(item);
  42. var g = lookup.GetGrouping(outerKey, create: false);
  43. if (g != null)
  44. {
  45. var count = g._count;
  46. var elements = g._elements;
  47. for (var i = 0; i != count; ++i)
  48. {
  49. yield return resultSelector(item, elements[i]);
  50. }
  51. }
  52. }
  53. while (await e.MoveNextAsync());
  54. }
  55. }
  56. }
  57. finally
  58. {
  59. await e.DisposeAsync();
  60. }
  61. }
  62. #else
  63. return new JoinAsyncIterator<TOuter, TInner, TKey, TResult>(outer, inner, outerKeySelector, innerKeySelector, resultSelector, comparer);
  64. #endif
  65. }
  66. public static IAsyncEnumerable<TResult> Join<TOuter, TInner, TKey, TResult>(this IAsyncEnumerable<TOuter> outer, IAsyncEnumerable<TInner> inner, Func<TOuter, ValueTask<TKey>> outerKeySelector, Func<TInner, ValueTask<TKey>> innerKeySelector, Func<TOuter, TInner, ValueTask<TResult>> resultSelector) =>
  67. Join<TOuter, TInner, TKey, TResult>(outer, inner, outerKeySelector, innerKeySelector, resultSelector, comparer: null);
  68. public static IAsyncEnumerable<TResult> Join<TOuter, TInner, TKey, TResult>(this IAsyncEnumerable<TOuter> outer, IAsyncEnumerable<TInner> inner, Func<TOuter, ValueTask<TKey>> outerKeySelector, Func<TInner, ValueTask<TKey>> innerKeySelector, Func<TOuter, TInner, ValueTask<TResult>> resultSelector, IEqualityComparer<TKey> comparer)
  69. {
  70. if (outer == null)
  71. throw Error.ArgumentNull(nameof(outer));
  72. if (inner == null)
  73. throw Error.ArgumentNull(nameof(inner));
  74. if (outerKeySelector == null)
  75. throw Error.ArgumentNull(nameof(outerKeySelector));
  76. if (innerKeySelector == null)
  77. throw Error.ArgumentNull(nameof(innerKeySelector));
  78. if (resultSelector == null)
  79. throw Error.ArgumentNull(nameof(resultSelector));
  80. #if USE_ASYNC_ITERATOR
  81. return Create(Core);
  82. async IAsyncEnumerator<TResult> Core(CancellationToken cancellationToken)
  83. {
  84. var e = outer.GetConfiguredAsyncEnumerator(cancellationToken, false);
  85. try // REVIEW: Can use `await using` if we get pattern bind (HAS_AWAIT_USING_PATTERN_BIND)
  86. {
  87. if (await e.MoveNextAsync())
  88. {
  89. var lookup = await Internal.LookupWithTask<TKey, TInner>.CreateForJoinAsync(inner, innerKeySelector, comparer, cancellationToken).ConfigureAwait(false);
  90. if (lookup.Count != 0)
  91. {
  92. do
  93. {
  94. var item = e.Current;
  95. var outerKey = await outerKeySelector(item).ConfigureAwait(false);
  96. var g = lookup.GetGrouping(outerKey, create: false);
  97. if (g != null)
  98. {
  99. var count = g._count;
  100. var elements = g._elements;
  101. for (var i = 0; i != count; ++i)
  102. {
  103. yield return await resultSelector(item, elements[i]).ConfigureAwait(false);
  104. }
  105. }
  106. }
  107. while (await e.MoveNextAsync());
  108. }
  109. }
  110. }
  111. finally
  112. {
  113. await e.DisposeAsync();
  114. }
  115. }
  116. #else
  117. return new JoinAsyncIteratorWithTask<TOuter, TInner, TKey, TResult>(outer, inner, outerKeySelector, innerKeySelector, resultSelector, comparer);
  118. #endif
  119. }
  120. #if !NO_DEEP_CANCELLATION
  121. public static IAsyncEnumerable<TResult> Join<TOuter, TInner, TKey, TResult>(this IAsyncEnumerable<TOuter> outer, IAsyncEnumerable<TInner> inner, Func<TOuter, CancellationToken, ValueTask<TKey>> outerKeySelector, Func<TInner, CancellationToken, ValueTask<TKey>> innerKeySelector, Func<TOuter, TInner, CancellationToken, ValueTask<TResult>> resultSelector) =>
  122. Join<TOuter, TInner, TKey, TResult>(outer, inner, outerKeySelector, innerKeySelector, resultSelector, comparer: null);
  123. public static IAsyncEnumerable<TResult> Join<TOuter, TInner, TKey, TResult>(this IAsyncEnumerable<TOuter> outer, IAsyncEnumerable<TInner> inner, Func<TOuter, CancellationToken, ValueTask<TKey>> outerKeySelector, Func<TInner, CancellationToken, ValueTask<TKey>> innerKeySelector, Func<TOuter, TInner, CancellationToken, ValueTask<TResult>> resultSelector, IEqualityComparer<TKey> comparer)
  124. {
  125. if (outer == null)
  126. throw Error.ArgumentNull(nameof(outer));
  127. if (inner == null)
  128. throw Error.ArgumentNull(nameof(inner));
  129. if (outerKeySelector == null)
  130. throw Error.ArgumentNull(nameof(outerKeySelector));
  131. if (innerKeySelector == null)
  132. throw Error.ArgumentNull(nameof(innerKeySelector));
  133. if (resultSelector == null)
  134. throw Error.ArgumentNull(nameof(resultSelector));
  135. #if USE_ASYNC_ITERATOR
  136. return Create(Core);
  137. async IAsyncEnumerator<TResult> Core(CancellationToken cancellationToken)
  138. {
  139. var e = outer.GetConfiguredAsyncEnumerator(cancellationToken, false);
  140. try // REVIEW: Can use `await using` if we get pattern bind (HAS_AWAIT_USING_PATTERN_BIND)
  141. {
  142. if (await e.MoveNextAsync())
  143. {
  144. var lookup = await Internal.LookupWithTask<TKey, TInner>.CreateForJoinAsync(inner, innerKeySelector, comparer, cancellationToken).ConfigureAwait(false);
  145. if (lookup.Count != 0)
  146. {
  147. do
  148. {
  149. var item = e.Current;
  150. var outerKey = await outerKeySelector(item, cancellationToken).ConfigureAwait(false);
  151. var g = lookup.GetGrouping(outerKey, create: false);
  152. if (g != null)
  153. {
  154. var count = g._count;
  155. var elements = g._elements;
  156. for (var i = 0; i != count; ++i)
  157. {
  158. yield return await resultSelector(item, elements[i], cancellationToken).ConfigureAwait(false);
  159. }
  160. }
  161. }
  162. while (await e.MoveNextAsync());
  163. }
  164. }
  165. }
  166. finally
  167. {
  168. await e.DisposeAsync();
  169. }
  170. }
  171. #else
  172. return new JoinAsyncIteratorWithTaskAndCancellation<TOuter, TInner, TKey, TResult>(outer, inner, outerKeySelector, innerKeySelector, resultSelector, comparer);
  173. #endif
  174. }
  175. #endif
  176. #if !USE_ASYNC_ITERATOR
  177. private sealed class JoinAsyncIterator<TOuter, TInner, TKey, TResult> : AsyncIterator<TResult>
  178. {
  179. private readonly IAsyncEnumerable<TOuter> _outer;
  180. private readonly IAsyncEnumerable<TInner> _inner;
  181. private readonly Func<TOuter, TKey> _outerKeySelector;
  182. private readonly Func<TInner, TKey> _innerKeySelector;
  183. private readonly Func<TOuter, TInner, TResult> _resultSelector;
  184. private readonly IEqualityComparer<TKey> _comparer;
  185. private IAsyncEnumerator<TOuter> _outerEnumerator;
  186. public JoinAsyncIterator(IAsyncEnumerable<TOuter> outer, IAsyncEnumerable<TInner> inner, Func<TOuter, TKey> outerKeySelector, Func<TInner, TKey> innerKeySelector, Func<TOuter, TInner, TResult> resultSelector, IEqualityComparer<TKey> comparer)
  187. {
  188. Debug.Assert(outer != null);
  189. Debug.Assert(inner != null);
  190. Debug.Assert(outerKeySelector != null);
  191. Debug.Assert(innerKeySelector != null);
  192. Debug.Assert(resultSelector != null);
  193. _outer = outer;
  194. _inner = inner;
  195. _outerKeySelector = outerKeySelector;
  196. _innerKeySelector = innerKeySelector;
  197. _resultSelector = resultSelector;
  198. _comparer = comparer;
  199. }
  200. public override AsyncIteratorBase<TResult> Clone()
  201. {
  202. return new JoinAsyncIterator<TOuter, TInner, TKey, TResult>(_outer, _inner, _outerKeySelector, _innerKeySelector, _resultSelector, _comparer);
  203. }
  204. public override async ValueTask DisposeAsync()
  205. {
  206. if (_outerEnumerator != null)
  207. {
  208. await _outerEnumerator.DisposeAsync().ConfigureAwait(false);
  209. _outerEnumerator = null;
  210. }
  211. await base.DisposeAsync().ConfigureAwait(false);
  212. }
  213. // State machine vars
  214. private Internal.Lookup<TKey, TInner> _lookup;
  215. private int _count;
  216. private TInner[] _elements;
  217. private int _index;
  218. private TOuter _item;
  219. private int _mode;
  220. private const int State_If = 1;
  221. private const int State_DoLoop = 2;
  222. private const int State_For = 3;
  223. private const int State_While = 4;
  224. protected override async ValueTask<bool> MoveNextCore()
  225. {
  226. switch (_state)
  227. {
  228. case AsyncIteratorState.Allocated:
  229. _outerEnumerator = _outer.GetAsyncEnumerator(_cancellationToken);
  230. _mode = State_If;
  231. _state = AsyncIteratorState.Iterating;
  232. goto case AsyncIteratorState.Iterating;
  233. case AsyncIteratorState.Iterating:
  234. switch (_mode)
  235. {
  236. case State_If:
  237. if (await _outerEnumerator.MoveNextAsync().ConfigureAwait(false))
  238. {
  239. _lookup = await Internal.Lookup<TKey, TInner>.CreateForJoinAsync(_inner, _innerKeySelector, _comparer, _cancellationToken).ConfigureAwait(false);
  240. if (_lookup.Count != 0)
  241. {
  242. _mode = State_DoLoop;
  243. goto case State_DoLoop;
  244. }
  245. }
  246. break;
  247. case State_DoLoop:
  248. _item = _outerEnumerator.Current;
  249. var g = _lookup.GetGrouping(_outerKeySelector(_item), create: false);
  250. if (g != null)
  251. {
  252. _count = g._count;
  253. _elements = g._elements;
  254. _index = 0;
  255. _mode = State_For;
  256. goto case State_For;
  257. }
  258. // advance to while
  259. _mode = State_While;
  260. goto case State_While;
  261. case State_For:
  262. _current = _resultSelector(_item, _elements[_index]);
  263. _index++;
  264. if (_index == _count)
  265. {
  266. _mode = State_While;
  267. }
  268. return true;
  269. case State_While:
  270. var hasNext = await _outerEnumerator.MoveNextAsync().ConfigureAwait(false);
  271. if (hasNext)
  272. {
  273. goto case State_DoLoop;
  274. }
  275. break;
  276. }
  277. await DisposeAsync().ConfigureAwait(false);
  278. break;
  279. }
  280. return false;
  281. }
  282. }
  283. private sealed class JoinAsyncIteratorWithTask<TOuter, TInner, TKey, TResult> : AsyncIterator<TResult>
  284. {
  285. private readonly IAsyncEnumerable<TOuter> _outer;
  286. private readonly IAsyncEnumerable<TInner> _inner;
  287. private readonly Func<TOuter, ValueTask<TKey>> _outerKeySelector;
  288. private readonly Func<TInner, ValueTask<TKey>> _innerKeySelector;
  289. private readonly Func<TOuter, TInner, ValueTask<TResult>> _resultSelector;
  290. private readonly IEqualityComparer<TKey> _comparer;
  291. private IAsyncEnumerator<TOuter> _outerEnumerator;
  292. public JoinAsyncIteratorWithTask(IAsyncEnumerable<TOuter> outer, IAsyncEnumerable<TInner> inner, Func<TOuter, ValueTask<TKey>> outerKeySelector, Func<TInner, ValueTask<TKey>> innerKeySelector, Func<TOuter, TInner, ValueTask<TResult>> resultSelector, IEqualityComparer<TKey> comparer)
  293. {
  294. Debug.Assert(outer != null);
  295. Debug.Assert(inner != null);
  296. Debug.Assert(outerKeySelector != null);
  297. Debug.Assert(innerKeySelector != null);
  298. Debug.Assert(resultSelector != null);
  299. _outer = outer;
  300. _inner = inner;
  301. _outerKeySelector = outerKeySelector;
  302. _innerKeySelector = innerKeySelector;
  303. _resultSelector = resultSelector;
  304. _comparer = comparer;
  305. }
  306. public override AsyncIteratorBase<TResult> Clone()
  307. {
  308. return new JoinAsyncIteratorWithTask<TOuter, TInner, TKey, TResult>(_outer, _inner, _outerKeySelector, _innerKeySelector, _resultSelector, _comparer);
  309. }
  310. public override async ValueTask DisposeAsync()
  311. {
  312. if (_outerEnumerator != null)
  313. {
  314. await _outerEnumerator.DisposeAsync().ConfigureAwait(false);
  315. _outerEnumerator = null;
  316. }
  317. await base.DisposeAsync().ConfigureAwait(false);
  318. }
  319. // State machine vars
  320. private Internal.LookupWithTask<TKey, TInner> _lookup;
  321. private int _count;
  322. private TInner[] _elements;
  323. private int _index;
  324. private TOuter _item;
  325. private int _mode;
  326. private const int State_If = 1;
  327. private const int State_DoLoop = 2;
  328. private const int State_For = 3;
  329. private const int State_While = 4;
  330. protected override async ValueTask<bool> MoveNextCore()
  331. {
  332. switch (_state)
  333. {
  334. case AsyncIteratorState.Allocated:
  335. _outerEnumerator = _outer.GetAsyncEnumerator(_cancellationToken);
  336. _mode = State_If;
  337. _state = AsyncIteratorState.Iterating;
  338. goto case AsyncIteratorState.Iterating;
  339. case AsyncIteratorState.Iterating:
  340. switch (_mode)
  341. {
  342. case State_If:
  343. if (await _outerEnumerator.MoveNextAsync().ConfigureAwait(false))
  344. {
  345. _lookup = await Internal.LookupWithTask<TKey, TInner>.CreateForJoinAsync(_inner, _innerKeySelector, _comparer, _cancellationToken).ConfigureAwait(false);
  346. if (_lookup.Count != 0)
  347. {
  348. _mode = State_DoLoop;
  349. goto case State_DoLoop;
  350. }
  351. }
  352. break;
  353. case State_DoLoop:
  354. _item = _outerEnumerator.Current;
  355. var g = _lookup.GetGrouping(await _outerKeySelector(_item).ConfigureAwait(false), create: false);
  356. if (g != null)
  357. {
  358. _count = g._count;
  359. _elements = g._elements;
  360. _index = 0;
  361. _mode = State_For;
  362. goto case State_For;
  363. }
  364. // advance to while
  365. _mode = State_While;
  366. goto case State_While;
  367. case State_For:
  368. _current = await _resultSelector(_item, _elements[_index]).ConfigureAwait(false);
  369. _index++;
  370. if (_index == _count)
  371. {
  372. _mode = State_While;
  373. }
  374. return true;
  375. case State_While:
  376. var hasNext = await _outerEnumerator.MoveNextAsync().ConfigureAwait(false);
  377. if (hasNext)
  378. {
  379. goto case State_DoLoop;
  380. }
  381. break;
  382. }
  383. await DisposeAsync().ConfigureAwait(false);
  384. break;
  385. }
  386. return false;
  387. }
  388. }
  389. #if !NO_DEEP_CANCELLATION
  390. private sealed class JoinAsyncIteratorWithTaskAndCancellation<TOuter, TInner, TKey, TResult> : AsyncIterator<TResult>
  391. {
  392. private readonly IAsyncEnumerable<TOuter> _outer;
  393. private readonly IAsyncEnumerable<TInner> _inner;
  394. private readonly Func<TOuter, CancellationToken, ValueTask<TKey>> _outerKeySelector;
  395. private readonly Func<TInner, CancellationToken, ValueTask<TKey>> _innerKeySelector;
  396. private readonly Func<TOuter, TInner, CancellationToken, ValueTask<TResult>> _resultSelector;
  397. private readonly IEqualityComparer<TKey> _comparer;
  398. private IAsyncEnumerator<TOuter> _outerEnumerator;
  399. public JoinAsyncIteratorWithTaskAndCancellation(IAsyncEnumerable<TOuter> outer, IAsyncEnumerable<TInner> inner, Func<TOuter, CancellationToken, ValueTask<TKey>> outerKeySelector, Func<TInner, CancellationToken, ValueTask<TKey>> innerKeySelector, Func<TOuter, TInner, CancellationToken, ValueTask<TResult>> resultSelector, IEqualityComparer<TKey> comparer)
  400. {
  401. Debug.Assert(outer != null);
  402. Debug.Assert(inner != null);
  403. Debug.Assert(outerKeySelector != null);
  404. Debug.Assert(innerKeySelector != null);
  405. Debug.Assert(resultSelector != null);
  406. _outer = outer;
  407. _inner = inner;
  408. _outerKeySelector = outerKeySelector;
  409. _innerKeySelector = innerKeySelector;
  410. _resultSelector = resultSelector;
  411. _comparer = comparer;
  412. }
  413. public override AsyncIteratorBase<TResult> Clone()
  414. {
  415. return new JoinAsyncIteratorWithTaskAndCancellation<TOuter, TInner, TKey, TResult>(_outer, _inner, _outerKeySelector, _innerKeySelector, _resultSelector, _comparer);
  416. }
  417. public override async ValueTask DisposeAsync()
  418. {
  419. if (_outerEnumerator != null)
  420. {
  421. await _outerEnumerator.DisposeAsync().ConfigureAwait(false);
  422. _outerEnumerator = null;
  423. }
  424. await base.DisposeAsync().ConfigureAwait(false);
  425. }
  426. // State machine vars
  427. private Internal.LookupWithTask<TKey, TInner> _lookup;
  428. private int _count;
  429. private TInner[] _elements;
  430. private int _index;
  431. private TOuter _item;
  432. private int _mode;
  433. private const int State_If = 1;
  434. private const int State_DoLoop = 2;
  435. private const int State_For = 3;
  436. private const int State_While = 4;
  437. protected override async ValueTask<bool> MoveNextCore()
  438. {
  439. switch (_state)
  440. {
  441. case AsyncIteratorState.Allocated:
  442. _outerEnumerator = _outer.GetAsyncEnumerator(_cancellationToken);
  443. _mode = State_If;
  444. _state = AsyncIteratorState.Iterating;
  445. goto case AsyncIteratorState.Iterating;
  446. case AsyncIteratorState.Iterating:
  447. switch (_mode)
  448. {
  449. case State_If:
  450. if (await _outerEnumerator.MoveNextAsync().ConfigureAwait(false))
  451. {
  452. _lookup = await Internal.LookupWithTask<TKey, TInner>.CreateForJoinAsync(_inner, _innerKeySelector, _comparer, _cancellationToken).ConfigureAwait(false);
  453. if (_lookup.Count != 0)
  454. {
  455. _mode = State_DoLoop;
  456. goto case State_DoLoop;
  457. }
  458. }
  459. break;
  460. case State_DoLoop:
  461. _item = _outerEnumerator.Current;
  462. var g = _lookup.GetGrouping(await _outerKeySelector(_item, _cancellationToken).ConfigureAwait(false), create: false);
  463. if (g != null)
  464. {
  465. _count = g._count;
  466. _elements = g._elements;
  467. _index = 0;
  468. _mode = State_For;
  469. goto case State_For;
  470. }
  471. // advance to while
  472. _mode = State_While;
  473. goto case State_While;
  474. case State_For:
  475. _current = await _resultSelector(_item, _elements[_index], _cancellationToken).ConfigureAwait(false);
  476. _index++;
  477. if (_index == _count)
  478. {
  479. _mode = State_While;
  480. }
  481. return true;
  482. case State_While:
  483. var hasNext = await _outerEnumerator.MoveNextAsync().ConfigureAwait(false);
  484. if (hasNext)
  485. {
  486. goto case State_DoLoop;
  487. }
  488. break;
  489. }
  490. await DisposeAsync().ConfigureAwait(false);
  491. break;
  492. }
  493. return false;
  494. }
  495. }
  496. #endif
  497. #endif
  498. }
  499. }