// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT License. // See the LICENSE file in the project root for more information. using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Reactive.Subjects; using System.Runtime.ExceptionServices; using System.Security; using System.Threading.Tasks; namespace System.Runtime.CompilerServices { /// /// Represents a builder for asynchronous methods that return a task-like . /// /// The type of the elements in the sequence. public struct AsyncObservableMethodBuilder { /// /// The compiler-generated asynchronous state machine representing the execution flow of the asynchronous /// method whose return type is a task-like . /// private IAsyncStateMachine _stateMachine; /// /// The underlying observable sequence representing the result produced by the asynchronous method. /// private TaskObservable _inner; /// /// Creates an instance of the struct. /// /// A new instance of the struct. public static AsyncObservableMethodBuilder Create() => default(AsyncObservableMethodBuilder); /// /// Begins running the builder with the associated state machine. /// /// The type of the state machine. /// The state machine instance, passed by reference. /// is null. public void Start(ref TStateMachine stateMachine) where TStateMachine : IAsyncStateMachine { if (stateMachine == null) throw new ArgumentNullException(nameof(stateMachine)); stateMachine.MoveNext(); } /// /// Associates the builder with the specified state machine. /// /// The state machine instance to associate with the builder. /// is null. /// The state machine was previously set. public void SetStateMachine(IAsyncStateMachine stateMachine) { if (stateMachine == null) throw new ArgumentNullException(nameof(stateMachine)); if (_stateMachine != null) throw new InvalidOperationException(); _stateMachine = stateMachine; } /// /// Marks the observable as successfully completed. /// /// The result to use to complete the observable sequence. /// The observable has already completed. public void SetResult(T result) { if (_inner == null) { _inner = new TaskObservable(result); } else { _inner.SetResult(result); } } /// /// Marks the observable as failed and binds the specified exception to the observable sequence. /// /// The exception to bind to the observable sequence. /// is null. /// The observable has already completed. public void SetException(Exception exception) { if (exception == null) throw new ArgumentNullException(nameof(exception)); if (_inner == null) { _inner = new TaskObservable(exception); } else { _inner.SetException(exception); } } /// /// Gets the observable sequence for this builder. /// public IAsyncObservable Task => _inner ?? (_inner = new TaskObservable()); /// /// Schedules the state machine to proceed to the next action when the specified awaiter completes. /// /// The type of the awaiter. /// The type of the state machine. /// The awaiter. /// The state machine. public void AwaitOnCompleted(ref TAwaiter awaiter, ref TStateMachine stateMachine) where TAwaiter : INotifyCompletion where TStateMachine : IAsyncStateMachine { try { if (_stateMachine == null) { var ignored = Task; // NB: Ensure we have the observable backed by an async subject ready. _stateMachine = stateMachine; _stateMachine.SetStateMachine(_stateMachine); } // NB: Rx has historically not bothered with execution contexts, so we don't do it here either. awaiter.OnCompleted(_stateMachine.MoveNext); } catch (Exception ex) { // NB: Prevent reentrancy into the async state machine when an exception would be observed // by the caller. This could cause concurrent execution of the async method. Instead, // rethrow the exception elsewhere. Rethrow(ex); } } /// /// Schedules the state machine to proceed to the next action when the specified awaiter completes. /// /// The type of the awaiter. /// The type of the state machine. /// The awaiter. /// The state machine. [SecuritySafeCritical] public void AwaitUnsafeOnCompleted(ref TAwaiter awaiter, ref TStateMachine stateMachine) where TAwaiter : ICriticalNotifyCompletion where TStateMachine : IAsyncStateMachine { try { if (_stateMachine == null) { var ignored = Task; // NB: Ensure we have the observable backed by an async subject ready. _stateMachine = stateMachine; _stateMachine.SetStateMachine(_stateMachine); } // NB: Rx has historically not bothered with execution contexts, so we don't do it here either. awaiter.UnsafeOnCompleted(_stateMachine.MoveNext); } catch (Exception ex) { // NB: Prevent reentrancy into the async state machine when an exception would be observed // by the caller. This could cause concurrent execution of the async method. Instead, // rethrow the exception elsewhere. Rethrow(ex); } } /// /// Rethrows an exception that was thrown from an awaiter's OnCompleted methods. /// /// The exception to rethrow. private static void Rethrow(Exception exception) { TaskPoolAsyncScheduler.Default.ScheduleAsync(_ => { ExceptionDispatchInfo.Capture(exception).Throw(); return System.Threading.Tasks.Task.CompletedTask; }); } /// /// Implementation of the IObservable<T> interface compatible with async method return types. /// /// /// This class implements a "task-like" type that can be used as the return type of an asynchronous /// method in C# 7.0 and beyond. For example: /// /// async Observable<int> RxAsync() /// { /// var res = await Observable.Return(21).Delay(TimeSpan.FromSeconds(1)); /// return res * 2; /// } /// /// /// The type of the elements in the sequence. internal sealed class TaskObservable : IAsyncObservable, INotifyCompletion { /// /// The underlying observable sequence to subscribe to in case the asynchronous method did not /// finish synchronously. /// private readonly AsyncAsyncSubject _subject; /// /// The result returned by the asynchronous method in case the method finished synchronously. /// private readonly T _result; /// /// The exception thrown by the asynchronous method in case the method finished synchronously. /// private readonly Exception _exception; /// /// Creates a new for an asynchronous method that has not finished yet. /// public TaskObservable() { _subject = new SequentialAsyncAsyncSubject(); } /// /// Creates a new for an asynchronous method that synchronously returned /// the specified value. /// /// The result returned by the asynchronous method. public TaskObservable(T result) { _result = result; } /// /// Creates a new for an asynchronous method that synchronously threw /// the specified . /// /// The exception thrown by the asynchronous method. public TaskObservable(Exception exception) { _exception = exception; } /// /// Marks the observable as successfully completed. /// /// The result to use to complete the observable sequence. /// The observable has already completed. public async void SetResult(T result) { if (IsCompleted) throw new InvalidOperationException(); // REVIEW: Async void method. await _subject.OnNextAsync(result).ConfigureAwait(false); await _subject.OnCompletedAsync().ConfigureAwait(false); } /// /// Marks the observable as failed and binds the specified exception to the observable sequence. /// /// The exception to bind to the observable sequence. /// is null. /// The observable has already completed. public void SetException(Exception exception) { if (IsCompleted) throw new InvalidOperationException(); _subject.OnErrorAsync(exception); } /// /// Subscribes the given observer to the observable sequence. /// /// Observer that will receive notifications from the observable sequence. /// Disposable object representing an observer's subscription to the observable sequence. /// is null. public ValueTask SubscribeAsync(IAsyncObserver observer) { if (_subject != null) { return _subject.SubscribeAsync(observer); } async ValueTask CoreAsync() { if (_exception != null) { await observer.OnErrorAsync(_exception).ConfigureAwait(false); } else { await observer.OnNextAsync(_result).ConfigureAwait(false); } return AsyncDisposable.Nop; } return CoreAsync(); } /// /// Gets an awaiter that can be used to await the eventual completion of the observable sequence. /// /// An awaiter that can be used to await the eventual completion of the observable sequence. public AsyncAsyncSubject GetAwaiter() => _subject; /// /// Gets a Boolean indicating whether the observable sequence has completed. /// public bool IsCompleted => _subject?.IsCompleted ?? true; /// /// Gets the result produced by the observable sequence. /// /// The result produced by the observable sequence. public T GetResult() { if (_subject != null) { return _subject.GetResult(); } if (_exception != null) { ExceptionDispatchInfo.Capture(_exception).Throw(); } return _result; } /// /// Attaches the specified to the observable sequence. /// /// The continuation to attach. public void OnCompleted(Action continuation) { if (_subject != null) { _subject.OnCompleted(continuation); } else { continuation(); } } } } }