// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
#if !NO_PERF
using System.Reactive.Disposables;
using System.Threading;
namespace System.Reactive.Subjects
{
    /// 
    /// Represents an object that is both an observable sequence as well as an observer.
    /// Each notification is broadcasted to all subscribed observers.
    /// 
    /// The type of the elements processed by the subject.
    public sealed class Subject : SubjectBase, IDisposable
    {
        #region Fields
        private volatile IObserver _observer;
        #endregion
        #region Constructors
        /// 
        /// Creates a subject.
        /// 
        public Subject()
        {
            _observer = NopObserver.Instance;
        }
        #endregion
        #region Properties
        /// 
        /// Indicates whether the subject has observers subscribed to it.
        /// 
        public override bool HasObservers
        {
            get
            {
                return _observer != NopObserver.Instance && !(_observer is DoneObserver) && _observer != DisposedObserver.Instance;
            }
        }
        /// 
        /// Indicates whether the subject has been disposed.
        /// 
        public override bool IsDisposed
        {
            get
            {
                return _observer is DisposedObserver;
            }
        }
        #endregion
        #region Methods
        #region IObserver implementation
        /// 
        /// Notifies all subscribed observers about the end of the sequence.
        /// 
        public override void OnCompleted()
        {
            var oldObserver = default(IObserver);
            var newObserver = DoneObserver.Completed;
            do
            {
                oldObserver = _observer;
                if (oldObserver == DisposedObserver.Instance || oldObserver is DoneObserver)
                    break;
#pragma warning disable 0420
            } while (Interlocked.CompareExchange(ref _observer, newObserver, oldObserver) != oldObserver);
#pragma warning restore 0420
            oldObserver.OnCompleted();
        }
        /// 
        /// Notifies all subscribed observers about the specified exception.
        /// 
        /// The exception to send to all currently subscribed observers.
        ///  is null.
        public override void OnError(Exception error)
        {
            if (error == null)
                throw new ArgumentNullException("error");
            var oldObserver = default(IObserver);
            var newObserver = new DoneObserver { Exception = error };
            do
            {
                oldObserver = _observer;
                if (oldObserver == DisposedObserver.Instance || oldObserver is DoneObserver)
                    break;
#pragma warning disable 0420
            } while (Interlocked.CompareExchange(ref _observer, newObserver, oldObserver) != oldObserver);
#pragma warning restore 0420
            oldObserver.OnError(error);
        }
        /// 
        /// Notifies all subscribed observers about the arrival of the specified element in the sequence.
        /// 
        /// The value to send to all currently subscribed observers.
        public override void OnNext(T value)
        {
            _observer.OnNext(value);
        }
        #endregion
        #region IObservable implementation
        /// 
        /// Subscribes an observer to the subject.
        /// 
        /// Observer to subscribe to the subject.
        /// Disposable object that can be used to unsubscribe the observer from the subject.
        ///  is null.
        public override IDisposable Subscribe(IObserver observer)
        {
            if (observer == null)
                throw new ArgumentNullException("observer");
            var oldObserver = default(IObserver);
            var newObserver = default(IObserver);
            do
            {
                oldObserver = _observer;
                if (oldObserver == DisposedObserver.Instance)
                {
                    throw new ObjectDisposedException("");
                }
                if (oldObserver == DoneObserver.Completed)
                {
                    observer.OnCompleted();
                    return Disposable.Empty;
                }
                var done = oldObserver as DoneObserver;
                if (done != null)
                {
                    observer.OnError(done.Exception);
                    return Disposable.Empty;
                }
                if (oldObserver == NopObserver.Instance)
                {
                    newObserver = observer;
                }
                else
                {
                    var obs = oldObserver as Observer;
                    if (obs != null)
                    {
                        newObserver = obs.Add(observer);
                    }
                    else
                    {
                        newObserver = new Observer(new ImmutableList>(new[] { oldObserver, observer }));
                    }
                }
#pragma warning disable 0420
            } while (Interlocked.CompareExchange(ref _observer, newObserver, oldObserver) != oldObserver);
#pragma warning restore 0420
            return new Subscription(this, observer);
        }
        class Subscription : IDisposable
        {
            private Subject _subject;
            private IObserver _observer;
            public Subscription(Subject subject, IObserver observer)
            {
                _subject = subject;
                _observer = observer;
            }
            public void Dispose()
            {
                var observer = Interlocked.Exchange(ref _observer, null);
                if (observer == null)
                    return;
                _subject.Unsubscribe(observer);
                _subject = null;
            }
        }
        private void Unsubscribe(IObserver observer)
        {
            var oldObserver = default(IObserver);
            var newObserver = default(IObserver);
            do
            {
                oldObserver = _observer;
                if (oldObserver == DisposedObserver.Instance || oldObserver is DoneObserver)
                    return;
                var obs = oldObserver as Observer;
                if (obs != null)
                {
                    newObserver = obs.Remove(observer);
                }
                else
                {
                    if (oldObserver != observer)
                        return;
                    newObserver = NopObserver.Instance;
                }
#pragma warning disable 0420
            } while (Interlocked.CompareExchange(ref _observer, newObserver, oldObserver) != oldObserver);
#pragma warning restore 0420
        }
        #endregion
        #region IDisposable implementation
        /// 
        /// Releases all resources used by the current instance of the  class and unsubscribes all observers.
        /// 
        public override void Dispose()
        {
            _observer = DisposedObserver.Instance;
        }
        #endregion
        #endregion
    }
}
#else
using System.Reactive.Disposables;
using System.Threading;
namespace System.Reactive.Subjects
{
    /// 
    /// Represents an object that is both an observable sequence as well as an observer.
    /// Each notification is broadcasted to all subscribed observers.
    /// 
    /// The type of the elements processed by the subject.
    public sealed class Subject : ISubject, IDisposable
    {
        bool isDisposed;
        bool isStopped;
        ImmutableList> observers;
        object gate = new object();
        Exception exception;
        /// 
        /// Creates a subject.
        /// 
        public Subject()
        {
            observers = new ImmutableList>();
        }
        /// 
        /// Notifies all subscribed observers about the end of the sequence.
        /// 
        public void OnCompleted()
        {
            var os = default(IObserver[]);
            lock (gate)
            {
                CheckDisposed();
                if (!isStopped)
                {
                    os = observers.Data;
                    observers = new ImmutableList>();
                    isStopped = true;
                }
            }
            if (os != null)
                foreach (var o in os)
                    o.OnCompleted();
        }
        /// 
        /// Notifies all subscribed observers with the exception.
        /// 
        /// The exception to send to all subscribed observers.
        ///  is null.
        public void OnError(Exception error)
        {
            if (error == null)
                throw new ArgumentNullException("error");
            var os = default(IObserver[]);
            lock (gate)
            {
                CheckDisposed();
                if (!isStopped)
                {
                    os = observers.Data;
                    observers = new ImmutableList>();
                    isStopped = true;
                    exception = error;
                }
            }
            if (os != null)
                foreach (var o in os)
                    o.OnError(error);
        }
        /// 
        /// Notifies all subscribed observers with the value.
        /// 
        /// The value to send to all subscribed observers.
        public void OnNext(T value)
        {
            var os = default(IObserver[]);
            lock (gate)
            {
                CheckDisposed();
                if (!isStopped)
                {
                    os = observers.Data;
                }
            }
            if (os != null)
                foreach (var o in os)
                    o.OnNext(value);
        }
        /// 
        /// Subscribes an observer to the subject.
        /// 
        /// Observer to subscribe to the subject.
        /// IDisposable object that can be used to unsubscribe the observer from the subject.
        ///  is null.
        public IDisposable Subscribe(IObserver observer)
        {
            if (observer == null)
                throw new ArgumentNullException("observer");
            lock (gate)
            {
                CheckDisposed();
                if (!isStopped)
                {
                    observers = observers.Add(observer);
                    return new Subscription(this, observer);
                }
                else if (exception != null)
                {
                    observer.OnError(exception);
                    return Disposable.Empty;
                }
                else
                {
                    observer.OnCompleted();
                    return Disposable.Empty;
                }
            }
        }
        void Unsubscribe(IObserver observer)
        {
            lock (gate)
            {
                if (observers != null)
                    observers = observers.Remove(observer);
            }
        }
        class Subscription : IDisposable
        {
            Subject subject;
            IObserver observer;
            public Subscription(Subject subject, IObserver observer)
            {
                this.subject = subject;
                this.observer = observer;
            }
            public void Dispose()
            {
                var o = Interlocked.Exchange>(ref observer, null);
                if (o != null)
                {
                    subject.Unsubscribe(o);
                    subject = null;
                }
            }
        }
        void CheckDisposed()
        {
            if (isDisposed)
                throw new ObjectDisposedException(string.Empty);
        }
        /// 
        /// Unsubscribe all observers and release resources.
        /// 
        public void Dispose()
        {
            lock (gate)
            {
                isDisposed = true;
                observers = null;
            }
        }
    }
}
#endif