// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information. 
using System.Collections.Generic;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Threading;
namespace System.Reactive.Subjects
{
    /// 
    /// Represents an object that is both an observable sequence as well as an observer.
    /// Each notification is broadcasted to all subscribed and future observers, subject to buffer trimming policies.
    /// 
    /// The type of the elements processed by the subject.
    public sealed class ReplaySubject : SubjectBase, IDisposable
    {
        #region Fields
        /// 
        /// Underlying optimized implementation of the replay subject.
        /// 
        private readonly SubjectBase _implementation;
        #endregion
        #region Constructors
        #region All
        /// 
        /// Initializes a new instance of the  class.
        /// 
        public ReplaySubject()
            : this(int.MaxValue)
        {
        }
        /// 
        /// Initializes a new instance of the  class with the specified scheduler.
        /// 
        /// Scheduler the observers are invoked on.
        ///  is null.
        public ReplaySubject(IScheduler scheduler)
        {
            _implementation = new ReplayByTime(scheduler);
        }
        #endregion
        #region Count
        /// 
        /// Initializes a new instance of the  class with the specified buffer size.
        /// 
        /// Maximum element count of the replay buffer.
        ///  is less than zero.
        public ReplaySubject(int bufferSize)
        {
            switch (bufferSize)
            {
                case 1:
                    _implementation = new ReplayOne();
                    break;
                case int.MaxValue:
                    _implementation = new ReplayAll();
                    break;
                default:
                    _implementation = new ReplayMany(bufferSize);
                    break;
            }
        }
        /// 
        /// Initializes a new instance of the  class with the specified buffer size and scheduler.
        /// 
        /// Maximum element count of the replay buffer.
        /// Scheduler the observers are invoked on.
        ///  is null.
        ///  is less than zero.
        public ReplaySubject(int bufferSize, IScheduler scheduler)
        {
            _implementation = new ReplayByTime(bufferSize, scheduler);
        }
        #endregion
        #region Time
        /// 
        /// Initializes a new instance of the  class with the specified window.
        /// 
        /// Maximum time length of the replay buffer.
        ///  is less than TimeSpan.Zero.
        public ReplaySubject(TimeSpan window)
        {
            _implementation = new ReplayByTime(window);
        }
        /// 
        /// Initializes a new instance of the  class with the specified window and scheduler.
        /// 
        /// Maximum time length of the replay buffer.
        /// Scheduler the observers are invoked on.
        ///  is null.
        ///  is less than TimeSpan.Zero.
        public ReplaySubject(TimeSpan window, IScheduler scheduler)
        {
            _implementation = new ReplayByTime(window, scheduler);
        }
        #endregion
        #region Count & Time
        /// 
        /// Initializes a new instance of the  class with the specified buffer size and window.
        /// 
        /// Maximum element count of the replay buffer.
        /// Maximum time length of the replay buffer.
        ///  is less than zero. -or-  is less than TimeSpan.Zero.
        public ReplaySubject(int bufferSize, TimeSpan window)
        {
            _implementation = new ReplayByTime(bufferSize, window);
        }
        /// 
        /// Initializes a new instance of the  class with the specified buffer size, window and scheduler.
        /// 
        /// Maximum element count of the replay buffer.
        /// Maximum time length of the replay buffer.
        /// Scheduler the observers are invoked on.
        ///  is less than zero. -or-  is less than TimeSpan.Zero.
        ///  is null.
        public ReplaySubject(int bufferSize, TimeSpan window, IScheduler scheduler)
        {
            _implementation = new ReplayByTime(bufferSize, window, scheduler);
        }
        #endregion
        #endregion
        #region Properties
        /// 
        /// Indicates whether the subject has observers subscribed to it.
        /// 
        public override bool HasObservers
        {
            get { return _implementation.HasObservers; }
        }
        /// 
        /// Indicates whether the subject has been disposed.
        /// 
        public override bool IsDisposed
        {
            get { return _implementation.IsDisposed; }
        }
        #endregion
        #region Methods
        #region IObserver implementation
        /// 
        /// Notifies all subscribed and future observers about the arrival of the specified element in the sequence.
        /// 
        /// The value to send to all observers.
        public override void OnNext(T value)
        {
            _implementation.OnNext(value);
        }
        /// 
        /// Notifies all subscribed and future observers about the specified exception.
        /// 
        /// The exception to send to all observers.
        ///  is null.
        public override void OnError(Exception error)
        {
            if (error == null)
                throw new ArgumentNullException(nameof(error));
            _implementation.OnError(error);
        }
        /// 
        /// Notifies all subscribed and future observers about the end of the sequence.
        /// 
        public override void OnCompleted()
        {
            _implementation.OnCompleted();
        }
        #endregion
        #region IObservable implementation
        /// 
        /// Subscribes an observer to the subject.
        /// 
        /// Observer to subscribe to the subject.
        /// Disposable object that can be used to unsubscribe the observer from the subject.
        ///  is null.
        public override IDisposable Subscribe(IObserver observer)
        {
            if (observer == null)
                throw new ArgumentNullException(nameof(observer));
            return _implementation.Subscribe(observer);
        }
        #endregion
        #region IDisposable implementation
        /// 
        /// Releases all resources used by the current instance of the  class and unsubscribe all observers.
        /// 
        public override void Dispose()
        {
            _implementation.Dispose();
        }
        #endregion
        #endregion
        private abstract class ReplayBase : SubjectBase
        {
            private readonly object _gate = new object();
            private ImmutableList> _observers;
            private bool _isStopped;
            private Exception _error;
            private bool _isDisposed;
            public ReplayBase()
            {
                _observers = ImmutableList>.Empty;
                _isStopped = false;
                _error = null;
            }
            public override bool HasObservers
            {
                get
                {
                    var observers = _observers;
                    return observers != null && observers.Data.Length > 0;
                }
            }
            public override bool IsDisposed
            {
                get
                {
                    lock (_gate)
                    {
                        return _isDisposed;
                    }
                }
            }
            public override void OnNext(T value)
            {
                var o = default(IScheduledObserver[]);
                lock (_gate)
                {
                    CheckDisposed();
                    if (!_isStopped)
                    {
                        Next(value);
                        Trim();
                        o = _observers.Data;
                        foreach (var observer in o)
                            observer.OnNext(value);
                    }
                }
                if (o != null)
                {
                    foreach (var observer in o)
                        observer.EnsureActive();
                }
            }
            public override void OnError(Exception error)
            {
                var o = default(IScheduledObserver[]);
                lock (_gate)
                {
                    CheckDisposed();
                    if (!_isStopped)
                    {
                        _isStopped = true;
                        _error = error;
                        Trim();
                        o = _observers.Data;
                        foreach (var observer in o)
                            observer.OnError(error);
                        _observers = ImmutableList>.Empty;
                    }
                }
                if (o != null)
                {
                    foreach (var observer in o)
                        observer.EnsureActive();
                }
            }
            public override void OnCompleted()
            {
                var o = default(IScheduledObserver[]);
                lock (_gate)
                {
                    CheckDisposed();
                    if (!_isStopped)
                    {
                        _isStopped = true;
                        Trim();
                        o = _observers.Data;
                        foreach (var observer in o)
                            observer.OnCompleted();
                        _observers = ImmutableList>.Empty;
                    }
                }
                if (o != null)
                {
                    foreach (var observer in o)
                        observer.EnsureActive();
                }
            }
            public override IDisposable Subscribe(IObserver observer)
            {
                var so = CreateScheduledObserver(observer);
                var n = 0;
                var subscription = Disposable.Empty;
                lock (_gate)
                {
                    CheckDisposed();
                    //
                    // Notice the v1.x behavior of always calling Trim is preserved here.
                    //
                    // This may be subject (pun intended) of debate: should this policy
                    // only be applied while the sequence is active? With the current
                    // behavior, a sequence will "die out" after it has terminated by
                    // continuing to drop OnNext notifications from the queue.
                    //
                    // In v1.x, this behavior was due to trimming based on the clock value
                    // returned by scheduler.Now, applied to all but the terminal message
                    // in the queue. Using the IStopwatch has the same effect. Either way,
                    // we guarantee the final notification will be observed, but there's
                    // no way to retain the buffer directly. One approach is to use the
                    // time-based TakeLast operator and apply an unbounded ReplaySubject
                    // to it.
                    //
                    // To conclude, we're keeping the behavior as-is for compatibility
                    // reasons with v1.x.
                    //
                    Trim();
                    n = Replay(so);
                    if (_error != null)
                    {
                        n++;
                        so.OnError(_error);
                    }
                    else if (_isStopped)
                    {
                        n++;
                        so.OnCompleted();
                    }
                    if (!_isStopped)
                    {
                        subscription = new Subscription(this, so);
                        _observers = _observers.Add(so);
                    }
                }
                so.EnsureActive(n);
                return subscription;
            }
            public override void Dispose()
            {
                lock (_gate)
                {
                    _isDisposed = true;
                    _observers = null;
                    DisposeCore();
                }
            }
            protected abstract void DisposeCore();
            protected abstract void Next(T value);
            protected abstract int Replay(IObserver observer);
            protected abstract void Trim();
            protected abstract IScheduledObserver CreateScheduledObserver(IObserver observer);
            private void CheckDisposed()
            {
                if (_isDisposed)
                    throw new ObjectDisposedException(string.Empty);
            }
            private void Unsubscribe(IScheduledObserver observer)
            {
                lock (_gate)
                {
                    if (!_isDisposed)
                    {
                        _observers = _observers.Remove(observer);
                    }
                }
            }
#if NOTYET // TODO: Expose internal notifications similar to BehaviorSubject.TryGetValue?
            public bool TryGetNotifications(out IList> notifications)
            {
                lock (_gate)
                {
                    if (_isDisposed)
                    {
                        notifications = null;
                        return false;
                    }
                    else
                    {
                        var res = new List>();
                        var materializer = Observer.Create(
                            x => res.Add(Notification.CreateOnNext(x)),
                            ex => res.Add(Notification.CreateOnError(ex)),
                            () => res.Add(Notification.CreateOnCompleted())
                        );
                        Replay(materializer);
                        if (_error != null)
                        {
                            materializer.OnError(_error);
                        }
                        else if (_isStopped)
                        {
                            materializer.OnCompleted();
                        }
                        notifications = res;
                        return true;
                    }
                }
            }
#endif
            private sealed class Subscription : IDisposable
            {
                private readonly ReplayBase _subject;
                private readonly IScheduledObserver _observer;
                public Subscription(ReplayBase subject, IScheduledObserver observer)
                {
                    _subject = subject;
                    _observer = observer;
                }
                public void Dispose()
                {
                    _observer.Dispose();
                    _subject.Unsubscribe(_observer);
                }
            }
        }
        /// 
        /// Original implementation of the ReplaySubject with time based operations (Scheduling, Stopwatch, buffer-by-time).
        /// 
        private sealed class ReplayByTime : ReplayBase
        {
            private const int InfiniteBufferSize = int.MaxValue;
            private readonly int _bufferSize;
            private readonly TimeSpan _window;
            private readonly IScheduler _scheduler;
            private readonly IStopwatch _stopwatch;
            private readonly Queue> _queue;
            public ReplayByTime(int bufferSize, TimeSpan window, IScheduler scheduler)
            {
                if (bufferSize < 0)
                    throw new ArgumentOutOfRangeException(nameof(bufferSize));
                if (window < TimeSpan.Zero)
                    throw new ArgumentOutOfRangeException(nameof(window));
                if (scheduler == null)
                    throw new ArgumentNullException(nameof(scheduler));
                _bufferSize = bufferSize;
                _window = window;
                _scheduler = scheduler;
                _stopwatch = _scheduler.StartStopwatch();
                _queue = new Queue>();
            }
            public ReplayByTime(int bufferSize, TimeSpan window)
                : this(bufferSize, window, SchedulerDefaults.Iteration)
            {
            }
            public ReplayByTime(IScheduler scheduler)
                : this(InfiniteBufferSize, TimeSpan.MaxValue, scheduler)
            {
            }
            public ReplayByTime(int bufferSize, IScheduler scheduler)
                : this(bufferSize, TimeSpan.MaxValue, scheduler)
            {
            }
            public ReplayByTime(TimeSpan window, IScheduler scheduler)
                : this(InfiniteBufferSize, window, scheduler)
            {
            }
            public ReplayByTime(TimeSpan window)
                : this(InfiniteBufferSize, window, SchedulerDefaults.Iteration)
            {
            }
            protected override IScheduledObserver CreateScheduledObserver(IObserver observer)
            {
                return new ScheduledObserver(_scheduler, observer);
            }
            protected override void DisposeCore()
            {
                _queue.Clear();
            }
            protected override void Next(T value)
            {
                var now = _stopwatch.Elapsed;
                _queue.Enqueue(new TimeInterval(value, now));
            }
            protected override int Replay(IObserver observer)
            {
                var n = _queue.Count;
                foreach (var item in _queue)
                    observer.OnNext(item.Value);
                return n;
            }
            protected override void Trim()
            {
                var now = _stopwatch.Elapsed;
                while (_queue.Count > _bufferSize)
                    _queue.Dequeue();
                while (_queue.Count > 0 && now.Subtract(_queue.Peek().Interval).CompareTo(_window) > 0)
                    _queue.Dequeue();
            }
        }
        //
        // Below are the non-time based implementations. 
        // These removed the need for the scheduler indirection, SchedulerObservers, stopwatch, TimeInterval and ensuring the scheduled observers are active after each action.
        // The ReplayOne implementation also removes the need to even have a queue.
        //
        private sealed class ReplayOne : ReplayBufferBase
        {
            private bool _hasValue;
            private T _value;
            protected override void Trim()
            {
                //
                // No need to trim.
                //
            }
            protected override void Next(T value)
            {
                _hasValue = true;
                _value = value;
            }
            protected override int Replay(IObserver observer)
            {
                var n = 0;
                if (_hasValue)
                {
                    n = 1;
                    observer.OnNext(_value);
                }
                return n;
            }
            protected override void DisposeCore()
            {
                _value = default(T);
            }
        }
        private sealed class ReplayMany : ReplayManyBase
        {
            private readonly int _bufferSize;
            public ReplayMany(int bufferSize)
                : base(bufferSize)
            {
                _bufferSize = bufferSize;
            }
            protected override void Trim()
            {
                while (_queue.Count > _bufferSize)
                    _queue.Dequeue();
            }
        }
        private sealed class ReplayAll : ReplayManyBase
        {
            public ReplayAll()
                : base(0)
            {
            }
            protected override void Trim()
            {
                //
                // Don't trim, keep all values.
                //
            }
        }
        private abstract class ReplayBufferBase : ReplayBase
        {
            protected override IScheduledObserver CreateScheduledObserver(IObserver observer)
            {
                return new FastImmediateObserver(observer);
            }
            protected override void DisposeCore()
            {
            }
        }
        private abstract class ReplayManyBase : ReplayBufferBase
        {
            protected readonly Queue _queue;
            protected ReplayManyBase(int queueSize)
                : base()
            {
                _queue = new Queue(Math.Min(queueSize, 64));
            }
            protected override void Next(T value)
            {
                _queue.Enqueue(value);
            }
            protected override int Replay(IObserver observer)
            {
                var n = _queue.Count;
                foreach (var item in _queue)
                    observer.OnNext(item);
                return n;
            }
            protected override void DisposeCore()
            {
                _queue.Clear();
            }
        }
    }
    /// 
    /// Specialized scheduled observer similar to a scheduled observer for the immediate scheduler.
    /// 
    /// Type of the elements processed by the observer.
    class FastImmediateObserver : IScheduledObserver
    {
        /// 
        /// Gate to control ownership transfer and protect data structures.
        /// 
        private readonly object _gate = new object();
        /// 
        /// Observer to forward notifications to.
        /// 
        private volatile IObserver _observer;
        /// 
        /// Queue to enqueue OnNext notifications into.
        /// 
        private Queue _queue = new Queue();
        /// 
        /// Standby queue to swap out for _queue when transferring ownership. This allows to reuse
        /// queues in case of busy subjects where the initial replay doesn't suffice to catch up.
        /// 
        private Queue _queue2;
        /// 
        /// Exception passed to an OnError notification, if any.
        /// 
        private Exception _error;
        /// 
        /// Indicates whether an OnCompleted notification was received.
        /// 
        private bool _done;
        /// 
        /// Indicates whether the observer is busy, i.e. some thread is actively draining the
        /// notifications that were queued up.
        /// 
        private bool _busy;
        /// 
        /// Indicates whether a failure occurred when the owner was draining the queue. This will
        /// prevent future work to be processed.
        /// 
        private bool _hasFaulted;
        /// 
        /// Creates a new scheduled observer that proxies to the specified observer.
        /// 
        /// Observer to forward notifications to.
        public FastImmediateObserver(IObserver observer)
        {
            _observer = observer;
        }
        /// 
        /// Disposes the observer.
        /// 
        public void Dispose()
        {
            Done();
        }
        /// 
        /// Notifies the observer of pending work. This will either cause the current owner to
        /// process the newly enqueued notifications, or it will cause the calling thread to
        /// become the owner and start processing the notification queue.
        /// 
        public void EnsureActive()
        {
            EnsureActive(1);
        }
        /// 
        /// Notifies the observer of pending work. This will either cause the current owner to
        /// process the newly enqueued notifications, or it will cause the calling thread to
        /// become the owner and start processing the notification queue.
        /// 
        /// The number of enqueued notifications to process (ignored).
        public void EnsureActive(int count)
        {
            var isOwner = false;
            lock (_gate)
            {
                //
                // If we failed to process work in the past, we'll simply drop it.
                //
                if (!_hasFaulted)
                {
                    //
                    // If no-one is processing the notification queue, become the owner.
                    //
                    if (!_busy)
                    {
                        isOwner = true;
                        _busy = true;
                    }
                }
            }
            if (isOwner)
            {
                while (true)
                {
                    var queue = default(Queue);
                    var error = default(Exception);
                    var done = false;
                    //
                    // Steal notifications from the producer side to drain them to the observer.
                    //
                    lock (_gate)
                    {
                        //
                        // Do we have any OnNext notifications to process?
                        //
                        if (_queue.Count > 0)
                        {
                            if (_queue2 == null)
                            {
                                _queue2 = new Queue();
                            }
                            //
                            // Swap out the current queue for a fresh or recycled one. The standby
                            // queue is set to null; when notifications are sent out the processed
                            // queue will become the new standby.
                            //
                            queue = _queue;
                            _queue = _queue2;
                            _queue2 = null;
                        }
                        //
                        // Do we have any terminal notifications to process?
                        //
                        if (_error != null)
                        {
                            error = _error;
                        }
                        else if (_done)
                        {
                            done = true;
                        }
                        else if (queue == null)
                        {
                            //
                            // No work left; quit the loop and let another thread become the
                            // owner in the future.
                            //
                            _busy = false;
                            break;
                        }
                    }
                    try
                    {
                        //
                        // Process OnNext notifications, if any.
                        //
                        if (queue != null)
                        {
                            //
                            // Drain the stolen OnNext notification queue.
                            //
                            while (queue.Count > 0)
                            {
                                _observer.OnNext(queue.Dequeue());
                            }
                            //
                            // The queue is now empty, so we can reuse it by making it the standby
                            // queue for a future swap.
                            //
                            lock (_gate)
                            {
                                _queue2 = queue;
                            }
                        }
                        //
                        // Process terminal notifications, if any. Notice we don't release ownership
                        // after processing these notifications; we simply quit from the loop. This
                        // will cause all processing of the scheduler observer to cease.
                        //
                        if (error != null)
                        {
                            var observer = Done();
                            observer.OnError(error);
                            break;
                        }
                        else if (done)
                        {
                            var observer = Done();
                            observer.OnCompleted();
                            break;
                        }
                    }
                    catch
                    {
                        lock (_gate)
                        {
                            _hasFaulted = true;
                            _queue.Clear();
                        }
                        throw;
                    }
                }
            }
        }
        /// 
        /// Enqueues an OnCompleted notification.
        /// 
        public void OnCompleted()
        {
            lock (_gate)
            {
                if (!_hasFaulted)
                {
                    _done = true;
                }
            }
        }
        /// 
        /// Enqueues an OnError notification.
        /// 
        /// Error of the notification.
        public void OnError(Exception error)
        {
            lock (_gate)
            {
                if (!_hasFaulted)
                {
                    _error = error;
                }
            }
        }
        /// 
        /// Enqueues an OnNext notification.
        /// 
        /// Value of the notification.
        public void OnNext(T value)
        {
            lock (_gate)
            {
                if (!_hasFaulted)
                {
                    _queue.Enqueue(value);
                }
            }
        }
        /// 
        /// Terminates the observer upon receiving terminal notifications, thus preventing
        /// future notifications to go out.
        /// 
        /// Observer to send terminal notifications to.
        private IObserver Done()
        {
#pragma warning disable 0420
            return Interlocked.Exchange(ref _observer, NopObserver.Instance);
#pragma warning restore 0420
        }
    }
}