Zip.cs 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Diagnostics;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. namespace System.Linq
  9. {
  10. public static partial class AsyncEnumerable
  11. {
  12. public static IAsyncEnumerable<TResult> Zip<TFirst, TSecond, TResult>(this IAsyncEnumerable<TFirst> first, IAsyncEnumerable<TSecond> second, Func<TFirst, TSecond, TResult> selector)
  13. {
  14. if (first == null)
  15. throw Error.ArgumentNull(nameof(first));
  16. if (second == null)
  17. throw Error.ArgumentNull(nameof(second));
  18. if (selector == null)
  19. throw Error.ArgumentNull(nameof(selector));
  20. #if USE_ASYNC_ITERATOR
  21. return Create(Core);
  22. async IAsyncEnumerator<TResult> Core(CancellationToken cancellationToken)
  23. {
  24. var e1 = first.GetConfiguredAsyncEnumerator(cancellationToken, false);
  25. try // TODO: Switch to `await using` in preview 3 (https://github.com/dotnet/roslyn/pull/32731)
  26. {
  27. var e2 = second.GetConfiguredAsyncEnumerator(cancellationToken, false);
  28. try // TODO: Switch to `await using` in preview 3 (https://github.com/dotnet/roslyn/pull/32731)
  29. {
  30. while (await e1.MoveNextAsync() && await e2.MoveNextAsync())
  31. {
  32. yield return selector(e1.Current, e2.Current);
  33. }
  34. }
  35. finally
  36. {
  37. await e2.DisposeAsync();
  38. }
  39. }
  40. finally
  41. {
  42. await e1.DisposeAsync();
  43. }
  44. }
  45. #else
  46. return new ZipAsyncIterator<TFirst, TSecond, TResult>(first, second, selector);
  47. #endif
  48. }
  49. public static IAsyncEnumerable<TResult> Zip<TFirst, TSecond, TResult>(this IAsyncEnumerable<TFirst> first, IAsyncEnumerable<TSecond> second, Func<TFirst, TSecond, ValueTask<TResult>> selector)
  50. {
  51. if (first == null)
  52. throw Error.ArgumentNull(nameof(first));
  53. if (second == null)
  54. throw Error.ArgumentNull(nameof(second));
  55. if (selector == null)
  56. throw Error.ArgumentNull(nameof(selector));
  57. #if USE_ASYNC_ITERATOR
  58. return Create(Core);
  59. async IAsyncEnumerator<TResult> Core(CancellationToken cancellationToken)
  60. {
  61. var e1 = first.GetConfiguredAsyncEnumerator(cancellationToken, false);
  62. try // TODO: Switch to `await using` in preview 3 (https://github.com/dotnet/roslyn/pull/32731)
  63. {
  64. var e2 = second.GetConfiguredAsyncEnumerator(cancellationToken, false);
  65. try // TODO: Switch to `await using` in preview 3 (https://github.com/dotnet/roslyn/pull/32731)
  66. {
  67. while (await e1.MoveNextAsync() && await e2.MoveNextAsync())
  68. {
  69. yield return await selector(e1.Current, e2.Current).ConfigureAwait(false);
  70. }
  71. }
  72. finally
  73. {
  74. await e2.DisposeAsync();
  75. }
  76. }
  77. finally
  78. {
  79. await e1.DisposeAsync();
  80. }
  81. }
  82. #else
  83. return new ZipAsyncIteratorWithTask<TFirst, TSecond, TResult>(first, second, selector);
  84. #endif
  85. }
  86. #if !NO_DEEP_CANCELLATION
  87. public static IAsyncEnumerable<TResult> Zip<TFirst, TSecond, TResult>(this IAsyncEnumerable<TFirst> first, IAsyncEnumerable<TSecond> second, Func<TFirst, TSecond, CancellationToken, ValueTask<TResult>> selector)
  88. {
  89. if (first == null)
  90. throw Error.ArgumentNull(nameof(first));
  91. if (second == null)
  92. throw Error.ArgumentNull(nameof(second));
  93. if (selector == null)
  94. throw Error.ArgumentNull(nameof(selector));
  95. #if USE_ASYNC_ITERATOR
  96. return Create(Core);
  97. async IAsyncEnumerator<TResult> Core(CancellationToken cancellationToken)
  98. {
  99. var e1 = first.GetConfiguredAsyncEnumerator(cancellationToken, false);
  100. try // TODO: Switch to `await using` in preview 3 (https://github.com/dotnet/roslyn/pull/32731)
  101. {
  102. var e2 = second.GetConfiguredAsyncEnumerator(cancellationToken, false);
  103. try // TODO: Switch to `await using` in preview 3 (https://github.com/dotnet/roslyn/pull/32731)
  104. {
  105. while (await e1.MoveNextAsync() && await e2.MoveNextAsync())
  106. {
  107. yield return await selector(e1.Current, e2.Current, cancellationToken).ConfigureAwait(false);
  108. }
  109. }
  110. finally
  111. {
  112. await e2.DisposeAsync();
  113. }
  114. }
  115. finally
  116. {
  117. await e1.DisposeAsync();
  118. }
  119. }
  120. #else
  121. return new ZipAsyncIteratorWithTaskAndCancellation<TFirst, TSecond, TResult>(first, second, selector);
  122. #endif
  123. }
  124. #endif
  125. #if !USE_ASYNC_ITERATOR
  126. private sealed class ZipAsyncIterator<TFirst, TSecond, TResult> : AsyncIterator<TResult>
  127. {
  128. private readonly IAsyncEnumerable<TFirst> _first;
  129. private readonly IAsyncEnumerable<TSecond> _second;
  130. private readonly Func<TFirst, TSecond, TResult> _selector;
  131. private IAsyncEnumerator<TFirst> _firstEnumerator;
  132. private IAsyncEnumerator<TSecond> _secondEnumerator;
  133. public ZipAsyncIterator(IAsyncEnumerable<TFirst> first, IAsyncEnumerable<TSecond> second, Func<TFirst, TSecond, TResult> selector)
  134. {
  135. Debug.Assert(first != null);
  136. Debug.Assert(second != null);
  137. Debug.Assert(selector != null);
  138. _first = first;
  139. _second = second;
  140. _selector = selector;
  141. }
  142. public override AsyncIteratorBase<TResult> Clone()
  143. {
  144. return new ZipAsyncIterator<TFirst, TSecond, TResult>(_first, _second, _selector);
  145. }
  146. public override async ValueTask DisposeAsync()
  147. {
  148. if (_secondEnumerator != null)
  149. {
  150. await _secondEnumerator.DisposeAsync().ConfigureAwait(false);
  151. _secondEnumerator = null;
  152. }
  153. if (_firstEnumerator != null)
  154. {
  155. await _firstEnumerator.DisposeAsync().ConfigureAwait(false);
  156. _firstEnumerator = null;
  157. }
  158. await base.DisposeAsync().ConfigureAwait(false);
  159. }
  160. protected override async ValueTask<bool> MoveNextCore()
  161. {
  162. // REVIEW: Earlier versions of this operator performed concurrent MoveNextAsync calls, which isn't a great default and
  163. // results in an unexpected source of concurrency. However, a concurrent Zip may be a worthy addition to the
  164. // API or System.Interactive.Async as a complementary implementation besides the conservative default.
  165. switch (_state)
  166. {
  167. case AsyncIteratorState.Allocated:
  168. _firstEnumerator = _first.GetAsyncEnumerator(_cancellationToken);
  169. _secondEnumerator = _second.GetAsyncEnumerator(_cancellationToken);
  170. _state = AsyncIteratorState.Iterating;
  171. goto case AsyncIteratorState.Iterating;
  172. case AsyncIteratorState.Iterating:
  173. if (await _firstEnumerator.MoveNextAsync().ConfigureAwait(false) && await _secondEnumerator.MoveNextAsync().ConfigureAwait(false))
  174. {
  175. _current = _selector(_firstEnumerator.Current, _secondEnumerator.Current);
  176. return true;
  177. }
  178. await DisposeAsync().ConfigureAwait(false);
  179. break;
  180. }
  181. return false;
  182. }
  183. }
  184. private sealed class ZipAsyncIteratorWithTask<TFirst, TSecond, TResult> : AsyncIterator<TResult>
  185. {
  186. private readonly IAsyncEnumerable<TFirst> _first;
  187. private readonly IAsyncEnumerable<TSecond> _second;
  188. private readonly Func<TFirst, TSecond, ValueTask<TResult>> _selector;
  189. private IAsyncEnumerator<TFirst> _firstEnumerator;
  190. private IAsyncEnumerator<TSecond> _secondEnumerator;
  191. public ZipAsyncIteratorWithTask(IAsyncEnumerable<TFirst> first, IAsyncEnumerable<TSecond> second, Func<TFirst, TSecond, ValueTask<TResult>> selector)
  192. {
  193. Debug.Assert(first != null);
  194. Debug.Assert(second != null);
  195. Debug.Assert(selector != null);
  196. _first = first;
  197. _second = second;
  198. _selector = selector;
  199. }
  200. public override AsyncIteratorBase<TResult> Clone()
  201. {
  202. return new ZipAsyncIteratorWithTask<TFirst, TSecond, TResult>(_first, _second, _selector);
  203. }
  204. public override async ValueTask DisposeAsync()
  205. {
  206. if (_secondEnumerator != null)
  207. {
  208. await _secondEnumerator.DisposeAsync().ConfigureAwait(false);
  209. _secondEnumerator = null;
  210. }
  211. if (_firstEnumerator != null)
  212. {
  213. await _firstEnumerator.DisposeAsync().ConfigureAwait(false);
  214. _firstEnumerator = null;
  215. }
  216. await base.DisposeAsync().ConfigureAwait(false);
  217. }
  218. protected override async ValueTask<bool> MoveNextCore()
  219. {
  220. // REVIEW: Earlier versions of this operator performed concurrent MoveNextAsync calls, which isn't a great default and
  221. // results in an unexpected source of concurrency. However, a concurrent Zip may be a worthy addition to the
  222. // API or System.Interactive.Async as a complementary implementation besides the conservative default.
  223. switch (_state)
  224. {
  225. case AsyncIteratorState.Allocated:
  226. _firstEnumerator = _first.GetAsyncEnumerator(_cancellationToken);
  227. _secondEnumerator = _second.GetAsyncEnumerator(_cancellationToken);
  228. _state = AsyncIteratorState.Iterating;
  229. goto case AsyncIteratorState.Iterating;
  230. case AsyncIteratorState.Iterating:
  231. if (await _firstEnumerator.MoveNextAsync().ConfigureAwait(false) && await _secondEnumerator.MoveNextAsync().ConfigureAwait(false))
  232. {
  233. _current = await _selector(_firstEnumerator.Current, _secondEnumerator.Current).ConfigureAwait(false);
  234. return true;
  235. }
  236. await DisposeAsync().ConfigureAwait(false);
  237. break;
  238. }
  239. return false;
  240. }
  241. }
  242. #if !NO_DEEP_CANCELLATION
  243. private sealed class ZipAsyncIteratorWithTaskAndCancellation<TFirst, TSecond, TResult> : AsyncIterator<TResult>
  244. {
  245. private readonly IAsyncEnumerable<TFirst> _first;
  246. private readonly IAsyncEnumerable<TSecond> _second;
  247. private readonly Func<TFirst, TSecond, CancellationToken, ValueTask<TResult>> _selector;
  248. private IAsyncEnumerator<TFirst> _firstEnumerator;
  249. private IAsyncEnumerator<TSecond> _secondEnumerator;
  250. public ZipAsyncIteratorWithTaskAndCancellation(IAsyncEnumerable<TFirst> first, IAsyncEnumerable<TSecond> second, Func<TFirst, TSecond, CancellationToken, ValueTask<TResult>> selector)
  251. {
  252. Debug.Assert(first != null);
  253. Debug.Assert(second != null);
  254. Debug.Assert(selector != null);
  255. _first = first;
  256. _second = second;
  257. _selector = selector;
  258. }
  259. public override AsyncIteratorBase<TResult> Clone()
  260. {
  261. return new ZipAsyncIteratorWithTaskAndCancellation<TFirst, TSecond, TResult>(_first, _second, _selector);
  262. }
  263. public override async ValueTask DisposeAsync()
  264. {
  265. if (_secondEnumerator != null)
  266. {
  267. await _secondEnumerator.DisposeAsync().ConfigureAwait(false);
  268. _secondEnumerator = null;
  269. }
  270. if (_firstEnumerator != null)
  271. {
  272. await _firstEnumerator.DisposeAsync().ConfigureAwait(false);
  273. _firstEnumerator = null;
  274. }
  275. await base.DisposeAsync().ConfigureAwait(false);
  276. }
  277. protected override async ValueTask<bool> MoveNextCore()
  278. {
  279. // REVIEW: Earlier versions of this operator performed concurrent MoveNextAsync calls, which isn't a great default and
  280. // results in an unexpected source of concurrency. However, a concurrent Zip may be a worthy addition to the
  281. // API or System.Interactive.Async as a complementary implementation besides the conservative default.
  282. switch (_state)
  283. {
  284. case AsyncIteratorState.Allocated:
  285. _firstEnumerator = _first.GetAsyncEnumerator(_cancellationToken);
  286. _secondEnumerator = _second.GetAsyncEnumerator(_cancellationToken);
  287. _state = AsyncIteratorState.Iterating;
  288. goto case AsyncIteratorState.Iterating;
  289. case AsyncIteratorState.Iterating:
  290. if (await _firstEnumerator.MoveNextAsync().ConfigureAwait(false) && await _secondEnumerator.MoveNextAsync().ConfigureAwait(false))
  291. {
  292. _current = await _selector(_firstEnumerator.Current, _secondEnumerator.Current, _cancellationToken).ConfigureAwait(false);
  293. return true;
  294. }
  295. await DisposeAsync().ConfigureAwait(false);
  296. break;
  297. }
  298. return false;
  299. }
  300. }
  301. #endif
  302. #endif
  303. }
  304. }