TaskPoolScheduler.cs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. #if !NO_TPL
  5. using System.Reactive.Disposables;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. namespace System.Reactive.Concurrency
  9. {
  10. /// <summary>
  11. /// Represents an object that schedules units of work on the Task Parallel Library (TPL) task pool.
  12. /// </summary>
  13. /// <seealso cref="TaskPoolScheduler.Default">Instance of this type using the default TaskScheduler to schedule work on the TPL task pool.</seealso>
  14. public sealed class TaskPoolScheduler : LocalScheduler, ISchedulerLongRunning, ISchedulerPeriodic
  15. {
  16. private static readonly Lazy<TaskPoolScheduler> s_instance = new Lazy<TaskPoolScheduler>(() => new TaskPoolScheduler(new TaskFactory(TaskScheduler.Default)));
  17. private readonly TaskFactory taskFactory;
  18. /// <summary>
  19. /// Creates an object that schedules units of work using the provided TaskFactory.
  20. /// </summary>
  21. /// <param name="taskFactory">Task factory used to create tasks to run units of work.</param>
  22. /// <exception cref="ArgumentNullException"><paramref name="taskFactory"/> is null.</exception>
  23. public TaskPoolScheduler(TaskFactory taskFactory)
  24. {
  25. if (taskFactory == null)
  26. throw new ArgumentNullException(nameof(taskFactory));
  27. this.taskFactory = taskFactory;
  28. }
  29. /// <summary>
  30. /// Gets an instance of this scheduler that uses the default TaskScheduler.
  31. /// </summary>
  32. public static TaskPoolScheduler Default
  33. {
  34. get
  35. {
  36. return s_instance.Value;
  37. }
  38. }
  39. /// <summary>
  40. /// Schedules an action to be executed.
  41. /// </summary>
  42. /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam>
  43. /// <param name="state">State passed to the action to be executed.</param>
  44. /// <param name="action">Action to be executed.</param>
  45. /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns>
  46. /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception>
  47. public override IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action)
  48. {
  49. if (action == null)
  50. throw new ArgumentNullException(nameof(action));
  51. var d = new SerialDisposable();
  52. var cancelable = new CancellationDisposable();
  53. d.Disposable = cancelable;
  54. taskFactory.StartNew(() =>
  55. {
  56. //
  57. // BREAKING CHANGE v2.0 > v1.x - No longer escalating exceptions using a throwing
  58. // helper thread.
  59. //
  60. // Our manual escalation based on the creation of a throwing thread was merely to
  61. // expedite the process of throwing the exception that would otherwise occur on the
  62. // finalizer thread at a later point during the app's lifetime.
  63. //
  64. // However, it also prevented applications from observing the exception through
  65. // the TaskScheduler.UnobservedTaskException static event. Also, starting form .NET
  66. // 4.5, the default behavior of the task pool is not to take down the application
  67. // when an exception goes unobserved (done as part of the async/await work). It'd
  68. // be weird for Rx not to follow the platform defaults.
  69. //
  70. // General implementation guidelines for schedulers (in order of importance):
  71. //
  72. // 1. Always thunk through to the underlying infrastructure with a wrapper that's as tiny as possible.
  73. // 2. Global exception notification/handling mechanisms shouldn't be bypassed.
  74. // 3. Escalation behavior for exceptions is left to the underlying infrastructure.
  75. //
  76. // The Catch extension method for IScheduler (added earlier) allows to re-route
  77. // exceptions at stage 2. If the exception isn't handled at the Rx level, it
  78. // propagates by means of a rethrow, falling back to behavior in 3.
  79. //
  80. d.Disposable = action(this, state);
  81. }, cancelable.Token);
  82. return d;
  83. }
  84. /// <summary>
  85. /// Schedules an action to be executed after dueTime.
  86. /// </summary>
  87. /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam>
  88. /// <param name="state">State passed to the action to be executed.</param>
  89. /// <param name="action">Action to be executed.</param>
  90. /// <param name="dueTime">Relative time after which to execute the action.</param>
  91. /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns>
  92. /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception>
  93. public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)
  94. {
  95. if (action == null)
  96. throw new ArgumentNullException(nameof(action));
  97. var dt = Scheduler.Normalize(dueTime);
  98. if (dt.Ticks == 0)
  99. return Schedule(state, action);
  100. #if !NO_TASK_DELAY
  101. var d = new MultipleAssignmentDisposable();
  102. var ct = new CancellationDisposable();
  103. d.Disposable = ct;
  104. TaskHelpers.Delay(dueTime, ct.Token).ContinueWith(_ =>
  105. {
  106. if (!d.IsDisposed)
  107. d.Disposable = action(this, state);
  108. }, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnRanToCompletion, taskFactory.Scheduler);
  109. return d;
  110. #else
  111. return DefaultScheduler.Instance.Schedule(state, dt, (_, state1) => Schedule(state1, action));
  112. #endif
  113. }
  114. /// <summary>
  115. /// Schedules a long-running task by creating a new task using TaskCreationOptions.LongRunning. Cancellation happens through polling.
  116. /// </summary>
  117. /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam>
  118. /// <param name="state">State passed to the action to be executed.</param>
  119. /// <param name="action">Action to be executed.</param>
  120. /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns>
  121. /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception>
  122. public IDisposable ScheduleLongRunning<TState>(TState state, Action<TState, ICancelable> action)
  123. {
  124. var d = new BooleanDisposable();
  125. taskFactory.StartNew(() =>
  126. {
  127. //
  128. // Notice we don't check d.IsDisposed. The contract for ISchedulerLongRunning
  129. // requires us to ensure the scheduled work gets an opportunity to observe
  130. // the cancellation request.
  131. //
  132. action(state, d);
  133. }, TaskCreationOptions.LongRunning);
  134. return d;
  135. }
  136. #if !NO_STOPWATCH
  137. /// <summary>
  138. /// Gets a new stopwatch ob ject.
  139. /// </summary>
  140. /// <returns>New stopwatch object; started at the time of the request.</returns>
  141. public override IStopwatch StartStopwatch()
  142. {
  143. //
  144. // Strictly speaking, this explicit override is not necessary because the base implementation calls into
  145. // the enlightenment module to obtain the CAL, which would circle back to System.Reactive.PlatformServices
  146. // where we're currently running. This is merely a short-circuit to avoid the additional roundtrip.
  147. //
  148. return new StopwatchImpl();
  149. }
  150. #endif
  151. /// <summary>
  152. /// Schedules a periodic piece of work by running a platform-specific timer to create tasks periodically.
  153. /// </summary>
  154. /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam>
  155. /// <param name="state">Initial state passed to the action upon the first iteration.</param>
  156. /// <param name="period">Period for running the work periodically.</param>
  157. /// <param name="action">Action to be executed, potentially updating the state.</param>
  158. /// <returns>The disposable object used to cancel the scheduled recurring action (best effort).</returns>
  159. /// <exception cref="ArgumentNullException"><paramref name="action"/> is null.</exception>
  160. /// <exception cref="ArgumentOutOfRangeException"><paramref name="period"/> is less than TimeSpan.Zero.</exception>
  161. public IDisposable SchedulePeriodic<TState>(TState state, TimeSpan period, Func<TState, TState> action)
  162. {
  163. if (period < TimeSpan.Zero)
  164. throw new ArgumentOutOfRangeException(nameof(period));
  165. if (action == null)
  166. throw new ArgumentNullException(nameof(action));
  167. #if !NO_TASK_DELAY
  168. var cancel = new CancellationDisposable();
  169. var state1 = state;
  170. var gate = new AsyncLock();
  171. var moveNext = default(Action);
  172. moveNext = () =>
  173. {
  174. TaskHelpers.Delay(period, cancel.Token).ContinueWith(
  175. _ =>
  176. {
  177. moveNext();
  178. gate.Wait(() =>
  179. {
  180. state1 = action(state1);
  181. });
  182. },
  183. CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnRanToCompletion, taskFactory.Scheduler
  184. );
  185. };
  186. moveNext();
  187. return StableCompositeDisposable.Create(cancel, gate);
  188. #else
  189. var state1 = state;
  190. var gate = new AsyncLock();
  191. var timer = ConcurrencyAbstractionLayer.Current.StartPeriodicTimer(() =>
  192. {
  193. taskFactory.StartNew(() =>
  194. {
  195. gate.Wait(() =>
  196. {
  197. state1 = action(state1);
  198. });
  199. });
  200. }, period);
  201. return StableCompositeDisposable.Create(timer, gate);
  202. #endif
  203. }
  204. }
  205. }
  206. #endif