GroupJoin.cs 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. namespace System.Linq
  8. {
  9. public static partial class AsyncEnumerable
  10. {
  11. public static IAsyncEnumerable<TResult> GroupJoin<TOuter, TInner, TKey, TResult>(this IAsyncEnumerable<TOuter> outer, IAsyncEnumerable<TInner> inner, Func<TOuter, TKey> outerKeySelector, Func<TInner, TKey> innerKeySelector, Func<TOuter, IAsyncEnumerable<TInner>, TResult> resultSelector)
  12. {
  13. if (outer == null)
  14. throw Error.ArgumentNull(nameof(outer));
  15. if (inner == null)
  16. throw Error.ArgumentNull(nameof(inner));
  17. if (outerKeySelector == null)
  18. throw Error.ArgumentNull(nameof(outerKeySelector));
  19. if (innerKeySelector == null)
  20. throw Error.ArgumentNull(nameof(innerKeySelector));
  21. if (resultSelector == null)
  22. throw Error.ArgumentNull(nameof(resultSelector));
  23. return new GroupJoinAsyncEnumerable<TOuter, TInner, TKey, TResult>(outer, inner, outerKeySelector, innerKeySelector, resultSelector, comparer: null);
  24. }
  25. public static IAsyncEnumerable<TResult> GroupJoin<TOuter, TInner, TKey, TResult>(this IAsyncEnumerable<TOuter> outer, IAsyncEnumerable<TInner> inner, Func<TOuter, TKey> outerKeySelector, Func<TInner, TKey> innerKeySelector, Func<TOuter, IAsyncEnumerable<TInner>, TResult> resultSelector, IEqualityComparer<TKey> comparer)
  26. {
  27. if (outer == null)
  28. throw Error.ArgumentNull(nameof(outer));
  29. if (inner == null)
  30. throw Error.ArgumentNull(nameof(inner));
  31. if (outerKeySelector == null)
  32. throw Error.ArgumentNull(nameof(outerKeySelector));
  33. if (innerKeySelector == null)
  34. throw Error.ArgumentNull(nameof(innerKeySelector));
  35. if (resultSelector == null)
  36. throw Error.ArgumentNull(nameof(resultSelector));
  37. return new GroupJoinAsyncEnumerable<TOuter, TInner, TKey, TResult>(outer, inner, outerKeySelector, innerKeySelector, resultSelector, comparer);
  38. }
  39. public static IAsyncEnumerable<TResult> GroupJoin<TOuter, TInner, TKey, TResult>(this IAsyncEnumerable<TOuter> outer, IAsyncEnumerable<TInner> inner, Func<TOuter, ValueTask<TKey>> outerKeySelector, Func<TInner, ValueTask<TKey>> innerKeySelector, Func<TOuter, IAsyncEnumerable<TInner>, ValueTask<TResult>> resultSelector)
  40. {
  41. if (outer == null)
  42. throw Error.ArgumentNull(nameof(outer));
  43. if (inner == null)
  44. throw Error.ArgumentNull(nameof(inner));
  45. if (outerKeySelector == null)
  46. throw Error.ArgumentNull(nameof(outerKeySelector));
  47. if (innerKeySelector == null)
  48. throw Error.ArgumentNull(nameof(innerKeySelector));
  49. if (resultSelector == null)
  50. throw Error.ArgumentNull(nameof(resultSelector));
  51. return new GroupJoinAsyncEnumerableWithTask<TOuter, TInner, TKey, TResult>(outer, inner, outerKeySelector, innerKeySelector, resultSelector, comparer: null);
  52. }
  53. #if !NO_DEEP_CANCELLATION
  54. public static IAsyncEnumerable<TResult> GroupJoin<TOuter, TInner, TKey, TResult>(this IAsyncEnumerable<TOuter> outer, IAsyncEnumerable<TInner> inner, Func<TOuter, CancellationToken, ValueTask<TKey>> outerKeySelector, Func<TInner, CancellationToken, ValueTask<TKey>> innerKeySelector, Func<TOuter, IAsyncEnumerable<TInner>, CancellationToken, ValueTask<TResult>> resultSelector)
  55. {
  56. if (outer == null)
  57. throw Error.ArgumentNull(nameof(outer));
  58. if (inner == null)
  59. throw Error.ArgumentNull(nameof(inner));
  60. if (outerKeySelector == null)
  61. throw Error.ArgumentNull(nameof(outerKeySelector));
  62. if (innerKeySelector == null)
  63. throw Error.ArgumentNull(nameof(innerKeySelector));
  64. if (resultSelector == null)
  65. throw Error.ArgumentNull(nameof(resultSelector));
  66. return new GroupJoinAsyncEnumerableWithTaskAndCancellation<TOuter, TInner, TKey, TResult>(outer, inner, outerKeySelector, innerKeySelector, resultSelector, comparer: null);
  67. }
  68. #endif
  69. public static IAsyncEnumerable<TResult> GroupJoin<TOuter, TInner, TKey, TResult>(this IAsyncEnumerable<TOuter> outer, IAsyncEnumerable<TInner> inner, Func<TOuter, ValueTask<TKey>> outerKeySelector, Func<TInner, ValueTask<TKey>> innerKeySelector, Func<TOuter, IAsyncEnumerable<TInner>, ValueTask<TResult>> resultSelector, IEqualityComparer<TKey> comparer)
  70. {
  71. if (outer == null)
  72. throw Error.ArgumentNull(nameof(outer));
  73. if (inner == null)
  74. throw Error.ArgumentNull(nameof(inner));
  75. if (outerKeySelector == null)
  76. throw Error.ArgumentNull(nameof(outerKeySelector));
  77. if (innerKeySelector == null)
  78. throw Error.ArgumentNull(nameof(innerKeySelector));
  79. if (resultSelector == null)
  80. throw Error.ArgumentNull(nameof(resultSelector));
  81. return new GroupJoinAsyncEnumerableWithTask<TOuter, TInner, TKey, TResult>(outer, inner, outerKeySelector, innerKeySelector, resultSelector, comparer);
  82. }
  83. #if !NO_DEEP_CANCELLATION
  84. public static IAsyncEnumerable<TResult> GroupJoin<TOuter, TInner, TKey, TResult>(this IAsyncEnumerable<TOuter> outer, IAsyncEnumerable<TInner> inner, Func<TOuter, CancellationToken, ValueTask<TKey>> outerKeySelector, Func<TInner, CancellationToken, ValueTask<TKey>> innerKeySelector, Func<TOuter, IAsyncEnumerable<TInner>, CancellationToken, ValueTask<TResult>> resultSelector, IEqualityComparer<TKey> comparer)
  85. {
  86. if (outer == null)
  87. throw Error.ArgumentNull(nameof(outer));
  88. if (inner == null)
  89. throw Error.ArgumentNull(nameof(inner));
  90. if (outerKeySelector == null)
  91. throw Error.ArgumentNull(nameof(outerKeySelector));
  92. if (innerKeySelector == null)
  93. throw Error.ArgumentNull(nameof(innerKeySelector));
  94. if (resultSelector == null)
  95. throw Error.ArgumentNull(nameof(resultSelector));
  96. return new GroupJoinAsyncEnumerableWithTaskAndCancellation<TOuter, TInner, TKey, TResult>(outer, inner, outerKeySelector, innerKeySelector, resultSelector, comparer);
  97. }
  98. #endif
  99. private sealed class GroupJoinAsyncEnumerable<TOuter, TInner, TKey, TResult> : IAsyncEnumerable<TResult>
  100. {
  101. private readonly IEqualityComparer<TKey> _comparer;
  102. private readonly IAsyncEnumerable<TInner> _inner;
  103. private readonly Func<TInner, TKey> _innerKeySelector;
  104. private readonly IAsyncEnumerable<TOuter> _outer;
  105. private readonly Func<TOuter, TKey> _outerKeySelector;
  106. private readonly Func<TOuter, IAsyncEnumerable<TInner>, TResult> _resultSelector;
  107. public GroupJoinAsyncEnumerable(
  108. IAsyncEnumerable<TOuter> outer,
  109. IAsyncEnumerable<TInner> inner,
  110. Func<TOuter, TKey> outerKeySelector,
  111. Func<TInner, TKey> innerKeySelector,
  112. Func<TOuter, IAsyncEnumerable<TInner>, TResult> resultSelector,
  113. IEqualityComparer<TKey> comparer)
  114. {
  115. _outer = outer;
  116. _inner = inner;
  117. _outerKeySelector = outerKeySelector;
  118. _innerKeySelector = innerKeySelector;
  119. _resultSelector = resultSelector;
  120. _comparer = comparer;
  121. }
  122. public IAsyncEnumerator<TResult> GetAsyncEnumerator(CancellationToken cancellationToken)
  123. {
  124. cancellationToken.ThrowIfCancellationRequested(); // NB: [LDM-2018-11-28] Equivalent to async iterator behavior.
  125. return new GroupJoinAsyncEnumerator(
  126. _outer.GetAsyncEnumerator(cancellationToken),
  127. _inner,
  128. _outerKeySelector,
  129. _innerKeySelector,
  130. _resultSelector,
  131. _comparer,
  132. cancellationToken);
  133. }
  134. private sealed class GroupJoinAsyncEnumerator : IAsyncEnumerator<TResult>
  135. {
  136. private readonly IEqualityComparer<TKey> _comparer;
  137. private readonly IAsyncEnumerable<TInner> _inner;
  138. private readonly Func<TInner, TKey> _innerKeySelector;
  139. private readonly IAsyncEnumerator<TOuter> _outer;
  140. private readonly Func<TOuter, TKey> _outerKeySelector;
  141. private readonly Func<TOuter, IAsyncEnumerable<TInner>, TResult> _resultSelector;
  142. private readonly CancellationToken _cancellationToken;
  143. private Internal.Lookup<TKey, TInner> _lookup;
  144. public GroupJoinAsyncEnumerator(
  145. IAsyncEnumerator<TOuter> outer,
  146. IAsyncEnumerable<TInner> inner,
  147. Func<TOuter, TKey> outerKeySelector,
  148. Func<TInner, TKey> innerKeySelector,
  149. Func<TOuter, IAsyncEnumerable<TInner>, TResult> resultSelector,
  150. IEqualityComparer<TKey> comparer,
  151. CancellationToken cancellationToken)
  152. {
  153. _outer = outer;
  154. _inner = inner;
  155. _outerKeySelector = outerKeySelector;
  156. _innerKeySelector = innerKeySelector;
  157. _resultSelector = resultSelector;
  158. _comparer = comparer;
  159. _cancellationToken = cancellationToken;
  160. }
  161. public async ValueTask<bool> MoveNextAsync()
  162. {
  163. // nothing to do
  164. if (!await _outer.MoveNextAsync().ConfigureAwait(false))
  165. {
  166. return false;
  167. }
  168. if (_lookup == null)
  169. {
  170. _lookup = await Internal.Lookup<TKey, TInner>.CreateForJoinAsync(_inner, _innerKeySelector, _comparer, _cancellationToken).ConfigureAwait(false);
  171. }
  172. var item = _outer.Current;
  173. var outerKey = _outerKeySelector(item);
  174. var inner = _lookup[outerKey].ToAsyncEnumerable();
  175. Current = _resultSelector(item, inner);
  176. return true;
  177. }
  178. public TResult Current { get; private set; }
  179. public ValueTask DisposeAsync() => _outer.DisposeAsync();
  180. }
  181. }
  182. private sealed class GroupJoinAsyncEnumerableWithTask<TOuter, TInner, TKey, TResult> : IAsyncEnumerable<TResult>
  183. {
  184. private readonly IEqualityComparer<TKey> _comparer;
  185. private readonly IAsyncEnumerable<TInner> _inner;
  186. private readonly Func<TInner, ValueTask<TKey>> _innerKeySelector;
  187. private readonly IAsyncEnumerable<TOuter> _outer;
  188. private readonly Func<TOuter, ValueTask<TKey>> _outerKeySelector;
  189. private readonly Func<TOuter, IAsyncEnumerable<TInner>, ValueTask<TResult>> _resultSelector;
  190. public GroupJoinAsyncEnumerableWithTask(
  191. IAsyncEnumerable<TOuter> outer,
  192. IAsyncEnumerable<TInner> inner,
  193. Func<TOuter, ValueTask<TKey>> outerKeySelector,
  194. Func<TInner, ValueTask<TKey>> innerKeySelector,
  195. Func<TOuter, IAsyncEnumerable<TInner>, ValueTask<TResult>> resultSelector,
  196. IEqualityComparer<TKey> comparer)
  197. {
  198. _outer = outer;
  199. _inner = inner;
  200. _outerKeySelector = outerKeySelector;
  201. _innerKeySelector = innerKeySelector;
  202. _resultSelector = resultSelector;
  203. _comparer = comparer;
  204. }
  205. public IAsyncEnumerator<TResult> GetAsyncEnumerator(CancellationToken cancellationToken)
  206. {
  207. cancellationToken.ThrowIfCancellationRequested(); // NB: [LDM-2018-11-28] Equivalent to async iterator behavior.
  208. return new GroupJoinAsyncEnumeratorWithTask(
  209. _outer.GetAsyncEnumerator(cancellationToken),
  210. _inner,
  211. _outerKeySelector,
  212. _innerKeySelector,
  213. _resultSelector,
  214. _comparer,
  215. cancellationToken);
  216. }
  217. private sealed class GroupJoinAsyncEnumeratorWithTask : IAsyncEnumerator<TResult>
  218. {
  219. private readonly IEqualityComparer<TKey> _comparer;
  220. private readonly IAsyncEnumerable<TInner> _inner;
  221. private readonly Func<TInner, ValueTask<TKey>> _innerKeySelector;
  222. private readonly IAsyncEnumerator<TOuter> _outer;
  223. private readonly Func<TOuter, ValueTask<TKey>> _outerKeySelector;
  224. private readonly Func<TOuter, IAsyncEnumerable<TInner>, ValueTask<TResult>> _resultSelector;
  225. private readonly CancellationToken _cancellationToken;
  226. private Internal.LookupWithTask<TKey, TInner> _lookup;
  227. public GroupJoinAsyncEnumeratorWithTask(
  228. IAsyncEnumerator<TOuter> outer,
  229. IAsyncEnumerable<TInner> inner,
  230. Func<TOuter, ValueTask<TKey>> outerKeySelector,
  231. Func<TInner, ValueTask<TKey>> innerKeySelector,
  232. Func<TOuter, IAsyncEnumerable<TInner>, ValueTask<TResult>> resultSelector,
  233. IEqualityComparer<TKey> comparer,
  234. CancellationToken cancellationToken)
  235. {
  236. _outer = outer;
  237. _inner = inner;
  238. _outerKeySelector = outerKeySelector;
  239. _innerKeySelector = innerKeySelector;
  240. _resultSelector = resultSelector;
  241. _comparer = comparer;
  242. _cancellationToken = cancellationToken;
  243. }
  244. public async ValueTask<bool> MoveNextAsync()
  245. {
  246. // nothing to do
  247. if (!await _outer.MoveNextAsync().ConfigureAwait(false))
  248. {
  249. return false;
  250. }
  251. if (_lookup == null)
  252. {
  253. _lookup = await Internal.LookupWithTask<TKey, TInner>.CreateForJoinAsync(_inner, _innerKeySelector, _comparer, _cancellationToken).ConfigureAwait(false);
  254. }
  255. var item = _outer.Current;
  256. var outerKey = await _outerKeySelector(item).ConfigureAwait(false);
  257. var inner = _lookup[outerKey].ToAsyncEnumerable();
  258. Current = await _resultSelector(item, inner).ConfigureAwait(false);
  259. return true;
  260. }
  261. public TResult Current { get; private set; }
  262. public ValueTask DisposeAsync() => _outer.DisposeAsync();
  263. }
  264. }
  265. #if !NO_DEEP_CANCELLATION
  266. private sealed class GroupJoinAsyncEnumerableWithTaskAndCancellation<TOuter, TInner, TKey, TResult> : IAsyncEnumerable<TResult>
  267. {
  268. private readonly IEqualityComparer<TKey> _comparer;
  269. private readonly IAsyncEnumerable<TInner> _inner;
  270. private readonly Func<TInner, CancellationToken, ValueTask<TKey>> _innerKeySelector;
  271. private readonly IAsyncEnumerable<TOuter> _outer;
  272. private readonly Func<TOuter, CancellationToken, ValueTask<TKey>> _outerKeySelector;
  273. private readonly Func<TOuter, IAsyncEnumerable<TInner>, CancellationToken, ValueTask<TResult>> _resultSelector;
  274. public GroupJoinAsyncEnumerableWithTaskAndCancellation(
  275. IAsyncEnumerable<TOuter> outer,
  276. IAsyncEnumerable<TInner> inner,
  277. Func<TOuter, CancellationToken, ValueTask<TKey>> outerKeySelector,
  278. Func<TInner, CancellationToken, ValueTask<TKey>> innerKeySelector,
  279. Func<TOuter, IAsyncEnumerable<TInner>, CancellationToken, ValueTask<TResult>> resultSelector,
  280. IEqualityComparer<TKey> comparer)
  281. {
  282. _outer = outer;
  283. _inner = inner;
  284. _outerKeySelector = outerKeySelector;
  285. _innerKeySelector = innerKeySelector;
  286. _resultSelector = resultSelector;
  287. _comparer = comparer;
  288. }
  289. public IAsyncEnumerator<TResult> GetAsyncEnumerator(CancellationToken cancellationToken)
  290. {
  291. cancellationToken.ThrowIfCancellationRequested(); // NB: [LDM-2018-11-28] Equivalent to async iterator behavior.
  292. return new GroupJoinAsyncEnumeratorWithTask(
  293. _outer.GetAsyncEnumerator(cancellationToken),
  294. _inner,
  295. _outerKeySelector,
  296. _innerKeySelector,
  297. _resultSelector,
  298. _comparer,
  299. cancellationToken);
  300. }
  301. private sealed class GroupJoinAsyncEnumeratorWithTask : IAsyncEnumerator<TResult>
  302. {
  303. private readonly IEqualityComparer<TKey> _comparer;
  304. private readonly IAsyncEnumerable<TInner> _inner;
  305. private readonly Func<TInner, CancellationToken, ValueTask<TKey>> _innerKeySelector;
  306. private readonly IAsyncEnumerator<TOuter> _outer;
  307. private readonly Func<TOuter, CancellationToken, ValueTask<TKey>> _outerKeySelector;
  308. private readonly Func<TOuter, IAsyncEnumerable<TInner>, CancellationToken, ValueTask<TResult>> _resultSelector;
  309. private readonly CancellationToken _cancellationToken;
  310. private Internal.LookupWithTask<TKey, TInner> _lookup;
  311. public GroupJoinAsyncEnumeratorWithTask(
  312. IAsyncEnumerator<TOuter> outer,
  313. IAsyncEnumerable<TInner> inner,
  314. Func<TOuter, CancellationToken, ValueTask<TKey>> outerKeySelector,
  315. Func<TInner, CancellationToken, ValueTask<TKey>> innerKeySelector,
  316. Func<TOuter, IAsyncEnumerable<TInner>, CancellationToken, ValueTask<TResult>> resultSelector,
  317. IEqualityComparer<TKey> comparer,
  318. CancellationToken cancellationToken)
  319. {
  320. _outer = outer;
  321. _inner = inner;
  322. _outerKeySelector = outerKeySelector;
  323. _innerKeySelector = innerKeySelector;
  324. _resultSelector = resultSelector;
  325. _comparer = comparer;
  326. _cancellationToken = cancellationToken;
  327. }
  328. public async ValueTask<bool> MoveNextAsync()
  329. {
  330. // nothing to do
  331. if (!await _outer.MoveNextAsync().ConfigureAwait(false))
  332. {
  333. return false;
  334. }
  335. if (_lookup == null)
  336. {
  337. _lookup = await Internal.LookupWithTask<TKey, TInner>.CreateForJoinAsync(_inner, _innerKeySelector, _comparer, _cancellationToken).ConfigureAwait(false);
  338. }
  339. var item = _outer.Current;
  340. var outerKey = await _outerKeySelector(item, _cancellationToken).ConfigureAwait(false);
  341. var inner = _lookup[outerKey].ToAsyncEnumerable();
  342. Current = await _resultSelector(item, inner, _cancellationToken).ConfigureAwait(false);
  343. return true;
  344. }
  345. public TResult Current { get; private set; }
  346. public ValueTask DisposeAsync() => _outer.DisposeAsync();
  347. }
  348. }
  349. #endif
  350. }
  351. }