GroupJoin.cs 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. namespace System.Linq
  8. {
  9. public static partial class AsyncEnumerable
  10. {
  11. public static IAsyncEnumerable<TResult> GroupJoin<TOuter, TInner, TKey, TResult>(this IAsyncEnumerable<TOuter> outer, IAsyncEnumerable<TInner> inner, Func<TOuter, TKey> outerKeySelector, Func<TInner, TKey> innerKeySelector, Func<TOuter, IAsyncEnumerable<TInner>, TResult> resultSelector)
  12. {
  13. if (outer == null)
  14. throw Error.ArgumentNull(nameof(outer));
  15. if (inner == null)
  16. throw Error.ArgumentNull(nameof(inner));
  17. if (outerKeySelector == null)
  18. throw Error.ArgumentNull(nameof(outerKeySelector));
  19. if (innerKeySelector == null)
  20. throw Error.ArgumentNull(nameof(innerKeySelector));
  21. if (resultSelector == null)
  22. throw Error.ArgumentNull(nameof(resultSelector));
  23. return new GroupJoinAsyncEnumerable<TOuter, TInner, TKey, TResult>(outer, inner, outerKeySelector, innerKeySelector, resultSelector, comparer: null);
  24. }
  25. public static IAsyncEnumerable<TResult> GroupJoin<TOuter, TInner, TKey, TResult>(this IAsyncEnumerable<TOuter> outer, IAsyncEnumerable<TInner> inner, Func<TOuter, TKey> outerKeySelector, Func<TInner, TKey> innerKeySelector, Func<TOuter, IAsyncEnumerable<TInner>, TResult> resultSelector, IEqualityComparer<TKey> comparer)
  26. {
  27. if (outer == null)
  28. throw Error.ArgumentNull(nameof(outer));
  29. if (inner == null)
  30. throw Error.ArgumentNull(nameof(inner));
  31. if (outerKeySelector == null)
  32. throw Error.ArgumentNull(nameof(outerKeySelector));
  33. if (innerKeySelector == null)
  34. throw Error.ArgumentNull(nameof(innerKeySelector));
  35. if (resultSelector == null)
  36. throw Error.ArgumentNull(nameof(resultSelector));
  37. return new GroupJoinAsyncEnumerable<TOuter, TInner, TKey, TResult>(outer, inner, outerKeySelector, innerKeySelector, resultSelector, comparer);
  38. }
  39. public static IAsyncEnumerable<TResult> GroupJoin<TOuter, TInner, TKey, TResult>(this IAsyncEnumerable<TOuter> outer, IAsyncEnumerable<TInner> inner, Func<TOuter, ValueTask<TKey>> outerKeySelector, Func<TInner, ValueTask<TKey>> innerKeySelector, Func<TOuter, IAsyncEnumerable<TInner>, ValueTask<TResult>> resultSelector)
  40. {
  41. if (outer == null)
  42. throw Error.ArgumentNull(nameof(outer));
  43. if (inner == null)
  44. throw Error.ArgumentNull(nameof(inner));
  45. if (outerKeySelector == null)
  46. throw Error.ArgumentNull(nameof(outerKeySelector));
  47. if (innerKeySelector == null)
  48. throw Error.ArgumentNull(nameof(innerKeySelector));
  49. if (resultSelector == null)
  50. throw Error.ArgumentNull(nameof(resultSelector));
  51. return new GroupJoinAsyncEnumerableWithTask<TOuter, TInner, TKey, TResult>(outer, inner, outerKeySelector, innerKeySelector, resultSelector, comparer: null);
  52. }
  53. #if !NO_DEEP_CANCELLATION
  54. public static IAsyncEnumerable<TResult> GroupJoin<TOuter, TInner, TKey, TResult>(this IAsyncEnumerable<TOuter> outer, IAsyncEnumerable<TInner> inner, Func<TOuter, CancellationToken, ValueTask<TKey>> outerKeySelector, Func<TInner, CancellationToken, ValueTask<TKey>> innerKeySelector, Func<TOuter, IAsyncEnumerable<TInner>, CancellationToken, ValueTask<TResult>> resultSelector)
  55. {
  56. if (outer == null)
  57. throw Error.ArgumentNull(nameof(outer));
  58. if (inner == null)
  59. throw Error.ArgumentNull(nameof(inner));
  60. if (outerKeySelector == null)
  61. throw Error.ArgumentNull(nameof(outerKeySelector));
  62. if (innerKeySelector == null)
  63. throw Error.ArgumentNull(nameof(innerKeySelector));
  64. if (resultSelector == null)
  65. throw Error.ArgumentNull(nameof(resultSelector));
  66. return new GroupJoinAsyncEnumerableWithTaskAndCancellation<TOuter, TInner, TKey, TResult>(outer, inner, outerKeySelector, innerKeySelector, resultSelector, comparer: null);
  67. }
  68. #endif
  69. public static IAsyncEnumerable<TResult> GroupJoin<TOuter, TInner, TKey, TResult>(this IAsyncEnumerable<TOuter> outer, IAsyncEnumerable<TInner> inner, Func<TOuter, ValueTask<TKey>> outerKeySelector, Func<TInner, ValueTask<TKey>> innerKeySelector, Func<TOuter, IAsyncEnumerable<TInner>, ValueTask<TResult>> resultSelector, IEqualityComparer<TKey> comparer)
  70. {
  71. if (outer == null)
  72. throw Error.ArgumentNull(nameof(outer));
  73. if (inner == null)
  74. throw Error.ArgumentNull(nameof(inner));
  75. if (outerKeySelector == null)
  76. throw Error.ArgumentNull(nameof(outerKeySelector));
  77. if (innerKeySelector == null)
  78. throw Error.ArgumentNull(nameof(innerKeySelector));
  79. if (resultSelector == null)
  80. throw Error.ArgumentNull(nameof(resultSelector));
  81. return new GroupJoinAsyncEnumerableWithTask<TOuter, TInner, TKey, TResult>(outer, inner, outerKeySelector, innerKeySelector, resultSelector, comparer);
  82. }
  83. #if !NO_DEEP_CANCELLATION
  84. public static IAsyncEnumerable<TResult> GroupJoin<TOuter, TInner, TKey, TResult>(this IAsyncEnumerable<TOuter> outer, IAsyncEnumerable<TInner> inner, Func<TOuter, CancellationToken, ValueTask<TKey>> outerKeySelector, Func<TInner, CancellationToken, ValueTask<TKey>> innerKeySelector, Func<TOuter, IAsyncEnumerable<TInner>, CancellationToken, ValueTask<TResult>> resultSelector, IEqualityComparer<TKey> comparer)
  85. {
  86. if (outer == null)
  87. throw Error.ArgumentNull(nameof(outer));
  88. if (inner == null)
  89. throw Error.ArgumentNull(nameof(inner));
  90. if (outerKeySelector == null)
  91. throw Error.ArgumentNull(nameof(outerKeySelector));
  92. if (innerKeySelector == null)
  93. throw Error.ArgumentNull(nameof(innerKeySelector));
  94. if (resultSelector == null)
  95. throw Error.ArgumentNull(nameof(resultSelector));
  96. return new GroupJoinAsyncEnumerableWithTaskAndCancellation<TOuter, TInner, TKey, TResult>(outer, inner, outerKeySelector, innerKeySelector, resultSelector, comparer);
  97. }
  98. #endif
  99. private sealed class GroupJoinAsyncEnumerable<TOuter, TInner, TKey, TResult> : IAsyncEnumerable<TResult>
  100. {
  101. private readonly IEqualityComparer<TKey> _comparer;
  102. private readonly IAsyncEnumerable<TInner> _inner;
  103. private readonly Func<TInner, TKey> _innerKeySelector;
  104. private readonly IAsyncEnumerable<TOuter> _outer;
  105. private readonly Func<TOuter, TKey> _outerKeySelector;
  106. private readonly Func<TOuter, IAsyncEnumerable<TInner>, TResult> _resultSelector;
  107. public GroupJoinAsyncEnumerable(
  108. IAsyncEnumerable<TOuter> outer,
  109. IAsyncEnumerable<TInner> inner,
  110. Func<TOuter, TKey> outerKeySelector,
  111. Func<TInner, TKey> innerKeySelector,
  112. Func<TOuter, IAsyncEnumerable<TInner>, TResult> resultSelector,
  113. IEqualityComparer<TKey> comparer)
  114. {
  115. _outer = outer;
  116. _inner = inner;
  117. _outerKeySelector = outerKeySelector;
  118. _innerKeySelector = innerKeySelector;
  119. _resultSelector = resultSelector;
  120. _comparer = comparer;
  121. }
  122. public IAsyncEnumerator<TResult> GetAsyncEnumerator(CancellationToken cancellationToken)
  123. => new GroupJoinAsyncEnumerator(
  124. _outer.GetAsyncEnumerator(cancellationToken),
  125. _inner,
  126. _outerKeySelector,
  127. _innerKeySelector,
  128. _resultSelector,
  129. _comparer,
  130. cancellationToken);
  131. private sealed class GroupJoinAsyncEnumerator : IAsyncEnumerator<TResult>
  132. {
  133. private readonly IEqualityComparer<TKey> _comparer;
  134. private readonly IAsyncEnumerable<TInner> _inner;
  135. private readonly Func<TInner, TKey> _innerKeySelector;
  136. private readonly IAsyncEnumerator<TOuter> _outer;
  137. private readonly Func<TOuter, TKey> _outerKeySelector;
  138. private readonly Func<TOuter, IAsyncEnumerable<TInner>, TResult> _resultSelector;
  139. private readonly CancellationToken _cancellationToken;
  140. private Internal.Lookup<TKey, TInner> _lookup;
  141. public GroupJoinAsyncEnumerator(
  142. IAsyncEnumerator<TOuter> outer,
  143. IAsyncEnumerable<TInner> inner,
  144. Func<TOuter, TKey> outerKeySelector,
  145. Func<TInner, TKey> innerKeySelector,
  146. Func<TOuter, IAsyncEnumerable<TInner>, TResult> resultSelector,
  147. IEqualityComparer<TKey> comparer,
  148. CancellationToken cancellationToken)
  149. {
  150. _outer = outer;
  151. _inner = inner;
  152. _outerKeySelector = outerKeySelector;
  153. _innerKeySelector = innerKeySelector;
  154. _resultSelector = resultSelector;
  155. _comparer = comparer;
  156. _cancellationToken = cancellationToken;
  157. }
  158. public async ValueTask<bool> MoveNextAsync()
  159. {
  160. // nothing to do
  161. if (!await _outer.MoveNextAsync().ConfigureAwait(false))
  162. {
  163. return false;
  164. }
  165. if (_lookup == null)
  166. {
  167. _lookup = await Internal.Lookup<TKey, TInner>.CreateForJoinAsync(_inner, _innerKeySelector, _comparer, _cancellationToken).ConfigureAwait(false);
  168. }
  169. var item = _outer.Current;
  170. var outerKey = _outerKeySelector(item);
  171. var inner = _lookup[outerKey].ToAsyncEnumerable();
  172. Current = _resultSelector(item, inner);
  173. return true;
  174. }
  175. public TResult Current { get; private set; }
  176. public ValueTask DisposeAsync() => _outer.DisposeAsync();
  177. }
  178. }
  179. private sealed class GroupJoinAsyncEnumerableWithTask<TOuter, TInner, TKey, TResult> : IAsyncEnumerable<TResult>
  180. {
  181. private readonly IEqualityComparer<TKey> _comparer;
  182. private readonly IAsyncEnumerable<TInner> _inner;
  183. private readonly Func<TInner, ValueTask<TKey>> _innerKeySelector;
  184. private readonly IAsyncEnumerable<TOuter> _outer;
  185. private readonly Func<TOuter, ValueTask<TKey>> _outerKeySelector;
  186. private readonly Func<TOuter, IAsyncEnumerable<TInner>, ValueTask<TResult>> _resultSelector;
  187. public GroupJoinAsyncEnumerableWithTask(
  188. IAsyncEnumerable<TOuter> outer,
  189. IAsyncEnumerable<TInner> inner,
  190. Func<TOuter, ValueTask<TKey>> outerKeySelector,
  191. Func<TInner, ValueTask<TKey>> innerKeySelector,
  192. Func<TOuter, IAsyncEnumerable<TInner>, ValueTask<TResult>> resultSelector,
  193. IEqualityComparer<TKey> comparer)
  194. {
  195. _outer = outer;
  196. _inner = inner;
  197. _outerKeySelector = outerKeySelector;
  198. _innerKeySelector = innerKeySelector;
  199. _resultSelector = resultSelector;
  200. _comparer = comparer;
  201. }
  202. public IAsyncEnumerator<TResult> GetAsyncEnumerator(CancellationToken cancellationToken)
  203. => new GroupJoinAsyncEnumeratorWithTask(
  204. _outer.GetAsyncEnumerator(cancellationToken),
  205. _inner,
  206. _outerKeySelector,
  207. _innerKeySelector,
  208. _resultSelector,
  209. _comparer,
  210. cancellationToken);
  211. private sealed class GroupJoinAsyncEnumeratorWithTask : IAsyncEnumerator<TResult>
  212. {
  213. private readonly IEqualityComparer<TKey> _comparer;
  214. private readonly IAsyncEnumerable<TInner> _inner;
  215. private readonly Func<TInner, ValueTask<TKey>> _innerKeySelector;
  216. private readonly IAsyncEnumerator<TOuter> _outer;
  217. private readonly Func<TOuter, ValueTask<TKey>> _outerKeySelector;
  218. private readonly Func<TOuter, IAsyncEnumerable<TInner>, ValueTask<TResult>> _resultSelector;
  219. private readonly CancellationToken _cancellationToken;
  220. private Internal.LookupWithTask<TKey, TInner> _lookup;
  221. public GroupJoinAsyncEnumeratorWithTask(
  222. IAsyncEnumerator<TOuter> outer,
  223. IAsyncEnumerable<TInner> inner,
  224. Func<TOuter, ValueTask<TKey>> outerKeySelector,
  225. Func<TInner, ValueTask<TKey>> innerKeySelector,
  226. Func<TOuter, IAsyncEnumerable<TInner>, ValueTask<TResult>> resultSelector,
  227. IEqualityComparer<TKey> comparer,
  228. CancellationToken cancellationToken)
  229. {
  230. _outer = outer;
  231. _inner = inner;
  232. _outerKeySelector = outerKeySelector;
  233. _innerKeySelector = innerKeySelector;
  234. _resultSelector = resultSelector;
  235. _comparer = comparer;
  236. _cancellationToken = cancellationToken;
  237. }
  238. public async ValueTask<bool> MoveNextAsync()
  239. {
  240. // nothing to do
  241. if (!await _outer.MoveNextAsync().ConfigureAwait(false))
  242. {
  243. return false;
  244. }
  245. if (_lookup == null)
  246. {
  247. _lookup = await Internal.LookupWithTask<TKey, TInner>.CreateForJoinAsync(_inner, _innerKeySelector, _comparer, _cancellationToken).ConfigureAwait(false);
  248. }
  249. var item = _outer.Current;
  250. var outerKey = await _outerKeySelector(item).ConfigureAwait(false);
  251. var inner = _lookup[outerKey].ToAsyncEnumerable();
  252. Current = await _resultSelector(item, inner).ConfigureAwait(false);
  253. return true;
  254. }
  255. public TResult Current { get; private set; }
  256. public ValueTask DisposeAsync() => _outer.DisposeAsync();
  257. }
  258. }
  259. #if !NO_DEEP_CANCELLATION
  260. private sealed class GroupJoinAsyncEnumerableWithTaskAndCancellation<TOuter, TInner, TKey, TResult> : IAsyncEnumerable<TResult>
  261. {
  262. private readonly IEqualityComparer<TKey> _comparer;
  263. private readonly IAsyncEnumerable<TInner> _inner;
  264. private readonly Func<TInner, CancellationToken, ValueTask<TKey>> _innerKeySelector;
  265. private readonly IAsyncEnumerable<TOuter> _outer;
  266. private readonly Func<TOuter, CancellationToken, ValueTask<TKey>> _outerKeySelector;
  267. private readonly Func<TOuter, IAsyncEnumerable<TInner>, CancellationToken, ValueTask<TResult>> _resultSelector;
  268. public GroupJoinAsyncEnumerableWithTaskAndCancellation(
  269. IAsyncEnumerable<TOuter> outer,
  270. IAsyncEnumerable<TInner> inner,
  271. Func<TOuter, CancellationToken, ValueTask<TKey>> outerKeySelector,
  272. Func<TInner, CancellationToken, ValueTask<TKey>> innerKeySelector,
  273. Func<TOuter, IAsyncEnumerable<TInner>, CancellationToken, ValueTask<TResult>> resultSelector,
  274. IEqualityComparer<TKey> comparer)
  275. {
  276. _outer = outer;
  277. _inner = inner;
  278. _outerKeySelector = outerKeySelector;
  279. _innerKeySelector = innerKeySelector;
  280. _resultSelector = resultSelector;
  281. _comparer = comparer;
  282. }
  283. public IAsyncEnumerator<TResult> GetAsyncEnumerator(CancellationToken cancellationToken)
  284. => new GroupJoinAsyncEnumeratorWithTask(
  285. _outer.GetAsyncEnumerator(cancellationToken),
  286. _inner,
  287. _outerKeySelector,
  288. _innerKeySelector,
  289. _resultSelector,
  290. _comparer,
  291. cancellationToken);
  292. private sealed class GroupJoinAsyncEnumeratorWithTask : IAsyncEnumerator<TResult>
  293. {
  294. private readonly IEqualityComparer<TKey> _comparer;
  295. private readonly IAsyncEnumerable<TInner> _inner;
  296. private readonly Func<TInner, CancellationToken, ValueTask<TKey>> _innerKeySelector;
  297. private readonly IAsyncEnumerator<TOuter> _outer;
  298. private readonly Func<TOuter, CancellationToken, ValueTask<TKey>> _outerKeySelector;
  299. private readonly Func<TOuter, IAsyncEnumerable<TInner>, CancellationToken, ValueTask<TResult>> _resultSelector;
  300. private readonly CancellationToken _cancellationToken;
  301. private Internal.LookupWithTask<TKey, TInner> _lookup;
  302. public GroupJoinAsyncEnumeratorWithTask(
  303. IAsyncEnumerator<TOuter> outer,
  304. IAsyncEnumerable<TInner> inner,
  305. Func<TOuter, CancellationToken, ValueTask<TKey>> outerKeySelector,
  306. Func<TInner, CancellationToken, ValueTask<TKey>> innerKeySelector,
  307. Func<TOuter, IAsyncEnumerable<TInner>, CancellationToken, ValueTask<TResult>> resultSelector,
  308. IEqualityComparer<TKey> comparer,
  309. CancellationToken cancellationToken)
  310. {
  311. _outer = outer;
  312. _inner = inner;
  313. _outerKeySelector = outerKeySelector;
  314. _innerKeySelector = innerKeySelector;
  315. _resultSelector = resultSelector;
  316. _comparer = comparer;
  317. _cancellationToken = cancellationToken;
  318. }
  319. public async ValueTask<bool> MoveNextAsync()
  320. {
  321. // nothing to do
  322. if (!await _outer.MoveNextAsync().ConfigureAwait(false))
  323. {
  324. return false;
  325. }
  326. if (_lookup == null)
  327. {
  328. _lookup = await Internal.LookupWithTask<TKey, TInner>.CreateForJoinAsync(_inner, _innerKeySelector, _comparer, _cancellationToken).ConfigureAwait(false);
  329. }
  330. var item = _outer.Current;
  331. var outerKey = await _outerKeySelector(item, _cancellationToken).ConfigureAwait(false);
  332. var inner = _lookup[outerKey].ToAsyncEnumerable();
  333. Current = await _resultSelector(item, inner, _cancellationToken).ConfigureAwait(false);
  334. return true;
  335. }
  336. public TResult Current { get; private set; }
  337. public ValueTask DisposeAsync() => _outer.DisposeAsync();
  338. }
  339. }
  340. #endif
  341. }
  342. }