// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.
using System.Collections.Generic;
using System.Reactive.Disposables;
using System.Threading;
namespace System.Reactive.Concurrency
{
///
/// Represents an object that schedules units of work on a designated thread.
///
public sealed class EventLoopScheduler : LocalScheduler, ISchedulerPeriodic, IDisposable
{
#region Fields
///
/// Counter for diagnostic purposes, to name the threads.
///
private static int _counter;
///
/// Thread factory function.
///
private readonly Func _threadFactory;
///
/// Stopwatch for timing free of absolute time dependencies.
///
private readonly IStopwatch _stopwatch;
///
/// Thread used by the event loop to run work items on. No work should be run on any other thread.
/// If ExitIfEmpty is set, the thread can quit and a new thread will be created when new work is scheduled.
///
private Thread _thread;
///
/// Gate to protect data structures, including the work queue and the ready list.
///
private readonly object _gate;
///
/// Semaphore to count requests to re-evaluate the queue, from either Schedule requests or when a timer
/// expires and moves on to the next item in the queue.
///
private readonly SemaphoreSlim _evt;
///
/// Queue holding work items. Protected by the gate.
///
private readonly SchedulerQueue _queue;
///
/// Queue holding items that are ready to be run as soon as possible. Protected by the gate.
///
private readonly Queue> _readyList;
///
/// Work item that will be scheduled next. Used upon reevaluation of the queue to check whether the next
/// item is still the same. If not, a new timer needs to be started (see below).
///
private ScheduledItem _nextItem;
///
/// Disposable that always holds the timer to dispatch the first element in the queue.
///
private IDisposable _nextTimer;
///
/// Flag indicating whether the event loop should quit. When set, the event should be signaled as well to
/// wake up the event loop thread, which will subsequently abandon all work.
///
private bool _disposed;
#endregion
#region Constructors
///
/// Creates an object that schedules units of work on a designated thread.
///
public EventLoopScheduler()
: this(a => new Thread(a) { Name = "Event Loop " + Interlocked.Increment(ref _counter), IsBackground = true })
{
}
#if !NO_THREAD
///
/// Creates an object that schedules units of work on a designated thread, using the specified factory to control thread creation options.
///
/// Factory function for thread creation.
/// is null.
public EventLoopScheduler(Func threadFactory)
{
#else
internal EventLoopScheduler(Func threadFactory)
{
#endif
_threadFactory = threadFactory ?? throw new ArgumentNullException(nameof(threadFactory));
_stopwatch = ConcurrencyAbstractionLayer.Current.StartStopwatch();
_gate = new object();
_evt = new SemaphoreSlim(0);
_queue = new SchedulerQueue();
_readyList = new Queue>();
ExitIfEmpty = false;
}
#endregion
#region Properties
///
/// Indicates whether the event loop thread is allowed to quit when no work is left. If new work
/// is scheduled afterwards, a new event loop thread is created. This property is used by the
/// NewThreadScheduler which uses an event loop for its recursive invocations.
///
internal bool ExitIfEmpty
{
get;
set;
}
#endregion
#region Public methods
///
/// Schedules an action to be executed after dueTime.
///
/// The type of the state passed to the scheduled action.
/// State passed to the action to be executed.
/// Action to be executed.
/// Relative time after which to execute the action.
/// The disposable object used to cancel the scheduled action (best effort).
/// is null.
/// The scheduler has been disposed and doesn't accept new work.
public override IDisposable Schedule(TState state, TimeSpan dueTime, Func action)
{
if (action == null)
{
throw new ArgumentNullException(nameof(action));
}
var due = _stopwatch.Elapsed + dueTime;
var si = new ScheduledItem(this, state, action, due);
lock (_gate)
{
if (_disposed)
{
throw new ObjectDisposedException("");
}
if (dueTime <= TimeSpan.Zero)
{
_readyList.Enqueue(si);
_evt.Release();
}
else
{
_queue.Enqueue(si);
_evt.Release();
}
EnsureThread();
}
return si;
}
///
/// Schedules a periodic piece of work on the designated thread.
///
/// The type of the state passed to the scheduled action.
/// Initial state passed to the action upon the first iteration.
/// Period for running the work periodically.
/// Action to be executed, potentially updating the state.
/// The disposable object used to cancel the scheduled recurring action (best effort).
/// is null.
/// is less than .
/// The scheduler has been disposed and doesn't accept new work.
public IDisposable SchedulePeriodic(TState state, TimeSpan period, Func action)
{
if (period < TimeSpan.Zero)
{
throw new ArgumentOutOfRangeException(nameof(period));
}
if (action == null)
{
throw new ArgumentNullException(nameof(action));
}
return new PeriodicallyScheduledWorkItem(this, state, period, action);
}
private sealed class PeriodicallyScheduledWorkItem : IDisposable
{
private readonly TimeSpan _period;
private readonly Func _action;
private readonly EventLoopScheduler _scheduler;
private readonly AsyncLock _gate = new AsyncLock();
private TState _state;
private TimeSpan _next;
private IDisposable _task;
public PeriodicallyScheduledWorkItem(EventLoopScheduler scheduler, TState state, TimeSpan period, Func action)
{
_state = state;
_period = period;
_action = action;
_scheduler = scheduler;
_next = scheduler._stopwatch.Elapsed + period;
Disposable.TrySetSingle(ref _task, scheduler.Schedule(this, _next - scheduler._stopwatch.Elapsed, (_, s) => s.Tick(_)));
}
private IDisposable Tick(IScheduler self)
{
_next += _period;
Disposable.TrySetMultiple(ref _task, self.Schedule(this, _next - _scheduler._stopwatch.Elapsed, (_, s) => s.Tick(_)));
_gate.Wait(
this,
closureWorkItem => closureWorkItem._state = closureWorkItem._action(closureWorkItem._state));
return Disposable.Empty;
}
public void Dispose()
{
Disposable.TryDispose(ref _task);
_gate.Dispose();
}
}
///
/// Starts a new stopwatch object.
///
/// New stopwatch object; started at the time of the request.
public override IStopwatch StartStopwatch()
{
//
// Strictly speaking, this explicit override is not necessary because the base implementation calls into
// the enlightenment module to obtain the CAL, which would circle back to System.Reactive.PlatformServices
// where we're currently running. This is merely a short-circuit to avoid the additional roundtrip.
//
return new StopwatchImpl();
}
///
/// Ends the thread associated with this scheduler. All remaining work in the scheduler queue is abandoned.
///
public void Dispose()
{
lock (_gate)
{
if (!_disposed)
{
_disposed = true;
Disposable.TryDispose(ref _nextTimer);
_evt.Release();
}
}
}
#endregion
#region Private implementation
///
/// Ensures there is an event loop thread running. Should be called under the gate.
///
private void EnsureThread()
{
if (_thread == null)
{
_thread = _threadFactory(Run);
_thread.Start();
}
}
///
/// Event loop scheduled on the designated event loop thread. The loop is suspended/resumed using the event
/// which gets set by calls to Schedule, the next item timer, or calls to Dispose.
///
private void Run()
{
while (true)
{
_evt.Wait();
var ready = default(ScheduledItem[]);
lock (_gate)
{
//
// Bug fix that ensures the number of calls to Release never greatly exceeds the number of calls to Wait.
// See work item #37: https://rx.codeplex.com/workitem/37
//
while (_evt.CurrentCount > 0)
{
_evt.Wait();
}
//
// The event could have been set by a call to Dispose. This takes priority over anything else. We quit the
// loop immediately. Subsequent calls to Schedule won't ever create a new thread.
//
if (_disposed)
{
_evt.Dispose();
return;
}
while (_queue.Count > 0 && _queue.Peek().DueTime <= _stopwatch.Elapsed)
{
var item = _queue.Dequeue();
_readyList.Enqueue(item);
}
if (_queue.Count > 0)
{
var next = _queue.Peek();
if (next != _nextItem)
{
_nextItem = next;
var due = next.DueTime - _stopwatch.Elapsed;
Disposable.TrySetSerial(ref _nextTimer, ConcurrencyAbstractionLayer.Current.StartTimer(Tick, next, due));
}
}
if (_readyList.Count > 0)
{
ready = _readyList.ToArray();
_readyList.Clear();
}
}
if (ready != null)
{
foreach (var item in ready)
{
if (!item.IsCanceled)
{
item.Invoke();
}
}
}
if (ExitIfEmpty)
{
lock (_gate)
{
if (_readyList.Count == 0 && _queue.Count == 0)
{
_thread = null;
return;
}
}
}
}
}
private void Tick(object state)
{
lock (_gate)
{
if (!_disposed)
{
var item = (ScheduledItem)state;
if (item == _nextItem)
{
_nextItem = null;
}
if (_queue.Remove(item))
{
_readyList.Enqueue(item);
}
_evt.Release();
}
}
}
#endregion
}
}