EventLoopScheduler.cs 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Reactive.Disposables;
  6. using System.Threading;
  7. namespace System.Reactive.Concurrency
  8. {
  9. /// <summary>
  10. /// Represents an object that schedules units of work on a designated thread.
  11. /// </summary>
  12. public sealed class EventLoopScheduler : LocalScheduler, ISchedulerPeriodic, IDisposable
  13. {
  14. #region Fields
  15. /// <summary>
  16. /// Counter for diagnostic purposes, to name the threads.
  17. /// </summary>
  18. private static int _counter;
  19. /// <summary>
  20. /// Thread factory function.
  21. /// </summary>
  22. private readonly Func<ThreadStart, Thread> _threadFactory;
  23. /// <summary>
  24. /// Stopwatch for timing free of absolute time dependencies.
  25. /// </summary>
  26. private readonly IStopwatch _stopwatch;
  27. /// <summary>
  28. /// Thread used by the event loop to run work items on. No work should be run on any other thread.
  29. /// If ExitIfEmpty is set, the thread can quit and a new thread will be created when new work is scheduled.
  30. /// </summary>
  31. private Thread _thread;
  32. /// <summary>
  33. /// Gate to protect data structures, including the work queue and the ready list.
  34. /// </summary>
  35. private readonly object _gate;
  36. /// <summary>
  37. /// Semaphore to count requests to re-evaluate the queue, from either Schedule requests or when a timer
  38. /// expires and moves on to the next item in the queue.
  39. /// </summary>
  40. private readonly SemaphoreSlim _evt;
  41. /// <summary>
  42. /// Queue holding work items. Protected by the gate.
  43. /// </summary>
  44. private readonly SchedulerQueue<TimeSpan> _queue;
  45. /// <summary>
  46. /// Queue holding items that are ready to be run as soon as possible. Protected by the gate.
  47. /// </summary>
  48. private readonly Queue<ScheduledItem<TimeSpan>> _readyList;
  49. /// <summary>
  50. /// Work item that will be scheduled next. Used upon reevaluation of the queue to check whether the next
  51. /// item is still the same. If not, a new timer needs to be started (see below).
  52. /// </summary>
  53. private ScheduledItem<TimeSpan> _nextItem;
  54. /// <summary>
  55. /// Disposable that always holds the timer to dispatch the first element in the queue.
  56. /// </summary>
  57. private IDisposable _nextTimer;
  58. /// <summary>
  59. /// Flag indicating whether the event loop should quit. When set, the event should be signaled as well to
  60. /// wake up the event loop thread, which will subsequently abandon all work.
  61. /// </summary>
  62. private bool _disposed;
  63. #endregion
  64. #region Constructors
  65. /// <summary>
  66. /// Creates an object that schedules units of work on a designated thread.
  67. /// </summary>
  68. public EventLoopScheduler()
  69. : this(a => new Thread(a) { Name = "Event Loop " + Interlocked.Increment(ref _counter), IsBackground = true })
  70. {
  71. }
  72. #if !NO_THREAD
  73. /// <summary>
  74. /// Creates an object that schedules units of work on a designated thread, using the specified factory to control thread creation options.
  75. /// </summary>
  76. /// <param name="threadFactory">Factory function for thread creation.</param>
  77. /// <exception cref="ArgumentNullException"><paramref name="threadFactory"/> is <c>null</c>.</exception>
  78. public EventLoopScheduler(Func<ThreadStart, Thread> threadFactory)
  79. {
  80. #else
  81. internal EventLoopScheduler(Func<ThreadStart, Thread> threadFactory)
  82. {
  83. #endif
  84. _threadFactory = threadFactory ?? throw new ArgumentNullException(nameof(threadFactory));
  85. _stopwatch = ConcurrencyAbstractionLayer.Current.StartStopwatch();
  86. _gate = new object();
  87. _evt = new SemaphoreSlim(0);
  88. _queue = new SchedulerQueue<TimeSpan>();
  89. _readyList = new Queue<ScheduledItem<TimeSpan>>();
  90. ExitIfEmpty = false;
  91. }
  92. #endregion
  93. #region Properties
  94. /// <summary>
  95. /// Indicates whether the event loop thread is allowed to quit when no work is left. If new work
  96. /// is scheduled afterwards, a new event loop thread is created. This property is used by the
  97. /// NewThreadScheduler which uses an event loop for its recursive invocations.
  98. /// </summary>
  99. internal bool ExitIfEmpty
  100. {
  101. get;
  102. set;
  103. }
  104. #endregion
  105. #region Public methods
  106. /// <summary>
  107. /// Schedules an action to be executed after dueTime.
  108. /// </summary>
  109. /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam>
  110. /// <param name="state">State passed to the action to be executed.</param>
  111. /// <param name="action">Action to be executed.</param>
  112. /// <param name="dueTime">Relative time after which to execute the action.</param>
  113. /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns>
  114. /// <exception cref="ArgumentNullException"><paramref name="action"/> is <c>null</c>.</exception>
  115. /// <exception cref="ObjectDisposedException">The scheduler has been disposed and doesn't accept new work.</exception>
  116. public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)
  117. {
  118. if (action == null)
  119. {
  120. throw new ArgumentNullException(nameof(action));
  121. }
  122. var due = _stopwatch.Elapsed + dueTime;
  123. var si = new ScheduledItem<TimeSpan, TState>(this, state, action, due);
  124. lock (_gate)
  125. {
  126. if (_disposed)
  127. {
  128. throw new ObjectDisposedException("");
  129. }
  130. if (dueTime <= TimeSpan.Zero)
  131. {
  132. _readyList.Enqueue(si);
  133. _evt.Release();
  134. }
  135. else
  136. {
  137. _queue.Enqueue(si);
  138. _evt.Release();
  139. }
  140. EnsureThread();
  141. }
  142. return si;
  143. }
  144. /// <summary>
  145. /// Schedules a periodic piece of work on the designated thread.
  146. /// </summary>
  147. /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam>
  148. /// <param name="state">Initial state passed to the action upon the first iteration.</param>
  149. /// <param name="period">Period for running the work periodically.</param>
  150. /// <param name="action">Action to be executed, potentially updating the state.</param>
  151. /// <returns>The disposable object used to cancel the scheduled recurring action (best effort).</returns>
  152. /// <exception cref="ArgumentNullException"><paramref name="action"/> is <c>null</c>.</exception>
  153. /// <exception cref="ArgumentOutOfRangeException"><paramref name="period"/> is less than <see cref="TimeSpan.Zero"/>.</exception>
  154. /// <exception cref="ObjectDisposedException">The scheduler has been disposed and doesn't accept new work.</exception>
  155. public IDisposable SchedulePeriodic<TState>(TState state, TimeSpan period, Func<TState, TState> action)
  156. {
  157. if (period < TimeSpan.Zero)
  158. {
  159. throw new ArgumentOutOfRangeException(nameof(period));
  160. }
  161. if (action == null)
  162. {
  163. throw new ArgumentNullException(nameof(action));
  164. }
  165. return new PeriodicallyScheduledWorkItem<TState>(this, state, period, action);
  166. }
  167. private sealed class PeriodicallyScheduledWorkItem<TState> : IDisposable
  168. {
  169. private readonly TimeSpan _period;
  170. private readonly Func<TState, TState> _action;
  171. private readonly EventLoopScheduler _scheduler;
  172. private readonly AsyncLock _gate = new AsyncLock();
  173. private TState _state;
  174. private TimeSpan _next;
  175. private IDisposable _task;
  176. public PeriodicallyScheduledWorkItem(EventLoopScheduler scheduler, TState state, TimeSpan period, Func<TState, TState> action)
  177. {
  178. _state = state;
  179. _period = period;
  180. _action = action;
  181. _scheduler = scheduler;
  182. _next = scheduler._stopwatch.Elapsed + period;
  183. Disposable.TrySetSingle(ref _task, scheduler.Schedule(this, _next - scheduler._stopwatch.Elapsed, (_, s) => s.Tick(_)));
  184. }
  185. private IDisposable Tick(IScheduler self)
  186. {
  187. _next += _period;
  188. Disposable.TrySetMultiple(ref _task, self.Schedule(this, _next - _scheduler._stopwatch.Elapsed, (_, s) => s.Tick(_)));
  189. _gate.Wait(
  190. this,
  191. closureWorkItem => closureWorkItem._state = closureWorkItem._action(closureWorkItem._state));
  192. return Disposable.Empty;
  193. }
  194. public void Dispose()
  195. {
  196. Disposable.TryDispose(ref _task);
  197. _gate.Dispose();
  198. }
  199. }
  200. /// <summary>
  201. /// Starts a new stopwatch object.
  202. /// </summary>
  203. /// <returns>New stopwatch object; started at the time of the request.</returns>
  204. public override IStopwatch StartStopwatch()
  205. {
  206. //
  207. // Strictly speaking, this explicit override is not necessary because the base implementation calls into
  208. // the enlightenment module to obtain the CAL, which would circle back to System.Reactive.PlatformServices
  209. // where we're currently running. This is merely a short-circuit to avoid the additional roundtrip.
  210. //
  211. return new StopwatchImpl();
  212. }
  213. /// <summary>
  214. /// Ends the thread associated with this scheduler. All remaining work in the scheduler queue is abandoned.
  215. /// </summary>
  216. public void Dispose()
  217. {
  218. lock (_gate)
  219. {
  220. if (!_disposed)
  221. {
  222. _disposed = true;
  223. Disposable.TryDispose(ref _nextTimer);
  224. _evt.Release();
  225. }
  226. }
  227. }
  228. #endregion
  229. #region Private implementation
  230. /// <summary>
  231. /// Ensures there is an event loop thread running. Should be called under the gate.
  232. /// </summary>
  233. private void EnsureThread()
  234. {
  235. if (_thread == null)
  236. {
  237. _thread = _threadFactory(Run);
  238. _thread.Start();
  239. }
  240. }
  241. /// <summary>
  242. /// Event loop scheduled on the designated event loop thread. The loop is suspended/resumed using the event
  243. /// which gets set by calls to Schedule, the next item timer, or calls to Dispose.
  244. /// </summary>
  245. private void Run()
  246. {
  247. while (true)
  248. {
  249. _evt.Wait();
  250. var ready = default(ScheduledItem<TimeSpan>[]);
  251. lock (_gate)
  252. {
  253. //
  254. // Bug fix that ensures the number of calls to Release never greatly exceeds the number of calls to Wait.
  255. // See work item #37: https://rx.codeplex.com/workitem/37
  256. //
  257. while (_evt.CurrentCount > 0)
  258. {
  259. _evt.Wait();
  260. }
  261. //
  262. // The event could have been set by a call to Dispose. This takes priority over anything else. We quit the
  263. // loop immediately. Subsequent calls to Schedule won't ever create a new thread.
  264. //
  265. if (_disposed)
  266. {
  267. _evt.Dispose();
  268. return;
  269. }
  270. while (_queue.Count > 0 && _queue.Peek().DueTime <= _stopwatch.Elapsed)
  271. {
  272. var item = _queue.Dequeue();
  273. _readyList.Enqueue(item);
  274. }
  275. if (_queue.Count > 0)
  276. {
  277. var next = _queue.Peek();
  278. if (next != _nextItem)
  279. {
  280. _nextItem = next;
  281. var due = next.DueTime - _stopwatch.Elapsed;
  282. Disposable.TrySetSerial(ref _nextTimer, ConcurrencyAbstractionLayer.Current.StartTimer(Tick, next, due));
  283. }
  284. }
  285. if (_readyList.Count > 0)
  286. {
  287. ready = _readyList.ToArray();
  288. _readyList.Clear();
  289. }
  290. }
  291. if (ready != null)
  292. {
  293. foreach (var item in ready)
  294. {
  295. if (!item.IsCanceled)
  296. {
  297. item.Invoke();
  298. }
  299. }
  300. }
  301. if (ExitIfEmpty)
  302. {
  303. lock (_gate)
  304. {
  305. if (_readyList.Count == 0 && _queue.Count == 0)
  306. {
  307. _thread = null;
  308. return;
  309. }
  310. }
  311. }
  312. }
  313. }
  314. private void Tick(object state)
  315. {
  316. lock (_gate)
  317. {
  318. if (!_disposed)
  319. {
  320. var item = (ScheduledItem<TimeSpan>)state;
  321. if (item == _nextItem)
  322. {
  323. _nextItem = null;
  324. }
  325. if (_queue.Remove(item))
  326. {
  327. _readyList.Enqueue(item);
  328. }
  329. _evt.Release();
  330. }
  331. }
  332. }
  333. #endregion
  334. }
  335. }