Zip.cs 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Diagnostics;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. namespace System.Linq
  9. {
  10. public static partial class AsyncEnumerable
  11. {
  12. #if HAS_VALUETUPLE
  13. public static IAsyncEnumerable<(TFirst First, TSecond Second)> Zip<TFirst, TSecond>(this IAsyncEnumerable<TFirst> first, IAsyncEnumerable<TSecond> second)
  14. {
  15. if (first == null)
  16. throw Error.ArgumentNull(nameof(first));
  17. if (second == null)
  18. throw Error.ArgumentNull(nameof(second));
  19. #if USE_ASYNC_ITERATOR
  20. return Create(Core);
  21. async IAsyncEnumerator<(TFirst, TSecond)> Core(CancellationToken cancellationToken)
  22. {
  23. await using (var e1 = first.GetConfiguredAsyncEnumerator(cancellationToken, false))
  24. {
  25. await using (var e2 = second.GetConfiguredAsyncEnumerator(cancellationToken, false))
  26. {
  27. while (await e1.MoveNextAsync() && await e2.MoveNextAsync())
  28. {
  29. yield return (e1.Current, e2.Current);
  30. }
  31. }
  32. }
  33. }
  34. #else
  35. return new ZipAsyncIterator<TFirst, TSecond, (TFirst, TSecond)>(first, second, (first, second) => (first, second));
  36. #endif
  37. }
  38. #endif
  39. public static IAsyncEnumerable<TResult> Zip<TFirst, TSecond, TResult>(this IAsyncEnumerable<TFirst> first, IAsyncEnumerable<TSecond> second, Func<TFirst, TSecond, TResult> selector)
  40. {
  41. if (first == null)
  42. throw Error.ArgumentNull(nameof(first));
  43. if (second == null)
  44. throw Error.ArgumentNull(nameof(second));
  45. if (selector == null)
  46. throw Error.ArgumentNull(nameof(selector));
  47. #if USE_ASYNC_ITERATOR
  48. return Create(Core);
  49. async IAsyncEnumerator<TResult> Core(CancellationToken cancellationToken)
  50. {
  51. await using (var e1 = first.GetConfiguredAsyncEnumerator(cancellationToken, false))
  52. {
  53. await using (var e2 = second.GetConfiguredAsyncEnumerator(cancellationToken, false))
  54. {
  55. while (await e1.MoveNextAsync() && await e2.MoveNextAsync())
  56. {
  57. yield return selector(e1.Current, e2.Current);
  58. }
  59. }
  60. }
  61. }
  62. #else
  63. return new ZipAsyncIterator<TFirst, TSecond, TResult>(first, second, selector);
  64. #endif
  65. }
  66. internal static IAsyncEnumerable<TResult> ZipAwaitCore<TFirst, TSecond, TResult>(this IAsyncEnumerable<TFirst> first, IAsyncEnumerable<TSecond> second, Func<TFirst, TSecond, ValueTask<TResult>> selector)
  67. {
  68. if (first == null)
  69. throw Error.ArgumentNull(nameof(first));
  70. if (second == null)
  71. throw Error.ArgumentNull(nameof(second));
  72. if (selector == null)
  73. throw Error.ArgumentNull(nameof(selector));
  74. #if USE_ASYNC_ITERATOR
  75. return Create(Core);
  76. async IAsyncEnumerator<TResult> Core(CancellationToken cancellationToken)
  77. {
  78. await using (var e1 = first.GetConfiguredAsyncEnumerator(cancellationToken, false))
  79. {
  80. await using (var e2 = second.GetConfiguredAsyncEnumerator(cancellationToken, false))
  81. {
  82. while (await e1.MoveNextAsync() && await e2.MoveNextAsync())
  83. {
  84. yield return await selector(e1.Current, e2.Current).ConfigureAwait(false);
  85. }
  86. }
  87. }
  88. }
  89. #else
  90. return new ZipAsyncIteratorWithTask<TFirst, TSecond, TResult>(first, second, selector);
  91. #endif
  92. }
  93. #if !NO_DEEP_CANCELLATION
  94. internal static IAsyncEnumerable<TResult> ZipAwaitWithCancellationCore<TFirst, TSecond, TResult>(this IAsyncEnumerable<TFirst> first, IAsyncEnumerable<TSecond> second, Func<TFirst, TSecond, CancellationToken, ValueTask<TResult>> selector)
  95. {
  96. if (first == null)
  97. throw Error.ArgumentNull(nameof(first));
  98. if (second == null)
  99. throw Error.ArgumentNull(nameof(second));
  100. if (selector == null)
  101. throw Error.ArgumentNull(nameof(selector));
  102. #if USE_ASYNC_ITERATOR
  103. return Create(Core);
  104. async IAsyncEnumerator<TResult> Core(CancellationToken cancellationToken)
  105. {
  106. await using (var e1 = first.GetConfiguredAsyncEnumerator(cancellationToken, false))
  107. {
  108. await using (var e2 = second.GetConfiguredAsyncEnumerator(cancellationToken, false))
  109. {
  110. while (await e1.MoveNextAsync() && await e2.MoveNextAsync())
  111. {
  112. yield return await selector(e1.Current, e2.Current, cancellationToken).ConfigureAwait(false);
  113. }
  114. }
  115. }
  116. }
  117. #else
  118. return new ZipAsyncIteratorWithTaskAndCancellation<TFirst, TSecond, TResult>(first, second, selector);
  119. #endif
  120. }
  121. #endif
  122. #if !USE_ASYNC_ITERATOR
  123. private sealed class ZipAsyncIterator<TFirst, TSecond, TResult> : AsyncIterator<TResult>
  124. {
  125. private readonly IAsyncEnumerable<TFirst> _first;
  126. private readonly IAsyncEnumerable<TSecond> _second;
  127. private readonly Func<TFirst, TSecond, TResult> _selector;
  128. private IAsyncEnumerator<TFirst> _firstEnumerator;
  129. private IAsyncEnumerator<TSecond> _secondEnumerator;
  130. public ZipAsyncIterator(IAsyncEnumerable<TFirst> first, IAsyncEnumerable<TSecond> second, Func<TFirst, TSecond, TResult> selector)
  131. {
  132. Debug.Assert(first != null);
  133. Debug.Assert(second != null);
  134. Debug.Assert(selector != null);
  135. _first = first;
  136. _second = second;
  137. _selector = selector;
  138. }
  139. public override AsyncIteratorBase<TResult> Clone()
  140. {
  141. return new ZipAsyncIterator<TFirst, TSecond, TResult>(_first, _second, _selector);
  142. }
  143. public override async ValueTask DisposeAsync()
  144. {
  145. if (_secondEnumerator != null)
  146. {
  147. await _secondEnumerator.DisposeAsync().ConfigureAwait(false);
  148. _secondEnumerator = null;
  149. }
  150. if (_firstEnumerator != null)
  151. {
  152. await _firstEnumerator.DisposeAsync().ConfigureAwait(false);
  153. _firstEnumerator = null;
  154. }
  155. await base.DisposeAsync().ConfigureAwait(false);
  156. }
  157. protected override async ValueTask<bool> MoveNextCore()
  158. {
  159. // REVIEW: Earlier versions of this operator performed concurrent MoveNextAsync calls, which isn't a great default and
  160. // results in an unexpected source of concurrency. However, a concurrent Zip may be a worthy addition to the
  161. // API or System.Interactive.Async as a complementary implementation besides the conservative default.
  162. switch (_state)
  163. {
  164. case AsyncIteratorState.Allocated:
  165. _firstEnumerator = _first.GetAsyncEnumerator(_cancellationToken);
  166. _secondEnumerator = _second.GetAsyncEnumerator(_cancellationToken);
  167. _state = AsyncIteratorState.Iterating;
  168. goto case AsyncIteratorState.Iterating;
  169. case AsyncIteratorState.Iterating:
  170. if (await _firstEnumerator.MoveNextAsync().ConfigureAwait(false) && await _secondEnumerator.MoveNextAsync().ConfigureAwait(false))
  171. {
  172. _current = _selector(_firstEnumerator.Current, _secondEnumerator.Current);
  173. return true;
  174. }
  175. await DisposeAsync().ConfigureAwait(false);
  176. break;
  177. }
  178. return false;
  179. }
  180. }
  181. private sealed class ZipAsyncIteratorWithTask<TFirst, TSecond, TResult> : AsyncIterator<TResult>
  182. {
  183. private readonly IAsyncEnumerable<TFirst> _first;
  184. private readonly IAsyncEnumerable<TSecond> _second;
  185. private readonly Func<TFirst, TSecond, ValueTask<TResult>> _selector;
  186. private IAsyncEnumerator<TFirst> _firstEnumerator;
  187. private IAsyncEnumerator<TSecond> _secondEnumerator;
  188. public ZipAsyncIteratorWithTask(IAsyncEnumerable<TFirst> first, IAsyncEnumerable<TSecond> second, Func<TFirst, TSecond, ValueTask<TResult>> selector)
  189. {
  190. Debug.Assert(first != null);
  191. Debug.Assert(second != null);
  192. Debug.Assert(selector != null);
  193. _first = first;
  194. _second = second;
  195. _selector = selector;
  196. }
  197. public override AsyncIteratorBase<TResult> Clone()
  198. {
  199. return new ZipAsyncIteratorWithTask<TFirst, TSecond, TResult>(_first, _second, _selector);
  200. }
  201. public override async ValueTask DisposeAsync()
  202. {
  203. if (_secondEnumerator != null)
  204. {
  205. await _secondEnumerator.DisposeAsync().ConfigureAwait(false);
  206. _secondEnumerator = null;
  207. }
  208. if (_firstEnumerator != null)
  209. {
  210. await _firstEnumerator.DisposeAsync().ConfigureAwait(false);
  211. _firstEnumerator = null;
  212. }
  213. await base.DisposeAsync().ConfigureAwait(false);
  214. }
  215. protected override async ValueTask<bool> MoveNextCore()
  216. {
  217. // REVIEW: Earlier versions of this operator performed concurrent MoveNextAsync calls, which isn't a great default and
  218. // results in an unexpected source of concurrency. However, a concurrent Zip may be a worthy addition to the
  219. // API or System.Interactive.Async as a complementary implementation besides the conservative default.
  220. switch (_state)
  221. {
  222. case AsyncIteratorState.Allocated:
  223. _firstEnumerator = _first.GetAsyncEnumerator(_cancellationToken);
  224. _secondEnumerator = _second.GetAsyncEnumerator(_cancellationToken);
  225. _state = AsyncIteratorState.Iterating;
  226. goto case AsyncIteratorState.Iterating;
  227. case AsyncIteratorState.Iterating:
  228. if (await _firstEnumerator.MoveNextAsync().ConfigureAwait(false) && await _secondEnumerator.MoveNextAsync().ConfigureAwait(false))
  229. {
  230. _current = await _selector(_firstEnumerator.Current, _secondEnumerator.Current).ConfigureAwait(false);
  231. return true;
  232. }
  233. await DisposeAsync().ConfigureAwait(false);
  234. break;
  235. }
  236. return false;
  237. }
  238. }
  239. #if !NO_DEEP_CANCELLATION
  240. private sealed class ZipAsyncIteratorWithTaskAndCancellation<TFirst, TSecond, TResult> : AsyncIterator<TResult>
  241. {
  242. private readonly IAsyncEnumerable<TFirst> _first;
  243. private readonly IAsyncEnumerable<TSecond> _second;
  244. private readonly Func<TFirst, TSecond, CancellationToken, ValueTask<TResult>> _selector;
  245. private IAsyncEnumerator<TFirst> _firstEnumerator;
  246. private IAsyncEnumerator<TSecond> _secondEnumerator;
  247. public ZipAsyncIteratorWithTaskAndCancellation(IAsyncEnumerable<TFirst> first, IAsyncEnumerable<TSecond> second, Func<TFirst, TSecond, CancellationToken, ValueTask<TResult>> selector)
  248. {
  249. Debug.Assert(first != null);
  250. Debug.Assert(second != null);
  251. Debug.Assert(selector != null);
  252. _first = first;
  253. _second = second;
  254. _selector = selector;
  255. }
  256. public override AsyncIteratorBase<TResult> Clone()
  257. {
  258. return new ZipAsyncIteratorWithTaskAndCancellation<TFirst, TSecond, TResult>(_first, _second, _selector);
  259. }
  260. public override async ValueTask DisposeAsync()
  261. {
  262. if (_secondEnumerator != null)
  263. {
  264. await _secondEnumerator.DisposeAsync().ConfigureAwait(false);
  265. _secondEnumerator = null;
  266. }
  267. if (_firstEnumerator != null)
  268. {
  269. await _firstEnumerator.DisposeAsync().ConfigureAwait(false);
  270. _firstEnumerator = null;
  271. }
  272. await base.DisposeAsync().ConfigureAwait(false);
  273. }
  274. protected override async ValueTask<bool> MoveNextCore()
  275. {
  276. // REVIEW: Earlier versions of this operator performed concurrent MoveNextAsync calls, which isn't a great default and
  277. // results in an unexpected source of concurrency. However, a concurrent Zip may be a worthy addition to the
  278. // API or System.Interactive.Async as a complementary implementation besides the conservative default.
  279. switch (_state)
  280. {
  281. case AsyncIteratorState.Allocated:
  282. _firstEnumerator = _first.GetAsyncEnumerator(_cancellationToken);
  283. _secondEnumerator = _second.GetAsyncEnumerator(_cancellationToken);
  284. _state = AsyncIteratorState.Iterating;
  285. goto case AsyncIteratorState.Iterating;
  286. case AsyncIteratorState.Iterating:
  287. if (await _firstEnumerator.MoveNextAsync().ConfigureAwait(false) && await _secondEnumerator.MoveNextAsync().ConfigureAwait(false))
  288. {
  289. _current = await _selector(_firstEnumerator.Current, _secondEnumerator.Current, _cancellationToken).ConfigureAwait(false);
  290. return true;
  291. }
  292. await DisposeAsync().ConfigureAwait(false);
  293. break;
  294. }
  295. return false;
  296. }
  297. }
  298. #endif
  299. #endif
  300. }
  301. }