// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information. using System.Collections.Generic; using System.Reactive.Disposables; using System.Threading; #if NO_SEMAPHORE using System.Reactive.Threading; #endif namespace System.Reactive.Concurrency { /// /// Represents an object that schedules units of work on a designated thread. /// public sealed class EventLoopScheduler : LocalScheduler, ISchedulerPeriodic, IDisposable { #region Fields /// /// Counter for diagnostic purposes, to name the threads. /// private static int s_counter; /// /// Thread factory function. /// private readonly Func _threadFactory; /// /// Stopwatch for timing free of absolute time dependencies. /// private IStopwatch _stopwatch; /// /// Thread used by the event loop to run work items on. No work should be run on any other thread. /// If ExitIfEmpty is set, the thread can quit and a new thread will be created when new work is scheduled. /// private Thread _thread; /// /// Gate to protect data structures, including the work queue and the ready list. /// private readonly object _gate; /// /// Semaphore to count requests to re-evaluate the queue, from either Schedule requests or when a timer /// expires and moves on to the next item in the queue. /// #if !NO_CDS private readonly SemaphoreSlim _evt; #else private readonly Semaphore _evt; #endif /// /// Queue holding work items. Protected by the gate. /// private readonly SchedulerQueue _queue; /// /// Queue holding items that are ready to be run as soon as possible. Protected by the gate. /// private readonly Queue> _readyList; /// /// Work item that will be scheduled next. Used upon reevaluation of the queue to check whether the next /// item is still the same. If not, a new timer needs to be started (see below). /// private ScheduledItem _nextItem; /// /// Disposable that always holds the timer to dispatch the first element in the queue. /// private readonly SerialDisposable _nextTimer; /// /// Flag indicating whether the event loop should quit. When set, the event should be signaled as well to /// wake up the event loop thread, which will subsequently abandon all work. /// private bool _disposed; #endregion #region Constructors /// /// Creates an object that schedules units of work on a designated thread. /// public EventLoopScheduler() : this(a => new Thread(a) { Name = "Event Loop " + Interlocked.Increment(ref s_counter), IsBackground = true }) { } #if !NO_THREAD /// /// Creates an object that schedules units of work on a designated thread, using the specified factory to control thread creation options. /// /// Factory function for thread creation. /// is null. public EventLoopScheduler(Func threadFactory) { if (threadFactory == null) throw new ArgumentNullException(nameof(threadFactory)); #else internal EventLoopScheduler(Func threadFactory) { #endif _threadFactory = threadFactory; _stopwatch = ConcurrencyAbstractionLayer.Current.StartStopwatch(); _gate = new object(); #if !NO_CDS _evt = new SemaphoreSlim(0); #else _evt = new Semaphore(0, int.MaxValue); #endif _queue = new SchedulerQueue(); _readyList = new Queue>(); _nextTimer = new SerialDisposable(); ExitIfEmpty = false; } #endregion #region Properties /// /// Indicates whether the event loop thread is allowed to quit when no work is left. If new work /// is scheduled afterwards, a new event loop thread is created. This property is used by the /// NewThreadScheduler which uses an event loop for its recursive invocations. /// internal bool ExitIfEmpty { get; set; } #endregion #region Public methods /// /// Schedules an action to be executed after dueTime. /// /// The type of the state passed to the scheduled action. /// State passed to the action to be executed. /// Action to be executed. /// Relative time after which to execute the action. /// The disposable object used to cancel the scheduled action (best effort). /// is null. /// The scheduler has been disposed and doesn't accept new work. public override IDisposable Schedule(TState state, TimeSpan dueTime, Func action) { if (action == null) throw new ArgumentNullException(nameof(action)); var due = _stopwatch.Elapsed + dueTime; var si = new ScheduledItem(this, state, action, due); lock (_gate) { if (_disposed) throw new ObjectDisposedException(""); if (dueTime <= TimeSpan.Zero) { _readyList.Enqueue(si); _evt.Release(); } else { _queue.Enqueue(si); _evt.Release(); } EnsureThread(); } return Disposable.Create(si.Cancel); } /// /// Schedules a periodic piece of work on the designated thread. /// /// The type of the state passed to the scheduled action. /// Initial state passed to the action upon the first iteration. /// Period for running the work periodically. /// Action to be executed, potentially updating the state. /// The disposable object used to cancel the scheduled recurring action (best effort). /// is null. /// is less than TimeSpan.Zero. /// The scheduler has been disposed and doesn't accept new work. public IDisposable SchedulePeriodic(TState state, TimeSpan period, Func action) { if (period < TimeSpan.Zero) throw new ArgumentOutOfRangeException(nameof(period)); if (action == null) throw new ArgumentNullException(nameof(action)); var start = _stopwatch.Elapsed; var next = start + period; var state1 = state; var d = new MultipleAssignmentDisposable(); var gate = new AsyncLock(); var tick = default(Func); tick = (self_, _) => { next += period; d.Disposable = self_.Schedule(null, next - _stopwatch.Elapsed, tick); gate.Wait(() => { state1 = action(state1); }); return Disposable.Empty; }; d.Disposable = Schedule(null, next - _stopwatch.Elapsed, tick); return StableCompositeDisposable.Create(d, gate); } /// /// Starts a new stopwatch object. /// /// New stopwatch object; started at the time of the request. public override IStopwatch StartStopwatch() { // // Strictly speaking, this explicit override is not necessary because the base implementation calls into // the enlightenment module to obtain the CAL, which would circle back to System.Reactive.PlatformServices // where we're currently running. This is merely a short-circuit to avoid the additional roundtrip. // return new StopwatchImpl(); } /// /// Ends the thread associated with this scheduler. All remaining work in the scheduler queue is abandoned. /// public void Dispose() { lock (_gate) { if (!_disposed) { _disposed = true; _nextTimer.Dispose(); _evt.Release(); } } } #endregion #region Private implementation /// /// Ensures there is an event loop thread running. Should be called under the gate. /// private void EnsureThread() { if (_thread == null) { _thread = _threadFactory(Run); _thread.Start(); } } /// /// Event loop scheduled on the designated event loop thread. The loop is suspended/resumed using the event /// which gets set by calls to Schedule, the next item timer, or calls to Dispose. /// private void Run() { while (true) { #if !NO_CDS _evt.Wait(); #else _evt.WaitOne(); #endif var ready = default(ScheduledItem[]); lock (_gate) { // // Bug fix that ensures the number of calls to Release never greatly exceeds the number of calls to Wait. // See work item #37: https://rx.codeplex.com/workitem/37 // #if !NO_CDS while (_evt.CurrentCount > 0) _evt.Wait(); #else while (_evt.WaitOne(TimeSpan.Zero)) { } #endif // // The event could have been set by a call to Dispose. This takes priority over anything else. We quit the // loop immediately. Subsequent calls to Schedule won't ever create a new thread. // if (_disposed) { ((IDisposable)_evt).Dispose(); return; } while (_queue.Count > 0 && _queue.Peek().DueTime <= _stopwatch.Elapsed) { var item = _queue.Dequeue(); _readyList.Enqueue(item); } if (_queue.Count > 0) { var next = _queue.Peek(); if (next != _nextItem) { _nextItem = next; var due = next.DueTime - _stopwatch.Elapsed; _nextTimer.Disposable = ConcurrencyAbstractionLayer.Current.StartTimer(Tick, next, due); } } if (_readyList.Count > 0) { ready = _readyList.ToArray(); _readyList.Clear(); } } if (ready != null) { foreach (var item in ready) { if (!item.IsCanceled) item.Invoke(); } } if (ExitIfEmpty) { lock (_gate) { if (_readyList.Count == 0 && _queue.Count == 0) { _thread = null; return; } } } } } private void Tick(object state) { lock (_gate) { if (!_disposed) { var item = (ScheduledItem)state; if (_queue.Remove(item)) { _readyList.Enqueue(item); } _evt.Release(); } } } #endregion } }