Join.cs 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Diagnostics;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. namespace System.Linq
  9. {
  10. public static partial class AsyncEnumerable
  11. {
  12. public static IAsyncEnumerable<TResult> Join<TOuter, TInner, TKey, TResult>(this IAsyncEnumerable<TOuter> outer, IAsyncEnumerable<TInner> inner, Func<TOuter, TKey> outerKeySelector, Func<TInner, TKey> innerKeySelector, Func<TOuter, TInner, TResult> resultSelector) =>
  13. Join(outer, inner, outerKeySelector, innerKeySelector, resultSelector, comparer: null);
  14. public static IAsyncEnumerable<TResult> Join<TOuter, TInner, TKey, TResult>(this IAsyncEnumerable<TOuter> outer, IAsyncEnumerable<TInner> inner, Func<TOuter, TKey> outerKeySelector, Func<TInner, TKey> innerKeySelector, Func<TOuter, TInner, TResult> resultSelector, IEqualityComparer<TKey>? comparer)
  15. {
  16. if (outer == null)
  17. throw Error.ArgumentNull(nameof(outer));
  18. if (inner == null)
  19. throw Error.ArgumentNull(nameof(inner));
  20. if (outerKeySelector == null)
  21. throw Error.ArgumentNull(nameof(outerKeySelector));
  22. if (innerKeySelector == null)
  23. throw Error.ArgumentNull(nameof(innerKeySelector));
  24. if (resultSelector == null)
  25. throw Error.ArgumentNull(nameof(resultSelector));
  26. return Create(Core);
  27. async IAsyncEnumerator<TResult> Core(CancellationToken cancellationToken)
  28. {
  29. await using (var e = outer.GetConfiguredAsyncEnumerator(cancellationToken, false))
  30. {
  31. if (await e.MoveNextAsync())
  32. {
  33. var lookup = await Internal.Lookup<TKey, TInner>.CreateForJoinAsync(inner, innerKeySelector, comparer, cancellationToken).ConfigureAwait(false);
  34. if (lookup.Count != 0)
  35. {
  36. do
  37. {
  38. var item = e.Current;
  39. var outerKey = outerKeySelector(item);
  40. var g = lookup.GetGrouping(outerKey);
  41. if (g != null)
  42. {
  43. var count = g._count;
  44. var elements = g._elements;
  45. for (var i = 0; i != count; ++i)
  46. {
  47. yield return resultSelector(item, elements[i]);
  48. }
  49. }
  50. }
  51. while (await e.MoveNextAsync());
  52. }
  53. }
  54. }
  55. }
  56. }
  57. internal static IAsyncEnumerable<TResult> JoinAwaitCore<TOuter, TInner, TKey, TResult>(this IAsyncEnumerable<TOuter> outer, IAsyncEnumerable<TInner> inner, Func<TOuter, ValueTask<TKey>> outerKeySelector, Func<TInner, ValueTask<TKey>> innerKeySelector, Func<TOuter, TInner, ValueTask<TResult>> resultSelector) =>
  58. JoinAwaitCore<TOuter, TInner, TKey, TResult>(outer, inner, outerKeySelector, innerKeySelector, resultSelector, comparer: null);
  59. internal static IAsyncEnumerable<TResult> JoinAwaitCore<TOuter, TInner, TKey, TResult>(this IAsyncEnumerable<TOuter> outer, IAsyncEnumerable<TInner> inner, Func<TOuter, ValueTask<TKey>> outerKeySelector, Func<TInner, ValueTask<TKey>> innerKeySelector, Func<TOuter, TInner, ValueTask<TResult>> resultSelector, IEqualityComparer<TKey>? comparer)
  60. {
  61. if (outer == null)
  62. throw Error.ArgumentNull(nameof(outer));
  63. if (inner == null)
  64. throw Error.ArgumentNull(nameof(inner));
  65. if (outerKeySelector == null)
  66. throw Error.ArgumentNull(nameof(outerKeySelector));
  67. if (innerKeySelector == null)
  68. throw Error.ArgumentNull(nameof(innerKeySelector));
  69. if (resultSelector == null)
  70. throw Error.ArgumentNull(nameof(resultSelector));
  71. return Create(Core);
  72. async IAsyncEnumerator<TResult> Core(CancellationToken cancellationToken)
  73. {
  74. await using (var e = outer.GetConfiguredAsyncEnumerator(cancellationToken, false))
  75. {
  76. if (await e.MoveNextAsync())
  77. {
  78. var lookup = await Internal.LookupWithTask<TKey, TInner>.CreateForJoinAsync(inner, innerKeySelector, comparer, cancellationToken).ConfigureAwait(false);
  79. if (lookup.Count != 0)
  80. {
  81. do
  82. {
  83. var item = e.Current;
  84. var outerKey = await outerKeySelector(item).ConfigureAwait(false);
  85. var g = lookup.GetGrouping(outerKey);
  86. if (g != null)
  87. {
  88. var count = g._count;
  89. var elements = g._elements;
  90. for (var i = 0; i != count; ++i)
  91. {
  92. yield return await resultSelector(item, elements[i]).ConfigureAwait(false);
  93. }
  94. }
  95. }
  96. while (await e.MoveNextAsync());
  97. }
  98. }
  99. }
  100. }
  101. }
  102. #if !NO_DEEP_CANCELLATION
  103. internal static IAsyncEnumerable<TResult> JoinAwaitWithCancellationCore<TOuter, TInner, TKey, TResult>(this IAsyncEnumerable<TOuter> outer, IAsyncEnumerable<TInner> inner, Func<TOuter, CancellationToken, ValueTask<TKey>> outerKeySelector, Func<TInner, CancellationToken, ValueTask<TKey>> innerKeySelector, Func<TOuter, TInner, CancellationToken, ValueTask<TResult>> resultSelector) =>
  104. JoinAwaitWithCancellationCore<TOuter, TInner, TKey, TResult>(outer, inner, outerKeySelector, innerKeySelector, resultSelector, comparer: null);
  105. internal static IAsyncEnumerable<TResult> JoinAwaitWithCancellationCore<TOuter, TInner, TKey, TResult>(this IAsyncEnumerable<TOuter> outer, IAsyncEnumerable<TInner> inner, Func<TOuter, CancellationToken, ValueTask<TKey>> outerKeySelector, Func<TInner, CancellationToken, ValueTask<TKey>> innerKeySelector, Func<TOuter, TInner, CancellationToken, ValueTask<TResult>> resultSelector, IEqualityComparer<TKey>? comparer)
  106. {
  107. if (outer == null)
  108. throw Error.ArgumentNull(nameof(outer));
  109. if (inner == null)
  110. throw Error.ArgumentNull(nameof(inner));
  111. if (outerKeySelector == null)
  112. throw Error.ArgumentNull(nameof(outerKeySelector));
  113. if (innerKeySelector == null)
  114. throw Error.ArgumentNull(nameof(innerKeySelector));
  115. if (resultSelector == null)
  116. throw Error.ArgumentNull(nameof(resultSelector));
  117. return Create(Core);
  118. async IAsyncEnumerator<TResult> Core(CancellationToken cancellationToken)
  119. {
  120. await using (var e = outer.GetConfiguredAsyncEnumerator(cancellationToken, false))
  121. {
  122. if (await e.MoveNextAsync())
  123. {
  124. var lookup = await Internal.LookupWithTask<TKey, TInner>.CreateForJoinAsync(inner, innerKeySelector, comparer, cancellationToken).ConfigureAwait(false);
  125. if (lookup.Count != 0)
  126. {
  127. do
  128. {
  129. var item = e.Current;
  130. var outerKey = await outerKeySelector(item, cancellationToken).ConfigureAwait(false);
  131. var g = lookup.GetGrouping(outerKey);
  132. if (g != null)
  133. {
  134. var count = g._count;
  135. var elements = g._elements;
  136. for (var i = 0; i != count; ++i)
  137. {
  138. yield return await resultSelector(item, elements[i], cancellationToken).ConfigureAwait(false);
  139. }
  140. }
  141. }
  142. while (await e.MoveNextAsync());
  143. }
  144. }
  145. }
  146. }
  147. }
  148. #endif
  149. }
  150. }