Retry.cs 2.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Linq;
  5. using System.Reactive.Disposables;
  6. namespace System.Reactive.Linq
  7. {
  8. public partial class AsyncObservable
  9. {
  10. public static IAsyncObservable<TSource> Retry<TSource>(this IAsyncObservable<TSource> source)
  11. {
  12. if (source == null)
  13. throw new ArgumentNullException(nameof(source));
  14. return Create(
  15. source,
  16. async static (source, observer) =>
  17. {
  18. var (sink, inner) = AsyncObserver.Retry(observer, source);
  19. var subscription = await source.SubscribeSafeAsync(sink).ConfigureAwait(false);
  20. return StableCompositeAsyncDisposable.Create(subscription, inner);
  21. });
  22. }
  23. public static IAsyncObservable<TSource> Retry<TSource>(this IAsyncObservable<TSource> source, int retryCount)
  24. {
  25. if (source == null)
  26. throw new ArgumentNullException(nameof(source));
  27. if (retryCount < 0)
  28. throw new ArgumentOutOfRangeException(nameof(retryCount));
  29. return CreateAsyncObservable<TSource>.From(
  30. source,
  31. retryCount,
  32. static async (source, retryCount, observer) =>
  33. {
  34. var (sink, inner) = AsyncObserver.Retry(observer, source, retryCount);
  35. var subscription = await source.SubscribeSafeAsync(sink).ConfigureAwait(false);
  36. return StableCompositeAsyncDisposable.Create(subscription, inner);
  37. });
  38. }
  39. }
  40. public partial class AsyncObserver
  41. {
  42. public static (IAsyncObserver<TSource>, IAsyncDisposable) Retry<TSource>(IAsyncObserver<TSource> observer, IAsyncObservable<TSource> source)
  43. {
  44. if (observer == null)
  45. throw new ArgumentNullException(nameof(observer));
  46. if (source == null)
  47. throw new ArgumentNullException(nameof(source));
  48. return Catch(observer, Repeat(source).GetEnumerator());
  49. }
  50. public static (IAsyncObserver<TSource>, IAsyncDisposable) Retry<TSource>(IAsyncObserver<TSource> observer, IAsyncObservable<TSource> source, int retryCount)
  51. {
  52. if (observer == null)
  53. throw new ArgumentNullException(nameof(observer));
  54. if (source == null)
  55. throw new ArgumentNullException(nameof(source));
  56. if (retryCount < 0)
  57. throw new ArgumentOutOfRangeException(nameof(retryCount));
  58. return Catch(observer, Enumerable.Repeat(source, retryCount).GetEnumerator());
  59. }
  60. }
  61. }