AsyncObservable.cs 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Reactive.Disposables;
  5. using System.Threading.Tasks;
  6. namespace System.Reactive.Linq
  7. {
  8. public static partial class AsyncObservable
  9. {
  10. internal static class CreateAsyncObservable<TResult>
  11. {
  12. private sealed class AsyncObservableImpl<TSource> : AsyncObservableBase<TResult>
  13. {
  14. private readonly IAsyncObservable<TSource> _source;
  15. private readonly Func<IAsyncObservable<TSource>, IAsyncObserver<TResult>, ValueTask<IAsyncDisposable>> _subscribeAsync;
  16. public AsyncObservableImpl(IAsyncObservable<TSource> source, Func<IAsyncObservable<TSource>, IAsyncObserver<TResult>, ValueTask<IAsyncDisposable>> subscribeAsync)
  17. {
  18. _source = source ?? throw new ArgumentNullException(nameof(source));
  19. _subscribeAsync = subscribeAsync ?? throw new ArgumentNullException(nameof(subscribeAsync));
  20. }
  21. protected override ValueTask<IAsyncDisposable> SubscribeAsyncCore(IAsyncObserver<TResult> observer)
  22. {
  23. if (observer == null)
  24. throw new ArgumentNullException(nameof(observer));
  25. return _subscribeAsync(_source, observer);
  26. }
  27. }
  28. private sealed class AsyncObservableImpl<TSource, TState> : AsyncObservableBase<TResult>
  29. {
  30. private readonly TState _state;
  31. private readonly IAsyncObservable<TSource> _source;
  32. private readonly Func<IAsyncObservable<TSource>, TState, IAsyncObserver<TResult>, ValueTask<IAsyncDisposable>> _subscribeAsync;
  33. public AsyncObservableImpl(IAsyncObservable<TSource> source, TState state, Func<IAsyncObservable<TSource>, TState, IAsyncObserver<TResult>, ValueTask<IAsyncDisposable>> subscribeAsync)
  34. {
  35. _state = state;
  36. _source = source ?? throw new ArgumentNullException(nameof(source));
  37. _subscribeAsync = subscribeAsync ?? throw new ArgumentNullException(nameof(subscribeAsync));
  38. }
  39. protected override ValueTask<IAsyncDisposable> SubscribeAsyncCore(IAsyncObserver<TResult> observer)
  40. {
  41. if (observer == null)
  42. throw new ArgumentNullException(nameof(observer));
  43. return _subscribeAsync(_source, _state, observer);
  44. }
  45. }
  46. public static IAsyncObservable<TResult> From<TSource>(IAsyncObservable<TSource> source, Func<IAsyncObservable<TSource>, IAsyncObserver<TResult>, ValueTask<IAsyncDisposable>> subscribeAsync)
  47. {
  48. return new AsyncObservableImpl<TSource>(source, subscribeAsync);
  49. }
  50. public static IAsyncObservable<TResult> From<TSource, TState>(IAsyncObservable<TSource> source, TState state, Func<IAsyncObservable<TSource>, TState, IAsyncObserver<TResult>, ValueTask<IAsyncDisposable>> subscribeAsync)
  51. {
  52. return new AsyncObservableImpl<TSource, TState>(source, state, subscribeAsync);
  53. }
  54. }
  55. public static IAsyncObservable<T> Create<T>(Func<IAsyncObserver<T>, ValueTask<IAsyncDisposable>> subscribeAsync)
  56. {
  57. if (subscribeAsync == null)
  58. throw new ArgumentNullException(nameof(subscribeAsync));
  59. return new AsyncObservable<T>(subscribeAsync);
  60. }
  61. internal static IAsyncObservable<TResult> Create<TSource, TResult>(IAsyncObservable<TSource> source, Func<IAsyncObservable<TSource>, IAsyncObserver<TResult>, ValueTask<IAsyncDisposable>> subscribeAsync)
  62. {
  63. return CreateAsyncObservable<TResult>.From(source, subscribeAsync);
  64. }
  65. internal static IAsyncObservable<TSource> Create<TSource, TState>(IAsyncObservable<TSource> source, TState state, Func<IAsyncObservable<TSource>, TState, IAsyncObserver<TSource>, ValueTask<IAsyncDisposable>> subscribeAsync)
  66. {
  67. return CreateAsyncObservable<TSource>.From(source, state, subscribeAsync);
  68. }
  69. internal static IAsyncObservable<TSource> Create<TSource>(IAsyncObservable<TSource> source, Func<IAsyncObservable<TSource>, IAsyncObserver<TSource>, ValueTask<IAsyncDisposable>> subscribeAsync)
  70. {
  71. return Create<TSource, TSource>(source, subscribeAsync);
  72. }
  73. public static ValueTask<IAsyncDisposable> SubscribeSafeAsync<T>(this IAsyncObservable<T> source, IAsyncObserver<T> observer)
  74. {
  75. if (source == null)
  76. throw new ArgumentNullException(nameof(source));
  77. if (observer == null)
  78. throw new ArgumentNullException(nameof(observer));
  79. return CoreAsync();
  80. async ValueTask<IAsyncDisposable> CoreAsync()
  81. {
  82. try
  83. {
  84. return await source.SubscribeAsync(observer).ConfigureAwait(false);
  85. }
  86. catch (Exception ex)
  87. {
  88. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  89. return AsyncDisposable.Nop;
  90. }
  91. }
  92. }
  93. }
  94. }