1
0

AsyncEnumerable.Exceptions.cs 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Threading;
  8. using System.Threading.Tasks;
  9. namespace System.Linq
  10. {
  11. public static partial class AsyncEnumerable
  12. {
  13. public static IAsyncEnumerable<TSource> Catch<TSource, TException>(this IAsyncEnumerable<TSource> source, Func<TException, IAsyncEnumerable<TSource>> handler)
  14. where TException : Exception
  15. {
  16. if (source == null)
  17. throw new ArgumentNullException("source");
  18. if (handler == null)
  19. throw new ArgumentNullException("handler");
  20. return Create(() =>
  21. {
  22. var e = source.GetEnumerator();
  23. var cts = new CancellationTokenDisposable();
  24. var a = new AssignableDisposable { Disposable = e };
  25. var d = Disposable.Create(cts, a);
  26. var done = false;
  27. var f = default(Action<TaskCompletionSource<bool>, CancellationToken>);
  28. f = (tcs, ct) =>
  29. {
  30. if (!done)
  31. {
  32. e.MoveNext(ct).Then(t =>
  33. {
  34. t.Handle(tcs,
  35. res =>
  36. {
  37. tcs.TrySetResult(res);
  38. },
  39. ex =>
  40. {
  41. var err = default(IAsyncEnumerator<TSource>);
  42. try
  43. {
  44. ex.Flatten().Handle(ex_ =>
  45. {
  46. var exx = ex_ as TException;
  47. if (exx != null)
  48. {
  49. err = handler(exx).GetEnumerator();
  50. return true;
  51. }
  52. return false;
  53. });
  54. }
  55. catch (Exception ex2)
  56. {
  57. tcs.TrySetException(ex2);
  58. return;
  59. }
  60. if (err != null)
  61. {
  62. e = err;
  63. a.Disposable = e;
  64. done = true;
  65. f(tcs, ct);
  66. }
  67. }
  68. );
  69. });
  70. }
  71. else
  72. {
  73. e.MoveNext(ct).Then(t =>
  74. {
  75. t.Handle(tcs, res =>
  76. {
  77. tcs.TrySetResult(res);
  78. });
  79. });
  80. }
  81. };
  82. return Create(
  83. (ct, tcs) =>
  84. {
  85. f(tcs, cts.Token);
  86. return tcs.Task.UsingEnumerator(a);
  87. },
  88. () => e.Current,
  89. d.Dispose
  90. );
  91. });
  92. }
  93. public static IAsyncEnumerable<TSource> Catch<TSource>(this IEnumerable<IAsyncEnumerable<TSource>> sources)
  94. {
  95. if (sources == null)
  96. throw new ArgumentNullException("sources");
  97. return sources.Catch_();
  98. }
  99. public static IAsyncEnumerable<TSource> Catch<TSource>(params IAsyncEnumerable<TSource>[] sources)
  100. {
  101. if (sources == null)
  102. throw new ArgumentNullException("sources");
  103. return sources.Catch_();
  104. }
  105. public static IAsyncEnumerable<TSource> Catch<TSource>(this IAsyncEnumerable<TSource> first, IAsyncEnumerable<TSource> second)
  106. {
  107. if (first == null)
  108. throw new ArgumentNullException("first");
  109. if (second == null)
  110. throw new ArgumentNullException("second");
  111. return new[] { first, second }.Catch_();
  112. }
  113. private static IAsyncEnumerable<TSource> Catch_<TSource>(this IEnumerable<IAsyncEnumerable<TSource>> sources)
  114. {
  115. return Create(() =>
  116. {
  117. var se = sources.GetEnumerator();
  118. var e = default(IAsyncEnumerator<TSource>);
  119. var cts = new CancellationTokenDisposable();
  120. var a = new AssignableDisposable();
  121. var d = Disposable.Create(cts, se, a);
  122. var error = default(Exception);
  123. var f = default(Action<TaskCompletionSource<bool>, CancellationToken>);
  124. f = (tcs, ct) =>
  125. {
  126. if (e == null)
  127. {
  128. var b = false;
  129. try
  130. {
  131. b = se.MoveNext();
  132. if (b)
  133. e = se.Current.GetEnumerator();
  134. }
  135. catch (Exception ex)
  136. {
  137. tcs.TrySetException(ex);
  138. return;
  139. }
  140. if (!b)
  141. {
  142. if (error != null)
  143. {
  144. tcs.TrySetException(error);
  145. return;
  146. }
  147. tcs.TrySetResult(false);
  148. return;
  149. }
  150. error = null;
  151. a.Disposable = e;
  152. }
  153. e.MoveNext(ct).Then(t =>
  154. {
  155. t.Handle(tcs,
  156. res =>
  157. {
  158. tcs.TrySetResult(res);
  159. },
  160. ex =>
  161. {
  162. e.Dispose();
  163. e = null;
  164. error = ex;
  165. f(tcs, ct);
  166. }
  167. );
  168. });
  169. };
  170. return Create(
  171. (ct, tcs) =>
  172. {
  173. f(tcs, cts.Token);
  174. return tcs.Task.UsingEnumerator(a);
  175. },
  176. () => e.Current,
  177. d.Dispose
  178. );
  179. });
  180. }
  181. public static IAsyncEnumerable<TSource> Finally<TSource>(this IAsyncEnumerable<TSource> source, Action finallyAction)
  182. {
  183. if (source == null)
  184. throw new ArgumentNullException("source");
  185. if (finallyAction == null)
  186. throw new ArgumentNullException("finallyAction");
  187. return Create(() =>
  188. {
  189. var e = source.GetEnumerator();
  190. var cts = new CancellationTokenDisposable();
  191. var r = new Disposable(finallyAction);
  192. var d = Disposable.Create(cts, e, r);
  193. var f = default(Action<TaskCompletionSource<bool>, CancellationToken>);
  194. f = (tcs, ct) =>
  195. {
  196. e.MoveNext(ct).Then(t =>
  197. {
  198. t.Handle(tcs, res =>
  199. {
  200. tcs.TrySetResult(res);
  201. });
  202. });
  203. };
  204. return Create(
  205. (ct, tcs) =>
  206. {
  207. f(tcs, cts.Token);
  208. return tcs.Task.UsingEnumerator(r);
  209. },
  210. () => e.Current,
  211. d.Dispose
  212. );
  213. });
  214. }
  215. public static IAsyncEnumerable<TSource> OnErrorResumeNext<TSource>(this IAsyncEnumerable<TSource> first, IAsyncEnumerable<TSource> second)
  216. {
  217. if (first == null)
  218. throw new ArgumentNullException("first");
  219. if (second == null)
  220. throw new ArgumentNullException("second");
  221. return OnErrorResumeNext_(new[] { first, second });
  222. }
  223. public static IAsyncEnumerable<TSource> OnErrorResumeNext<TSource>(params IAsyncEnumerable<TSource>[] sources)
  224. {
  225. if (sources == null)
  226. throw new ArgumentNullException("sources");
  227. return OnErrorResumeNext_(sources);
  228. }
  229. public static IAsyncEnumerable<TSource> OnErrorResumeNext<TSource>(this IEnumerable<IAsyncEnumerable<TSource>> sources)
  230. {
  231. if (sources == null)
  232. throw new ArgumentNullException("sources");
  233. return OnErrorResumeNext_(sources);
  234. }
  235. private static IAsyncEnumerable<TSource> OnErrorResumeNext_<TSource>(IEnumerable<IAsyncEnumerable<TSource>> sources)
  236. {
  237. return Create(() =>
  238. {
  239. var se = sources.GetEnumerator();
  240. var e = default(IAsyncEnumerator<TSource>);
  241. var cts = new CancellationTokenDisposable();
  242. var a = new AssignableDisposable();
  243. var d = Disposable.Create(cts, se, a);
  244. var f = default(Action<TaskCompletionSource<bool>, CancellationToken>);
  245. f = (tcs, ct) =>
  246. {
  247. if (e == null)
  248. {
  249. var b = false;
  250. try
  251. {
  252. b = se.MoveNext();
  253. if (b)
  254. e = se.Current.GetEnumerator();
  255. }
  256. catch (Exception ex)
  257. {
  258. tcs.TrySetException(ex);
  259. return;
  260. }
  261. if (!b)
  262. {
  263. tcs.TrySetResult(false);
  264. return;
  265. }
  266. a.Disposable = e;
  267. }
  268. e.MoveNext(ct).Then(t =>
  269. {
  270. t.Handle(tcs,
  271. res =>
  272. {
  273. if (res)
  274. {
  275. tcs.TrySetResult(true);
  276. }
  277. else
  278. {
  279. e.Dispose();
  280. e = null;
  281. f(tcs, ct);
  282. }
  283. },
  284. ex =>
  285. {
  286. e.Dispose();
  287. e = null;
  288. f(tcs, ct);
  289. }
  290. );
  291. });
  292. };
  293. return Create(
  294. (ct, tcs) =>
  295. {
  296. f(tcs, cts.Token);
  297. return tcs.Task.UsingEnumerator(a);
  298. },
  299. () => e.Current,
  300. d.Dispose
  301. );
  302. });
  303. }
  304. public static IAsyncEnumerable<TSource> Retry<TSource>(this IAsyncEnumerable<TSource> source)
  305. {
  306. if (source == null)
  307. throw new ArgumentNullException("source");
  308. return new[] { source }.Repeat().Catch();
  309. }
  310. public static IAsyncEnumerable<TSource> Retry<TSource>(this IAsyncEnumerable<TSource> source, int retryCount)
  311. {
  312. if (source == null)
  313. throw new ArgumentNullException("source");
  314. if (retryCount < 0)
  315. throw new ArgumentOutOfRangeException("retryCount");
  316. return new[] { source }.Repeat(retryCount).Catch();
  317. }
  318. private static IEnumerable<TSource> Repeat<TSource>(this IEnumerable<TSource> source)
  319. {
  320. while (true)
  321. foreach (var item in source)
  322. yield return item;
  323. }
  324. private static IEnumerable<TSource> Repeat<TSource>(this IEnumerable<TSource> source, int count)
  325. {
  326. for (var i = 0; i < count; i++)
  327. foreach (var item in source)
  328. yield return item;
  329. }
  330. }
  331. }