// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
namespace System.Reactive
{
    /// 
    /// Abstract base class for implementations of the IObservable<T> interface.
    /// 
    /// 
    /// If you don't need a named type to create an observable sequence (i.e. you rather need
    /// an instance rather than a reusable type), use the Observable.Create method to create
    /// an observable sequence with specified subscription behavior.
    /// 
    /// The type of the elements in the sequence.
    public abstract class ObservableBase : IObservable
    {
        /// 
        /// Subscribes the given observer to the observable sequence.
        /// 
        /// Observer that will receive notifications from the observable sequence.
        /// Disposable object representing an observer's subscription to the observable sequence.
        ///  is null.
        public IDisposable Subscribe(IObserver observer)
        {
            if (observer == null)
                throw new ArgumentNullException("observer");
            var autoDetachObserver = new AutoDetachObserver(observer);
            
            if (CurrentThreadScheduler.IsScheduleRequired)
            {
                //
                // Notice we don't protect this piece of code using an exception handler to
                // redirect errors to the OnError channel. This call to Schedule will run the
                // trampoline, so we'd be catching all exceptions, including those from user
                // callbacks that happen to run there. For example, consider:
                //
                //    Observable.Return(42, Scheduler.CurrentThread)
                //              .Subscribe(x => { throw new Exception(); });
                //
                // Here, the OnNext(42) call would be scheduled on the trampoline, so when we
                // return from the scheduled Subscribe call, the CurrentThreadScheduler moves
                // on to invoking this work item. Too much of protection here would cause the
                // exception thrown in OnNext to circle back to OnError, which looks like the
                // sequence can't make up its mind.
                //
                CurrentThreadScheduler.Instance.Schedule(autoDetachObserver, ScheduledSubscribe);
            }
            else
            {
                try
                {
                    autoDetachObserver.Disposable = SubscribeCore(autoDetachObserver);
                }
                catch (Exception exception)
                {
                    //
                    // This can happen when there's a synchronous callback to OnError in the
                    // implementation of SubscribeCore, which also throws. So, we're seeing
                    // an exception being thrown from a handler.
                    //
                    // For compat with v1.x, we rethrow the exception in this case, keeping
                    // in mind this should be rare but if it happens, something's totally
                    // screwed up.
                    //
                    if (!autoDetachObserver.Fail(exception))
                        throw;
                }
            }
            return autoDetachObserver;
        }
        private IDisposable ScheduledSubscribe(IScheduler _, AutoDetachObserver autoDetachObserver)
        {
            try
            {
                autoDetachObserver.Disposable = SubscribeCore(autoDetachObserver);
            }
            catch (Exception exception)
            {
                //
                // This can happen when there's a synchronous callback to OnError in the
                // implementation of SubscribeCore, which also throws. So, we're seeing
                // an exception being thrown from a handler.
                //
                // For compat with v1.x, we rethrow the exception in this case, keeping
                // in mind this should be rare but if it happens, something's totally
                // screwed up.
                //
                if (!autoDetachObserver.Fail(exception))
                    throw;
            }
            return Disposable.Empty;
        }
        /// 
        /// Implement this method with the core subscription logic for the observable sequence.
        /// 
        /// Observer to send notifications to.
        /// Disposable object representing an observer's subscription to the observable sequence.
        protected abstract IDisposable SubscribeCore(IObserver observer);
    }
}