// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information. using System.Collections.Generic; using System.Reactive.Disposables; using System.Threading; namespace System.Reactive.Concurrency { /// /// Represents an object that schedules units of work on a designated thread. /// public sealed class EventLoopScheduler : LocalScheduler, ISchedulerPeriodic, IDisposable { #region Fields /// /// Counter for diagnostic purposes, to name the threads. /// private static int _counter; /// /// Thread factory function. /// private readonly Func _threadFactory; /// /// Stopwatch for timing free of absolute time dependencies. /// private readonly IStopwatch _stopwatch; /// /// Thread used by the event loop to run work items on. No work should be run on any other thread. /// If ExitIfEmpty is set, the thread can quit and a new thread will be created when new work is scheduled. /// private Thread _thread; /// /// Gate to protect data structures, including the work queue and the ready list. /// private readonly object _gate; /// /// Semaphore to count requests to re-evaluate the queue, from either Schedule requests or when a timer /// expires and moves on to the next item in the queue. /// private readonly SemaphoreSlim _evt; /// /// Queue holding work items. Protected by the gate. /// private readonly SchedulerQueue _queue; /// /// Queue holding items that are ready to be run as soon as possible. Protected by the gate. /// private readonly Queue> _readyList; /// /// Work item that will be scheduled next. Used upon reevaluation of the queue to check whether the next /// item is still the same. If not, a new timer needs to be started (see below). /// private ScheduledItem _nextItem; /// /// Disposable that always holds the timer to dispatch the first element in the queue. /// private IDisposable _nextTimer; /// /// Flag indicating whether the event loop should quit. When set, the event should be signaled as well to /// wake up the event loop thread, which will subsequently abandon all work. /// private bool _disposed; #endregion #region Constructors /// /// Creates an object that schedules units of work on a designated thread. /// public EventLoopScheduler() : this(a => new Thread(a) { Name = "Event Loop " + Interlocked.Increment(ref _counter), IsBackground = true }) { } #if !NO_THREAD /// /// Creates an object that schedules units of work on a designated thread, using the specified factory to control thread creation options. /// /// Factory function for thread creation. /// is null. public EventLoopScheduler(Func threadFactory) { #else internal EventLoopScheduler(Func threadFactory) { #endif _threadFactory = threadFactory ?? throw new ArgumentNullException(nameof(threadFactory)); _stopwatch = ConcurrencyAbstractionLayer.Current.StartStopwatch(); _gate = new object(); _evt = new SemaphoreSlim(0); _queue = new SchedulerQueue(); _readyList = new Queue>(); ExitIfEmpty = false; } #endregion #region Properties /// /// Indicates whether the event loop thread is allowed to quit when no work is left. If new work /// is scheduled afterwards, a new event loop thread is created. This property is used by the /// NewThreadScheduler which uses an event loop for its recursive invocations. /// internal bool ExitIfEmpty { get; set; } #endregion #region Public methods /// /// Schedules an action to be executed after dueTime. /// /// The type of the state passed to the scheduled action. /// State passed to the action to be executed. /// Action to be executed. /// Relative time after which to execute the action. /// The disposable object used to cancel the scheduled action (best effort). /// is null. /// The scheduler has been disposed and doesn't accept new work. public override IDisposable Schedule(TState state, TimeSpan dueTime, Func action) { if (action == null) { throw new ArgumentNullException(nameof(action)); } var due = _stopwatch.Elapsed + dueTime; var si = new ScheduledItem(this, state, action, due); lock (_gate) { if (_disposed) { throw new ObjectDisposedException(""); } if (dueTime <= TimeSpan.Zero) { _readyList.Enqueue(si); _evt.Release(); } else { _queue.Enqueue(si); _evt.Release(); } EnsureThread(); } return si; } /// /// Schedules a periodic piece of work on the designated thread. /// /// The type of the state passed to the scheduled action. /// Initial state passed to the action upon the first iteration. /// Period for running the work periodically. /// Action to be executed, potentially updating the state. /// The disposable object used to cancel the scheduled recurring action (best effort). /// is null. /// is less than . /// The scheduler has been disposed and doesn't accept new work. public IDisposable SchedulePeriodic(TState state, TimeSpan period, Func action) { if (period < TimeSpan.Zero) { throw new ArgumentOutOfRangeException(nameof(period)); } if (action == null) { throw new ArgumentNullException(nameof(action)); } return new PeriodicallyScheduledWorkItem(this, state, period, action); } private sealed class PeriodicallyScheduledWorkItem : IDisposable { private readonly TimeSpan _period; private readonly Func _action; private readonly EventLoopScheduler _scheduler; private readonly AsyncLock _gate = new AsyncLock(); private TState _state; private TimeSpan _next; private IDisposable _task; public PeriodicallyScheduledWorkItem(EventLoopScheduler scheduler, TState state, TimeSpan period, Func action) { _state = state; _period = period; _action = action; _scheduler = scheduler; _next = scheduler._stopwatch.Elapsed + period; Disposable.TrySetSingle(ref _task, scheduler.Schedule(this, _next - scheduler._stopwatch.Elapsed, (_, s) => s.Tick(_))); } private IDisposable Tick(IScheduler self) { _next += _period; Disposable.TrySetMultiple(ref _task, self.Schedule(this, _next - _scheduler._stopwatch.Elapsed, (_, s) => s.Tick(_))); _gate.Wait( this, closureWorkItem => closureWorkItem._state = closureWorkItem._action(closureWorkItem._state)); return Disposable.Empty; } public void Dispose() { Disposable.TryDispose(ref _task); _gate.Dispose(); } } /// /// Starts a new stopwatch object. /// /// New stopwatch object; started at the time of the request. public override IStopwatch StartStopwatch() { // // Strictly speaking, this explicit override is not necessary because the base implementation calls into // the enlightenment module to obtain the CAL, which would circle back to System.Reactive.PlatformServices // where we're currently running. This is merely a short-circuit to avoid the additional roundtrip. // return new StopwatchImpl(); } /// /// Ends the thread associated with this scheduler. All remaining work in the scheduler queue is abandoned. /// public void Dispose() { lock (_gate) { if (!_disposed) { _disposed = true; Disposable.TryDispose(ref _nextTimer); _evt.Release(); } } } #endregion #region Private implementation /// /// Ensures there is an event loop thread running. Should be called under the gate. /// private void EnsureThread() { if (_thread == null) { _thread = _threadFactory(Run); _thread.Start(); } } /// /// Event loop scheduled on the designated event loop thread. The loop is suspended/resumed using the event /// which gets set by calls to Schedule, the next item timer, or calls to Dispose. /// private void Run() { while (true) { _evt.Wait(); var ready = default(ScheduledItem[]); lock (_gate) { // // Bug fix that ensures the number of calls to Release never greatly exceeds the number of calls to Wait. // See work item #37: https://rx.codeplex.com/workitem/37 // while (_evt.CurrentCount > 0) { _evt.Wait(); } // // The event could have been set by a call to Dispose. This takes priority over anything else. We quit the // loop immediately. Subsequent calls to Schedule won't ever create a new thread. // if (_disposed) { _evt.Dispose(); return; } while (_queue.Count > 0 && _queue.Peek().DueTime <= _stopwatch.Elapsed) { var item = _queue.Dequeue(); _readyList.Enqueue(item); } if (_queue.Count > 0) { var next = _queue.Peek(); if (next != _nextItem) { _nextItem = next; var due = next.DueTime - _stopwatch.Elapsed; Disposable.TrySetSerial(ref _nextTimer, ConcurrencyAbstractionLayer.Current.StartTimer(Tick, next, due)); } } if (_readyList.Count > 0) { ready = _readyList.ToArray(); _readyList.Clear(); } } if (ready != null) { foreach (var item in ready) { if (!item.IsCanceled) { item.Invoke(); } } } if (ExitIfEmpty) { lock (_gate) { if (_readyList.Count == 0 && _queue.Count == 0) { _thread = null; return; } } } } } private void Tick(object state) { lock (_gate) { if (!_disposed) { var item = (ScheduledItem)state; if (item == _nextItem) { _nextItem = null; } if (_queue.Remove(item)) { _readyList.Enqueue(item); } _evt.Release(); } } } #endregion } }