1
0

AsyncObserver.cs 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Runtime.ExceptionServices;
  5. using System.Threading.Tasks;
  6. namespace System.Reactive.Linq
  7. {
  8. public static partial class AsyncObserver
  9. {
  10. public static IAsyncObserver<T> Create<T>(Func<T, Task> onNextAsync)
  11. {
  12. if (onNextAsync == null)
  13. throw new ArgumentNullException(nameof(onNextAsync));
  14. return new AnonymousAsyncObserver<T>(
  15. onNextAsync,
  16. ex =>
  17. {
  18. ExceptionDispatchInfo.Capture(ex).Throw();
  19. return Task.CompletedTask;
  20. },
  21. () => Task.CompletedTask
  22. );
  23. }
  24. public static IAsyncObserver<T> Create<T>(Func<T, Task> onNextAsync, Func<Exception, Task> onErrorAsync, Func<Task> onCompletedAsync)
  25. {
  26. if (onNextAsync == null)
  27. throw new ArgumentNullException(nameof(onNextAsync));
  28. if (onErrorAsync == null)
  29. throw new ArgumentNullException(nameof(onErrorAsync));
  30. if (onCompletedAsync == null)
  31. throw new ArgumentNullException(nameof(onCompletedAsync));
  32. return new AnonymousAsyncObserver<T>(onNextAsync, onErrorAsync, onCompletedAsync);
  33. }
  34. private sealed class AnonymousAsyncObserver<T> : AsyncObserverBase<T>
  35. {
  36. private readonly Func<T, Task> _onNextAsync;
  37. private readonly Func<Exception, Task> _onErrorAsync;
  38. private readonly Func<Task> _onCompletedAsync;
  39. public AnonymousAsyncObserver(Func<T, Task> onNextAsync, Func<Exception, Task> onErrorAsync, Func<Task> onCompletedAsync)
  40. {
  41. _onNextAsync = onNextAsync;
  42. _onErrorAsync = onErrorAsync;
  43. _onCompletedAsync = onCompletedAsync;
  44. }
  45. protected override Task OnCompletedAsyncCore() => _onCompletedAsync();
  46. protected override Task OnErrorAsyncCore(Exception error) => _onErrorAsync(error);
  47. protected override Task OnNextAsyncCore(T value) => _onNextAsync(value);
  48. }
  49. }
  50. }