// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
using System.Collections.Generic;
using System.Reactive.Disposables;
using System.Threading;
#if NO_SEMAPHORE
using System.Reactive.Threading;
#endif
namespace System.Reactive.Concurrency
{
    /// 
    /// Represents an object that schedules units of work on a designated thread.
    /// 
    public sealed class EventLoopScheduler : LocalScheduler, ISchedulerPeriodic, IDisposable
    {
        #region Fields
        /// 
        /// Counter for diagnostic purposes, to name the threads.
        /// 
        private static int s_counter;
        /// 
        /// Thread factory function.
        /// 
        private readonly Func _threadFactory;
        /// 
        /// Stopwatch for timing free of absolute time dependencies.
        /// 
        private IStopwatch _stopwatch;
        /// 
        /// Thread used by the event loop to run work items on. No work should be run on any other thread.
        /// If ExitIfEmpty is set, the thread can quit and a new thread will be created when new work is scheduled.
        /// 
        private Thread _thread;
        /// 
        /// Gate to protect data structures, including the work queue and the ready list.
        /// 
        private readonly object _gate;
        /// 
        /// Semaphore to count requests to re-evaluate the queue, from either Schedule requests or when a timer
        /// expires and moves on to the next item in the queue.
        /// 
#if !NO_CDS
        private readonly SemaphoreSlim _evt;
#else
        private readonly Semaphore _evt;
#endif
        /// 
        /// Queue holding work items. Protected by the gate.
        /// 
        private readonly SchedulerQueue _queue;
        /// 
        /// Queue holding items that are ready to be run as soon as possible. Protected by the gate.
        /// 
        private readonly Queue> _readyList;
        /// 
        /// Work item that will be scheduled next. Used upon reevaluation of the queue to check whether the next
        /// item is still the same. If not, a new timer needs to be started (see below).
        /// 
        private ScheduledItem _nextItem;
        /// 
        /// Disposable that always holds the timer to dispatch the first element in the queue.
        /// 
        private readonly SerialDisposable _nextTimer;
        /// 
        /// Flag indicating whether the event loop should quit. When set, the event should be signaled as well to
        /// wake up the event loop thread, which will subsequently abandon all work.
        /// 
        private bool _disposed;
        #endregion
        #region Constructors
        /// 
        /// Creates an object that schedules units of work on a designated thread.
        /// 
        public EventLoopScheduler()
            : this(a => new Thread(a) { Name = "Event Loop " + Interlocked.Increment(ref s_counter), IsBackground = true })
        {
        }
#if !NO_THREAD
        /// 
        /// Creates an object that schedules units of work on a designated thread, using the specified factory to control thread creation options.
        /// 
        /// Factory function for thread creation.
        ///  is null.
        public EventLoopScheduler(Func threadFactory)
        {
            if (threadFactory == null)
                throw new ArgumentNullException("threadFactory");
#else
        internal EventLoopScheduler(Func threadFactory)
        {
#endif
            _threadFactory = threadFactory;
            _stopwatch = ConcurrencyAbstractionLayer.Current.StartStopwatch();
            _gate = new object();
#if !NO_CDS
            _evt = new SemaphoreSlim(0);
#else
            _evt = new Semaphore(0, int.MaxValue);
#endif
            _queue = new SchedulerQueue();
            _readyList = new Queue>();
            _nextTimer = new SerialDisposable();
            ExitIfEmpty = false;
        }
        #endregion
        #region Properties
        /// 
        /// Indicates whether the event loop thread is allowed to quit when no work is left. If new work
        /// is scheduled afterwards, a new event loop thread is created. This property is used by the
        /// NewThreadScheduler which uses an event loop for its recursive invocations.
        /// 
        internal bool ExitIfEmpty
        {
            get;
            set;
        }
        #endregion
        #region Public methods
        /// 
        /// Schedules an action to be executed after dueTime.
        /// 
        /// The type of the state passed to the scheduled action.
        /// State passed to the action to be executed.
        /// Action to be executed.
        /// Relative time after which to execute the action.
        /// The disposable object used to cancel the scheduled action (best effort).
        ///  is null.
        /// The scheduler has been disposed and doesn't accept new work.
        public override IDisposable Schedule(TState state, TimeSpan dueTime, Func action)
        {
            if (action == null)
                throw new ArgumentNullException("action");
            var due = _stopwatch.Elapsed + dueTime;
            var si = new ScheduledItem(this, state, action, due);
            lock (_gate)
            {
                if (_disposed)
                    throw new ObjectDisposedException("");
                if (dueTime <= TimeSpan.Zero)
                {
                    _readyList.Enqueue(si);
                    _evt.Release();
                }
                else
                {
                    _queue.Enqueue(si);
                    _evt.Release();
                }
                EnsureThread();
            }
            return Disposable.Create(si.Cancel);
        }
        /// 
        /// Schedules a periodic piece of work on the designated thread.
        /// 
        /// The type of the state passed to the scheduled action.
        /// Initial state passed to the action upon the first iteration.
        /// Period for running the work periodically.
        /// Action to be executed, potentially updating the state.
        /// The disposable object used to cancel the scheduled recurring action (best effort).
        ///  is null.
        ///  is less than TimeSpan.Zero.
        /// The scheduler has been disposed and doesn't accept new work.
        public IDisposable SchedulePeriodic(TState state, TimeSpan period, Func action)
        {
            if (period < TimeSpan.Zero)
                throw new ArgumentOutOfRangeException("period");
            if (action == null)
                throw new ArgumentNullException("action");
            var start = _stopwatch.Elapsed;
            var next = start + period;
            var state1 = state;
            var d = new MultipleAssignmentDisposable();
            var gate = new AsyncLock();
            var tick = default(Func);
            tick = (self_, _) =>
            {
                next += period;
                d.Disposable = self_.Schedule(null, next - _stopwatch.Elapsed, tick);
                gate.Wait(() =>
                {
                    state1 = action(state1);
                });
                return Disposable.Empty;
            };
            d.Disposable = Schedule(null, next - _stopwatch.Elapsed, tick);
            return new CompositeDisposable(d, gate);
        }
#if !NO_STOPWATCH
        /// 
        /// Starts a new stopwatch object.
        /// 
        /// New stopwatch object; started at the time of the request.
        public override IStopwatch StartStopwatch()
        {
            //
            // Strictly speaking, this explicit override is not necessary because the base implementation calls into
            // the enlightenment module to obtain the CAL, which would circle back to System.Reactive.PlatformServices
            // where we're currently running. This is merely a short-circuit to avoid the additional roundtrip.
            //
            return new StopwatchImpl();
        }
#endif
        /// 
        /// Ends the thread associated with this scheduler. All remaining work in the scheduler queue is abandoned.
        /// 
        public void Dispose()
        {
            lock (_gate)
            {
                if (!_disposed)
                {
                    _disposed = true;
                    _nextTimer.Dispose();
                    _evt.Release();
                }
            }
        }
        #endregion
        #region Private implementation
        /// 
        /// Ensures there is an event loop thread running. Should be called under the gate.
        /// 
        private void EnsureThread()
        {
            if (_thread == null)
            {
                _thread = _threadFactory(Run);
                _thread.Start();
            }
        }
        /// 
        /// Event loop scheduled on the designated event loop thread. The loop is suspended/resumed using the event
        /// which gets set by calls to Schedule, the next item timer, or calls to Dispose.
        /// 
        private void Run()
        {
            while (true)
            {
#if !NO_CDS
                _evt.Wait();
#else
                _evt.WaitOne();
#endif
                var ready = default(ScheduledItem[]);
                lock (_gate)
                {
                    //
                    // The event could have been set by a call to Dispose. This takes priority over anything else. We quit the
                    // loop immediately. Subsequent calls to Schedule won't ever create a new thread.
                    //
                    if (_disposed)
                    {
                        ((IDisposable)_evt).Dispose();
                        return;
                    }
                    while (_queue.Count > 0 && _queue.Peek().DueTime <= _stopwatch.Elapsed)
                    {
                        var item = _queue.Dequeue();
                        _readyList.Enqueue(item);
                    }
                    if (_queue.Count > 0)
                    {
                        var next = _queue.Peek();
                        if (next != _nextItem)
                        {
                            _nextItem = next;
                            var due = next.DueTime - _stopwatch.Elapsed;
                            _nextTimer.Disposable = ConcurrencyAbstractionLayer.Current.StartTimer(Tick, next, due);
                        }
                    }
                    if (_readyList.Count > 0)
                    {
                        ready = _readyList.ToArray();
                        _readyList.Clear();
                    }
                }
                if (ready != null)
                {
                    foreach (var item in ready)
                    {
                        if (!item.IsCanceled)
                            item.Invoke();
                    }
                }
                if (ExitIfEmpty)
                {
                    lock (_gate)
                    {
                        if (_readyList.Count == 0 && _queue.Count == 0)
                        {
                            _thread = null;
                            return;
                        }
                    }
                }
            }
        }
        private void Tick(object state)
        {
            lock (_gate)
            {
                if (!_disposed)
                {
                    var item = (ScheduledItem)state;
                    if (_queue.Remove(item))
                    {
                        _readyList.Enqueue(item);
                    }
                    _evt.Release();
                }
            }
        }
        #endregion
    }
}