Retry.cs 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Linq;
  5. using System.Reactive.Disposables;
  6. namespace System.Reactive.Linq
  7. {
  8. public partial class AsyncObservable
  9. {
  10. public static IAsyncObservable<TSource> Retry<TSource>(this IAsyncObservable<TSource> source)
  11. {
  12. if (source == null)
  13. throw new ArgumentNullException(nameof(source));
  14. return Create(
  15. source,
  16. async static (source, observer) =>
  17. {
  18. var (sink, inner) = AsyncObserver.Retry(observer, source);
  19. var subscription = await source.SubscribeSafeAsync(sink).ConfigureAwait(false);
  20. return StableCompositeAsyncDisposable.Create(subscription, inner);
  21. });
  22. }
  23. public static IAsyncObservable<TSource> Retry<TSource>(this IAsyncObservable<TSource> source, int retryCount)
  24. {
  25. if (source == null)
  26. throw new ArgumentNullException(nameof(source));
  27. if (retryCount < 0)
  28. throw new ArgumentOutOfRangeException(nameof(retryCount));
  29. return Create(
  30. source,
  31. retryCount,
  32. default(TSource),
  33. async (source, retryCount, observer) =>
  34. {
  35. var (sink, inner) = AsyncObserver.Retry(observer, source, retryCount);
  36. var subscription = await source.SubscribeSafeAsync(sink).ConfigureAwait(false);
  37. return StableCompositeAsyncDisposable.Create(subscription, inner);
  38. });
  39. }
  40. }
  41. public partial class AsyncObserver
  42. {
  43. public static (IAsyncObserver<TSource>, IAsyncDisposable) Retry<TSource>(IAsyncObserver<TSource> observer, IAsyncObservable<TSource> source)
  44. {
  45. if (observer == null)
  46. throw new ArgumentNullException(nameof(observer));
  47. if (source == null)
  48. throw new ArgumentNullException(nameof(source));
  49. return Catch(observer, Repeat(source).GetEnumerator());
  50. }
  51. public static (IAsyncObserver<TSource>, IAsyncDisposable) Retry<TSource>(IAsyncObserver<TSource> observer, IAsyncObservable<TSource> source, int retryCount)
  52. {
  53. if (observer == null)
  54. throw new ArgumentNullException(nameof(observer));
  55. if (source == null)
  56. throw new ArgumentNullException(nameof(source));
  57. if (retryCount < 0)
  58. throw new ArgumentOutOfRangeException(nameof(retryCount));
  59. return Catch(observer, Enumerable.Repeat(source, retryCount).GetEnumerator());
  60. }
  61. }
  62. }