Catch.cs 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Diagnostics;
  6. using System.Runtime.ExceptionServices;
  7. using System.Threading;
  8. using System.Threading.Tasks;
  9. namespace System.Linq
  10. {
  11. public static partial class AsyncEnumerableEx
  12. {
  13. // REVIEW: All Catch operators may catch OperationCanceledException due to cancellation of the enumeration
  14. // of the source. Should we explicitly avoid handling this? E.g. as follows:
  15. //
  16. // catch (TException ex) when(!(ex is OperationCanceledException oce && oce.CancellationToken == cancellationToken))
  17. public static IAsyncEnumerable<TSource> Catch<TSource, TException>(this IAsyncEnumerable<TSource> source, Func<TException, IAsyncEnumerable<TSource>> handler)
  18. where TException : Exception
  19. {
  20. if (source == null)
  21. throw Error.ArgumentNull(nameof(source));
  22. if (handler == null)
  23. throw Error.ArgumentNull(nameof(handler));
  24. #if USE_ASYNC_ITERATOR
  25. return AsyncEnumerable.Create(Core);
  26. async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
  27. {
  28. // REVIEW: This implementation mirrors the Ix implementation, which does not protect GetEnumerator
  29. // using the try statement either. A more trivial implementation would use await foreach
  30. // and protect the entire loop using a try statement, with two breaking changes:
  31. //
  32. // - Also protecting the call to GetAsyncEnumerator by the try statement.
  33. // - Invocation of the handler after disposal of the failed first sequence.
  34. var err = default(IAsyncEnumerable<TSource>);
  35. var e = source.GetConfiguredAsyncEnumerator(cancellationToken, false);
  36. try // TODO: Switch to `await using` in preview 3 (https://github.com/dotnet/roslyn/pull/32731)
  37. {
  38. while (true)
  39. {
  40. var c = default(TSource);
  41. try
  42. {
  43. if (!await e.MoveNextAsync())
  44. break;
  45. c = e.Current;
  46. }
  47. catch (TException ex)
  48. {
  49. err = handler(ex);
  50. break;
  51. }
  52. yield return c;
  53. }
  54. }
  55. finally
  56. {
  57. await e.DisposeAsync();
  58. }
  59. if (err != null)
  60. {
  61. await foreach (var item in err.WithCancellation(cancellationToken).ConfigureAwait(false))
  62. {
  63. yield return item;
  64. }
  65. }
  66. }
  67. #else
  68. return new CatchAsyncIterator<TSource, TException>(source, handler);
  69. #endif
  70. }
  71. public static IAsyncEnumerable<TSource> Catch<TSource, TException>(this IAsyncEnumerable<TSource> source, Func<TException, ValueTask<IAsyncEnumerable<TSource>>> handler)
  72. where TException : Exception
  73. {
  74. if (source == null)
  75. throw Error.ArgumentNull(nameof(source));
  76. if (handler == null)
  77. throw Error.ArgumentNull(nameof(handler));
  78. #if USE_ASYNC_ITERATOR
  79. return AsyncEnumerable.Create(Core);
  80. async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
  81. {
  82. // REVIEW: This implementation mirrors the Ix implementation, which does not protect GetEnumerator
  83. // using the try statement either. A more trivial implementation would use await foreach
  84. // and protect the entire loop using a try statement, with two breaking changes:
  85. //
  86. // - Also protecting the call to GetAsyncEnumerator by the try statement.
  87. // - Invocation of the handler after disposal of the failed first sequence.
  88. var err = default(IAsyncEnumerable<TSource>);
  89. var e = source.GetConfiguredAsyncEnumerator(cancellationToken, false);
  90. try // TODO: Switch to `await using` in preview 3 (https://github.com/dotnet/roslyn/pull/32731)
  91. {
  92. while (true)
  93. {
  94. var c = default(TSource);
  95. try
  96. {
  97. if (!await e.MoveNextAsync())
  98. break;
  99. c = e.Current;
  100. }
  101. catch (TException ex)
  102. {
  103. err = await handler(ex).ConfigureAwait(false);
  104. break;
  105. }
  106. yield return c;
  107. }
  108. }
  109. finally
  110. {
  111. await e.DisposeAsync();
  112. }
  113. if (err != null)
  114. {
  115. await foreach (var item in err.WithCancellation(cancellationToken).ConfigureAwait(false))
  116. {
  117. yield return item;
  118. }
  119. }
  120. }
  121. #else
  122. return new CatchAsyncIteratorWithTask<TSource, TException>(source, handler);
  123. #endif
  124. }
  125. #if !NO_DEEP_CANCELLATION
  126. public static IAsyncEnumerable<TSource> Catch<TSource, TException>(this IAsyncEnumerable<TSource> source, Func<TException, CancellationToken, ValueTask<IAsyncEnumerable<TSource>>> handler)
  127. where TException : Exception
  128. {
  129. if (source == null)
  130. throw Error.ArgumentNull(nameof(source));
  131. if (handler == null)
  132. throw Error.ArgumentNull(nameof(handler));
  133. #if USE_ASYNC_ITERATOR
  134. return AsyncEnumerable.Create(Core);
  135. async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
  136. {
  137. // REVIEW: This implementation mirrors the Ix implementation, which does not protect GetEnumerator
  138. // using the try statement either. A more trivial implementation would use await foreach
  139. // and protect the entire loop using a try statement, with two breaking changes:
  140. //
  141. // - Also protecting the call to GetAsyncEnumerator by the try statement.
  142. // - Invocation of the handler after disposal of the failed first sequence.
  143. var err = default(IAsyncEnumerable<TSource>);
  144. var e = source.GetConfiguredAsyncEnumerator(cancellationToken, false);
  145. try // TODO: Switch to `await using` in preview 3 (https://github.com/dotnet/roslyn/pull/32731)
  146. {
  147. while (true)
  148. {
  149. var c = default(TSource);
  150. try
  151. {
  152. if (!await e.MoveNextAsync())
  153. break;
  154. c = e.Current;
  155. }
  156. catch (TException ex)
  157. {
  158. err = await handler(ex, cancellationToken).ConfigureAwait(false);
  159. break;
  160. }
  161. yield return c;
  162. }
  163. }
  164. finally
  165. {
  166. await e.DisposeAsync();
  167. }
  168. if (err != null)
  169. {
  170. await foreach (var item in err.WithCancellation(cancellationToken).ConfigureAwait(false))
  171. {
  172. yield return item;
  173. }
  174. }
  175. }
  176. #else
  177. return new CatchAsyncIteratorWithTaskAndCancellation<TSource, TException>(source, handler);
  178. #endif
  179. }
  180. #endif
  181. public static IAsyncEnumerable<TSource> Catch<TSource>(this IEnumerable<IAsyncEnumerable<TSource>> sources)
  182. {
  183. if (sources == null)
  184. throw Error.ArgumentNull(nameof(sources));
  185. return CatchCore(sources);
  186. }
  187. public static IAsyncEnumerable<TSource> Catch<TSource>(params IAsyncEnumerable<TSource>[] sources)
  188. {
  189. if (sources == null)
  190. throw Error.ArgumentNull(nameof(sources));
  191. return CatchCore(sources);
  192. }
  193. public static IAsyncEnumerable<TSource> Catch<TSource>(this IAsyncEnumerable<TSource> first, IAsyncEnumerable<TSource> second)
  194. {
  195. if (first == null)
  196. throw Error.ArgumentNull(nameof(first));
  197. if (second == null)
  198. throw Error.ArgumentNull(nameof(second));
  199. return CatchCore(new[] { first, second });
  200. }
  201. private static IAsyncEnumerable<TSource> CatchCore<TSource>(IEnumerable<IAsyncEnumerable<TSource>> sources)
  202. {
  203. #if USE_ASYNC_ITERATOR
  204. return AsyncEnumerable.Create(Core);
  205. async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
  206. {
  207. var error = default(ExceptionDispatchInfo);
  208. foreach (var source in sources)
  209. {
  210. var e = source.GetConfiguredAsyncEnumerator(cancellationToken, false);
  211. try // TODO: Switch to `await using` in preview 3 (https://github.com/dotnet/roslyn/pull/32731)
  212. {
  213. error = null;
  214. while (true)
  215. {
  216. var c = default(TSource);
  217. try
  218. {
  219. if (!await e.MoveNextAsync())
  220. break;
  221. c = e.Current;
  222. }
  223. catch (Exception ex)
  224. {
  225. error = ExceptionDispatchInfo.Capture(ex);
  226. break;
  227. }
  228. yield return c;
  229. }
  230. if (error == null)
  231. break;
  232. }
  233. finally
  234. {
  235. await e.DisposeAsync();
  236. }
  237. }
  238. error?.Throw();
  239. }
  240. #else
  241. return new CatchAsyncIterator<TSource>(sources);
  242. #endif
  243. }
  244. #if !USE_ASYNC_ITERATOR
  245. private sealed class CatchAsyncIterator<TSource, TException> : AsyncIterator<TSource> where TException : Exception
  246. {
  247. private readonly Func<TException, IAsyncEnumerable<TSource>> _handler;
  248. private readonly IAsyncEnumerable<TSource> _source;
  249. private IAsyncEnumerator<TSource> _enumerator;
  250. private bool _isDone;
  251. public CatchAsyncIterator(IAsyncEnumerable<TSource> source, Func<TException, IAsyncEnumerable<TSource>> handler)
  252. {
  253. Debug.Assert(source != null);
  254. Debug.Assert(handler != null);
  255. _source = source;
  256. _handler = handler;
  257. }
  258. public override AsyncIteratorBase<TSource> Clone()
  259. {
  260. return new CatchAsyncIterator<TSource, TException>(_source, _handler);
  261. }
  262. public override async ValueTask DisposeAsync()
  263. {
  264. if (_enumerator != null)
  265. {
  266. await _enumerator.DisposeAsync().ConfigureAwait(false);
  267. _enumerator = null;
  268. }
  269. await base.DisposeAsync().ConfigureAwait(false);
  270. }
  271. protected override async ValueTask<bool> MoveNextCore()
  272. {
  273. switch (_state)
  274. {
  275. case AsyncIteratorState.Allocated:
  276. _enumerator = _source.GetAsyncEnumerator(_cancellationToken);
  277. _isDone = false;
  278. _state = AsyncIteratorState.Iterating;
  279. goto case AsyncIteratorState.Iterating;
  280. case AsyncIteratorState.Iterating:
  281. while (true)
  282. {
  283. if (!_isDone)
  284. {
  285. try
  286. {
  287. if (await _enumerator.MoveNextAsync().ConfigureAwait(false))
  288. {
  289. _current = _enumerator.Current;
  290. return true;
  291. }
  292. }
  293. catch (TException ex)
  294. {
  295. // Note: Ideally we'd dispose of the previous enumerator before
  296. // invoking the handler, but we use this order to preserve
  297. // current behavior
  298. var inner = _handler(ex);
  299. var err = inner.GetAsyncEnumerator(_cancellationToken);
  300. if (_enumerator != null)
  301. {
  302. await _enumerator.DisposeAsync().ConfigureAwait(false);
  303. }
  304. _enumerator = err;
  305. _isDone = true;
  306. continue; // loop so we hit the catch state
  307. }
  308. }
  309. if (await _enumerator.MoveNextAsync().ConfigureAwait(false))
  310. {
  311. _current = _enumerator.Current;
  312. return true;
  313. }
  314. break; // while
  315. }
  316. break; // case
  317. }
  318. await DisposeAsync().ConfigureAwait(false);
  319. return false;
  320. }
  321. }
  322. private sealed class CatchAsyncIteratorWithTask<TSource, TException> : AsyncIterator<TSource> where TException : Exception
  323. {
  324. private readonly Func<TException, ValueTask<IAsyncEnumerable<TSource>>> _handler;
  325. private readonly IAsyncEnumerable<TSource> _source;
  326. private IAsyncEnumerator<TSource> _enumerator;
  327. private bool _isDone;
  328. public CatchAsyncIteratorWithTask(IAsyncEnumerable<TSource> source, Func<TException, ValueTask<IAsyncEnumerable<TSource>>> handler)
  329. {
  330. Debug.Assert(source != null);
  331. Debug.Assert(handler != null);
  332. _source = source;
  333. _handler = handler;
  334. }
  335. public override AsyncIteratorBase<TSource> Clone()
  336. {
  337. return new CatchAsyncIteratorWithTask<TSource, TException>(_source, _handler);
  338. }
  339. public override async ValueTask DisposeAsync()
  340. {
  341. if (_enumerator != null)
  342. {
  343. await _enumerator.DisposeAsync().ConfigureAwait(false);
  344. _enumerator = null;
  345. }
  346. await base.DisposeAsync().ConfigureAwait(false);
  347. }
  348. protected override async ValueTask<bool> MoveNextCore()
  349. {
  350. switch (_state)
  351. {
  352. case AsyncIteratorState.Allocated:
  353. _enumerator = _source.GetAsyncEnumerator(_cancellationToken);
  354. _isDone = false;
  355. _state = AsyncIteratorState.Iterating;
  356. goto case AsyncIteratorState.Iterating;
  357. case AsyncIteratorState.Iterating:
  358. while (true)
  359. {
  360. if (!_isDone)
  361. {
  362. try
  363. {
  364. if (await _enumerator.MoveNextAsync().ConfigureAwait(false))
  365. {
  366. _current = _enumerator.Current;
  367. return true;
  368. }
  369. }
  370. catch (TException ex)
  371. {
  372. // Note: Ideally we'd dispose of the previous enumerator before
  373. // invoking the handler, but we use this order to preserve
  374. // current behavior
  375. var inner = await _handler(ex).ConfigureAwait(false);
  376. var err = inner.GetAsyncEnumerator(_cancellationToken);
  377. if (_enumerator != null)
  378. {
  379. await _enumerator.DisposeAsync().ConfigureAwait(false);
  380. }
  381. _enumerator = err;
  382. _isDone = true;
  383. continue; // loop so we hit the catch state
  384. }
  385. }
  386. if (await _enumerator.MoveNextAsync().ConfigureAwait(false))
  387. {
  388. _current = _enumerator.Current;
  389. return true;
  390. }
  391. break; // while
  392. }
  393. break; // case
  394. }
  395. await DisposeAsync().ConfigureAwait(false);
  396. return false;
  397. }
  398. }
  399. #if !NO_DEEP_CANCELLATION
  400. private sealed class CatchAsyncIteratorWithTaskAndCancellation<TSource, TException> : AsyncIterator<TSource> where TException : Exception
  401. {
  402. private readonly Func<TException, CancellationToken, ValueTask<IAsyncEnumerable<TSource>>> _handler;
  403. private readonly IAsyncEnumerable<TSource> _source;
  404. private IAsyncEnumerator<TSource> _enumerator;
  405. private bool _isDone;
  406. public CatchAsyncIteratorWithTaskAndCancellation(IAsyncEnumerable<TSource> source, Func<TException, CancellationToken, ValueTask<IAsyncEnumerable<TSource>>> handler)
  407. {
  408. Debug.Assert(source != null);
  409. Debug.Assert(handler != null);
  410. _source = source;
  411. _handler = handler;
  412. }
  413. public override AsyncIteratorBase<TSource> Clone()
  414. {
  415. return new CatchAsyncIteratorWithTaskAndCancellation<TSource, TException>(_source, _handler);
  416. }
  417. public override async ValueTask DisposeAsync()
  418. {
  419. if (_enumerator != null)
  420. {
  421. await _enumerator.DisposeAsync().ConfigureAwait(false);
  422. _enumerator = null;
  423. }
  424. await base.DisposeAsync().ConfigureAwait(false);
  425. }
  426. protected override async ValueTask<bool> MoveNextCore()
  427. {
  428. switch (_state)
  429. {
  430. case AsyncIteratorState.Allocated:
  431. _enumerator = _source.GetAsyncEnumerator(_cancellationToken);
  432. _isDone = false;
  433. _state = AsyncIteratorState.Iterating;
  434. goto case AsyncIteratorState.Iterating;
  435. case AsyncIteratorState.Iterating:
  436. while (true)
  437. {
  438. if (!_isDone)
  439. {
  440. try
  441. {
  442. if (await _enumerator.MoveNextAsync().ConfigureAwait(false))
  443. {
  444. _current = _enumerator.Current;
  445. return true;
  446. }
  447. }
  448. catch (TException ex)
  449. {
  450. // Note: Ideally we'd dispose of the previous enumerator before
  451. // invoking the handler, but we use this order to preserve
  452. // current behavior
  453. var inner = await _handler(ex, _cancellationToken).ConfigureAwait(false);
  454. var err = inner.GetAsyncEnumerator(_cancellationToken);
  455. if (_enumerator != null)
  456. {
  457. await _enumerator.DisposeAsync().ConfigureAwait(false);
  458. }
  459. _enumerator = err;
  460. _isDone = true;
  461. continue; // loop so we hit the catch state
  462. }
  463. }
  464. if (await _enumerator.MoveNextAsync().ConfigureAwait(false))
  465. {
  466. _current = _enumerator.Current;
  467. return true;
  468. }
  469. break; // while
  470. }
  471. break; // case
  472. }
  473. await DisposeAsync().ConfigureAwait(false);
  474. return false;
  475. }
  476. }
  477. #endif
  478. private sealed class CatchAsyncIterator<TSource> : AsyncIterator<TSource>
  479. {
  480. private readonly IEnumerable<IAsyncEnumerable<TSource>> _sources;
  481. private IAsyncEnumerator<TSource> _enumerator;
  482. private ExceptionDispatchInfo _error;
  483. private IEnumerator<IAsyncEnumerable<TSource>> _sourcesEnumerator;
  484. public CatchAsyncIterator(IEnumerable<IAsyncEnumerable<TSource>> sources)
  485. {
  486. Debug.Assert(sources != null);
  487. _sources = sources;
  488. }
  489. public override AsyncIteratorBase<TSource> Clone()
  490. {
  491. return new CatchAsyncIterator<TSource>(_sources);
  492. }
  493. public override async ValueTask DisposeAsync()
  494. {
  495. if (_sourcesEnumerator != null)
  496. {
  497. _sourcesEnumerator.Dispose();
  498. _sourcesEnumerator = null;
  499. }
  500. if (_enumerator != null)
  501. {
  502. await _enumerator.DisposeAsync().ConfigureAwait(false);
  503. _enumerator = null;
  504. }
  505. _error = null;
  506. await base.DisposeAsync().ConfigureAwait(false);
  507. }
  508. protected override async ValueTask<bool> MoveNextCore()
  509. {
  510. switch (_state)
  511. {
  512. case AsyncIteratorState.Allocated:
  513. _sourcesEnumerator = _sources.GetEnumerator();
  514. _state = AsyncIteratorState.Iterating;
  515. goto case AsyncIteratorState.Iterating;
  516. case AsyncIteratorState.Iterating:
  517. while (true)
  518. {
  519. if (_enumerator == null)
  520. {
  521. if (!_sourcesEnumerator.MoveNext())
  522. {
  523. // only throw if we have an error on the last one
  524. _error?.Throw();
  525. break; // done, nothing else to do
  526. }
  527. _error = null;
  528. _enumerator = _sourcesEnumerator.Current.GetAsyncEnumerator(_cancellationToken);
  529. }
  530. try
  531. {
  532. if (await _enumerator.MoveNextAsync().ConfigureAwait(false))
  533. {
  534. _current = _enumerator.Current;
  535. return true;
  536. }
  537. }
  538. catch (Exception ex)
  539. {
  540. // Done with the current one, go to the next
  541. await _enumerator.DisposeAsync().ConfigureAwait(false);
  542. _enumerator = null;
  543. _error = ExceptionDispatchInfo.Capture(ex);
  544. continue;
  545. }
  546. break; // while
  547. }
  548. break; // case
  549. }
  550. await DisposeAsync().ConfigureAwait(false);
  551. return false;
  552. }
  553. }
  554. #endif
  555. }
  556. }