Catch.cs 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Runtime.ExceptionServices;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. namespace System.Linq
  9. {
  10. public static partial class AsyncEnumerableEx
  11. {
  12. // REVIEW: All Catch operators may catch OperationCanceledException due to cancellation of the enumeration
  13. // of the source. Should we explicitly avoid handling this? E.g. as follows:
  14. //
  15. // catch (TException ex) when(!(ex is OperationCanceledException oce && oce.CancellationToken == cancellationToken))
  16. public static IAsyncEnumerable<TSource> Catch<TSource, TException>(this IAsyncEnumerable<TSource> source, Func<TException, IAsyncEnumerable<TSource>> handler)
  17. where TException : Exception
  18. {
  19. if (source == null)
  20. throw Error.ArgumentNull(nameof(source));
  21. if (handler == null)
  22. throw Error.ArgumentNull(nameof(handler));
  23. return AsyncEnumerable.Create(Core);
  24. async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
  25. {
  26. // REVIEW: This implementation mirrors the Ix implementation, which does not protect GetEnumerator
  27. // using the try statement either. A more trivial implementation would use await foreach
  28. // and protect the entire loop using a try statement, with two breaking changes:
  29. //
  30. // - Also protecting the call to GetAsyncEnumerator by the try statement.
  31. // - Invocation of the handler after disposal of the failed first sequence.
  32. var err = default(IAsyncEnumerable<TSource>);
  33. await using (var e = source.GetConfiguredAsyncEnumerator(cancellationToken, false))
  34. {
  35. while (true)
  36. {
  37. TSource c;
  38. try
  39. {
  40. if (!await e.MoveNextAsync())
  41. break;
  42. c = e.Current;
  43. }
  44. catch (TException ex)
  45. {
  46. err = handler(ex);
  47. break;
  48. }
  49. yield return c;
  50. }
  51. }
  52. if (err != null)
  53. {
  54. await foreach (var item in err.WithCancellation(cancellationToken).ConfigureAwait(false))
  55. {
  56. yield return item;
  57. }
  58. }
  59. }
  60. }
  61. public static IAsyncEnumerable<TSource> Catch<TSource, TException>(this IAsyncEnumerable<TSource> source, Func<TException, ValueTask<IAsyncEnumerable<TSource>>> handler)
  62. where TException : Exception
  63. {
  64. if (source == null)
  65. throw Error.ArgumentNull(nameof(source));
  66. if (handler == null)
  67. throw Error.ArgumentNull(nameof(handler));
  68. return AsyncEnumerable.Create(Core);
  69. async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
  70. {
  71. // REVIEW: This implementation mirrors the Ix implementation, which does not protect GetEnumerator
  72. // using the try statement either. A more trivial implementation would use await foreach
  73. // and protect the entire loop using a try statement, with two breaking changes:
  74. //
  75. // - Also protecting the call to GetAsyncEnumerator by the try statement.
  76. // - Invocation of the handler after disposal of the failed first sequence.
  77. var err = default(IAsyncEnumerable<TSource>);
  78. await using (var e = source.GetConfiguredAsyncEnumerator(cancellationToken, false))
  79. {
  80. while (true)
  81. {
  82. TSource c;
  83. try
  84. {
  85. if (!await e.MoveNextAsync())
  86. break;
  87. c = e.Current;
  88. }
  89. catch (TException ex)
  90. {
  91. err = await handler(ex).ConfigureAwait(false);
  92. break;
  93. }
  94. yield return c;
  95. }
  96. }
  97. if (err != null)
  98. {
  99. await foreach (var item in err.WithCancellation(cancellationToken).ConfigureAwait(false))
  100. {
  101. yield return item;
  102. }
  103. }
  104. }
  105. }
  106. #if !NO_DEEP_CANCELLATION
  107. public static IAsyncEnumerable<TSource> Catch<TSource, TException>(this IAsyncEnumerable<TSource> source, Func<TException, CancellationToken, ValueTask<IAsyncEnumerable<TSource>>> handler)
  108. where TException : Exception
  109. {
  110. if (source == null)
  111. throw Error.ArgumentNull(nameof(source));
  112. if (handler == null)
  113. throw Error.ArgumentNull(nameof(handler));
  114. return AsyncEnumerable.Create(Core);
  115. async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
  116. {
  117. // REVIEW: This implementation mirrors the Ix implementation, which does not protect GetEnumerator
  118. // using the try statement either. A more trivial implementation would use await foreach
  119. // and protect the entire loop using a try statement, with two breaking changes:
  120. //
  121. // - Also protecting the call to GetAsyncEnumerator by the try statement.
  122. // - Invocation of the handler after disposal of the failed first sequence.
  123. var err = default(IAsyncEnumerable<TSource>);
  124. await using (var e = source.GetConfiguredAsyncEnumerator(cancellationToken, false))
  125. {
  126. while (true)
  127. {
  128. TSource c;
  129. try
  130. {
  131. if (!await e.MoveNextAsync())
  132. break;
  133. c = e.Current;
  134. }
  135. catch (TException ex)
  136. {
  137. err = await handler(ex, cancellationToken).ConfigureAwait(false);
  138. break;
  139. }
  140. yield return c;
  141. }
  142. }
  143. if (err != null)
  144. {
  145. await foreach (var item in err.WithCancellation(cancellationToken).ConfigureAwait(false))
  146. {
  147. yield return item;
  148. }
  149. }
  150. }
  151. }
  152. #endif
  153. public static IAsyncEnumerable<TSource> Catch<TSource>(this IEnumerable<IAsyncEnumerable<TSource>> sources)
  154. {
  155. if (sources == null)
  156. throw Error.ArgumentNull(nameof(sources));
  157. return CatchCore(sources);
  158. }
  159. public static IAsyncEnumerable<TSource> Catch<TSource>(params IAsyncEnumerable<TSource>[] sources)
  160. {
  161. if (sources == null)
  162. throw Error.ArgumentNull(nameof(sources));
  163. return CatchCore(sources);
  164. }
  165. public static IAsyncEnumerable<TSource> Catch<TSource>(this IAsyncEnumerable<TSource> first, IAsyncEnumerable<TSource> second)
  166. {
  167. if (first == null)
  168. throw Error.ArgumentNull(nameof(first));
  169. if (second == null)
  170. throw Error.ArgumentNull(nameof(second));
  171. return CatchCore(new[] { first, second });
  172. }
  173. private static IAsyncEnumerable<TSource> CatchCore<TSource>(IEnumerable<IAsyncEnumerable<TSource>> sources)
  174. {
  175. return AsyncEnumerable.Create(Core);
  176. async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
  177. {
  178. var error = default(ExceptionDispatchInfo);
  179. foreach (var source in sources)
  180. {
  181. await using (var e = source.GetConfiguredAsyncEnumerator(cancellationToken, false))
  182. {
  183. error = null;
  184. while (true)
  185. {
  186. TSource c;
  187. try
  188. {
  189. if (!await e.MoveNextAsync())
  190. break;
  191. c = e.Current;
  192. }
  193. catch (Exception ex)
  194. {
  195. error = ExceptionDispatchInfo.Capture(ex);
  196. break;
  197. }
  198. yield return c;
  199. }
  200. if (error == null)
  201. break;
  202. }
  203. }
  204. error?.Throw();
  205. }
  206. }
  207. }
  208. }