Concat.cs 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Diagnostics;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. namespace System.Linq
  9. {
  10. public static partial class AsyncEnumerableEx
  11. {
  12. public static IAsyncEnumerable<TSource> Concat<TSource>(this IAsyncEnumerable<IAsyncEnumerable<TSource>> sources)
  13. {
  14. if (sources == null)
  15. throw new ArgumentNullException(nameof(sources));
  16. return new ConcatAsyncEnumerableAsyncIterator<TSource>(sources);
  17. }
  18. public static IAsyncEnumerable<TSource> Concat<TSource>(this IEnumerable<IAsyncEnumerable<TSource>> sources)
  19. {
  20. if (sources == null)
  21. throw new ArgumentNullException(nameof(sources));
  22. return ConcatCore(sources);
  23. }
  24. public static IAsyncEnumerable<TSource> Concat<TSource>(params IAsyncEnumerable<TSource>[] sources)
  25. {
  26. if (sources == null)
  27. throw new ArgumentNullException(nameof(sources));
  28. return ConcatCore(sources);
  29. }
  30. private static IAsyncEnumerable<TSource> ConcatCore<TSource>(IEnumerable<IAsyncEnumerable<TSource>> sources)
  31. {
  32. return new ConcatEnumerableAsyncIterator<TSource>(sources);
  33. }
  34. private sealed class ConcatEnumerableAsyncIterator<TSource> : AsyncIterator<TSource>
  35. {
  36. private readonly IEnumerable<IAsyncEnumerable<TSource>> source;
  37. public ConcatEnumerableAsyncIterator(IEnumerable<IAsyncEnumerable<TSource>> source)
  38. {
  39. Debug.Assert(source != null);
  40. this.source = source;
  41. }
  42. public override AsyncIterator<TSource> Clone()
  43. {
  44. return new ConcatEnumerableAsyncIterator<TSource>(source);
  45. }
  46. public override async ValueTask DisposeAsync()
  47. {
  48. if (outerEnumerator != null)
  49. {
  50. outerEnumerator.Dispose();
  51. outerEnumerator = null;
  52. }
  53. if (currentEnumerator != null)
  54. {
  55. await currentEnumerator.DisposeAsync().ConfigureAwait(false);
  56. currentEnumerator = null;
  57. }
  58. await base.DisposeAsync().ConfigureAwait(false);
  59. }
  60. // State machine vars
  61. private IEnumerator<IAsyncEnumerable<TSource>> outerEnumerator;
  62. private IAsyncEnumerator<TSource> currentEnumerator;
  63. private int mode;
  64. private const int State_OuterNext = 1;
  65. private const int State_While = 4;
  66. protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
  67. {
  68. switch (state)
  69. {
  70. case AsyncIteratorState.Allocated:
  71. outerEnumerator = source.GetEnumerator();
  72. mode = State_OuterNext;
  73. state = AsyncIteratorState.Iterating;
  74. goto case AsyncIteratorState.Iterating;
  75. case AsyncIteratorState.Iterating:
  76. switch (mode)
  77. {
  78. case State_OuterNext:
  79. if (outerEnumerator.MoveNext())
  80. {
  81. // make sure we dispose the previous one if we're about to replace it
  82. if (currentEnumerator != null)
  83. {
  84. await currentEnumerator.DisposeAsync().ConfigureAwait(false);
  85. }
  86. currentEnumerator = outerEnumerator.Current.GetAsyncEnumerator(cancellationToken);
  87. mode = State_While;
  88. goto case State_While;
  89. }
  90. break;
  91. case State_While:
  92. if (await currentEnumerator.MoveNextAsync().ConfigureAwait(false))
  93. {
  94. current = currentEnumerator.Current;
  95. return true;
  96. }
  97. // No more on the inner enumerator, move to the next outer
  98. goto case State_OuterNext;
  99. }
  100. await DisposeAsync().ConfigureAwait(false);
  101. break;
  102. }
  103. return false;
  104. }
  105. }
  106. private sealed class ConcatAsyncEnumerableAsyncIterator<TSource> : AsyncIterator<TSource>
  107. {
  108. private readonly IAsyncEnumerable<IAsyncEnumerable<TSource>> source;
  109. public ConcatAsyncEnumerableAsyncIterator(IAsyncEnumerable<IAsyncEnumerable<TSource>> source)
  110. {
  111. Debug.Assert(source != null);
  112. this.source = source;
  113. }
  114. public override AsyncIterator<TSource> Clone()
  115. {
  116. return new ConcatAsyncEnumerableAsyncIterator<TSource>(source);
  117. }
  118. public override async ValueTask DisposeAsync()
  119. {
  120. if (outerEnumerator != null)
  121. {
  122. await outerEnumerator.DisposeAsync().ConfigureAwait(false);
  123. outerEnumerator = null;
  124. }
  125. if (currentEnumerator != null)
  126. {
  127. await currentEnumerator.DisposeAsync().ConfigureAwait(false);
  128. currentEnumerator = null;
  129. }
  130. await base.DisposeAsync().ConfigureAwait(false);
  131. }
  132. // State machine vars
  133. private IAsyncEnumerator<IAsyncEnumerable<TSource>> outerEnumerator;
  134. private IAsyncEnumerator<TSource> currentEnumerator;
  135. private int mode;
  136. private const int State_OuterNext = 1;
  137. private const int State_While = 4;
  138. protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
  139. {
  140. switch (state)
  141. {
  142. case AsyncIteratorState.Allocated:
  143. outerEnumerator = source.GetAsyncEnumerator(cancellationToken);
  144. mode = State_OuterNext;
  145. state = AsyncIteratorState.Iterating;
  146. goto case AsyncIteratorState.Iterating;
  147. case AsyncIteratorState.Iterating:
  148. switch (mode)
  149. {
  150. case State_OuterNext:
  151. if (await outerEnumerator.MoveNextAsync().ConfigureAwait(false))
  152. {
  153. // make sure we dispose the previous one if we're about to replace it
  154. if (currentEnumerator != null)
  155. {
  156. await currentEnumerator.DisposeAsync().ConfigureAwait(false);
  157. }
  158. currentEnumerator = outerEnumerator.Current.GetAsyncEnumerator(cancellationToken);
  159. mode = State_While;
  160. goto case State_While;
  161. }
  162. break;
  163. case State_While:
  164. if (await currentEnumerator.MoveNextAsync().ConfigureAwait(false))
  165. {
  166. current = currentEnumerator.Current;
  167. return true;
  168. }
  169. // No more on the inner enumerator, move to the next outer
  170. goto case State_OuterNext;
  171. }
  172. await DisposeAsync().ConfigureAwait(false);
  173. break;
  174. }
  175. return false;
  176. }
  177. }
  178. }
  179. }