// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
using System.Reactive.Concurrency;
using System.Reactive.Linq;
namespace System.Reactive.Subjects
{
    /// 
    /// Provides a set of static methods for creating subjects.
    /// 
    public static class Subject
    {
        /// 
        /// Creates a subject from the specified observer and observable.
        /// 
        /// The type of the elements received by the observer.
        /// The type of the elements produced by the observable sequence.
        /// The observer used to send messages to the subject.
        /// The observable used to subscribe to messages sent from the subject.
        /// Subject implemented using the given observer and observable.
        ///  or  is null.
        public static ISubject Create(IObserver observer, IObservable observable)
        {
            if (observer == null)
                throw new ArgumentNullException("observer");
            if (observable == null)
                throw new ArgumentNullException("observable");
            return new AnonymousSubject(observer, observable);
        }
        /// 
        /// Synchronizes the messages sent to the subject.
        /// 
        /// The type of the elements received by the subject.
        /// The type of the elements produced by the subject.
        /// The subject to synchronize.
        /// Subject whose messages are synchronized.
        ///  is null.
        public static ISubject Synchronize(ISubject subject)
        {
            if (subject == null)
                throw new ArgumentNullException("subject");
            return new AnonymousSubject(Observer.Synchronize(subject), subject);
        }
        /// 
        /// Synchronizes the messages sent to the subject and notifies observers on the specified scheduler.
        /// 
        /// The type of the elements received by the subject.
        /// The type of the elements produced by the subject.
        /// The subject to synchronize.
        /// Scheduler to notify observers on.
        /// Subject whose messages are synchronized and whose observers are notified on the given scheduler.
        ///  or  is null.
        public static ISubject Synchronize(ISubject subject, IScheduler scheduler)
        {
            if (subject == null)
                throw new ArgumentNullException("subject");
            if (scheduler == null)
                throw new ArgumentNullException("scheduler");
            return new AnonymousSubject(Observer.Synchronize(subject), subject.ObserveOn(scheduler));
        }
        class AnonymousSubject : ISubject
        {
            private readonly IObserver _observer;
            private readonly IObservable _observable;
            public AnonymousSubject(IObserver observer, IObservable observable)
            {
                _observer = observer;
                _observable = observable;
            }
            public void OnCompleted()
            {
                _observer.OnCompleted();
            }
            public void OnError(Exception error)
            {
                if (error == null)
                    throw new ArgumentNullException("error");
                _observer.OnError(error);
            }
            public void OnNext(T value)
            {
                _observer.OnNext(value);
            }
            public IDisposable Subscribe(IObserver observer)
            {
                if (observer == null)
                    throw new ArgumentNullException("observer");
                //
                // [OK] Use of unsafe Subscribe: non-pretentious wrapping of an observable sequence.
                //
                return _observable.Subscribe/*Unsafe*/(observer);
            }
        }
    }
}