| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362 |
- // Licensed to the .NET Foundation under one or more agreements.
- // The .NET Foundation licenses this file to you under the MIT License.
- // See the LICENSE file in the project root for more information.
- using System.Reactive.Concurrency;
- using System.Reactive.Disposables;
- using System.Reactive.Subjects;
- using System.Runtime.ExceptionServices;
- using System.Security;
- using System.Threading.Tasks;
- namespace System.Runtime.CompilerServices
- {
- /// <summary>
- /// Represents a builder for asynchronous methods that return a task-like <see cref="IAsyncObservable{T}"/>.
- /// </summary>
- /// <typeparam name="T">The type of the elements in the sequence.</typeparam>
- public struct AsyncObservableMethodBuilder<T>
- {
- /// <summary>
- /// The compiler-generated asynchronous state machine representing the execution flow of the asynchronous
- /// method whose return type is a task-like <see cref="IAsyncObservable{T}"/>.
- /// </summary>
- private IAsyncStateMachine _stateMachine;
- /// <summary>
- /// The underlying observable sequence representing the result produced by the asynchronous method.
- /// </summary>
- private TaskObservable _inner;
- /// <summary>
- /// Creates an instance of the <see cref="AsyncObservableMethodBuilder{T}"/> struct.
- /// </summary>
- /// <returns>A new instance of the struct.</returns>
- public static AsyncObservableMethodBuilder<T> Create() => default;
- /// <summary>
- /// Begins running the builder with the associated state machine.
- /// </summary>
- /// <typeparam name="TStateMachine">The type of the state machine.</typeparam>
- /// <param name="stateMachine">The state machine instance, passed by reference.</param>
- /// <exception cref="ArgumentNullException"><paramref name="stateMachine"/> is <c>null</c>.</exception>
- public void Start<TStateMachine>(ref TStateMachine stateMachine)
- where TStateMachine : IAsyncStateMachine
- {
- if (stateMachine == null)
- throw new ArgumentNullException(nameof(stateMachine));
- stateMachine.MoveNext();
- }
- /// <summary>
- /// Associates the builder with the specified state machine.
- /// </summary>
- /// <param name="stateMachine">The state machine instance to associate with the builder.</param>
- /// <exception cref="ArgumentNullException"><paramref name="stateMachine"/> is <c>null</c>.</exception>
- /// <exception cref="InvalidOperationException">The state machine was previously set.</exception>
- public void SetStateMachine(IAsyncStateMachine stateMachine)
- {
- if (_stateMachine != null)
- throw new InvalidOperationException();
- _stateMachine = stateMachine ?? throw new ArgumentNullException(nameof(stateMachine));
- }
- /// <summary>
- /// Marks the observable as successfully completed.
- /// </summary>
- /// <param name="result">The result to use to complete the observable sequence.</param>
- /// <exception cref="InvalidOperationException">The observable has already completed.</exception>
- public void SetResult(T result)
- {
- if (_inner == null)
- {
- _inner = new TaskObservable(result);
- }
- else
- {
- _inner.SetResult(result);
- }
- }
- /// <summary>
- /// Marks the observable as failed and binds the specified exception to the observable sequence.
- /// </summary>
- /// <param name="exception">The exception to bind to the observable sequence.</param>
- /// <exception cref="ArgumentNullException"><paramref name="exception"/> is <c>null</c>.</exception>
- /// <exception cref="InvalidOperationException">The observable has already completed.</exception>
- public void SetException(Exception exception)
- {
- if (exception == null)
- throw new ArgumentNullException(nameof(exception));
- if (_inner == null)
- {
- _inner = new TaskObservable(exception);
- }
- else
- {
- _inner.SetException(exception);
- }
- }
- /// <summary>
- /// Gets the observable sequence for this builder.
- /// </summary>
- public IAsyncObservable<T> Task => _inner ??= new TaskObservable();
- /// <summary>
- /// Schedules the state machine to proceed to the next action when the specified awaiter completes.
- /// </summary>
- /// <typeparam name="TAwaiter">The type of the awaiter.</typeparam>
- /// <typeparam name="TStateMachine">The type of the state machine.</typeparam>
- /// <param name="awaiter">The awaiter.</param>
- /// <param name="stateMachine">The state machine.</param>
- public void AwaitOnCompleted<TAwaiter, TStateMachine>(ref TAwaiter awaiter, ref TStateMachine stateMachine)
- where TAwaiter : INotifyCompletion
- where TStateMachine : IAsyncStateMachine
- {
- try
- {
- if (_stateMachine == null)
- {
- var ignored = Task; // NB: Ensure we have the observable backed by an async subject ready.
- _stateMachine = stateMachine;
- _stateMachine.SetStateMachine(_stateMachine);
- }
- // NB: Rx has historically not bothered with execution contexts, so we don't do it here either.
- awaiter.OnCompleted(_stateMachine.MoveNext);
- }
- catch (Exception ex)
- {
- // NB: Prevent reentrancy into the async state machine when an exception would be observed
- // by the caller. This could cause concurrent execution of the async method. Instead,
- // rethrow the exception elsewhere.
- Rethrow(ex);
- }
- }
- /// <summary>
- /// Schedules the state machine to proceed to the next action when the specified awaiter completes.
- /// </summary>
- /// <typeparam name="TAwaiter">The type of the awaiter.</typeparam>
- /// <typeparam name="TStateMachine">The type of the state machine.</typeparam>
- /// <param name="awaiter">The awaiter.</param>
- /// <param name="stateMachine">The state machine.</param>
- [SecuritySafeCritical]
- public void AwaitUnsafeOnCompleted<TAwaiter, TStateMachine>(ref TAwaiter awaiter, ref TStateMachine stateMachine)
- where TAwaiter : ICriticalNotifyCompletion
- where TStateMachine : IAsyncStateMachine
- {
- try
- {
- if (_stateMachine == null)
- {
- var ignored = Task; // NB: Ensure we have the observable backed by an async subject ready.
- _stateMachine = stateMachine;
- _stateMachine.SetStateMachine(_stateMachine);
- }
- // NB: Rx has historically not bothered with execution contexts, so we don't do it here either.
- awaiter.UnsafeOnCompleted(_stateMachine.MoveNext);
- }
- catch (Exception ex)
- {
- // NB: Prevent reentrancy into the async state machine when an exception would be observed
- // by the caller. This could cause concurrent execution of the async method. Instead,
- // rethrow the exception elsewhere.
- Rethrow(ex);
- }
- }
- /// <summary>
- /// Rethrows an exception that was thrown from an awaiter's OnCompleted methods.
- /// </summary>
- /// <param name="exception">The exception to rethrow.</param>
- private static void Rethrow(Exception exception)
- {
- TaskPoolAsyncScheduler.Default.ScheduleAsync(_ =>
- {
- ExceptionDispatchInfo.Capture(exception).Throw();
- return default;
- });
- }
- /// <summary>
- /// Implementation of the IObservable<T> interface compatible with async method return types.
- /// </summary>
- /// <remarks>
- /// This class implements a "task-like" type that can be used as the return type of an asynchronous
- /// method in C# 7.0 and beyond. For example:
- /// <code>
- /// async Observable<int> RxAsync()
- /// {
- /// var res = await Observable.Return(21).Delay(TimeSpan.FromSeconds(1));
- /// return res * 2;
- /// }
- /// </code>
- /// </remarks>
- /// <typeparam name="T">The type of the elements in the sequence.</typeparam>
- internal sealed class TaskObservable : IAsyncObservable<T>, INotifyCompletion
- {
- /// <summary>
- /// The underlying observable sequence to subscribe to in case the asynchronous method did not
- /// finish synchronously.
- /// </summary>
- private readonly AsyncAsyncSubject<T> _subject;
- /// <summary>
- /// The result returned by the asynchronous method in case the method finished synchronously.
- /// </summary>
- private readonly T _result;
- /// <summary>
- /// The exception thrown by the asynchronous method in case the method finished synchronously.
- /// </summary>
- private readonly Exception _exception;
- /// <summary>
- /// Creates a new <see cref="TaskObservable"/> for an asynchronous method that has not finished yet.
- /// </summary>
- public TaskObservable()
- {
- _subject = new SequentialAsyncAsyncSubject<T>();
- }
- /// <summary>
- /// Creates a new <see cref="TaskObservable"/> for an asynchronous method that synchronously returned
- /// the specified <paramref name="result"/> value.
- /// </summary>
- /// <param name="result">The result returned by the asynchronous method.</param>
- public TaskObservable(T result)
- {
- _result = result;
- }
- /// <summary>
- /// Creates a new <see cref="TaskObservable"/> for an asynchronous method that synchronously threw
- /// the specified <paramref name="exception"/>.
- /// </summary>
- /// <param name="exception">The exception thrown by the asynchronous method.</param>
- public TaskObservable(Exception exception)
- {
- _exception = exception;
- }
- /// <summary>
- /// Marks the observable as successfully completed.
- /// </summary>
- /// <param name="result">The result to use to complete the observable sequence.</param>
- /// <exception cref="InvalidOperationException">The observable has already completed.</exception>
- public async void SetResult(T result)
- {
- if (IsCompleted)
- throw new InvalidOperationException();
- // REVIEW: Async void method.
- await _subject.OnNextAsync(result).ConfigureAwait(false);
- await _subject.OnCompletedAsync().ConfigureAwait(false);
- }
- /// <summary>
- /// Marks the observable as failed and binds the specified exception to the observable sequence.
- /// </summary>
- /// <param name="exception">The exception to bind to the observable sequence.</param>
- /// <exception cref="ArgumentNullException"><paramref name="exception"/> is <c>null</c>.</exception>
- /// <exception cref="InvalidOperationException">The observable has already completed.</exception>
- public void SetException(Exception exception)
- {
- if (IsCompleted)
- throw new InvalidOperationException();
- _subject.OnErrorAsync(exception);
- }
- /// <summary>
- /// Subscribes the given observer to the observable sequence.
- /// </summary>
- /// <param name="observer">Observer that will receive notifications from the observable sequence.</param>
- /// <returns>Disposable object representing an observer's subscription to the observable sequence.</returns>
- /// <exception cref="ArgumentNullException"><paramref name="observer"/> is null.</exception>
- public ValueTask<IAsyncDisposable> SubscribeAsync(IAsyncObserver<T> observer)
- {
- if (_subject != null)
- {
- return _subject.SubscribeAsync(observer);
- }
- async ValueTask<IAsyncDisposable> CoreAsync()
- {
- if (_exception != null)
- {
- await observer.OnErrorAsync(_exception).ConfigureAwait(false);
- }
- else
- {
- await observer.OnNextAsync(_result).ConfigureAwait(false);
- }
- return AsyncDisposable.Nop;
- }
- return CoreAsync();
- }
- /// <summary>
- /// Gets an awaiter that can be used to await the eventual completion of the observable sequence.
- /// </summary>
- /// <returns>An awaiter that can be used to await the eventual completion of the observable sequence.</returns>
- public AsyncAsyncSubject<T> GetAwaiter() => _subject;
- /// <summary>
- /// Gets a Boolean indicating whether the observable sequence has completed.
- /// </summary>
- public bool IsCompleted => _subject?.IsCompleted ?? true;
- /// <summary>
- /// Gets the result produced by the observable sequence.
- /// </summary>
- /// <returns>The result produced by the observable sequence.</returns>
- public T GetResult()
- {
- if (_subject != null)
- {
- return _subject.GetResult();
- }
- if (_exception != null)
- {
- ExceptionDispatchInfo.Capture(_exception).Throw();
- }
- return _result;
- }
- /// <summary>
- /// Attaches the specified <paramref name="continuation"/> to the observable sequence.
- /// </summary>
- /// <param name="continuation">The continuation to attach.</param>
- public void OnCompleted(Action continuation)
- {
- if (_subject != null)
- {
- _subject.OnCompleted(continuation);
- }
- else
- {
- continuation();
- }
- }
- }
- }
- }
|