SelectMany.cs 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Threading;
  8. using System.Threading.Tasks;
  9. namespace System.Linq
  10. {
  11. public static partial class AsyncEnumerable
  12. {
  13. public static IAsyncEnumerable<TOther> SelectMany<TSource, TOther>(this IAsyncEnumerable<TSource> source, IAsyncEnumerable<TOther> other)
  14. {
  15. if (source == null)
  16. throw new ArgumentNullException(nameof(source));
  17. if (other == null)
  18. throw new ArgumentNullException(nameof(other));
  19. return source.SelectMany(_ => other);
  20. }
  21. public static IAsyncEnumerable<TResult> SelectMany<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, IAsyncEnumerable<TResult>> selector)
  22. {
  23. if (source == null)
  24. throw new ArgumentNullException(nameof(source));
  25. if (selector == null)
  26. throw new ArgumentNullException(nameof(selector));
  27. return CreateEnumerable(
  28. () =>
  29. {
  30. var e = source.GetEnumerator();
  31. var ie = default(IAsyncEnumerator<TResult>);
  32. var innerDisposable = new AssignableDisposable();
  33. var cts = new CancellationTokenDisposable();
  34. var d = Disposable.Create(cts, innerDisposable, e);
  35. var inner = default(Func<CancellationToken, Task<bool>>);
  36. var outer = default(Func<CancellationToken, Task<bool>>);
  37. inner = async ct =>
  38. {
  39. if (await ie.MoveNext(ct)
  40. .ConfigureAwait(false))
  41. {
  42. return true;
  43. }
  44. innerDisposable.Disposable = null;
  45. return await outer(ct)
  46. .ConfigureAwait(false);
  47. };
  48. outer = async ct =>
  49. {
  50. if (await e.MoveNext(ct)
  51. .ConfigureAwait(false))
  52. {
  53. var enumerable = selector(e.Current);
  54. ie = enumerable.GetEnumerator();
  55. innerDisposable.Disposable = ie;
  56. return await inner(ct)
  57. .ConfigureAwait(false);
  58. }
  59. return false;
  60. };
  61. return CreateEnumerator(ct => ie == null ? outer(cts.Token) : inner(cts.Token),
  62. () => ie.Current,
  63. d.Dispose,
  64. e
  65. );
  66. });
  67. }
  68. public static IAsyncEnumerable<TResult> SelectMany<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, int, IAsyncEnumerable<TResult>> selector)
  69. {
  70. if (source == null)
  71. throw new ArgumentNullException(nameof(source));
  72. if (selector == null)
  73. throw new ArgumentNullException(nameof(selector));
  74. return CreateEnumerable(
  75. () =>
  76. {
  77. var e = source.GetEnumerator();
  78. var ie = default(IAsyncEnumerator<TResult>);
  79. var index = 0;
  80. var innerDisposable = new AssignableDisposable();
  81. var cts = new CancellationTokenDisposable();
  82. var d = Disposable.Create(cts, innerDisposable, e);
  83. var inner = default(Func<CancellationToken, Task<bool>>);
  84. var outer = default(Func<CancellationToken, Task<bool>>);
  85. inner = async ct =>
  86. {
  87. if (await ie.MoveNext(ct)
  88. .ConfigureAwait(false))
  89. {
  90. return true;
  91. }
  92. innerDisposable.Disposable = null;
  93. return await outer(ct)
  94. .ConfigureAwait(false);
  95. };
  96. outer = async ct =>
  97. {
  98. if (await e.MoveNext(ct)
  99. .ConfigureAwait(false))
  100. {
  101. var enumerable = selector(e.Current, checked(index++));
  102. ie = enumerable.GetEnumerator();
  103. innerDisposable.Disposable = ie;
  104. return await inner(ct)
  105. .ConfigureAwait(false);
  106. }
  107. return false;
  108. };
  109. return CreateEnumerator(ct => ie == null ? outer(cts.Token) : inner(cts.Token),
  110. () => ie.Current,
  111. d.Dispose,
  112. e
  113. );
  114. });
  115. }
  116. public static IAsyncEnumerable<TResult> SelectMany<TSource, TCollection, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, IAsyncEnumerable<TCollection>> selector, Func<TSource, TCollection, TResult> resultSelector)
  117. {
  118. if (source == null)
  119. throw new ArgumentNullException(nameof(source));
  120. if (selector == null)
  121. throw new ArgumentNullException(nameof(selector));
  122. if (resultSelector == null)
  123. throw new ArgumentNullException(nameof(resultSelector));
  124. return source.SelectMany(x => selector(x)
  125. .Select(y => resultSelector(x, y)));
  126. }
  127. public static IAsyncEnumerable<TResult> SelectMany<TSource, TCollection, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, int, IAsyncEnumerable<TCollection>> selector, Func<TSource, TCollection, TResult> resultSelector)
  128. {
  129. if (source == null)
  130. throw new ArgumentNullException(nameof(source));
  131. if (selector == null)
  132. throw new ArgumentNullException(nameof(selector));
  133. if (resultSelector == null)
  134. throw new ArgumentNullException(nameof(resultSelector));
  135. return source.SelectMany((x, i) => selector(x, i)
  136. .Select(y => resultSelector(x, y)));
  137. }
  138. }
  139. }