// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT License. // See the LICENSE file in the project root for more information. using System.Threading.Tasks; namespace System.Reactive { internal static class NotificationAsyncExtensions { /// /// Invokes the observer's method corresponding to the notification. /// /// The item type. /// The notification. /// Observer to invoke the notification on. /// A task that completes when the observer is done. /// /// /// The implementation of we get from System.Reactive is /// missing the built-in support for this method that the version in System.Reactive.Shared /// used to have (but is otherwise identical). This reproduces that as an extension method. /// public static async ValueTask AcceptAsync(this Notification notification, IAsyncObserver observer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); var adapter = new NotificationAdapter(observer); notification.Accept(adapter); await adapter.Wait().ConfigureAwait(false); } private class NotificationAdapter : IObserver { private readonly IAsyncObserver _asyncObserver; private ValueTask? _valueTask; public NotificationAdapter(IAsyncObserver asyncObserver) { _asyncObserver = asyncObserver; } public async ValueTask Wait() { if (!_valueTask.HasValue) { throw new InvalidOperationException("Accept did not call any IObserver method"); } await _valueTask.Value.ConfigureAwait(false); } public void OnCompleted() { if (_valueTask.HasValue) { throw new InvalidOperationException("Accept should have called only one IObserver method"); } _valueTask = _asyncObserver.OnCompletedAsync(); } public void OnError(Exception error) { if (_valueTask.HasValue) { throw new InvalidOperationException("Accept should have called only one IObserver method"); } _valueTask = _asyncObserver.OnErrorAsync(error); } public void OnNext(T value) { if (_valueTask.HasValue) { throw new InvalidOperationException("Accept should have called only one IObserver method"); } _valueTask = _asyncObserver.OnNextAsync(value); } } } }