AsyncObservable.cs 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Reactive.Disposables;
  5. using System.Threading.Tasks;
  6. namespace System.Reactive.Linq
  7. {
  8. public static partial class AsyncObservable
  9. {
  10. private sealed class AsyncObservableImpl<TSource, TResult> : AsyncObservableBase<TResult>
  11. {
  12. private readonly IAsyncObservable<TSource> _source;
  13. private readonly Func<IAsyncObservable<TSource>, IAsyncObserver<TResult>, ValueTask<IAsyncDisposable>> _subscribeAsync;
  14. public AsyncObservableImpl(IAsyncObservable<TSource> source, Func<IAsyncObservable<TSource>, IAsyncObserver<TResult>, ValueTask<IAsyncDisposable>> subscribeAsync)
  15. {
  16. _source = source;
  17. _subscribeAsync = subscribeAsync ?? throw new ArgumentNullException(nameof(subscribeAsync));
  18. }
  19. protected override ValueTask<IAsyncDisposable> SubscribeAsyncCore(IAsyncObserver<TResult> observer)
  20. {
  21. if (observer == null)
  22. throw new ArgumentNullException(nameof(observer));
  23. return _subscribeAsync(_source, observer);
  24. }
  25. }
  26. private sealed class AsyncObservableImpl<TSource, TResult, TState> : AsyncObservableBase<TResult>
  27. {
  28. private readonly TState _state;
  29. private readonly IAsyncObservable<TSource> _source;
  30. private readonly Func<IAsyncObservable<TSource>, TState, IAsyncObserver<TResult>, ValueTask<IAsyncDisposable>> _subscribeAsync;
  31. public AsyncObservableImpl(IAsyncObservable<TSource> source, TState state, Func<IAsyncObservable<TSource>, TState, IAsyncObserver<TResult>, ValueTask<IAsyncDisposable>> subscribeAsync)
  32. {
  33. _source = source;
  34. _state = state;
  35. _subscribeAsync = subscribeAsync ?? throw new ArgumentNullException(nameof(subscribeAsync));
  36. }
  37. protected override ValueTask<IAsyncDisposable> SubscribeAsyncCore(IAsyncObserver<TResult> observer)
  38. {
  39. if (observer == null)
  40. throw new ArgumentNullException(nameof(observer));
  41. return _subscribeAsync(_source, _state, observer);
  42. }
  43. }
  44. public static IAsyncObservable<T> Create<T>(Func<IAsyncObserver<T>, ValueTask<IAsyncDisposable>> subscribeAsync)
  45. {
  46. if (subscribeAsync == null)
  47. throw new ArgumentNullException(nameof(subscribeAsync));
  48. return new AsyncObservable<T>(subscribeAsync);
  49. }
  50. internal static IAsyncObservable<TResult> Create<TSource, TResult>(IAsyncObservable<TSource> source, Func<IAsyncObservable<TSource>, IAsyncObserver<TResult>, ValueTask<IAsyncDisposable>> subscribeAsync)
  51. {
  52. if (subscribeAsync == null)
  53. throw new ArgumentNullException(nameof(subscribeAsync));
  54. return new AsyncObservableImpl<TSource, TResult>(source, subscribeAsync);
  55. }
  56. internal static IAsyncObservable<TResult> Create<TSource, TResult, TState>(IAsyncObservable<TSource> source, TState state, TResult dummy, Func<IAsyncObservable<TSource>, TState, IAsyncObserver<TResult>, ValueTask<IAsyncDisposable>> subscribeAsync)
  57. {
  58. if (subscribeAsync == null)
  59. throw new ArgumentNullException(nameof(subscribeAsync));
  60. return new AsyncObservableImpl<TSource, TResult, TState>(source, state, subscribeAsync);
  61. }
  62. internal static IAsyncObservable<TSource> Create<TSource, TState>(IAsyncObservable<TSource> source, TState state, Func<IAsyncObservable<TSource>, TState, IAsyncObserver<TSource>, ValueTask<IAsyncDisposable>> subscribeAsync)
  63. {
  64. return Create(source, state, default(TSource), subscribeAsync);
  65. }
  66. internal static IAsyncObservable<TSource> Create<TSource>(IAsyncObservable<TSource> source, Func<IAsyncObservable<TSource>, IAsyncObserver<TSource>, ValueTask<IAsyncDisposable>> subscribeAsync)
  67. {
  68. return Create<TSource, TSource>(source, subscribeAsync);
  69. }
  70. public static ValueTask<IAsyncDisposable> SubscribeSafeAsync<T>(this IAsyncObservable<T> source, IAsyncObserver<T> observer)
  71. {
  72. if (source == null)
  73. throw new ArgumentNullException(nameof(source));
  74. if (observer == null)
  75. throw new ArgumentNullException(nameof(observer));
  76. return CoreAsync();
  77. async ValueTask<IAsyncDisposable> CoreAsync()
  78. {
  79. try
  80. {
  81. return await source.SubscribeAsync(observer).ConfigureAwait(false);
  82. }
  83. catch (Exception ex)
  84. {
  85. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  86. return AsyncDisposable.Nop;
  87. }
  88. }
  89. }
  90. }
  91. }