// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
using System.Reactive.Disposables;
namespace System.Reactive.Subjects
{
    /// 
    /// Represents a value that changes over time.
    /// Observers can subscribe to the subject to receive the last (or initial) value and all subsequent notifications.
    /// 
    /// The type of the elements processed by the subject.
    public sealed class BehaviorSubject : ISubject, IDisposable
    {
        private readonly object _gate = new object();
        private ImmutableList> _observers;
        private bool _isStopped;
        private T _value;
        private Exception _exception;
        private bool _isDisposed;
        /// 
        /// Initializes a new instance of the  class which creates a subject that caches its last value and starts with the specified value.
        /// 
        /// Initial value sent to observers when no other value has been received by the subject yet.
        public BehaviorSubject(T value)
        {
            _value = value;
            _observers = new ImmutableList>();
        }
        /// 
        /// Indicates whether the subject has observers subscribed to it.
        /// 
        public bool HasObservers
        {
            get
            {
                var observers = _observers;
                return observers != null && observers.Data.Length > 0;
            }
        }
        /// 
        /// Notifies all subscribed observers about the end of the sequence.
        /// 
        public void OnCompleted()
        {
            var os = default(IObserver[]);
            lock (_gate)
            {
                CheckDisposed();
                if (!_isStopped)
                {
                    os = _observers.Data;
                    _observers = new ImmutableList>();
                    _isStopped = true;
                }
            }
            if (os != null)
            {
                foreach (var o in os)
                    o.OnCompleted();
            }
        }
        /// 
        /// Notifies all subscribed observers about the exception.
        /// 
        /// The exception to send to all observers.
        ///  is null.
        public void OnError(Exception error)
        {
            if (error == null)
                throw new ArgumentNullException("error");
            var os = default(IObserver[]);
            lock (_gate)
            {
                CheckDisposed();
                if (!_isStopped)
                {
                    os = _observers.Data;
                    _observers = new ImmutableList>();
                    _isStopped = true;
                    _exception = error;
                }
            }
            if (os != null)
                foreach (var o in os)
                    o.OnError(error);
        }
        /// 
        /// Notifies all subscribed observers about the arrival of the specified element in the sequence.
        /// 
        /// The value to send to all observers.
        public void OnNext(T value)
        {
            var os = default(IObserver[]);
            lock (_gate)
            {
                CheckDisposed();
                if (!_isStopped)
                {
                    _value = value;
                    os = _observers.Data;
                }
            }
            if (os != null)
            {
                foreach (var o in os)
                    o.OnNext(value);
            }
        }
        /// 
        /// Subscribes an observer to the subject.
        /// 
        /// Observer to subscribe to the subject.
        /// Disposable object that can be used to unsubscribe the observer from the subject.
        ///  is null.
        public IDisposable Subscribe(IObserver observer)
        {
            if (observer == null)
                throw new ArgumentNullException("observer");
            var ex = default(Exception);
            lock (_gate)
            {
                CheckDisposed();
                if (!_isStopped)
                {
                    _observers = _observers.Add(observer);
                    observer.OnNext(_value);
                    return new Subscription(this, observer);
                }
                ex = _exception;
            }
            if (ex != null)
                observer.OnError(ex);
            else
                observer.OnCompleted();
            return Disposable.Empty;
        }
        class Subscription : IDisposable
        {
            private readonly BehaviorSubject _subject;
            private IObserver _observer;
            public Subscription(BehaviorSubject subject, IObserver observer)
            {
                _subject = subject;
                _observer = observer;
            }
            public void Dispose()
            {
                if (_observer != null)
                {
                    lock (_subject._gate)
                    {
                        if (!_subject._isDisposed && _observer != null)
                        {
                            _subject._observers = _subject._observers.Remove(_observer);
                            _observer = null;
                        }
                    }
                }
            }
        }
        void CheckDisposed()
        {
            if (_isDisposed)
                throw new ObjectDisposedException(string.Empty);
        }
        /// 
        /// Unsubscribe all observers and release resources.
        /// 
        public void Dispose()
        {
            lock (_gate)
            {
                _isDisposed = true;
                _observers = null;
                _value = default(T);
                _exception = null;
            }
        }
    }
}