EventLoopScheduler.cs 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Reactive.Disposables;
  6. using System.Threading;
  7. #if NO_SEMAPHORE
  8. using System.Reactive.Threading;
  9. #endif
  10. namespace System.Reactive.Concurrency
  11. {
  12. /// <summary>
  13. /// Represents an object that schedules units of work on a designated thread.
  14. /// </summary>
  15. public sealed class EventLoopScheduler : LocalScheduler, ISchedulerPeriodic, IDisposable
  16. {
  17. #region Fields
  18. /// <summary>
  19. /// Counter for diagnostic purposes, to name the threads.
  20. /// </summary>
  21. private static int s_counter;
  22. /// <summary>
  23. /// Thread factory function.
  24. /// </summary>
  25. private readonly Func<ThreadStart, Thread> _threadFactory;
  26. /// <summary>
  27. /// Stopwatch for timing free of absolute time dependencies.
  28. /// </summary>
  29. private IStopwatch _stopwatch;
  30. /// <summary>
  31. /// Thread used by the event loop to run work items on. No work should be run on any other thread.
  32. /// If ExitIfEmpty is set, the thread can quit and a new thread will be created when new work is scheduled.
  33. /// </summary>
  34. private Thread _thread;
  35. /// <summary>
  36. /// Gate to protect data structures, including the work queue and the ready list.
  37. /// </summary>
  38. private readonly object _gate;
  39. /// <summary>
  40. /// Semaphore to count requests to re-evaluate the queue, from either Schedule requests or when a timer
  41. /// expires and moves on to the next item in the queue.
  42. /// </summary>
  43. #if !NO_CDS
  44. private readonly SemaphoreSlim _evt;
  45. #else
  46. private readonly Semaphore _evt;
  47. #endif
  48. /// <summary>
  49. /// Queue holding work items. Protected by the gate.
  50. /// </summary>
  51. private readonly SchedulerQueue<TimeSpan> _queue;
  52. /// <summary>
  53. /// Queue holding items that are ready to be run as soon as possible. Protected by the gate.
  54. /// </summary>
  55. private readonly Queue<ScheduledItem<TimeSpan>> _readyList;
  56. /// <summary>
  57. /// Work item that will be scheduled next. Used upon reevaluation of the queue to check whether the next
  58. /// item is still the same. If not, a new timer needs to be started (see below).
  59. /// </summary>
  60. private ScheduledItem<TimeSpan> _nextItem;
  61. /// <summary>
  62. /// Disposable that always holds the timer to dispatch the first element in the queue.
  63. /// </summary>
  64. private readonly SerialDisposable _nextTimer;
  65. /// <summary>
  66. /// Flag indicating whether the event loop should quit. When set, the event should be signaled as well to
  67. /// wake up the event loop thread, which will subsequently abandon all work.
  68. /// </summary>
  69. private bool _disposed;
  70. #endregion
  71. #region Constructors
  72. /// <summary>
  73. /// Creates an object that schedules units of work on a designated thread.
  74. /// </summary>
  75. public EventLoopScheduler()
  76. : this(a => new Thread(a) { Name = "Event Loop " + Interlocked.Increment(ref s_counter), IsBackground = true })
  77. {
  78. }
  79. #if !NO_THREAD
  80. /// <summary>
  81. /// Creates an object that schedules units of work on a designated thread, using the specified factory to control thread creation options.
  82. /// </summary>
  83. /// <param name="threadFactory">Factory function for thread creation.</param>
  84. /// <exception cref="ArgumentNullException"><paramref name="threadFactory"/> is null.</exception>
  85. public EventLoopScheduler(Func<ThreadStart, Thread> threadFactory)
  86. {
  87. if (threadFactory == null)
  88. throw new ArgumentNullException(nameof(threadFactory));
  89. #else
  90. internal EventLoopScheduler(Func<ThreadStart, Thread> threadFactory)
  91. {
  92. #endif
  93. _threadFactory = threadFactory;
  94. _stopwatch = ConcurrencyAbstractionLayer.Current.StartStopwatch();
  95. _gate = new object();
  96. #if !NO_CDS
  97. _evt = new SemaphoreSlim(0);
  98. #else
  99. _evt = new Semaphore(0, int.MaxValue);
  100. #endif
  101. _queue = new SchedulerQueue<TimeSpan>();
  102. _readyList = new Queue<ScheduledItem<TimeSpan>>();
  103. _nextTimer = new SerialDisposable();
  104. ExitIfEmpty = false;
  105. }
  106. #endregion
  107. #region Properties
  108. /// <summary>
  109. /// Indicates whether the event loop thread is allowed to quit when no work is left. If new work
  110. /// is scheduled afterwards, a new event loop thread is created. This property is used by the
  111. /// NewThreadScheduler which uses an event loop for its recursive invocations.
  112. /// </summary>
  113. internal bool ExitIfEmpty
  114. {
  115. get;
  116. set;
  117. }
  118. #endregion
  119. #region Public methods
  120. /// <summary>
  121. /// Schedules an action to be executed after dueTime.
  122. /// </summary>
  123. /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam>
  124. /// <param name="state">State passed to the action to be executed.</param>
  125. /// <param name="action">Action to be executed.</param>
  126. /// <param name="dueTime">Relative time after which to execute the action.</param>
  127. /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns>
  128. /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception>
  129. /// <exception cref="ObjectDisposedException">The scheduler has been disposed and doesn't accept new work.</exception>
  130. public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)
  131. {
  132. if (action == null)
  133. throw new ArgumentNullException(nameof(action));
  134. var due = _stopwatch.Elapsed + dueTime;
  135. var si = new ScheduledItem<TimeSpan, TState>(this, state, action, due);
  136. lock (_gate)
  137. {
  138. if (_disposed)
  139. throw new ObjectDisposedException("");
  140. if (dueTime <= TimeSpan.Zero)
  141. {
  142. _readyList.Enqueue(si);
  143. _evt.Release();
  144. }
  145. else
  146. {
  147. _queue.Enqueue(si);
  148. _evt.Release();
  149. }
  150. EnsureThread();
  151. }
  152. return Disposable.Create(si.Cancel);
  153. }
  154. /// <summary>
  155. /// Schedules a periodic piece of work on the designated thread.
  156. /// </summary>
  157. /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam>
  158. /// <param name="state">Initial state passed to the action upon the first iteration.</param>
  159. /// <param name="period">Period for running the work periodically.</param>
  160. /// <param name="action">Action to be executed, potentially updating the state.</param>
  161. /// <returns>The disposable object used to cancel the scheduled recurring action (best effort).</returns>
  162. /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception>
  163. /// <exception cref="ArgumentOutOfRangeException"><paramref name="period"/> is less than TimeSpan.Zero.</exception>
  164. /// <exception cref="ObjectDisposedException">The scheduler has been disposed and doesn't accept new work.</exception>
  165. public IDisposable SchedulePeriodic<TState>(TState state, TimeSpan period, Func<TState, TState> action)
  166. {
  167. if (period < TimeSpan.Zero)
  168. throw new ArgumentOutOfRangeException(nameof(period));
  169. if (action == null)
  170. throw new ArgumentNullException(nameof(action));
  171. var start = _stopwatch.Elapsed;
  172. var next = start + period;
  173. var state1 = state;
  174. var d = new MultipleAssignmentDisposable();
  175. var gate = new AsyncLock();
  176. var tick = default(Func<IScheduler, object, IDisposable>);
  177. tick = (self_, _) =>
  178. {
  179. next += period;
  180. d.Disposable = self_.Schedule(null, next - _stopwatch.Elapsed, tick);
  181. gate.Wait(() =>
  182. {
  183. state1 = action(state1);
  184. });
  185. return Disposable.Empty;
  186. };
  187. d.Disposable = Schedule(null, next - _stopwatch.Elapsed, tick);
  188. return StableCompositeDisposable.Create(d, gate);
  189. }
  190. #if !NO_STOPWATCH
  191. /// <summary>
  192. /// Starts a new stopwatch object.
  193. /// </summary>
  194. /// <returns>New stopwatch object; started at the time of the request.</returns>
  195. public override IStopwatch StartStopwatch()
  196. {
  197. //
  198. // Strictly speaking, this explicit override is not necessary because the base implementation calls into
  199. // the enlightenment module to obtain the CAL, which would circle back to System.Reactive.PlatformServices
  200. // where we're currently running. This is merely a short-circuit to avoid the additional roundtrip.
  201. //
  202. return new StopwatchImpl();
  203. }
  204. #endif
  205. /// <summary>
  206. /// Ends the thread associated with this scheduler. All remaining work in the scheduler queue is abandoned.
  207. /// </summary>
  208. public void Dispose()
  209. {
  210. lock (_gate)
  211. {
  212. if (!_disposed)
  213. {
  214. _disposed = true;
  215. _nextTimer.Dispose();
  216. _evt.Release();
  217. }
  218. }
  219. }
  220. #endregion
  221. #region Private implementation
  222. /// <summary>
  223. /// Ensures there is an event loop thread running. Should be called under the gate.
  224. /// </summary>
  225. private void EnsureThread()
  226. {
  227. if (_thread == null)
  228. {
  229. _thread = _threadFactory(Run);
  230. _thread.Start();
  231. }
  232. }
  233. /// <summary>
  234. /// Event loop scheduled on the designated event loop thread. The loop is suspended/resumed using the event
  235. /// which gets set by calls to Schedule, the next item timer, or calls to Dispose.
  236. /// </summary>
  237. private void Run()
  238. {
  239. while (true)
  240. {
  241. #if !NO_CDS
  242. _evt.Wait();
  243. #else
  244. _evt.WaitOne();
  245. #endif
  246. var ready = default(ScheduledItem<TimeSpan>[]);
  247. lock (_gate)
  248. {
  249. //
  250. // Bug fix that ensures the number of calls to Release never greatly exceeds the number of calls to Wait.
  251. // See work item #37: https://rx.codeplex.com/workitem/37
  252. //
  253. #if !NO_CDS
  254. while (_evt.CurrentCount > 0) _evt.Wait();
  255. #else
  256. while (_evt.WaitOne(TimeSpan.Zero)) { }
  257. #endif
  258. //
  259. // The event could have been set by a call to Dispose. This takes priority over anything else. We quit the
  260. // loop immediately. Subsequent calls to Schedule won't ever create a new thread.
  261. //
  262. if (_disposed)
  263. {
  264. ((IDisposable)_evt).Dispose();
  265. return;
  266. }
  267. while (_queue.Count > 0 && _queue.Peek().DueTime <= _stopwatch.Elapsed)
  268. {
  269. var item = _queue.Dequeue();
  270. _readyList.Enqueue(item);
  271. }
  272. if (_queue.Count > 0)
  273. {
  274. var next = _queue.Peek();
  275. if (next != _nextItem)
  276. {
  277. _nextItem = next;
  278. var due = next.DueTime - _stopwatch.Elapsed;
  279. _nextTimer.Disposable = ConcurrencyAbstractionLayer.Current.StartTimer(Tick, next, due);
  280. }
  281. }
  282. if (_readyList.Count > 0)
  283. {
  284. ready = _readyList.ToArray();
  285. _readyList.Clear();
  286. }
  287. }
  288. if (ready != null)
  289. {
  290. foreach (var item in ready)
  291. {
  292. if (!item.IsCanceled)
  293. item.Invoke();
  294. }
  295. }
  296. if (ExitIfEmpty)
  297. {
  298. lock (_gate)
  299. {
  300. if (_readyList.Count == 0 && _queue.Count == 0)
  301. {
  302. _thread = null;
  303. return;
  304. }
  305. }
  306. }
  307. }
  308. }
  309. private void Tick(object state)
  310. {
  311. lock (_gate)
  312. {
  313. if (!_disposed)
  314. {
  315. var item = (ScheduledItem<TimeSpan>)state;
  316. if (_queue.Remove(item))
  317. {
  318. _readyList.Enqueue(item);
  319. }
  320. _evt.Release();
  321. }
  322. }
  323. }
  324. #endregion
  325. }
  326. }