NotificationAsyncExtensions.cs 3.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Threading.Tasks;
  5. namespace System.Reactive
  6. {
  7. internal static class NotificationAsyncExtensions
  8. {
  9. /// <summary>
  10. /// Invokes the observer's method corresponding to the notification.
  11. /// </summary>
  12. /// <typeparam name="T">The item type.</typeparam>
  13. /// <param name="notification">The notification.</param>
  14. /// <param name="observer">Observer to invoke the notification on.</param>
  15. /// <returns>A task that completes when the observer is done.</returns>
  16. /// <exception cref="ArgumentNullException"></exception>
  17. /// <remarks>
  18. /// The implementation of <see cref="Notification{T}"/> we get from System.Reactive is
  19. /// missing the built-in support for this method that the version in System.Reactive.Shared
  20. /// used to have (but is otherwise identical). This reproduces that as an extension method.
  21. /// </remarks>
  22. public static async ValueTask AcceptAsync<T>(this Notification<T> notification, IAsyncObserver<T> observer)
  23. {
  24. if (observer == null)
  25. throw new ArgumentNullException(nameof(observer));
  26. var adapter = new NotificationAdapter<T>(observer);
  27. notification.Accept(adapter);
  28. await adapter.Wait().ConfigureAwait(false);
  29. }
  30. private class NotificationAdapter<T> : IObserver<T>
  31. {
  32. private readonly IAsyncObserver<T> _asyncObserver;
  33. private ValueTask? _valueTask;
  34. public NotificationAdapter(IAsyncObserver<T> asyncObserver)
  35. {
  36. _asyncObserver = asyncObserver;
  37. }
  38. public async ValueTask Wait()
  39. {
  40. if (!_valueTask.HasValue)
  41. {
  42. throw new InvalidOperationException("Accept did not call any IObserver<T> method");
  43. }
  44. await _valueTask.Value.ConfigureAwait(false);
  45. }
  46. public void OnCompleted()
  47. {
  48. if (_valueTask.HasValue)
  49. {
  50. throw new InvalidOperationException("Accept should have called only one IObserver<T> method");
  51. }
  52. _valueTask = _asyncObserver.OnCompletedAsync();
  53. }
  54. public void OnError(Exception error)
  55. {
  56. if (_valueTask.HasValue)
  57. {
  58. throw new InvalidOperationException("Accept should have called only one IObserver<T> method");
  59. }
  60. _valueTask = _asyncObserver.OnErrorAsync(error);
  61. }
  62. public void OnNext(T value)
  63. {
  64. if (_valueTask.HasValue)
  65. {
  66. throw new InvalidOperationException("Accept should have called only one IObserver<T> method");
  67. }
  68. _valueTask = _asyncObserver.OnNextAsync(value);
  69. }
  70. }
  71. }
  72. }