AsyncObservableExtensions.cs 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Reactive;
  5. using System.Threading.Tasks;
  6. namespace System
  7. {
  8. public static class AsyncObservableExtensions
  9. {
  10. public static ValueTask<IAsyncDisposable> SubscribeAsync<T>(this IAsyncObservable<T> source, Func<T, ValueTask> onNextAsync)
  11. {
  12. if (source == null)
  13. throw new ArgumentNullException(nameof(source));
  14. if (onNextAsync == null)
  15. throw new ArgumentNullException(nameof(onNextAsync));
  16. return source.SubscribeAsync(new AsyncObserver<T>(onNextAsync, ex => new ValueTask(Task.FromException(ex)), () => default));
  17. }
  18. public static ValueTask<IAsyncDisposable> SubscribeAsync<T>(this IAsyncObservable<T> source, Func<T, ValueTask> onNextAsync, Func<Exception, ValueTask> onErrorAsync)
  19. {
  20. if (source == null)
  21. throw new ArgumentNullException(nameof(source));
  22. if (onNextAsync == null)
  23. throw new ArgumentNullException(nameof(onNextAsync));
  24. if (onErrorAsync == null)
  25. throw new ArgumentNullException(nameof(onErrorAsync));
  26. return source.SubscribeAsync(new AsyncObserver<T>(onNextAsync, onErrorAsync, () => default));
  27. }
  28. public static ValueTask<IAsyncDisposable> SubscribeAsync<T>(this IAsyncObservable<T> source, Func<T, ValueTask> onNextAsync, Func<ValueTask> onCompletedAsync)
  29. {
  30. if (source == null)
  31. throw new ArgumentNullException(nameof(source));
  32. if (onNextAsync == null)
  33. throw new ArgumentNullException(nameof(onNextAsync));
  34. if (onCompletedAsync == null)
  35. throw new ArgumentNullException(nameof(onCompletedAsync));
  36. return source.SubscribeAsync(new AsyncObserver<T>(onNextAsync, ex => new ValueTask(Task.FromException(ex)), onCompletedAsync));
  37. }
  38. public static ValueTask<IAsyncDisposable> SubscribeAsync<T>(this IAsyncObservable<T> source, Func<T, ValueTask> onNextAsync, Func<Exception, ValueTask> onErrorAsync, Func<ValueTask> onCompletedAsync)
  39. {
  40. if (source == null)
  41. throw new ArgumentNullException(nameof(source));
  42. if (onNextAsync == null)
  43. throw new ArgumentNullException(nameof(onNextAsync));
  44. if (onErrorAsync == null)
  45. throw new ArgumentNullException(nameof(onErrorAsync));
  46. if (onCompletedAsync == null)
  47. throw new ArgumentNullException(nameof(onCompletedAsync));
  48. return source.SubscribeAsync(new AsyncObserver<T>(onNextAsync, onErrorAsync, onCompletedAsync));
  49. }
  50. public static ValueTask<IAsyncDisposable> SubscribeAsync<T>(this IAsyncObservable<T> source, Action<T> onNext)
  51. {
  52. if (source == null)
  53. throw new ArgumentNullException(nameof(source));
  54. if (onNext == null)
  55. throw new ArgumentNullException(nameof(onNext));
  56. return source.SubscribeAsync(new AsyncObserver<T>(x => { onNext(x); return default; }, ex => new ValueTask(Task.FromException(ex)), () => default));
  57. }
  58. public static ValueTask<IAsyncDisposable> SubscribeAsync<T>(this IAsyncObservable<T> source, Action<T> onNext, Action<Exception> onError)
  59. {
  60. if (source == null)
  61. throw new ArgumentNullException(nameof(source));
  62. if (onNext == null)
  63. throw new ArgumentNullException(nameof(onNext));
  64. if (onError == null)
  65. throw new ArgumentNullException(nameof(onError));
  66. return source.SubscribeAsync(new AsyncObserver<T>(x => { onNext(x); return default; }, ex => { onError(ex); return default; }, () => default));
  67. }
  68. public static ValueTask<IAsyncDisposable> SubscribeAsync<T>(this IAsyncObservable<T> source, Action<T> onNext, Action onCompleted)
  69. {
  70. if (source == null)
  71. throw new ArgumentNullException(nameof(source));
  72. if (onNext == null)
  73. throw new ArgumentNullException(nameof(onNext));
  74. if (onCompleted == null)
  75. throw new ArgumentNullException(nameof(onCompleted));
  76. return source.SubscribeAsync(new AsyncObserver<T>(x => { onNext(x); return default; }, ex => new ValueTask(Task.FromException(ex)), () => { onCompleted(); return default; }));
  77. }
  78. public static ValueTask<IAsyncDisposable> SubscribeAsync<T>(this IAsyncObservable<T> source, Action<T> onNext, Action<Exception> onError, Action onCompleted)
  79. {
  80. if (source == null)
  81. throw new ArgumentNullException(nameof(source));
  82. if (onNext == null)
  83. throw new ArgumentNullException(nameof(onNext));
  84. if (onError == null)
  85. throw new ArgumentNullException(nameof(onError));
  86. if (onCompleted == null)
  87. throw new ArgumentNullException(nameof(onCompleted));
  88. return source.SubscribeAsync(new AsyncObserver<T>(x => { onNext(x); return default; }, ex => { onError(ex); return default; }, () => { onCompleted(); return default; }));
  89. }
  90. }
  91. }