NewThreadScheduler.cs 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Reactive.Disposables;
  5. using System.Threading;
  6. namespace System.Reactive.Concurrency
  7. {
  8. /// <summary>
  9. /// Represents an object that schedules each unit of work on a separate thread.
  10. /// </summary>
  11. public sealed class NewThreadScheduler : LocalScheduler, ISchedulerLongRunning, ISchedulerPeriodic
  12. {
  13. internal static readonly Lazy<NewThreadScheduler> s_instance = new Lazy<NewThreadScheduler>(() => new NewThreadScheduler());
  14. private readonly Func<ThreadStart, Thread> _threadFactory;
  15. /// <summary>
  16. /// Creates an object that schedules each unit of work on a separate thread.
  17. /// </summary>
  18. public NewThreadScheduler()
  19. : this(action => new Thread(action))
  20. {
  21. }
  22. /// <summary>
  23. /// Gets an instance of this scheduler that uses the default Thread constructor.
  24. /// </summary>
  25. public static NewThreadScheduler Default
  26. {
  27. get
  28. {
  29. return s_instance.Value;
  30. }
  31. }
  32. #if !NO_THREAD
  33. /// <summary>
  34. /// Creates an object that schedules each unit of work on a separate thread.
  35. /// </summary>
  36. /// <param name="threadFactory">Factory function for thread creation.</param>
  37. /// <exception cref="ArgumentNullException"><paramref name="threadFactory"/> is null.</exception>
  38. public NewThreadScheduler(Func<ThreadStart, Thread> threadFactory)
  39. {
  40. if (threadFactory == null)
  41. throw new ArgumentNullException(nameof(threadFactory));
  42. #else
  43. private NewThreadScheduler(Func<ThreadStart, Thread> threadFactory)
  44. {
  45. #endif
  46. _threadFactory = threadFactory;
  47. }
  48. /// <summary>
  49. /// Schedules an action to be executed after dueTime.
  50. /// </summary>
  51. /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam>
  52. /// <param name="state">State passed to the action to be executed.</param>
  53. /// <param name="action">Action to be executed.</param>
  54. /// <param name="dueTime">Relative time after which to execute the action.</param>
  55. /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns>
  56. /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception>
  57. public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)
  58. {
  59. if (action == null)
  60. throw new ArgumentNullException(nameof(action));
  61. var scheduler = new EventLoopScheduler(_threadFactory);
  62. scheduler.ExitIfEmpty = true;
  63. return scheduler.Schedule(state, dueTime, action);
  64. }
  65. /// <summary>
  66. /// Schedules a long-running task by creating a new thread. Cancellation happens through polling.
  67. /// </summary>
  68. /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam>
  69. /// <param name="state">State passed to the action to be executed.</param>
  70. /// <param name="action">Action to be executed.</param>
  71. /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns>
  72. /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception>
  73. public IDisposable ScheduleLongRunning<TState>(TState state, Action<TState, ICancelable> action)
  74. {
  75. if (action == null)
  76. throw new ArgumentNullException(nameof(action));
  77. var d = new BooleanDisposable();
  78. var thread = _threadFactory(() =>
  79. {
  80. //
  81. // Notice we don't check d.IsDisposed. The contract for ISchedulerLongRunning
  82. // requires us to ensure the scheduled work gets an opportunity to observe
  83. // the cancellation request.
  84. //
  85. action(state, d);
  86. });
  87. thread.Start();
  88. return d;
  89. }
  90. /// <summary>
  91. /// Schedules a periodic piece of work by creating a new thread that goes to sleep when work has been dispatched and wakes up again at the next periodic due time.
  92. /// </summary>
  93. /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam>
  94. /// <param name="state">Initial state passed to the action upon the first iteration.</param>
  95. /// <param name="period">Period for running the work periodically.</param>
  96. /// <param name="action">Action to be executed, potentially updating the state.</param>
  97. /// <returns>The disposable object used to cancel the scheduled recurring action (best effort).</returns>
  98. /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception>
  99. /// <exception cref="ArgumentOutOfRangeException"><paramref name="period"/> is less than TimeSpan.Zero.</exception>
  100. public IDisposable SchedulePeriodic<TState>(TState state, TimeSpan period, Func<TState, TState> action)
  101. {
  102. if (period < TimeSpan.Zero)
  103. throw new ArgumentOutOfRangeException(nameof(period));
  104. if (action == null)
  105. throw new ArgumentNullException(nameof(action));
  106. var periodic = new Periodic<TState>(state, period, action);
  107. var thread = _threadFactory(periodic.Run);
  108. thread.Start();
  109. return periodic;
  110. }
  111. class Periodic<TState> : IDisposable
  112. {
  113. private readonly IStopwatch _stopwatch;
  114. private readonly TimeSpan _period;
  115. private readonly Func<TState, TState> _action;
  116. private readonly object _cancel = new object();
  117. private volatile bool _done;
  118. private TState _state;
  119. private TimeSpan _next;
  120. public Periodic(TState state, TimeSpan period, Func<TState, TState> action)
  121. {
  122. _stopwatch = ConcurrencyAbstractionLayer.Current.StartStopwatch();
  123. _period = period;
  124. _action = action;
  125. _state = state;
  126. _next = period;
  127. }
  128. public void Run()
  129. {
  130. while (!_done)
  131. {
  132. var timeout = Scheduler.Normalize(_next - _stopwatch.Elapsed);
  133. lock (_cancel)
  134. {
  135. if (Monitor.Wait(_cancel, timeout))
  136. return;
  137. }
  138. _state = _action(_state);
  139. _next += _period;
  140. }
  141. }
  142. public void Dispose()
  143. {
  144. _done = true;
  145. lock (_cancel)
  146. {
  147. Monitor.Pulse(_cancel);
  148. }
  149. }
  150. }
  151. #if !NO_STOPWATCH
  152. /// <summary>
  153. /// Starts a new stopwatch object.
  154. /// </summary>
  155. /// <returns>New stopwatch object; started at the time of the request.</returns>
  156. public override IStopwatch StartStopwatch()
  157. {
  158. //
  159. // Strictly speaking, this explicit override is not necessary because the base implementation calls into
  160. // the enlightenment module to obtain the CAL, which would circle back to System.Reactive.PlatformServices
  161. // where we're currently running. This is merely a short-circuit to avoid the additional roundtrip.
  162. //
  163. return new StopwatchImpl();
  164. }
  165. #endif
  166. }
  167. }