12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485 |
- // Licensed to the .NET Foundation under one or more agreements.
- // The .NET Foundation licenses this file to you under the MIT License.
- // See the LICENSE file in the project root for more information.
- using System.Threading.Tasks;
- namespace System.Reactive
- {
- internal static class NotificationAsyncExtensions
- {
- /// <summary>
- /// Invokes the observer's method corresponding to the notification.
- /// </summary>
- /// <typeparam name="T">The item type.</typeparam>
- /// <param name="notification">The notification.</param>
- /// <param name="observer">Observer to invoke the notification on.</param>
- /// <returns>A task that completes when the observer is done.</returns>
- /// <exception cref="ArgumentNullException"></exception>
- /// <remarks>
- /// The implementation of <see cref="Notification{T}"/> we get from System.Reactive is
- /// missing the built-in support for this method that the version in System.Reactive.Shared
- /// used to have (but is otherwise identical). This reproduces that as an extension method.
- /// </remarks>
- public static async ValueTask AcceptAsync<T>(this Notification<T> notification, IAsyncObserver<T> observer)
- {
- if (observer == null)
- throw new ArgumentNullException(nameof(observer));
- var adapter = new NotificationAdapter<T>(observer);
- notification.Accept(adapter);
- await adapter.Wait().ConfigureAwait(false);
- }
- private class NotificationAdapter<T> : IObserver<T>
- {
- private readonly IAsyncObserver<T> _asyncObserver;
- private ValueTask? _valueTask;
- public NotificationAdapter(IAsyncObserver<T> asyncObserver)
- {
- _asyncObserver = asyncObserver;
- }
- public async ValueTask Wait()
- {
- if (!_valueTask.HasValue)
- {
- throw new InvalidOperationException("Accept did not call any IObserver<T> method");
- }
- await _valueTask.Value.ConfigureAwait(false);
- }
- public void OnCompleted()
- {
- if (_valueTask.HasValue)
- {
- throw new InvalidOperationException("Accept should have called only one IObserver<T> method");
- }
- _valueTask = _asyncObserver.OnCompletedAsync();
- }
- public void OnError(Exception error)
- {
- if (_valueTask.HasValue)
- {
- throw new InvalidOperationException("Accept should have called only one IObserver<T> method");
- }
- _valueTask = _asyncObserver.OnErrorAsync(error);
- }
- public void OnNext(T value)
- {
- if (_valueTask.HasValue)
- {
- throw new InvalidOperationException("Accept should have called only one IObserver<T> method");
- }
- _valueTask = _asyncObserver.OnNextAsync(value);
- }
- }
- }
- }
|