Scheduler.Async.cs 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Reactive.Disposables;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. namespace System.Reactive.Concurrency
  8. {
  9. public static partial class Scheduler
  10. {
  11. private sealed class AsyncInvocation<TState> : IDisposable
  12. {
  13. private readonly CancellationTokenSource _cts = new CancellationTokenSource();
  14. private IDisposable _run;
  15. public IDisposable Run(IScheduler self, TState s, Func<IScheduler, TState, CancellationToken, Task<IDisposable>> action)
  16. {
  17. if (_cts.IsCancellationRequested)
  18. return Disposable.Empty;
  19. action(new CancelableScheduler(self, _cts.Token), s, _cts.Token).ContinueWith(
  20. (t, @thisObject) =>
  21. {
  22. if (!t.IsCanceled)
  23. {
  24. var @this = (AsyncInvocation<TState>)@thisObject;
  25. t.Exception?.Handle(e => e is OperationCanceledException);
  26. Disposable.SetSingle(ref @this._run, t.Result);
  27. }
  28. },
  29. this,
  30. TaskContinuationOptions.ExecuteSynchronously);
  31. return this;
  32. }
  33. public void Dispose()
  34. {
  35. _cts.Cancel();
  36. Disposable.TryDispose(ref _run);
  37. }
  38. }
  39. /// <summary>
  40. /// Yields execution of the current work item on the scheduler to another work item on the scheduler.
  41. /// The caller should await the result of calling Yield to schedule the remainder of the current work item (known as the continuation).
  42. /// </summary>
  43. /// <param name="scheduler">Scheduler to yield work on.</param>
  44. /// <returns>Scheduler operation object to await in order to schedule the continuation.</returns>
  45. /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is <c>null</c>.</exception>
  46. public static SchedulerOperation Yield(this IScheduler scheduler)
  47. {
  48. if (scheduler == null)
  49. {
  50. throw new ArgumentNullException(nameof(scheduler));
  51. }
  52. return new SchedulerOperation(a => scheduler.Schedule(a), scheduler.GetCancellationToken());
  53. }
  54. /// <summary>
  55. /// Yields execution of the current work item on the scheduler to another work item on the scheduler.
  56. /// The caller should await the result of calling Yield to schedule the remainder of the current work item (known as the continuation).
  57. /// </summary>
  58. /// <param name="scheduler">Scheduler to yield work on.</param>
  59. /// <param name="cancellationToken">Cancellation token to cancel the continuation to run.</param>
  60. /// <returns>Scheduler operation object to await in order to schedule the continuation.</returns>
  61. /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is <c>null</c>.</exception>
  62. public static SchedulerOperation Yield(this IScheduler scheduler, CancellationToken cancellationToken)
  63. {
  64. if (scheduler == null)
  65. {
  66. throw new ArgumentNullException(nameof(scheduler));
  67. }
  68. return new SchedulerOperation(a => scheduler.Schedule(a), cancellationToken);
  69. }
  70. /// <summary>
  71. /// Suspends execution of the current work item on the scheduler for the specified duration.
  72. /// The caller should await the result of calling Sleep to schedule the remainder of the current work item (known as the continuation) after the specified duration.
  73. /// </summary>
  74. /// <param name="scheduler">Scheduler to yield work on.</param>
  75. /// <param name="dueTime">Time when the continuation should run.</param>
  76. /// <returns>Scheduler operation object to await in order to schedule the continuation.</returns>
  77. /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is <c>null</c>.</exception>
  78. public static SchedulerOperation Sleep(this IScheduler scheduler, TimeSpan dueTime)
  79. {
  80. if (scheduler == null)
  81. {
  82. throw new ArgumentNullException(nameof(scheduler));
  83. }
  84. return new SchedulerOperation(a => scheduler.Schedule(dueTime, a), scheduler.GetCancellationToken());
  85. }
  86. /// <summary>
  87. /// Suspends execution of the current work item on the scheduler for the specified duration.
  88. /// The caller should await the result of calling Sleep to schedule the remainder of the current work item (known as the continuation) after the specified duration.
  89. /// </summary>
  90. /// <param name="scheduler">Scheduler to yield work on.</param>
  91. /// <param name="dueTime">Time when the continuation should run.</param>
  92. /// <param name="cancellationToken">Cancellation token to cancel the continuation to run.</param>
  93. /// <returns>Scheduler operation object to await in order to schedule the continuation.</returns>
  94. /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is <c>null</c>.</exception>
  95. public static SchedulerOperation Sleep(this IScheduler scheduler, TimeSpan dueTime, CancellationToken cancellationToken)
  96. {
  97. if (scheduler == null)
  98. {
  99. throw new ArgumentNullException(nameof(scheduler));
  100. }
  101. return new SchedulerOperation(a => scheduler.Schedule(dueTime, a), cancellationToken);
  102. }
  103. /// <summary>
  104. /// Suspends execution of the current work item on the scheduler until the specified due time.
  105. /// The caller should await the result of calling Sleep to schedule the remainder of the current work item (known as the continuation) at the specified due time.
  106. /// </summary>
  107. /// <param name="scheduler">Scheduler to yield work on.</param>
  108. /// <param name="dueTime">Time when the continuation should run.</param>
  109. /// <returns>Scheduler operation object to await in order to schedule the continuation.</returns>
  110. /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is <c>null</c>.</exception>
  111. public static SchedulerOperation Sleep(this IScheduler scheduler, DateTimeOffset dueTime)
  112. {
  113. if (scheduler == null)
  114. {
  115. throw new ArgumentNullException(nameof(scheduler));
  116. }
  117. return new SchedulerOperation(a => scheduler.Schedule(dueTime, a), scheduler.GetCancellationToken());
  118. }
  119. /// <summary>
  120. /// Suspends execution of the current work item on the scheduler until the specified due time.
  121. /// The caller should await the result of calling Sleep to schedule the remainder of the current work item (known as the continuation) at the specified due time.
  122. /// </summary>
  123. /// <param name="scheduler">Scheduler to yield work on.</param>
  124. /// <param name="dueTime">Time when the continuation should run.</param>
  125. /// <param name="cancellationToken">Cancellation token to cancel the continuation to run.</param>
  126. /// <returns>Scheduler operation object to await in order to schedule the continuation.</returns>
  127. /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is <c>null</c>.</exception>
  128. public static SchedulerOperation Sleep(this IScheduler scheduler, DateTimeOffset dueTime, CancellationToken cancellationToken)
  129. {
  130. if (scheduler == null)
  131. {
  132. throw new ArgumentNullException(nameof(scheduler));
  133. }
  134. return new SchedulerOperation(a => scheduler.Schedule(dueTime, a), cancellationToken);
  135. }
  136. /// <summary>
  137. /// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style.
  138. /// </summary>
  139. /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam>
  140. /// <param name="scheduler">Scheduler to schedule work on.</param>
  141. /// <param name="state">State to pass to the asynchronous method.</param>
  142. /// <param name="action">Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points.</param>
  143. /// <returns>Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method.</returns>
  144. /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> or <paramref name="action"/> is <c>null</c>.</exception>
  145. public static IDisposable ScheduleAsync<TState>(this IScheduler scheduler, TState state, Func<IScheduler, TState, CancellationToken, Task> action)
  146. {
  147. if (scheduler == null)
  148. {
  149. throw new ArgumentNullException(nameof(scheduler));
  150. }
  151. if (action == null)
  152. {
  153. throw new ArgumentNullException(nameof(action));
  154. }
  155. return ScheduleAsync_<TState>(scheduler, state, action);
  156. }
  157. /// <summary>
  158. /// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style.
  159. /// </summary>
  160. /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam>
  161. /// <param name="scheduler">Scheduler to schedule work on.</param>
  162. /// <param name="state">State to pass to the asynchronous method.</param>
  163. /// <param name="action">Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points.</param>
  164. /// <returns>Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method.</returns>
  165. /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> or <paramref name="action"/> is <c>null</c>.</exception>
  166. public static IDisposable ScheduleAsync<TState>(this IScheduler scheduler, TState state, Func<IScheduler, TState, CancellationToken, Task<IDisposable>> action)
  167. {
  168. if (scheduler == null)
  169. {
  170. throw new ArgumentNullException(nameof(scheduler));
  171. }
  172. if (action == null)
  173. {
  174. throw new ArgumentNullException(nameof(action));
  175. }
  176. return ScheduleAsync_<TState>(scheduler, state, action);
  177. }
  178. /// <summary>
  179. /// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style.
  180. /// </summary>
  181. /// <param name="scheduler">Scheduler to schedule work on.</param>
  182. /// <param name="action">Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points.</param>
  183. /// <returns>Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method.</returns>
  184. /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> or <paramref name="action"/> is <c>null</c>.</exception>
  185. public static IDisposable ScheduleAsync(this IScheduler scheduler, Func<IScheduler, CancellationToken, Task> action)
  186. {
  187. if (scheduler == null)
  188. {
  189. throw new ArgumentNullException(nameof(scheduler));
  190. }
  191. if (action == null)
  192. {
  193. throw new ArgumentNullException(nameof(action));
  194. }
  195. return ScheduleAsync_(scheduler, action, (self, closureAction, ct) => closureAction(self, ct));
  196. }
  197. /// <summary>
  198. /// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style.
  199. /// </summary>
  200. /// <param name="scheduler">Scheduler to schedule work on.</param>
  201. /// <param name="action">Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points.</param>
  202. /// <returns>Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method.</returns>
  203. /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> or <paramref name="action"/> is <c>null</c>.</exception>
  204. public static IDisposable ScheduleAsync(this IScheduler scheduler, Func<IScheduler, CancellationToken, Task<IDisposable>> action)
  205. {
  206. if (scheduler == null)
  207. {
  208. throw new ArgumentNullException(nameof(scheduler));
  209. }
  210. if (action == null)
  211. {
  212. throw new ArgumentNullException(nameof(action));
  213. }
  214. return ScheduleAsync_(scheduler, action, (self, closureAction, ct) => closureAction(self, ct));
  215. }
  216. /// <summary>
  217. /// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style.
  218. /// </summary>
  219. /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam>
  220. /// <param name="scheduler">Scheduler to schedule work on.</param>
  221. /// <param name="state">State to pass to the asynchronous method.</param>
  222. /// <param name="dueTime">Relative time after which to execute the action.</param>
  223. /// <param name="action">Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points.</param>
  224. /// <returns>Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method.</returns>
  225. /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> or <paramref name="action"/> is <c>null</c>.</exception>
  226. public static IDisposable ScheduleAsync<TState>(this IScheduler scheduler, TState state, TimeSpan dueTime, Func<IScheduler, TState, CancellationToken, Task> action)
  227. {
  228. if (scheduler == null)
  229. {
  230. throw new ArgumentNullException(nameof(scheduler));
  231. }
  232. if (action == null)
  233. {
  234. throw new ArgumentNullException(nameof(action));
  235. }
  236. return ScheduleAsync_(scheduler, state, dueTime, action);
  237. }
  238. /// <summary>
  239. /// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style.
  240. /// </summary>
  241. /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam>
  242. /// <param name="scheduler">Scheduler to schedule work on.</param>
  243. /// <param name="state">State to pass to the asynchronous method.</param>
  244. /// <param name="dueTime">Relative time after which to execute the action.</param>
  245. /// <param name="action">Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points.</param>
  246. /// <returns>Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method.</returns>
  247. /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> or <paramref name="action"/> is <c>null</c>.</exception>
  248. public static IDisposable ScheduleAsync<TState>(this IScheduler scheduler, TState state, TimeSpan dueTime, Func<IScheduler, TState, CancellationToken, Task<IDisposable>> action)
  249. {
  250. if (scheduler == null)
  251. {
  252. throw new ArgumentNullException(nameof(scheduler));
  253. }
  254. if (action == null)
  255. {
  256. throw new ArgumentNullException(nameof(action));
  257. }
  258. return ScheduleAsync_(scheduler, state, dueTime, action);
  259. }
  260. /// <summary>
  261. /// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style.
  262. /// </summary>
  263. /// <param name="scheduler">Scheduler to schedule work on.</param>
  264. /// <param name="dueTime">Relative time after which to execute the action.</param>
  265. /// <param name="action">Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points.</param>
  266. /// <returns>Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method.</returns>
  267. /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> or <paramref name="action"/> is <c>null</c>.</exception>
  268. public static IDisposable ScheduleAsync(this IScheduler scheduler, TimeSpan dueTime, Func<IScheduler, CancellationToken, Task> action)
  269. {
  270. if (scheduler == null)
  271. {
  272. throw new ArgumentNullException(nameof(scheduler));
  273. }
  274. if (action == null)
  275. {
  276. throw new ArgumentNullException(nameof(action));
  277. }
  278. return ScheduleAsync_(scheduler, action, dueTime, (self, closureAction, ct) => closureAction(self, ct));
  279. }
  280. /// <summary>
  281. /// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style.
  282. /// </summary>
  283. /// <param name="scheduler">Scheduler to schedule work on.</param>
  284. /// <param name="dueTime">Relative time after which to execute the action.</param>
  285. /// <param name="action">Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points.</param>
  286. /// <returns>Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method.</returns>
  287. /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> or <paramref name="action"/> is <c>null</c>.</exception>
  288. public static IDisposable ScheduleAsync(this IScheduler scheduler, TimeSpan dueTime, Func<IScheduler, CancellationToken, Task<IDisposable>> action)
  289. {
  290. if (scheduler == null)
  291. {
  292. throw new ArgumentNullException(nameof(scheduler));
  293. }
  294. if (action == null)
  295. {
  296. throw new ArgumentNullException(nameof(action));
  297. }
  298. return ScheduleAsync_(scheduler, action, dueTime, (self, closureAction, ct) => closureAction(self, ct));
  299. }
  300. /// <summary>
  301. /// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style.
  302. /// </summary>
  303. /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam>
  304. /// <param name="scheduler">Scheduler to schedule work on.</param>
  305. /// <param name="state">State to pass to the asynchronous method.</param>
  306. /// <param name="dueTime">Absolute time at which to execute the action.</param>
  307. /// <param name="action">Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points.</param>
  308. /// <returns>Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method.</returns>
  309. /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> or <paramref name="action"/> is <c>null</c>.</exception>
  310. public static IDisposable ScheduleAsync<TState>(this IScheduler scheduler, TState state, DateTimeOffset dueTime, Func<IScheduler, TState, CancellationToken, Task> action)
  311. {
  312. if (scheduler == null)
  313. {
  314. throw new ArgumentNullException(nameof(scheduler));
  315. }
  316. if (action == null)
  317. {
  318. throw new ArgumentNullException(nameof(action));
  319. }
  320. return ScheduleAsync_(scheduler, state, dueTime, action);
  321. }
  322. /// <summary>
  323. /// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style.
  324. /// </summary>
  325. /// <typeparam name="TState">The type of the state passed to the scheduled action.</typeparam>
  326. /// <param name="scheduler">Scheduler to schedule work on.</param>
  327. /// <param name="state">State to pass to the asynchronous method.</param>
  328. /// <param name="dueTime">Absolute time at which to execute the action.</param>
  329. /// <param name="action">Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points.</param>
  330. /// <returns>Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method.</returns>
  331. /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> or <paramref name="action"/> is <c>null</c>.</exception>
  332. public static IDisposable ScheduleAsync<TState>(this IScheduler scheduler, TState state, DateTimeOffset dueTime, Func<IScheduler, TState, CancellationToken, Task<IDisposable>> action)
  333. {
  334. if (scheduler == null)
  335. {
  336. throw new ArgumentNullException(nameof(scheduler));
  337. }
  338. if (action == null)
  339. {
  340. throw new ArgumentNullException(nameof(action));
  341. }
  342. return ScheduleAsync_(scheduler, state, dueTime, action);
  343. }
  344. /// <summary>
  345. /// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style.
  346. /// </summary>
  347. /// <param name="scheduler">Scheduler to schedule work on.</param>
  348. /// <param name="dueTime">Absolute time at which to execute the action.</param>
  349. /// <param name="action">Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points.</param>
  350. /// <returns>Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method.</returns>
  351. /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> or <paramref name="action"/> is <c>null</c>.</exception>
  352. public static IDisposable ScheduleAsync(this IScheduler scheduler, DateTimeOffset dueTime, Func<IScheduler, CancellationToken, Task> action)
  353. {
  354. if (scheduler == null)
  355. {
  356. throw new ArgumentNullException(nameof(scheduler));
  357. }
  358. if (action == null)
  359. {
  360. throw new ArgumentNullException(nameof(action));
  361. }
  362. return ScheduleAsync_(scheduler, action, dueTime, (self, closureAction, ct) => closureAction(self, ct));
  363. }
  364. /// <summary>
  365. /// Schedules work using an asynchronous method, allowing for cooperative scheduling in an imperative coding style.
  366. /// </summary>
  367. /// <param name="scheduler">Scheduler to schedule work on.</param>
  368. /// <param name="dueTime">Absolute time at which to execute the action.</param>
  369. /// <param name="action">Asynchronous method to run the work, using Yield and Sleep operations for cooperative scheduling and injection of cancellation points.</param>
  370. /// <returns>Disposable object that allows to cancel outstanding work on cooperative cancellation points or through the cancellation token passed to the asynchronous method.</returns>
  371. /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> or <paramref name="action"/> is <c>null</c>.</exception>
  372. public static IDisposable ScheduleAsync(this IScheduler scheduler, DateTimeOffset dueTime, Func<IScheduler, CancellationToken, Task<IDisposable>> action)
  373. {
  374. if (scheduler == null)
  375. {
  376. throw new ArgumentNullException(nameof(scheduler));
  377. }
  378. if (action == null)
  379. {
  380. throw new ArgumentNullException(nameof(action));
  381. }
  382. return ScheduleAsync_(scheduler, action, dueTime, (self, closureAction, ct) => closureAction(self, ct));
  383. }
  384. private static IDisposable ScheduleAsync_<TState>(IScheduler scheduler, TState state, Func<IScheduler, TState, CancellationToken, Task> action)
  385. {
  386. return scheduler.Schedule((state, action), (self, t) => InvokeAsync(self, t.state, t.action));
  387. }
  388. private static IDisposable ScheduleAsync_<TState>(IScheduler scheduler, TState state, Func<IScheduler, TState, CancellationToken, Task<IDisposable>> action)
  389. {
  390. return scheduler.Schedule((state, action), (self, t) => InvokeAsync(self, t.state, t.action));
  391. }
  392. private static IDisposable ScheduleAsync_<TState>(IScheduler scheduler, TState state, TimeSpan dueTime, Func<IScheduler, TState, CancellationToken, Task> action)
  393. {
  394. return scheduler.Schedule((state, action), dueTime, (self, t) => InvokeAsync(self, t.state, t.action));
  395. }
  396. private static IDisposable ScheduleAsync_<TState>(IScheduler scheduler, TState state, TimeSpan dueTime, Func<IScheduler, TState, CancellationToken, Task<IDisposable>> action)
  397. {
  398. return scheduler.Schedule((state, action), dueTime, (self, t) => InvokeAsync(self, t.state, t.action));
  399. }
  400. private static IDisposable ScheduleAsync_<TState>(IScheduler scheduler, TState state, DateTimeOffset dueTime, Func<IScheduler, TState, CancellationToken, Task> action)
  401. {
  402. return scheduler.Schedule((state, action), dueTime, (self, t) => InvokeAsync(self, t.state, t.action));
  403. }
  404. private static IDisposable ScheduleAsync_<TState>(IScheduler scheduler, TState state, DateTimeOffset dueTime, Func<IScheduler, TState, CancellationToken, Task<IDisposable>> action)
  405. {
  406. return scheduler.Schedule((state, action), dueTime, (self, t) => InvokeAsync(self, t.state, t.action));
  407. }
  408. private static IDisposable InvokeAsync<TState>(IScheduler self, TState s, Func<IScheduler, TState, CancellationToken, Task<IDisposable>> action)
  409. {
  410. return new AsyncInvocation<TState>().Run(self, s, action);
  411. }
  412. private static IDisposable InvokeAsync<TState>(IScheduler self, TState s, Func<IScheduler, TState, CancellationToken, Task> action)
  413. {
  414. return InvokeAsync(self, (action, state: s), (self_, t, ct) => t.action(self_, t.state, ct).ContinueWith(_ => Disposable.Empty));
  415. }
  416. private static CancellationToken GetCancellationToken(this IScheduler scheduler)
  417. {
  418. return scheduler is CancelableScheduler cs ? cs.Token : CancellationToken.None;
  419. }
  420. private sealed class CancelableScheduler : IScheduler
  421. {
  422. private readonly IScheduler _scheduler;
  423. private readonly CancellationToken _cancellationToken;
  424. public CancelableScheduler(IScheduler scheduler, CancellationToken cancellationToken)
  425. {
  426. _scheduler = scheduler;
  427. _cancellationToken = cancellationToken;
  428. }
  429. public CancellationToken Token
  430. {
  431. get { return _cancellationToken; }
  432. }
  433. public DateTimeOffset Now => _scheduler.Now;
  434. public IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action)
  435. {
  436. return _scheduler.Schedule(state, action);
  437. }
  438. public IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)
  439. {
  440. return _scheduler.Schedule(state, dueTime, action);
  441. }
  442. public IDisposable Schedule<TState>(TState state, DateTimeOffset dueTime, Func<IScheduler, TState, IDisposable> action)
  443. {
  444. return _scheduler.Schedule(state, dueTime, action);
  445. }
  446. }
  447. }
  448. }