// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.
using System.Collections.Generic;
using System.Reactive.Disposables;
using System.Threading;
#if NO_SEMAPHORE
using System.Reactive.Threading;
#endif
namespace System.Reactive.Concurrency
{
///
/// Represents an object that schedules units of work on a designated thread.
///
public sealed class EventLoopScheduler : LocalScheduler, ISchedulerPeriodic, IDisposable
{
#region Fields
///
/// Counter for diagnostic purposes, to name the threads.
///
private static int s_counter;
///
/// Thread factory function.
///
private readonly Func _threadFactory;
///
/// Stopwatch for timing free of absolute time dependencies.
///
private IStopwatch _stopwatch;
///
/// Thread used by the event loop to run work items on. No work should be run on any other thread.
/// If ExitIfEmpty is set, the thread can quit and a new thread will be created when new work is scheduled.
///
private Thread _thread;
///
/// Gate to protect data structures, including the work queue and the ready list.
///
private readonly object _gate;
///
/// Semaphore to count requests to re-evaluate the queue, from either Schedule requests or when a timer
/// expires and moves on to the next item in the queue.
///
#if !NO_CDS
private readonly SemaphoreSlim _evt;
#else
private readonly Semaphore _evt;
#endif
///
/// Queue holding work items. Protected by the gate.
///
private readonly SchedulerQueue _queue;
///
/// Queue holding items that are ready to be run as soon as possible. Protected by the gate.
///
private readonly Queue> _readyList;
///
/// Work item that will be scheduled next. Used upon reevaluation of the queue to check whether the next
/// item is still the same. If not, a new timer needs to be started (see below).
///
private ScheduledItem _nextItem;
///
/// Disposable that always holds the timer to dispatch the first element in the queue.
///
private readonly SerialDisposable _nextTimer;
///
/// Flag indicating whether the event loop should quit. When set, the event should be signaled as well to
/// wake up the event loop thread, which will subsequently abandon all work.
///
private bool _disposed;
#endregion
#region Constructors
///
/// Creates an object that schedules units of work on a designated thread.
///
public EventLoopScheduler()
: this(a => new Thread(a) { Name = "Event Loop " + Interlocked.Increment(ref s_counter), IsBackground = true })
{
}
#if !NO_THREAD
///
/// Creates an object that schedules units of work on a designated thread, using the specified factory to control thread creation options.
///
/// Factory function for thread creation.
/// is null.
public EventLoopScheduler(Func threadFactory)
{
if (threadFactory == null)
throw new ArgumentNullException(nameof(threadFactory));
#else
internal EventLoopScheduler(Func threadFactory)
{
#endif
_threadFactory = threadFactory;
_stopwatch = ConcurrencyAbstractionLayer.Current.StartStopwatch();
_gate = new object();
#if !NO_CDS
_evt = new SemaphoreSlim(0);
#else
_evt = new Semaphore(0, int.MaxValue);
#endif
_queue = new SchedulerQueue();
_readyList = new Queue>();
_nextTimer = new SerialDisposable();
ExitIfEmpty = false;
}
#endregion
#region Properties
///
/// Indicates whether the event loop thread is allowed to quit when no work is left. If new work
/// is scheduled afterwards, a new event loop thread is created. This property is used by the
/// NewThreadScheduler which uses an event loop for its recursive invocations.
///
internal bool ExitIfEmpty
{
get;
set;
}
#endregion
#region Public methods
///
/// Schedules an action to be executed after dueTime.
///
/// The type of the state passed to the scheduled action.
/// State passed to the action to be executed.
/// Action to be executed.
/// Relative time after which to execute the action.
/// The disposable object used to cancel the scheduled action (best effort).
/// is null.
/// The scheduler has been disposed and doesn't accept new work.
public override IDisposable Schedule(TState state, TimeSpan dueTime, Func action)
{
if (action == null)
throw new ArgumentNullException(nameof(action));
var due = _stopwatch.Elapsed + dueTime;
var si = new ScheduledItem(this, state, action, due);
lock (_gate)
{
if (_disposed)
throw new ObjectDisposedException("");
if (dueTime <= TimeSpan.Zero)
{
_readyList.Enqueue(si);
_evt.Release();
}
else
{
_queue.Enqueue(si);
_evt.Release();
}
EnsureThread();
}
return Disposable.Create(si.Cancel);
}
///
/// Schedules a periodic piece of work on the designated thread.
///
/// The type of the state passed to the scheduled action.
/// Initial state passed to the action upon the first iteration.
/// Period for running the work periodically.
/// Action to be executed, potentially updating the state.
/// The disposable object used to cancel the scheduled recurring action (best effort).
/// is null.
/// is less than TimeSpan.Zero.
/// The scheduler has been disposed and doesn't accept new work.
public IDisposable SchedulePeriodic(TState state, TimeSpan period, Func action)
{
if (period < TimeSpan.Zero)
throw new ArgumentOutOfRangeException(nameof(period));
if (action == null)
throw new ArgumentNullException(nameof(action));
var start = _stopwatch.Elapsed;
var next = start + period;
var state1 = state;
var d = new MultipleAssignmentDisposable();
var gate = new AsyncLock();
var tick = default(Func);
tick = (self_, _) =>
{
next += period;
d.Disposable = self_.Schedule(null, next - _stopwatch.Elapsed, tick);
gate.Wait(() =>
{
state1 = action(state1);
});
return Disposable.Empty;
};
d.Disposable = Schedule(null, next - _stopwatch.Elapsed, tick);
return StableCompositeDisposable.Create(d, gate);
}
///
/// Starts a new stopwatch object.
///
/// New stopwatch object; started at the time of the request.
public override IStopwatch StartStopwatch()
{
//
// Strictly speaking, this explicit override is not necessary because the base implementation calls into
// the enlightenment module to obtain the CAL, which would circle back to System.Reactive.PlatformServices
// where we're currently running. This is merely a short-circuit to avoid the additional roundtrip.
//
return new StopwatchImpl();
}
///
/// Ends the thread associated with this scheduler. All remaining work in the scheduler queue is abandoned.
///
public void Dispose()
{
lock (_gate)
{
if (!_disposed)
{
_disposed = true;
_nextTimer.Dispose();
_evt.Release();
}
}
}
#endregion
#region Private implementation
///
/// Ensures there is an event loop thread running. Should be called under the gate.
///
private void EnsureThread()
{
if (_thread == null)
{
_thread = _threadFactory(Run);
_thread.Start();
}
}
///
/// Event loop scheduled on the designated event loop thread. The loop is suspended/resumed using the event
/// which gets set by calls to Schedule, the next item timer, or calls to Dispose.
///
private void Run()
{
while (true)
{
#if !NO_CDS
_evt.Wait();
#else
_evt.WaitOne();
#endif
var ready = default(ScheduledItem[]);
lock (_gate)
{
//
// Bug fix that ensures the number of calls to Release never greatly exceeds the number of calls to Wait.
// See work item #37: https://rx.codeplex.com/workitem/37
//
#if !NO_CDS
while (_evt.CurrentCount > 0) _evt.Wait();
#else
while (_evt.WaitOne(TimeSpan.Zero)) { }
#endif
//
// The event could have been set by a call to Dispose. This takes priority over anything else. We quit the
// loop immediately. Subsequent calls to Schedule won't ever create a new thread.
//
if (_disposed)
{
((IDisposable)_evt).Dispose();
return;
}
while (_queue.Count > 0 && _queue.Peek().DueTime <= _stopwatch.Elapsed)
{
var item = _queue.Dequeue();
_readyList.Enqueue(item);
}
if (_queue.Count > 0)
{
var next = _queue.Peek();
if (next != _nextItem)
{
_nextItem = next;
var due = next.DueTime - _stopwatch.Elapsed;
_nextTimer.Disposable = ConcurrencyAbstractionLayer.Current.StartTimer(Tick, next, due);
}
}
if (_readyList.Count > 0)
{
ready = _readyList.ToArray();
_readyList.Clear();
}
}
if (ready != null)
{
foreach (var item in ready)
{
if (!item.IsCanceled)
item.Invoke();
}
}
if (ExitIfEmpty)
{
lock (_gate)
{
if (_readyList.Count == 0 && _queue.Count == 0)
{
_thread = null;
return;
}
}
}
}
}
private void Tick(object state)
{
lock (_gate)
{
if (!_disposed)
{
var item = (ScheduledItem)state;
if (_queue.Remove(item))
{
_readyList.Enqueue(item);
}
_evt.Release();
}
}
}
#endregion
}
}