// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT License.
// See the LICENSE file in the project root for more information.
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Subjects;
using System.Runtime.ExceptionServices;
using System.Security;
using System.Threading.Tasks;
namespace System.Runtime.CompilerServices
{
///
/// Represents a builder for asynchronous methods that return a task-like .
///
/// The type of the elements in the sequence.
public struct AsyncObservableMethodBuilder
{
///
/// The compiler-generated asynchronous state machine representing the execution flow of the asynchronous
/// method whose return type is a task-like .
///
private IAsyncStateMachine _stateMachine;
///
/// The underlying observable sequence representing the result produced by the asynchronous method.
///
private TaskObservable _inner;
///
/// Creates an instance of the struct.
///
/// A new instance of the struct.
public static AsyncObservableMethodBuilder Create() => default(AsyncObservableMethodBuilder);
///
/// Begins running the builder with the associated state machine.
///
/// The type of the state machine.
/// The state machine instance, passed by reference.
/// is null.
public void Start(ref TStateMachine stateMachine)
where TStateMachine : IAsyncStateMachine
{
if (stateMachine == null)
throw new ArgumentNullException(nameof(stateMachine));
stateMachine.MoveNext();
}
///
/// Associates the builder with the specified state machine.
///
/// The state machine instance to associate with the builder.
/// is null.
/// The state machine was previously set.
public void SetStateMachine(IAsyncStateMachine stateMachine)
{
if (stateMachine == null)
throw new ArgumentNullException(nameof(stateMachine));
if (_stateMachine != null)
throw new InvalidOperationException();
_stateMachine = stateMachine;
}
///
/// Marks the observable as successfully completed.
///
/// The result to use to complete the observable sequence.
/// The observable has already completed.
public void SetResult(T result)
{
if (_inner == null)
{
_inner = new TaskObservable(result);
}
else
{
_inner.SetResult(result);
}
}
///
/// Marks the observable as failed and binds the specified exception to the observable sequence.
///
/// The exception to bind to the observable sequence.
/// is null.
/// The observable has already completed.
public void SetException(Exception exception)
{
if (exception == null)
throw new ArgumentNullException(nameof(exception));
if (_inner == null)
{
_inner = new TaskObservable(exception);
}
else
{
_inner.SetException(exception);
}
}
///
/// Gets the observable sequence for this builder.
///
public IAsyncObservable Task => _inner ?? (_inner = new TaskObservable());
///
/// Schedules the state machine to proceed to the next action when the specified awaiter completes.
///
/// The type of the awaiter.
/// The type of the state machine.
/// The awaiter.
/// The state machine.
public void AwaitOnCompleted(ref TAwaiter awaiter, ref TStateMachine stateMachine)
where TAwaiter : INotifyCompletion
where TStateMachine : IAsyncStateMachine
{
try
{
if (_stateMachine == null)
{
var ignored = Task; // NB: Ensure we have the observable backed by an async subject ready.
_stateMachine = stateMachine;
_stateMachine.SetStateMachine(_stateMachine);
}
// NB: Rx has historically not bothered with execution contexts, so we don't do it here either.
awaiter.OnCompleted(_stateMachine.MoveNext);
}
catch (Exception ex)
{
// NB: Prevent reentrancy into the async state machine when an exception would be observed
// by the caller. This could cause concurrent execution of the async method. Instead,
// rethrow the exception elsewhere.
Rethrow(ex);
}
}
///
/// Schedules the state machine to proceed to the next action when the specified awaiter completes.
///
/// The type of the awaiter.
/// The type of the state machine.
/// The awaiter.
/// The state machine.
[SecuritySafeCritical]
public void AwaitUnsafeOnCompleted(ref TAwaiter awaiter, ref TStateMachine stateMachine)
where TAwaiter : ICriticalNotifyCompletion
where TStateMachine : IAsyncStateMachine
{
try
{
if (_stateMachine == null)
{
var ignored = Task; // NB: Ensure we have the observable backed by an async subject ready.
_stateMachine = stateMachine;
_stateMachine.SetStateMachine(_stateMachine);
}
// NB: Rx has historically not bothered with execution contexts, so we don't do it here either.
awaiter.UnsafeOnCompleted(_stateMachine.MoveNext);
}
catch (Exception ex)
{
// NB: Prevent reentrancy into the async state machine when an exception would be observed
// by the caller. This could cause concurrent execution of the async method. Instead,
// rethrow the exception elsewhere.
Rethrow(ex);
}
}
///
/// Rethrows an exception that was thrown from an awaiter's OnCompleted methods.
///
/// The exception to rethrow.
private static void Rethrow(Exception exception)
{
TaskPoolAsyncScheduler.Default.ScheduleAsync(_ =>
{
ExceptionDispatchInfo.Capture(exception).Throw();
return System.Threading.Tasks.Task.CompletedTask;
});
}
///
/// Implementation of the IObservable<T> interface compatible with async method return types.
///
///
/// This class implements a "task-like" type that can be used as the return type of an asynchronous
/// method in C# 7.0 and beyond. For example:
///
/// async Observable<int> RxAsync()
/// {
/// var res = await Observable.Return(21).Delay(TimeSpan.FromSeconds(1));
/// return res * 2;
/// }
///
///
/// The type of the elements in the sequence.
internal sealed class TaskObservable : IAsyncObservable, INotifyCompletion
{
///
/// The underlying observable sequence to subscribe to in case the asynchronous method did not
/// finish synchronously.
///
private readonly AsyncAsyncSubject _subject;
///
/// The result returned by the asynchronous method in case the method finished synchronously.
///
private readonly T _result;
///
/// The exception thrown by the asynchronous method in case the method finished synchronously.
///
private readonly Exception _exception;
///
/// Creates a new for an asynchronous method that has not finished yet.
///
public TaskObservable()
{
_subject = new SequentialAsyncAsyncSubject();
}
///
/// Creates a new for an asynchronous method that synchronously returned
/// the specified value.
///
/// The result returned by the asynchronous method.
public TaskObservable(T result)
{
_result = result;
}
///
/// Creates a new for an asynchronous method that synchronously threw
/// the specified .
///
/// The exception thrown by the asynchronous method.
public TaskObservable(Exception exception)
{
_exception = exception;
}
///
/// Marks the observable as successfully completed.
///
/// The result to use to complete the observable sequence.
/// The observable has already completed.
public async void SetResult(T result)
{
if (IsCompleted)
throw new InvalidOperationException();
// REVIEW: Async void method.
await _subject.OnNextAsync(result).ConfigureAwait(false);
await _subject.OnCompletedAsync().ConfigureAwait(false);
}
///
/// Marks the observable as failed and binds the specified exception to the observable sequence.
///
/// The exception to bind to the observable sequence.
/// is null.
/// The observable has already completed.
public void SetException(Exception exception)
{
if (IsCompleted)
throw new InvalidOperationException();
_subject.OnErrorAsync(exception);
}
///
/// Subscribes the given observer to the observable sequence.
///
/// Observer that will receive notifications from the observable sequence.
/// Disposable object representing an observer's subscription to the observable sequence.
/// is null.
public ValueTask SubscribeAsync(IAsyncObserver observer)
{
if (_subject != null)
{
return _subject.SubscribeAsync(observer);
}
async ValueTask CoreAsync()
{
if (_exception != null)
{
await observer.OnErrorAsync(_exception).ConfigureAwait(false);
}
else
{
await observer.OnNextAsync(_result).ConfigureAwait(false);
}
return AsyncDisposable.Nop;
}
return CoreAsync();
}
///
/// Gets an awaiter that can be used to await the eventual completion of the observable sequence.
///
/// An awaiter that can be used to await the eventual completion of the observable sequence.
public AsyncAsyncSubject GetAwaiter() => _subject;
///
/// Gets a Boolean indicating whether the observable sequence has completed.
///
public bool IsCompleted => _subject?.IsCompleted ?? true;
///
/// Gets the result produced by the observable sequence.
///
/// The result produced by the observable sequence.
public T GetResult()
{
if (_subject != null)
{
return _subject.GetResult();
}
if (_exception != null)
{
ExceptionDispatchInfo.Capture(_exception).Throw();
}
return _result;
}
///
/// Attaches the specified to the observable sequence.
///
/// The continuation to attach.
public void OnCompleted(Action continuation)
{
if (_subject != null)
{
_subject.OnCompleted(continuation);
}
else
{
continuation();
}
}
}
}
}