1
0

ThreadPoolScheduler.cs 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. #if !LEGACY_WINRT && !NO_THREAD
  5. using System.Reactive.Disposables;
  6. using System.Threading;
  7. namespace System.Reactive.Concurrency
  8. {
  9. /// <summary>
  10. /// Represents an object that schedules units of work on the CLR thread pool.
  11. /// </summary>
  12. /// <seealso cref="Instance">Singleton instance of this type exposed through this static property.</seealso>
  13. public sealed class ThreadPoolScheduler : LocalScheduler, ISchedulerLongRunning, ISchedulerPeriodic
  14. {
  15. private static readonly Lazy<ThreadPoolScheduler> LazyInstance = new Lazy<ThreadPoolScheduler>(static () => new ThreadPoolScheduler());
  16. private static readonly Lazy<NewThreadScheduler> LazyNewBackgroundThread = new Lazy<NewThreadScheduler>(static () => new NewThreadScheduler(action => new Thread(action) { IsBackground = true }));
  17. /// <summary>
  18. /// Gets the singleton instance of the CLR thread pool scheduler.
  19. /// </summary>
  20. public static ThreadPoolScheduler Instance => LazyInstance.Value;
  21. private ThreadPoolScheduler()
  22. {
  23. }
  24. /// <summary>
  25. /// Schedules an action to be executed.
  26. /// </summary>
  27. /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam>
  28. /// <param name="state">State passed to the action to be executed.</param>
  29. /// <param name="action">Action to be executed.</param>
  30. /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns>
  31. /// <exception cref="ArgumentNullException"><paramref name="action"/> is <c>null</c>.</exception>
  32. public override IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action)
  33. {
  34. if (action == null)
  35. {
  36. throw new ArgumentNullException(nameof(action));
  37. }
  38. var workItem = new UserWorkItem<TState>(this, state, action);
  39. ThreadPool.QueueUserWorkItem(
  40. static closureWorkItem => ((UserWorkItem<TState>)closureWorkItem).Run(),
  41. workItem);
  42. return workItem;
  43. }
  44. /// <summary>
  45. /// Schedules an action to be executed after dueTime, using a System.Threading.Timer object.
  46. /// </summary>
  47. /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam>
  48. /// <param name="state">State passed to the action to be executed.</param>
  49. /// <param name="action">Action to be executed.</param>
  50. /// <param name="dueTime">Relative time after which to execute the action.</param>
  51. /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns>
  52. /// <exception cref="ArgumentNullException"><paramref name="action"/> is <c>null</c>.</exception>
  53. public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)
  54. {
  55. if (action == null)
  56. {
  57. throw new ArgumentNullException(nameof(action));
  58. }
  59. var dt = Scheduler.Normalize(dueTime);
  60. if (dt.Ticks == 0)
  61. {
  62. return Schedule(state, action);
  63. }
  64. var workItem = new UserWorkItem<TState>(this, state, action);
  65. workItem.CancelQueueDisposable = new Timer(
  66. static closureWorkItem => ((UserWorkItem<TState>)closureWorkItem).Run(),
  67. workItem,
  68. dt,
  69. Timeout.InfiniteTimeSpan);
  70. return workItem;
  71. }
  72. /// <summary>
  73. /// Schedules a long-running task by creating a new thread. Cancellation happens through polling.
  74. /// </summary>
  75. /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam>
  76. /// <param name="state">State passed to the action to be executed.</param>
  77. /// <param name="action">Action to be executed.</param>
  78. /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns>
  79. /// <exception cref="ArgumentNullException"><paramref name="action"/> is <c>null</c>.</exception>
  80. public IDisposable ScheduleLongRunning<TState>(TState state, Action<TState, ICancelable> action)
  81. {
  82. if (action == null)
  83. {
  84. throw new ArgumentNullException(nameof(action));
  85. }
  86. return LazyNewBackgroundThread.Value.ScheduleLongRunning(state, action);
  87. }
  88. /// <summary>
  89. /// Starts a new stopwatch object.
  90. /// </summary>
  91. /// <returns>New stopwatch object; started at the time of the request.</returns>
  92. public override IStopwatch StartStopwatch()
  93. {
  94. //
  95. // Strictly speaking, this explicit override is not necessary because the base implementation calls into
  96. // the enlightenment module to obtain the CAL, which would circle back to System.Reactive.PlatformServices
  97. // where we're currently running. This is merely a short-circuit to avoid the additional roundtrip.
  98. //
  99. return new StopwatchImpl();
  100. }
  101. /// <summary>
  102. /// Schedules a periodic piece of work, using a System.Threading.Timer object.
  103. /// </summary>
  104. /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam>
  105. /// <param name="state">Initial state passed to the action upon the first iteration.</param>
  106. /// <param name="period">Period for running the work periodically.</param>
  107. /// <param name="action">Action to be executed, potentially updating the state.</param>
  108. /// <returns>The disposable object used to cancel the scheduled recurring action (best effort).</returns>
  109. /// <exception cref="ArgumentNullException"><paramref name="action"/> is <c>null</c>.</exception>
  110. /// <exception cref="ArgumentOutOfRangeException"><paramref name="period"/> is less than zero.</exception>
  111. public IDisposable SchedulePeriodic<TState>(TState state, TimeSpan period, Func<TState, TState> action)
  112. {
  113. if (period < TimeSpan.Zero)
  114. {
  115. throw new ArgumentOutOfRangeException(nameof(period));
  116. }
  117. if (action == null)
  118. {
  119. throw new ArgumentNullException(nameof(action));
  120. }
  121. if (period == TimeSpan.Zero)
  122. {
  123. return new FastPeriodicTimer<TState>(state, action);
  124. }
  125. return new PeriodicTimer<TState>(state, period, action);
  126. }
  127. private sealed class FastPeriodicTimer<TState> : IDisposable
  128. {
  129. private TState _state;
  130. private Func<TState, TState> _action;
  131. private volatile bool _disposed;
  132. public FastPeriodicTimer(TState state, Func<TState, TState> action)
  133. {
  134. _state = state;
  135. _action = action;
  136. ThreadPool.QueueUserWorkItem(static @this => Tick(@this), this); // Replace with method group as soon as Roslyn will cache the delegate then.
  137. }
  138. private static void Tick(object state)
  139. {
  140. var timer = (FastPeriodicTimer<TState>)state;
  141. if (!timer._disposed)
  142. {
  143. timer._state = timer._action(timer._state);
  144. ThreadPool.QueueUserWorkItem(static t => Tick(t), timer);
  145. }
  146. }
  147. public void Dispose()
  148. {
  149. _disposed = true;
  150. _action = Stubs<TState>.I;
  151. }
  152. }
  153. private sealed class PeriodicTimer<TState> : IDisposable
  154. {
  155. private TState _state;
  156. private Func<TState, TState> _action;
  157. private readonly AsyncLock _gate;
  158. private volatile Timer _timer;
  159. public PeriodicTimer(TState state, TimeSpan period, Func<TState, TState> action)
  160. {
  161. _state = state;
  162. _action = action;
  163. _gate = new AsyncLock();
  164. //
  165. // Rooting of the timer happens through the this.Tick delegate's target object,
  166. // which is the current instance and has a field to store the Timer instance.
  167. //
  168. _timer = new Timer(static @this => ((PeriodicTimer<TState>)@this).Tick(), this, period, period);
  169. }
  170. private void Tick()
  171. {
  172. _gate.Wait(
  173. this,
  174. @this =>
  175. {
  176. @this._state = @this._action(@this._state);
  177. });
  178. }
  179. public void Dispose()
  180. {
  181. var timer = _timer;
  182. if (timer != null)
  183. {
  184. _action = Stubs<TState>.I;
  185. _timer = null;
  186. timer.Dispose();
  187. _gate.Dispose();
  188. }
  189. }
  190. }
  191. }
  192. }
  193. #endif