SelectMany.cs 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Diagnostics;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. namespace System.Linq
  9. {
  10. public static partial class AsyncEnumerable
  11. {
  12. public static IAsyncEnumerable<TOther> SelectMany<TSource, TOther>(this IAsyncEnumerable<TSource> source, IAsyncEnumerable<TOther> other)
  13. {
  14. if (source == null)
  15. throw new ArgumentNullException(nameof(source));
  16. if (other == null)
  17. throw new ArgumentNullException(nameof(other));
  18. return source.SelectMany(_ => other);
  19. }
  20. public static IAsyncEnumerable<TResult> SelectMany<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, IAsyncEnumerable<TResult>> selector)
  21. {
  22. if (source == null)
  23. throw new ArgumentNullException(nameof(source));
  24. if (selector == null)
  25. throw new ArgumentNullException(nameof(selector));
  26. return new SelectManyAsyncIterator<TSource, TResult>(source, selector);
  27. }
  28. public static IAsyncEnumerable<TResult> SelectMany<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, int, IAsyncEnumerable<TResult>> selector)
  29. {
  30. if (source == null)
  31. throw new ArgumentNullException(nameof(source));
  32. if (selector == null)
  33. throw new ArgumentNullException(nameof(selector));
  34. return new SelectManyWithIndexAsyncIterator<TSource, TResult>(source, selector);
  35. }
  36. public static IAsyncEnumerable<TResult> SelectMany<TSource, TCollection, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, IAsyncEnumerable<TCollection>> selector, Func<TSource, TCollection, TResult> resultSelector)
  37. {
  38. if (source == null)
  39. throw new ArgumentNullException(nameof(source));
  40. if (selector == null)
  41. throw new ArgumentNullException(nameof(selector));
  42. if (resultSelector == null)
  43. throw new ArgumentNullException(nameof(resultSelector));
  44. return new SelectManyAsyncIterator<TSource, TCollection, TResult>(source, selector, resultSelector);
  45. }
  46. public static IAsyncEnumerable<TResult> SelectMany<TSource, TCollection, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, int, IAsyncEnumerable<TCollection>> selector, Func<TSource, TCollection, TResult> resultSelector)
  47. {
  48. if (source == null)
  49. throw new ArgumentNullException(nameof(source));
  50. if (selector == null)
  51. throw new ArgumentNullException(nameof(selector));
  52. if (resultSelector == null)
  53. throw new ArgumentNullException(nameof(resultSelector));
  54. return new SelectManyWithIndexAsyncIterator<TSource, TCollection, TResult>(source, selector, resultSelector);
  55. }
  56. private sealed class SelectManyAsyncIterator<TSource, TResult> : AsyncIterator<TResult>
  57. {
  58. private const int State_Source = 1;
  59. private const int State_Result = 2;
  60. private readonly Func<TSource, IAsyncEnumerable<TResult>> selector;
  61. private readonly IAsyncEnumerable<TSource> source;
  62. private int mode;
  63. private IAsyncEnumerator<TResult> resultEnumerator;
  64. private IAsyncEnumerator<TSource> sourceEnumerator;
  65. public SelectManyAsyncIterator(IAsyncEnumerable<TSource> source, Func<TSource, IAsyncEnumerable<TResult>> selector)
  66. {
  67. this.source = source;
  68. this.selector = selector;
  69. }
  70. public override AsyncIterator<TResult> Clone()
  71. {
  72. return new SelectManyAsyncIterator<TSource, TResult>(source, selector);
  73. }
  74. public override void Dispose()
  75. {
  76. if (sourceEnumerator != null)
  77. {
  78. sourceEnumerator.Dispose();
  79. sourceEnumerator = null;
  80. }
  81. if (resultEnumerator != null)
  82. {
  83. resultEnumerator.Dispose();
  84. resultEnumerator = null;
  85. }
  86. base.Dispose();
  87. }
  88. protected override async Task<bool> MoveNextCore(CancellationToken cancellationToken)
  89. {
  90. switch (state)
  91. {
  92. case AsyncIteratorState.Allocated:
  93. sourceEnumerator = source.GetEnumerator();
  94. mode = State_Source;
  95. state = AsyncIteratorState.Iterating;
  96. goto case AsyncIteratorState.Iterating;
  97. case AsyncIteratorState.Iterating:
  98. switch (mode)
  99. {
  100. case State_Source:
  101. if (await sourceEnumerator.MoveNext(cancellationToken)
  102. .ConfigureAwait(false))
  103. {
  104. resultEnumerator?.Dispose();
  105. resultEnumerator = selector(sourceEnumerator.Current)
  106. .GetEnumerator();
  107. mode = State_Result;
  108. goto case State_Result;
  109. }
  110. break;
  111. case State_Result:
  112. if (await resultEnumerator.MoveNext(cancellationToken)
  113. .ConfigureAwait(false))
  114. {
  115. current = resultEnumerator.Current;
  116. return true;
  117. }
  118. mode = State_Source;
  119. goto case State_Source; // loop
  120. }
  121. break;
  122. }
  123. Dispose();
  124. return false;
  125. }
  126. }
  127. private sealed class SelectManyAsyncIterator<TSource, TCollection, TResult> : AsyncIterator<TResult>
  128. {
  129. private const int State_Source = 1;
  130. private const int State_Result = 2;
  131. private readonly Func<TSource, IAsyncEnumerable<TCollection>> collectionSelector;
  132. private readonly Func<TSource, TCollection, TResult> resultSelector;
  133. private readonly IAsyncEnumerable<TSource> source;
  134. private TSource currentSource;
  135. private int mode;
  136. private IAsyncEnumerator<TCollection> resultEnumerator;
  137. private IAsyncEnumerator<TSource> sourceEnumerator;
  138. public SelectManyAsyncIterator(IAsyncEnumerable<TSource> source, Func<TSource, IAsyncEnumerable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
  139. {
  140. this.source = source;
  141. this.collectionSelector = collectionSelector;
  142. this.resultSelector = resultSelector;
  143. }
  144. public override AsyncIterator<TResult> Clone()
  145. {
  146. return new SelectManyAsyncIterator<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
  147. }
  148. public override void Dispose()
  149. {
  150. if (sourceEnumerator != null)
  151. {
  152. sourceEnumerator.Dispose();
  153. sourceEnumerator = null;
  154. }
  155. if (resultEnumerator != null)
  156. {
  157. resultEnumerator.Dispose();
  158. resultEnumerator = null;
  159. }
  160. currentSource = default(TSource);
  161. base.Dispose();
  162. }
  163. protected override async Task<bool> MoveNextCore(CancellationToken cancellationToken)
  164. {
  165. switch (state)
  166. {
  167. case AsyncIteratorState.Allocated:
  168. sourceEnumerator = source.GetEnumerator();
  169. mode = State_Source;
  170. state = AsyncIteratorState.Iterating;
  171. goto case AsyncIteratorState.Iterating;
  172. case AsyncIteratorState.Iterating:
  173. switch (mode)
  174. {
  175. case State_Source:
  176. if (await sourceEnumerator.MoveNext(cancellationToken)
  177. .ConfigureAwait(false))
  178. {
  179. resultEnumerator?.Dispose();
  180. currentSource = sourceEnumerator.Current;
  181. resultEnumerator = collectionSelector(currentSource)
  182. .GetEnumerator();
  183. mode = State_Result;
  184. goto case State_Result;
  185. }
  186. break;
  187. case State_Result:
  188. if (await resultEnumerator.MoveNext(cancellationToken)
  189. .ConfigureAwait(false))
  190. {
  191. current = resultSelector(currentSource, resultEnumerator.Current);
  192. return true;
  193. }
  194. mode = State_Source;
  195. goto case State_Source; // loop
  196. }
  197. break;
  198. }
  199. Dispose();
  200. return false;
  201. }
  202. }
  203. private sealed class SelectManyWithIndexAsyncIterator<TSource, TCollection, TResult> : AsyncIterator<TResult>
  204. {
  205. private const int State_Source = 1;
  206. private const int State_Result = 2;
  207. private readonly Func<TSource, int, IAsyncEnumerable<TCollection>> collectionSelector;
  208. private readonly Func<TSource, TCollection, TResult> resultSelector;
  209. private readonly IAsyncEnumerable<TSource> source;
  210. private TSource currentSource;
  211. private int index;
  212. private int mode;
  213. private IAsyncEnumerator<TCollection> resultEnumerator;
  214. private IAsyncEnumerator<TSource> sourceEnumerator;
  215. public SelectManyWithIndexAsyncIterator(IAsyncEnumerable<TSource> source, Func<TSource, int, IAsyncEnumerable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
  216. {
  217. Debug.Assert(source != null);
  218. Debug.Assert(collectionSelector != null);
  219. Debug.Assert(resultSelector != null);
  220. this.source = source;
  221. this.collectionSelector = collectionSelector;
  222. this.resultSelector = resultSelector;
  223. }
  224. public override AsyncIterator<TResult> Clone()
  225. {
  226. return new SelectManyWithIndexAsyncIterator<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
  227. }
  228. public override void Dispose()
  229. {
  230. if (sourceEnumerator != null)
  231. {
  232. sourceEnumerator.Dispose();
  233. sourceEnumerator = null;
  234. }
  235. if (resultEnumerator != null)
  236. {
  237. resultEnumerator.Dispose();
  238. resultEnumerator = null;
  239. }
  240. currentSource = default(TSource);
  241. base.Dispose();
  242. }
  243. protected override async Task<bool> MoveNextCore(CancellationToken cancellationToken)
  244. {
  245. switch (state)
  246. {
  247. case AsyncIteratorState.Allocated:
  248. sourceEnumerator = source.GetEnumerator();
  249. index = -1;
  250. mode = State_Source;
  251. state = AsyncIteratorState.Iterating;
  252. goto case AsyncIteratorState.Iterating;
  253. case AsyncIteratorState.Iterating:
  254. switch (mode)
  255. {
  256. case State_Source:
  257. if (await sourceEnumerator.MoveNext(cancellationToken)
  258. .ConfigureAwait(false))
  259. {
  260. resultEnumerator?.Dispose();
  261. currentSource = sourceEnumerator.Current;
  262. checked
  263. {
  264. index++;
  265. }
  266. resultEnumerator = collectionSelector(currentSource, index)
  267. .GetEnumerator();
  268. mode = State_Result;
  269. goto case State_Result;
  270. }
  271. break;
  272. case State_Result:
  273. if (await resultEnumerator.MoveNext(cancellationToken)
  274. .ConfigureAwait(false))
  275. {
  276. current = resultSelector(currentSource, resultEnumerator.Current);
  277. return true;
  278. }
  279. mode = State_Source;
  280. goto case State_Source; // loop
  281. }
  282. break;
  283. }
  284. Dispose();
  285. return false;
  286. }
  287. }
  288. private sealed class SelectManyWithIndexAsyncIterator<TSource, TResult> : AsyncIterator<TResult>
  289. {
  290. private const int State_Source = 1;
  291. private const int State_Result = 2;
  292. private readonly Func<TSource, int, IAsyncEnumerable<TResult>> selector;
  293. private readonly IAsyncEnumerable<TSource> source;
  294. private int index;
  295. private int mode;
  296. private IAsyncEnumerator<TResult> resultEnumerator;
  297. private IAsyncEnumerator<TSource> sourceEnumerator;
  298. public SelectManyWithIndexAsyncIterator(IAsyncEnumerable<TSource> source, Func<TSource, int, IAsyncEnumerable<TResult>> selector)
  299. {
  300. Debug.Assert(source != null);
  301. Debug.Assert(selector != null);
  302. this.source = source;
  303. this.selector = selector;
  304. }
  305. public override AsyncIterator<TResult> Clone()
  306. {
  307. return new SelectManyWithIndexAsyncIterator<TSource, TResult>(source, selector);
  308. }
  309. public override void Dispose()
  310. {
  311. if (sourceEnumerator != null)
  312. {
  313. sourceEnumerator.Dispose();
  314. sourceEnumerator = null;
  315. }
  316. if (resultEnumerator != null)
  317. {
  318. resultEnumerator.Dispose();
  319. resultEnumerator = null;
  320. }
  321. base.Dispose();
  322. }
  323. protected override async Task<bool> MoveNextCore(CancellationToken cancellationToken)
  324. {
  325. switch (state)
  326. {
  327. case AsyncIteratorState.Allocated:
  328. sourceEnumerator = source.GetEnumerator();
  329. index = -1;
  330. mode = State_Source;
  331. state = AsyncIteratorState.Iterating;
  332. goto case AsyncIteratorState.Iterating;
  333. case AsyncIteratorState.Iterating:
  334. switch (mode)
  335. {
  336. case State_Source:
  337. if (await sourceEnumerator.MoveNext(cancellationToken)
  338. .ConfigureAwait(false))
  339. {
  340. resultEnumerator?.Dispose();
  341. checked
  342. {
  343. index++;
  344. }
  345. resultEnumerator = selector(sourceEnumerator.Current, index)
  346. .GetEnumerator();
  347. mode = State_Result;
  348. goto case State_Result;
  349. }
  350. break;
  351. case State_Result:
  352. if (await resultEnumerator.MoveNext(cancellationToken)
  353. .ConfigureAwait(false))
  354. {
  355. current = resultEnumerator.Current;
  356. return true;
  357. }
  358. mode = State_Source;
  359. goto case State_Source; // loop
  360. }
  361. break;
  362. }
  363. Dispose();
  364. return false;
  365. }
  366. }
  367. }
  368. }