Join.cs 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Diagnostics;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. namespace System.Linq
  9. {
  10. public static partial class AsyncEnumerable
  11. {
  12. public static IAsyncEnumerable<TResult> Join<TOuter, TInner, TKey, TResult>(this IAsyncEnumerable<TOuter> outer, IAsyncEnumerable<TInner> inner, Func<TOuter, TKey> outerKeySelector, Func<TInner, TKey> innerKeySelector, Func<TOuter, TInner, TResult> resultSelector) =>
  13. Join(outer, inner, outerKeySelector, innerKeySelector, resultSelector, comparer: null);
  14. public static IAsyncEnumerable<TResult> Join<TOuter, TInner, TKey, TResult>(this IAsyncEnumerable<TOuter> outer, IAsyncEnumerable<TInner> inner, Func<TOuter, TKey> outerKeySelector, Func<TInner, TKey> innerKeySelector, Func<TOuter, TInner, TResult> resultSelector, IEqualityComparer<TKey> comparer)
  15. {
  16. if (outer == null)
  17. throw Error.ArgumentNull(nameof(outer));
  18. if (inner == null)
  19. throw Error.ArgumentNull(nameof(inner));
  20. if (outerKeySelector == null)
  21. throw Error.ArgumentNull(nameof(outerKeySelector));
  22. if (innerKeySelector == null)
  23. throw Error.ArgumentNull(nameof(innerKeySelector));
  24. if (resultSelector == null)
  25. throw Error.ArgumentNull(nameof(resultSelector));
  26. return new JoinAsyncIterator<TOuter, TInner, TKey, TResult>(outer, inner, outerKeySelector, innerKeySelector, resultSelector, comparer);
  27. }
  28. public static IAsyncEnumerable<TResult> Join<TOuter, TInner, TKey, TResult>(this IAsyncEnumerable<TOuter> outer, IAsyncEnumerable<TInner> inner, Func<TOuter, ValueTask<TKey>> outerKeySelector, Func<TInner, ValueTask<TKey>> innerKeySelector, Func<TOuter, TInner, ValueTask<TResult>> resultSelector) =>
  29. Join<TOuter, TInner, TKey, TResult>(outer, inner, outerKeySelector, innerKeySelector, resultSelector, comparer: null);
  30. public static IAsyncEnumerable<TResult> Join<TOuter, TInner, TKey, TResult>(this IAsyncEnumerable<TOuter> outer, IAsyncEnumerable<TInner> inner, Func<TOuter, ValueTask<TKey>> outerKeySelector, Func<TInner, ValueTask<TKey>> innerKeySelector, Func<TOuter, TInner, ValueTask<TResult>> resultSelector, IEqualityComparer<TKey> comparer)
  31. {
  32. if (outer == null)
  33. throw Error.ArgumentNull(nameof(outer));
  34. if (inner == null)
  35. throw Error.ArgumentNull(nameof(inner));
  36. if (outerKeySelector == null)
  37. throw Error.ArgumentNull(nameof(outerKeySelector));
  38. if (innerKeySelector == null)
  39. throw Error.ArgumentNull(nameof(innerKeySelector));
  40. if (resultSelector == null)
  41. throw Error.ArgumentNull(nameof(resultSelector));
  42. return new JoinAsyncIteratorWithTask<TOuter, TInner, TKey, TResult>(outer, inner, outerKeySelector, innerKeySelector, resultSelector, comparer);
  43. }
  44. #if !NO_DEEP_CANCELLATION
  45. public static IAsyncEnumerable<TResult> Join<TOuter, TInner, TKey, TResult>(this IAsyncEnumerable<TOuter> outer, IAsyncEnumerable<TInner> inner, Func<TOuter, CancellationToken, ValueTask<TKey>> outerKeySelector, Func<TInner, CancellationToken, ValueTask<TKey>> innerKeySelector, Func<TOuter, TInner, CancellationToken, ValueTask<TResult>> resultSelector) =>
  46. Join<TOuter, TInner, TKey, TResult>(outer, inner, outerKeySelector, innerKeySelector, resultSelector, comparer: null);
  47. public static IAsyncEnumerable<TResult> Join<TOuter, TInner, TKey, TResult>(this IAsyncEnumerable<TOuter> outer, IAsyncEnumerable<TInner> inner, Func<TOuter, CancellationToken, ValueTask<TKey>> outerKeySelector, Func<TInner, CancellationToken, ValueTask<TKey>> innerKeySelector, Func<TOuter, TInner, CancellationToken, ValueTask<TResult>> resultSelector, IEqualityComparer<TKey> comparer)
  48. {
  49. if (outer == null)
  50. throw Error.ArgumentNull(nameof(outer));
  51. if (inner == null)
  52. throw Error.ArgumentNull(nameof(inner));
  53. if (outerKeySelector == null)
  54. throw Error.ArgumentNull(nameof(outerKeySelector));
  55. if (innerKeySelector == null)
  56. throw Error.ArgumentNull(nameof(innerKeySelector));
  57. if (resultSelector == null)
  58. throw Error.ArgumentNull(nameof(resultSelector));
  59. return new JoinAsyncIteratorWithTaskAndCancellation<TOuter, TInner, TKey, TResult>(outer, inner, outerKeySelector, innerKeySelector, resultSelector, comparer);
  60. }
  61. #endif
  62. private sealed class JoinAsyncIterator<TOuter, TInner, TKey, TResult> : AsyncIterator<TResult>
  63. {
  64. private readonly IAsyncEnumerable<TOuter> _outer;
  65. private readonly IAsyncEnumerable<TInner> _inner;
  66. private readonly Func<TOuter, TKey> _outerKeySelector;
  67. private readonly Func<TInner, TKey> _innerKeySelector;
  68. private readonly Func<TOuter, TInner, TResult> _resultSelector;
  69. private readonly IEqualityComparer<TKey> _comparer;
  70. private IAsyncEnumerator<TOuter> _outerEnumerator;
  71. public JoinAsyncIterator(IAsyncEnumerable<TOuter> outer, IAsyncEnumerable<TInner> inner, Func<TOuter, TKey> outerKeySelector, Func<TInner, TKey> innerKeySelector, Func<TOuter, TInner, TResult> resultSelector, IEqualityComparer<TKey> comparer)
  72. {
  73. Debug.Assert(outer != null);
  74. Debug.Assert(inner != null);
  75. Debug.Assert(outerKeySelector != null);
  76. Debug.Assert(innerKeySelector != null);
  77. Debug.Assert(resultSelector != null);
  78. _outer = outer;
  79. _inner = inner;
  80. _outerKeySelector = outerKeySelector;
  81. _innerKeySelector = innerKeySelector;
  82. _resultSelector = resultSelector;
  83. _comparer = comparer;
  84. }
  85. public override AsyncIteratorBase<TResult> Clone()
  86. {
  87. return new JoinAsyncIterator<TOuter, TInner, TKey, TResult>(_outer, _inner, _outerKeySelector, _innerKeySelector, _resultSelector, _comparer);
  88. }
  89. public override async ValueTask DisposeAsync()
  90. {
  91. if (_outerEnumerator != null)
  92. {
  93. await _outerEnumerator.DisposeAsync().ConfigureAwait(false);
  94. _outerEnumerator = null;
  95. }
  96. await base.DisposeAsync().ConfigureAwait(false);
  97. }
  98. // State machine vars
  99. private Internal.Lookup<TKey, TInner> _lookup;
  100. private int _count;
  101. private TInner[] _elements;
  102. private int _index;
  103. private TOuter _item;
  104. private int _mode;
  105. private const int State_If = 1;
  106. private const int State_DoLoop = 2;
  107. private const int State_For = 3;
  108. private const int State_While = 4;
  109. protected override async ValueTask<bool> MoveNextCore()
  110. {
  111. switch (_state)
  112. {
  113. case AsyncIteratorState.Allocated:
  114. _outerEnumerator = _outer.GetAsyncEnumerator(_cancellationToken);
  115. _mode = State_If;
  116. _state = AsyncIteratorState.Iterating;
  117. goto case AsyncIteratorState.Iterating;
  118. case AsyncIteratorState.Iterating:
  119. switch (_mode)
  120. {
  121. case State_If:
  122. if (await _outerEnumerator.MoveNextAsync().ConfigureAwait(false))
  123. {
  124. _lookup = await Internal.Lookup<TKey, TInner>.CreateForJoinAsync(_inner, _innerKeySelector, _comparer, _cancellationToken).ConfigureAwait(false);
  125. if (_lookup.Count != 0)
  126. {
  127. _mode = State_DoLoop;
  128. goto case State_DoLoop;
  129. }
  130. }
  131. break;
  132. case State_DoLoop:
  133. _item = _outerEnumerator.Current;
  134. var g = _lookup.GetGrouping(_outerKeySelector(_item), create: false);
  135. if (g != null)
  136. {
  137. _count = g._count;
  138. _elements = g._elements;
  139. _index = 0;
  140. _mode = State_For;
  141. goto case State_For;
  142. }
  143. // advance to while
  144. _mode = State_While;
  145. goto case State_While;
  146. case State_For:
  147. _current = _resultSelector(_item, _elements[_index]);
  148. _index++;
  149. if (_index == _count)
  150. {
  151. _mode = State_While;
  152. }
  153. return true;
  154. case State_While:
  155. var hasNext = await _outerEnumerator.MoveNextAsync().ConfigureAwait(false);
  156. if (hasNext)
  157. {
  158. goto case State_DoLoop;
  159. }
  160. break;
  161. }
  162. await DisposeAsync().ConfigureAwait(false);
  163. break;
  164. }
  165. return false;
  166. }
  167. }
  168. private sealed class JoinAsyncIteratorWithTask<TOuter, TInner, TKey, TResult> : AsyncIterator<TResult>
  169. {
  170. private readonly IAsyncEnumerable<TOuter> _outer;
  171. private readonly IAsyncEnumerable<TInner> _inner;
  172. private readonly Func<TOuter, ValueTask<TKey>> _outerKeySelector;
  173. private readonly Func<TInner, ValueTask<TKey>> _innerKeySelector;
  174. private readonly Func<TOuter, TInner, ValueTask<TResult>> _resultSelector;
  175. private readonly IEqualityComparer<TKey> _comparer;
  176. private IAsyncEnumerator<TOuter> _outerEnumerator;
  177. public JoinAsyncIteratorWithTask(IAsyncEnumerable<TOuter> outer, IAsyncEnumerable<TInner> inner, Func<TOuter, ValueTask<TKey>> outerKeySelector, Func<TInner, ValueTask<TKey>> innerKeySelector, Func<TOuter, TInner, ValueTask<TResult>> resultSelector, IEqualityComparer<TKey> comparer)
  178. {
  179. Debug.Assert(outer != null);
  180. Debug.Assert(inner != null);
  181. Debug.Assert(outerKeySelector != null);
  182. Debug.Assert(innerKeySelector != null);
  183. Debug.Assert(resultSelector != null);
  184. _outer = outer;
  185. _inner = inner;
  186. _outerKeySelector = outerKeySelector;
  187. _innerKeySelector = innerKeySelector;
  188. _resultSelector = resultSelector;
  189. _comparer = comparer;
  190. }
  191. public override AsyncIteratorBase<TResult> Clone()
  192. {
  193. return new JoinAsyncIteratorWithTask<TOuter, TInner, TKey, TResult>(_outer, _inner, _outerKeySelector, _innerKeySelector, _resultSelector, _comparer);
  194. }
  195. public override async ValueTask DisposeAsync()
  196. {
  197. if (_outerEnumerator != null)
  198. {
  199. await _outerEnumerator.DisposeAsync().ConfigureAwait(false);
  200. _outerEnumerator = null;
  201. }
  202. await base.DisposeAsync().ConfigureAwait(false);
  203. }
  204. // State machine vars
  205. private Internal.LookupWithTask<TKey, TInner> _lookup;
  206. private int _count;
  207. private TInner[] _elements;
  208. private int _index;
  209. private TOuter _item;
  210. private int _mode;
  211. private const int State_If = 1;
  212. private const int State_DoLoop = 2;
  213. private const int State_For = 3;
  214. private const int State_While = 4;
  215. protected override async ValueTask<bool> MoveNextCore()
  216. {
  217. switch (_state)
  218. {
  219. case AsyncIteratorState.Allocated:
  220. _outerEnumerator = _outer.GetAsyncEnumerator(_cancellationToken);
  221. _mode = State_If;
  222. _state = AsyncIteratorState.Iterating;
  223. goto case AsyncIteratorState.Iterating;
  224. case AsyncIteratorState.Iterating:
  225. switch (_mode)
  226. {
  227. case State_If:
  228. if (await _outerEnumerator.MoveNextAsync().ConfigureAwait(false))
  229. {
  230. _lookup = await Internal.LookupWithTask<TKey, TInner>.CreateForJoinAsync(_inner, _innerKeySelector, _comparer, _cancellationToken).ConfigureAwait(false);
  231. if (_lookup.Count != 0)
  232. {
  233. _mode = State_DoLoop;
  234. goto case State_DoLoop;
  235. }
  236. }
  237. break;
  238. case State_DoLoop:
  239. _item = _outerEnumerator.Current;
  240. var g = _lookup.GetGrouping(await _outerKeySelector(_item).ConfigureAwait(false), create: false);
  241. if (g != null)
  242. {
  243. _count = g._count;
  244. _elements = g._elements;
  245. _index = 0;
  246. _mode = State_For;
  247. goto case State_For;
  248. }
  249. // advance to while
  250. _mode = State_While;
  251. goto case State_While;
  252. case State_For:
  253. _current = await _resultSelector(_item, _elements[_index]).ConfigureAwait(false);
  254. _index++;
  255. if (_index == _count)
  256. {
  257. _mode = State_While;
  258. }
  259. return true;
  260. case State_While:
  261. var hasNext = await _outerEnumerator.MoveNextAsync().ConfigureAwait(false);
  262. if (hasNext)
  263. {
  264. goto case State_DoLoop;
  265. }
  266. break;
  267. }
  268. await DisposeAsync().ConfigureAwait(false);
  269. break;
  270. }
  271. return false;
  272. }
  273. }
  274. #if !NO_DEEP_CANCELLATION
  275. private sealed class JoinAsyncIteratorWithTaskAndCancellation<TOuter, TInner, TKey, TResult> : AsyncIterator<TResult>
  276. {
  277. private readonly IAsyncEnumerable<TOuter> _outer;
  278. private readonly IAsyncEnumerable<TInner> _inner;
  279. private readonly Func<TOuter, CancellationToken, ValueTask<TKey>> _outerKeySelector;
  280. private readonly Func<TInner, CancellationToken, ValueTask<TKey>> _innerKeySelector;
  281. private readonly Func<TOuter, TInner, CancellationToken, ValueTask<TResult>> _resultSelector;
  282. private readonly IEqualityComparer<TKey> _comparer;
  283. private IAsyncEnumerator<TOuter> _outerEnumerator;
  284. public JoinAsyncIteratorWithTaskAndCancellation(IAsyncEnumerable<TOuter> outer, IAsyncEnumerable<TInner> inner, Func<TOuter, CancellationToken, ValueTask<TKey>> outerKeySelector, Func<TInner, CancellationToken, ValueTask<TKey>> innerKeySelector, Func<TOuter, TInner, CancellationToken, ValueTask<TResult>> resultSelector, IEqualityComparer<TKey> comparer)
  285. {
  286. Debug.Assert(outer != null);
  287. Debug.Assert(inner != null);
  288. Debug.Assert(outerKeySelector != null);
  289. Debug.Assert(innerKeySelector != null);
  290. Debug.Assert(resultSelector != null);
  291. _outer = outer;
  292. _inner = inner;
  293. _outerKeySelector = outerKeySelector;
  294. _innerKeySelector = innerKeySelector;
  295. _resultSelector = resultSelector;
  296. _comparer = comparer;
  297. }
  298. public override AsyncIteratorBase<TResult> Clone()
  299. {
  300. return new JoinAsyncIteratorWithTaskAndCancellation<TOuter, TInner, TKey, TResult>(_outer, _inner, _outerKeySelector, _innerKeySelector, _resultSelector, _comparer);
  301. }
  302. public override async ValueTask DisposeAsync()
  303. {
  304. if (_outerEnumerator != null)
  305. {
  306. await _outerEnumerator.DisposeAsync().ConfigureAwait(false);
  307. _outerEnumerator = null;
  308. }
  309. await base.DisposeAsync().ConfigureAwait(false);
  310. }
  311. // State machine vars
  312. private Internal.LookupWithTask<TKey, TInner> _lookup;
  313. private int _count;
  314. private TInner[] _elements;
  315. private int _index;
  316. private TOuter _item;
  317. private int _mode;
  318. private const int State_If = 1;
  319. private const int State_DoLoop = 2;
  320. private const int State_For = 3;
  321. private const int State_While = 4;
  322. protected override async ValueTask<bool> MoveNextCore()
  323. {
  324. switch (_state)
  325. {
  326. case AsyncIteratorState.Allocated:
  327. _outerEnumerator = _outer.GetAsyncEnumerator(_cancellationToken);
  328. _mode = State_If;
  329. _state = AsyncIteratorState.Iterating;
  330. goto case AsyncIteratorState.Iterating;
  331. case AsyncIteratorState.Iterating:
  332. switch (_mode)
  333. {
  334. case State_If:
  335. if (await _outerEnumerator.MoveNextAsync().ConfigureAwait(false))
  336. {
  337. _lookup = await Internal.LookupWithTask<TKey, TInner>.CreateForJoinAsync(_inner, _innerKeySelector, _comparer, _cancellationToken).ConfigureAwait(false);
  338. if (_lookup.Count != 0)
  339. {
  340. _mode = State_DoLoop;
  341. goto case State_DoLoop;
  342. }
  343. }
  344. break;
  345. case State_DoLoop:
  346. _item = _outerEnumerator.Current;
  347. var g = _lookup.GetGrouping(await _outerKeySelector(_item, _cancellationToken).ConfigureAwait(false), create: false);
  348. if (g != null)
  349. {
  350. _count = g._count;
  351. _elements = g._elements;
  352. _index = 0;
  353. _mode = State_For;
  354. goto case State_For;
  355. }
  356. // advance to while
  357. _mode = State_While;
  358. goto case State_While;
  359. case State_For:
  360. _current = await _resultSelector(_item, _elements[_index], _cancellationToken).ConfigureAwait(false);
  361. _index++;
  362. if (_index == _count)
  363. {
  364. _mode = State_While;
  365. }
  366. return true;
  367. case State_While:
  368. var hasNext = await _outerEnumerator.MoveNextAsync().ConfigureAwait(false);
  369. if (hasNext)
  370. {
  371. goto case State_DoLoop;
  372. }
  373. break;
  374. }
  375. await DisposeAsync().ConfigureAwait(false);
  376. break;
  377. }
  378. return false;
  379. }
  380. }
  381. #endif
  382. }
  383. }