NewThreadScheduler.cs 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. using System.Reactive.Disposables;
  3. using System.Threading;
  4. namespace System.Reactive.Concurrency
  5. {
  6. /// <summary>
  7. /// Represents an object that schedules each unit of work on a separate thread.
  8. /// </summary>
  9. public sealed class NewThreadScheduler : LocalScheduler, ISchedulerLongRunning, ISchedulerPeriodic
  10. {
  11. internal static readonly Lazy<NewThreadScheduler> s_instance = new Lazy<NewThreadScheduler>(() => new NewThreadScheduler());
  12. private readonly Func<ThreadStart, Thread> _threadFactory;
  13. /// <summary>
  14. /// Creates an object that schedules each unit of work on a separate thread.
  15. /// </summary>
  16. public NewThreadScheduler()
  17. : this(action => new Thread(action))
  18. {
  19. }
  20. /// <summary>
  21. /// Gets an instance of this scheduler that uses the default Thread constructor.
  22. /// </summary>
  23. public static NewThreadScheduler Default
  24. {
  25. get
  26. {
  27. return s_instance.Value;
  28. }
  29. }
  30. #if !NO_THREAD
  31. /// <summary>
  32. /// Creates an object that schedules each unit of work on a separate thread.
  33. /// </summary>
  34. /// <param name="threadFactory">Factory function for thread creation.</param>
  35. /// <exception cref="ArgumentNullException"><paramref name="threadFactory"/> is null.</exception>
  36. public NewThreadScheduler(Func<ThreadStart, Thread> threadFactory)
  37. {
  38. if (threadFactory == null)
  39. throw new ArgumentNullException("threadFactory");
  40. #else
  41. private NewThreadScheduler(Func<ThreadStart, Thread> threadFactory)
  42. {
  43. #endif
  44. _threadFactory = threadFactory;
  45. }
  46. /// <summary>
  47. /// Schedules an action to be executed after dueTime.
  48. /// </summary>
  49. /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam>
  50. /// <param name="state">State passed to the action to be executed.</param>
  51. /// <param name="action">Action to be executed.</param>
  52. /// <param name="dueTime">Relative time after which to execute the action.</param>
  53. /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns>
  54. /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception>
  55. public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)
  56. {
  57. if (action == null)
  58. throw new ArgumentNullException("action");
  59. var scheduler = new EventLoopScheduler(_threadFactory);
  60. scheduler.ExitIfEmpty = true;
  61. return scheduler.Schedule(state, dueTime, action);
  62. }
  63. /// <summary>
  64. /// Schedules a long-running task by creating a new thread. Cancellation happens through polling.
  65. /// </summary>
  66. /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam>
  67. /// <param name="state">State passed to the action to be executed.</param>
  68. /// <param name="action">Action to be executed.</param>
  69. /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns>
  70. /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception>
  71. public IDisposable ScheduleLongRunning<TState>(TState state, Action<TState, ICancelable> action)
  72. {
  73. if (action == null)
  74. throw new ArgumentNullException("action");
  75. var d = new BooleanDisposable();
  76. var thread = _threadFactory(() =>
  77. {
  78. //
  79. // Notice we don't check d.IsDisposed. The contract for ISchedulerLongRunning
  80. // requires us to ensure the scheduled work gets an opportunity to observe
  81. // the cancellation request.
  82. //
  83. action(state, d);
  84. });
  85. thread.Start();
  86. return d;
  87. }
  88. /// <summary>
  89. /// Schedules a periodic piece of work by creating a new thread that goes to sleep when work has been dispatched and wakes up again at the next periodic due time.
  90. /// </summary>
  91. /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam>
  92. /// <param name="state">Initial state passed to the action upon the first iteration.</param>
  93. /// <param name="period">Period for running the work periodically.</param>
  94. /// <param name="action">Action to be executed, potentially updating the state.</param>
  95. /// <returns>The disposable object used to cancel the scheduled recurring action (best effort).</returns>
  96. /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception>
  97. /// <exception cref="ArgumentOutOfRangeException"><paramref name="period"/> is less than TimeSpan.Zero.</exception>
  98. public IDisposable SchedulePeriodic<TState>(TState state, TimeSpan period, Func<TState, TState> action)
  99. {
  100. if (period < TimeSpan.Zero)
  101. throw new ArgumentOutOfRangeException("period");
  102. if (action == null)
  103. throw new ArgumentNullException("action");
  104. var periodic = new Periodic<TState>(state, period, action);
  105. var thread = _threadFactory(periodic.Run);
  106. thread.Start();
  107. return periodic;
  108. }
  109. class Periodic<TState> : IDisposable
  110. {
  111. private readonly IStopwatch _stopwatch;
  112. private readonly TimeSpan _period;
  113. private readonly Func<TState, TState> _action;
  114. private readonly object _cancel = new object();
  115. private volatile bool _done;
  116. private TState _state;
  117. private TimeSpan _next;
  118. public Periodic(TState state, TimeSpan period, Func<TState, TState> action)
  119. {
  120. _stopwatch = ConcurrencyAbstractionLayer.Current.StartStopwatch();
  121. _period = period;
  122. _action = action;
  123. _state = state;
  124. _next = period;
  125. }
  126. public void Run()
  127. {
  128. while (!_done)
  129. {
  130. var timeout = Scheduler.Normalize(_next - _stopwatch.Elapsed);
  131. lock (_cancel)
  132. {
  133. if (Monitor.Wait(_cancel, timeout))
  134. return;
  135. }
  136. _state = _action(_state);
  137. _next += _period;
  138. }
  139. }
  140. public void Dispose()
  141. {
  142. _done = true;
  143. lock (_cancel)
  144. {
  145. Monitor.Pulse(_cancel);
  146. }
  147. }
  148. }
  149. #if !NO_STOPWATCH
  150. /// <summary>
  151. /// Starts a new stopwatch object.
  152. /// </summary>
  153. /// <returns>New stopwatch object; started at the time of the request.</returns>
  154. public override IStopwatch StartStopwatch()
  155. {
  156. //
  157. // Strictly speaking, this explicit override is not necessary because the base implementation calls into
  158. // the enlightenment module to obtain the CAL, which would circle back to System.Reactive.PlatformServices
  159. // where we're currently running. This is merely a short-circuit to avoid the additional roundtrip.
  160. //
  161. return new StopwatchImpl();
  162. }
  163. #endif
  164. }
  165. }