AsyncObservableExtensions.cs 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Reactive;
  5. using System.Threading.Tasks;
  6. namespace System
  7. {
  8. public static class AsyncObservableExtensions
  9. {
  10. public static Task<IAsyncDisposable> SubscribeAsync<T>(this IAsyncObservable<T> source, Func<T, Task> onNextAsync)
  11. {
  12. if (source == null)
  13. throw new ArgumentNullException(nameof(source));
  14. if (onNextAsync == null)
  15. throw new ArgumentNullException(nameof(onNextAsync));
  16. return source.SubscribeAsync(new AsyncObserver<T>(onNextAsync, ex => Task.FromException(ex), () => Task.CompletedTask));
  17. }
  18. public static Task<IAsyncDisposable> SubscribeAsync<T>(this IAsyncObservable<T> source, Func<T, Task> onNextAsync, Func<Exception, Task> onErrorAsync)
  19. {
  20. if (source == null)
  21. throw new ArgumentNullException(nameof(source));
  22. if (onNextAsync == null)
  23. throw new ArgumentNullException(nameof(onNextAsync));
  24. if (onErrorAsync == null)
  25. throw new ArgumentNullException(nameof(onErrorAsync));
  26. return source.SubscribeAsync(new AsyncObserver<T>(onNextAsync, onErrorAsync, () => Task.CompletedTask));
  27. }
  28. public static Task<IAsyncDisposable> SubscribeAsync<T>(this IAsyncObservable<T> source, Func<T, Task> onNextAsync, Func<Task> onCompletedAsync)
  29. {
  30. if (source == null)
  31. throw new ArgumentNullException(nameof(source));
  32. if (onNextAsync == null)
  33. throw new ArgumentNullException(nameof(onNextAsync));
  34. if (onCompletedAsync == null)
  35. throw new ArgumentNullException(nameof(onCompletedAsync));
  36. return source.SubscribeAsync(new AsyncObserver<T>(onNextAsync, ex => Task.FromException(ex), onCompletedAsync));
  37. }
  38. public static Task<IAsyncDisposable> SubscribeAsync<T>(this IAsyncObservable<T> source, Func<T, Task> onNextAsync, Func<Exception, Task> onErrorAsync, Func<Task> onCompletedAsync)
  39. {
  40. if (source == null)
  41. throw new ArgumentNullException(nameof(source));
  42. if (onNextAsync == null)
  43. throw new ArgumentNullException(nameof(onNextAsync));
  44. if (onErrorAsync == null)
  45. throw new ArgumentNullException(nameof(onErrorAsync));
  46. if (onCompletedAsync == null)
  47. throw new ArgumentNullException(nameof(onCompletedAsync));
  48. return source.SubscribeAsync(new AsyncObserver<T>(onNextAsync, onErrorAsync, onCompletedAsync));
  49. }
  50. }
  51. }