// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
using System.Reactive.Disposables;
using System.Reactive.Linq;
namespace System.Reactive.Subjects
{
    /// 
    /// Represents an observable wrapper that can be connected and disconnected from its underlying observable sequence.
    /// 
    /// The type of the elements in the source sequence.
    /// The type of the elements in the resulting sequence, after transformation through the subject.
    internal class ConnectableObservable : IConnectableObservable
    {
        private readonly ISubject _subject;
        private readonly IObservable _source;
        private readonly object _gate;
        private Connection _connection;
        /// 
        /// Creates an observable that can be connected and disconnected from its source.
        /// 
        /// Underlying observable source sequence that can be connected and disconnected from the wrapper.
        /// Subject exposed by the connectable observable, receiving data from the underlying source sequence upon connection.
        public ConnectableObservable(IObservable source, ISubject subject)
        {
            _subject = subject;
            _source = source.AsObservable(); // This gets us auto-detach behavior; otherwise, we'd have to roll our own, including trampoline installation.
            _gate = new object();
        }
        /// 
        /// Connects the observable wrapper to its source. All subscribed observers will receive values from the underlying observable sequence as long as the connection is established.
        /// 
        /// Disposable object used to disconnect the observable wrapper from its source, causing subscribed observer to stop receiving values from the underlying observable sequence.
        public IDisposable Connect()
        {
            lock (_gate)
            {
                if (_connection == null)
                {
                    var subscription = _source.SubscribeSafe(_subject);
                    _connection = new Connection(this, subscription);
                }
                return _connection;
            }
        }
        class Connection : IDisposable
        {
            private readonly ConnectableObservable _parent;
            private IDisposable _subscription;
            public Connection(ConnectableObservable parent, IDisposable subscription)
            {
                _parent = parent;
                _subscription = subscription;
            }
            public void Dispose()
            {
                lock (_parent._gate)
                {
                    if (_subscription != null)
                    {
                        _subscription.Dispose();
                        _subscription = null;
                        _parent._connection = null;
                    }
                }
            }
        }
        /// 
        /// Subscribes an observer to the observable sequence. No values from the underlying observable source will be received unless a connection was established through the Connect method.
        /// 
        /// Observer that will receive values from the underlying observable source when the current ConnectableObservable instance is connected through a call to Connect.
        /// Disposable used to unsubscribe from the observable sequence.
        public IDisposable Subscribe(IObserver observer)
        {
            if (observer == null)
                throw new ArgumentNullException("observer");
            return _subject.SubscribeSafe(observer);
        }
    }
}