// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
using System;
using System.ComponentModel;
using System.Reactive.Disposables;
using System.Threading;
namespace System.Reactive.Concurrency
{
    /// 
    /// Provides basic synchronization and scheduling services for observable sequences.
    /// 
    [EditorBrowsable(EditorBrowsableState.Advanced)]
    public static class Synchronization
    {
        #region SubscribeOn
        /// 
        /// Wraps the source sequence in order to run its subscription and unsubscription logic on the specified scheduler.
        /// 
        /// The type of the elements in the source sequence.
        /// Source sequence.
        /// Scheduler to perform subscription and unsubscription actions on.
        /// The source sequence whose subscriptions and unsubscriptions happen on the specified scheduler.
        ///  or  is null.
        /// 
        /// Only the side-effects of subscribing to the source sequence and disposing subscriptions to the source sequence are run on the specified scheduler.
        /// In order to invoke observer callbacks on the specified scheduler, e.g. to offload callback processing to a dedicated thread, use .
        /// 
        public static IObservable SubscribeOn(IObservable source, IScheduler scheduler)
        {
            if (source == null)
                throw new ArgumentNullException("source");
            if (scheduler == null)
                throw new ArgumentNullException("scheduler");
            return new AnonymousObservable(observer =>
            {
                var m = new SingleAssignmentDisposable();
                var d = new SerialDisposable();
                d.Disposable = m;
                m.Disposable = scheduler.Schedule(() =>
                {
                    d.Disposable = new ScheduledDisposable(scheduler, source.SubscribeSafe(observer));
                });
                return d;
            });
        }
#if !NO_SYNCCTX
        /// 
        /// Wraps the source sequence in order to run its subscription and unsubscription logic on the specified synchronization context.
        /// 
        /// The type of the elements in the source sequence.
        /// Source sequence.
        /// Synchronization context to perform subscription and unsubscription actions on.
        /// The source sequence whose subscriptions and unsubscriptions happen on the specified synchronization context.
        ///  or  is null.
        /// 
        /// Only the side-effects of subscribing to the source sequence and disposing subscriptions to the source sequence are run on the specified synchronization context.
        /// In order to invoke observer callbacks on the specified synchronization context, e.g. to post callbacks to a UI thread represented by the synchronization context, use .
        /// 
        public static IObservable SubscribeOn(IObservable source, SynchronizationContext context)
        {
            if (source == null)
                throw new ArgumentNullException("source");
            if (context == null)
                throw new ArgumentNullException("context");
            return new AnonymousObservable(observer =>
            {
                var subscription = new SingleAssignmentDisposable();
                context.PostWithStartComplete(() =>
                {
                    if (!subscription.IsDisposed)
                        subscription.Disposable = new ContextDisposable(context, source.SubscribeSafe(observer));
                });
                return subscription;
            });
        }
#endif
        #endregion
        #region ObserveOn
        /// 
        /// Wraps the source sequence in order to run its observer callbacks on the specified scheduler.
        /// 
        /// The type of the elements in the source sequence.
        /// Source sequence.
        /// Scheduler to notify observers on.
        /// The source sequence whose observations happen on the specified scheduler.
        ///  or  is null.
        public static IObservable ObserveOn(IObservable source, IScheduler scheduler)
        {
            if (source == null)
                throw new ArgumentNullException("source");
            if (scheduler == null)
                throw new ArgumentNullException("scheduler");
#if !NO_PERF
            return new ObserveOn(source, scheduler);
#else
            return new AnonymousObservable(observer => source.Subscribe(new ObserveOnObserver(scheduler, observer, null)));
#endif
        }
#if !NO_SYNCCTX
        /// 
        /// Wraps the source sequence in order to run its observer callbacks on the specified synchronization context.
        /// 
        /// The type of the elements in the source sequence.
        /// Source sequence.
        /// Synchronization context to notify observers on.
        /// The source sequence whose observations happen on the specified synchronization context.
        ///  or  is null.
        public static IObservable ObserveOn(IObservable source, SynchronizationContext context)
        {
            if (source == null)
                throw new ArgumentNullException("source");
            if (context == null)
                throw new ArgumentNullException("context");
#if !NO_PERF
            return new ObserveOn(source, context);
#else
            return new AnonymousObservable(observer =>
            {
                context.OperationStarted();
                return source.Subscribe(
                    x => context.Post(_ =>
                    {
                        observer.OnNext(x);
                    }, null),
                    exception => context.Post(_ =>
                    {
                        observer.OnError(exception);
                    }, null),
                    () => context.Post(_ =>
                    {
                        observer.OnCompleted();
                    }, null)
                ).Finally(() =>
                {
                    context.OperationCompleted();
                });
            });
#endif
        }
#endif
        #endregion
        #region Synchronize
        /// 
        /// Wraps the source sequence in order to ensure observer callbacks are properly serialized.
        /// 
        /// The type of the elements in the source sequence.
        /// Source sequence.
        /// The source sequence whose outgoing calls to observers are synchronized.
        ///  is null.
        public static IObservable Synchronize(IObservable source)
        {
            if (source == null)
                throw new ArgumentNullException("source");
#if !NO_PERF
            return new Synchronize(source);
#else
            return new AnonymousObservable(observer =>
            {
                var gate = new object();
                return source.Subscribe(Observer.Synchronize(observer, gate));
            });
#endif
        }
        /// 
        /// Wraps the source sequence in order to ensure observer callbacks are synchronized using the specified gate object.
        /// 
        /// The type of the elements in the source sequence.
        /// Source sequence.
        /// Gate object to synchronize each observer call on.
        /// The source sequence whose outgoing calls to observers are synchronized on the given gate object.
        ///  or  is null.
        public static IObservable Synchronize(IObservable source, object gate)
        {
            if (source == null)
                throw new ArgumentNullException("source");
            if (gate == null)
                throw new ArgumentNullException("gate");
#if !NO_PERF
            return new Synchronize(source, gate);
#else
            return new AnonymousObservable(observer =>
            {
                return source.Subscribe(Observer.Synchronize(observer, gate));
            });
#endif
        }
        #endregion
    }
}