// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information. 
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Subjects;
using System.Runtime.ExceptionServices;
using System.Security;
using System.Threading.Tasks;
namespace System.Runtime.CompilerServices
{
    /// 
    /// Represents a builder for asynchronous methods that return a task-like .
    /// 
    /// The type of the elements in the sequence.
    public struct AsyncObservableMethodBuilder
    {
        /// 
        /// The compiler-generated asynchronous state machine representing the execution flow of the asynchronous
        /// method whose return type is a task-like .
        /// 
        private IAsyncStateMachine _stateMachine;
        /// 
        /// The underlying observable sequence representing the result produced by the asynchronous method.
        /// 
        private TaskObservable _inner;
        /// 
        /// Creates an instance of the  struct.
        /// 
        /// A new instance of the struct.
        public static AsyncObservableMethodBuilder Create() => default(AsyncObservableMethodBuilder);
        /// 
        /// Begins running the builder with the associated state machine.
        /// 
        /// The type of the state machine.
        /// The state machine instance, passed by reference.
        ///  is null.
        public void Start(ref TStateMachine stateMachine)
            where TStateMachine : IAsyncStateMachine
        {
            if (stateMachine == null)
                throw new ArgumentNullException(nameof(stateMachine));
            stateMachine.MoveNext();
        }
        /// 
        /// Associates the builder with the specified state machine.
        /// 
        /// The state machine instance to associate with the builder.
        ///  is null.
        /// The state machine was previously set.
        public void SetStateMachine(IAsyncStateMachine stateMachine)
        {
            if (stateMachine == null)
                throw new ArgumentNullException(nameof(stateMachine));
            if (_stateMachine != null)
                throw new InvalidOperationException();
            _stateMachine = stateMachine;
        }
        /// 
        /// Marks the observable as successfully completed.
        /// 
        /// The result to use to complete the observable sequence.
        /// The observable has already completed.
        public void SetResult(T result)
        {
            if (_inner == null)
            {
                _inner = new TaskObservable(result);
            }
            else
            {
                _inner.SetResult(result);
            }
        }
        /// 
        /// Marks the observable as failed and binds the specified exception to the observable sequence.
        /// 
        /// The exception to bind to the observable sequence.
        ///  is null.
        /// The observable has already completed.
        public void SetException(Exception exception)
        {
            if (exception == null)
                throw new ArgumentNullException(nameof(exception));
            if (_inner == null)
            {
                _inner = new TaskObservable(exception);
            }
            else
            {
                _inner.SetException(exception);
            }
        }
        /// 
        /// Gets the observable sequence for this builder.
        /// 
        public IAsyncObservable Task => _inner ?? (_inner = new TaskObservable());
        /// 
        /// Schedules the state machine to proceed to the next action when the specified awaiter completes.
        /// 
        /// The type of the awaiter.
        /// The type of the state machine.
        /// The awaiter.
        /// The state machine.
        public void AwaitOnCompleted(ref TAwaiter awaiter, ref TStateMachine stateMachine)
            where TAwaiter : INotifyCompletion
            where TStateMachine : IAsyncStateMachine
        {
            try
            {
                if (_stateMachine == null)
                {
                    var ignored = Task; // NB: Ensure we have the observable backed by an async subject ready.
                    _stateMachine = stateMachine;
                    _stateMachine.SetStateMachine(_stateMachine);
                }
                // NB: Rx has historically not bothered with execution contexts, so we don't do it here either.
                awaiter.OnCompleted(_stateMachine.MoveNext);
            }
            catch (Exception ex)
            {
                // NB: Prevent reentrancy into the async state machine when an exception would be observed
                //     by the caller. This could cause concurrent execution of the async method. Instead,
                //     rethrow the exception elsewhere.
                Rethrow(ex);
            }
        }
        /// 
        /// Schedules the state machine to proceed to the next action when the specified awaiter completes.
        /// 
        /// The type of the awaiter.
        /// The type of the state machine.
        /// The awaiter.
        /// The state machine.
        [SecuritySafeCritical]
        public void AwaitUnsafeOnCompleted(ref TAwaiter awaiter, ref TStateMachine stateMachine)
            where TAwaiter : ICriticalNotifyCompletion
            where TStateMachine : IAsyncStateMachine
        {
            try
            {
                if (_stateMachine == null)
                {
                    var ignored = Task; // NB: Ensure we have the observable backed by an async subject ready.
                    _stateMachine = stateMachine;
                    _stateMachine.SetStateMachine(_stateMachine);
                }
                // NB: Rx has historically not bothered with execution contexts, so we don't do it here either.
                awaiter.UnsafeOnCompleted(_stateMachine.MoveNext);
            }
            catch (Exception ex)
            {
                // NB: Prevent reentrancy into the async state machine when an exception would be observed
                //     by the caller. This could cause concurrent execution of the async method. Instead,
                //     rethrow the exception elsewhere.
                Rethrow(ex);
            }
        }
        /// 
        /// Rethrows an exception that was thrown from an awaiter's OnCompleted methods.
        /// 
        /// The exception to rethrow.
        private static void Rethrow(Exception exception)
        {
            TaskPoolAsyncScheduler.Default.ScheduleAsync(_ =>
            {
                ExceptionDispatchInfo.Capture(exception).Throw();
                return System.Threading.Tasks.Task.CompletedTask;
            });
        }
        /// 
        /// Implementation of the IObservable<T> interface compatible with async method return types.
        /// 
        /// 
        /// This class implements a "task-like" type that can be used as the return type of an asynchronous
        /// method in C# 7.0 and beyond. For example:
        /// 
        /// async Observable<int> RxAsync()
        /// {
        ///     var res = await Observable.Return(21).Delay(TimeSpan.FromSeconds(1));
        ///     return res * 2;
        /// }
        /// 
        /// 
        /// The type of the elements in the sequence.
        internal sealed class TaskObservable : IAsyncObservable, INotifyCompletion
        {
            /// 
            /// The underlying observable sequence to subscribe to in case the asynchronous method did not
            /// finish synchronously.
            /// 
            private readonly AsyncAsyncSubject _subject;
            /// 
            /// The result returned by the asynchronous method in case the method finished synchronously.
            /// 
            private readonly T _result;
            /// 
            /// The exception thrown by the asynchronous method in case the method finished synchronously.
            /// 
            private readonly Exception _exception;
            /// 
            /// Creates a new  for an asynchronous method that has not finished yet.
            /// 
            public TaskObservable()
            {
                _subject = new SequentialAsyncAsyncSubject();
            }
            /// 
            /// Creates a new  for an asynchronous method that synchronously returned
            /// the specified  value.
            /// 
            /// The result returned by the asynchronous method.
            public TaskObservable(T result)
            {
                _result = result;
            }
            /// 
            /// Creates a new  for an asynchronous method that synchronously threw
            /// the specified .
            /// 
            /// The exception thrown by the asynchronous method.
            public TaskObservable(Exception exception)
            {
                _exception = exception;
            }
            /// 
            /// Marks the observable as successfully completed.
            /// 
            /// The result to use to complete the observable sequence.
            /// The observable has already completed.
            public async void SetResult(T result)
            {
                if (IsCompleted)
                    throw new InvalidOperationException();
                // REVIEW: Async void method.
                await _subject.OnNextAsync(result).ConfigureAwait(false);
                await _subject.OnCompletedAsync().ConfigureAwait(false);
            }
            /// 
            /// Marks the observable as failed and binds the specified exception to the observable sequence.
            /// 
            /// The exception to bind to the observable sequence.
            ///  is null.
            /// The observable has already completed.
            public void SetException(Exception exception)
            {
                if (IsCompleted)
                    throw new InvalidOperationException();
                _subject.OnErrorAsync(exception);
            }
            /// 
            /// Subscribes the given observer to the observable sequence.
            /// 
            /// Observer that will receive notifications from the observable sequence.
            /// Disposable object representing an observer's subscription to the observable sequence.
            ///  is null.
            public Task SubscribeAsync(IAsyncObserver observer)
            {
                if (_subject != null)
                {
                    return _subject.SubscribeAsync(observer);
                }
                async Task CoreAsync()
                {
                    if (_exception != null)
                    {
                        await observer.OnErrorAsync(_exception).ConfigureAwait(false);
                    }
                    else
                    {
                        await observer.OnNextAsync(_result).ConfigureAwait(false);
                    }
                    return AsyncDisposable.Nop;
                }
                return CoreAsync();
            }
            /// 
            /// Gets an awaiter that can be used to await the eventual completion of the observable sequence.
            /// 
            /// An awaiter that can be used to await the eventual completion of the observable sequence.
            public AsyncAsyncSubject GetAwaiter() => _subject;
            /// 
            /// Gets a Boolean indicating whether the observable sequence has completed.
            /// 
            public bool IsCompleted => _subject?.IsCompleted ?? true;
            /// 
            /// Gets the result produced by the observable sequence.
            /// 
            /// The result produced by the observable sequence.
            public T GetResult()
            {
                if (_subject != null)
                {
                    return _subject.GetResult();
                }
                if (_exception != null)
                {
                    ExceptionDispatchInfo.Capture(_exception).Throw();
                }
                return _result;
            }
            /// 
            /// Attaches the specified  to the observable sequence.
            /// 
            /// The continuation to attach.
            public void OnCompleted(Action continuation)
            {
                if (_subject != null)
                {
                    _subject.OnCompleted(continuation);
                }
                else
                {
                    continuation();
                }
            }
        }
    }
}