AsyncScheduler.cs 11 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Runtime.CompilerServices;
  5. using System.Runtime.ExceptionServices;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. namespace System.Reactive.Concurrency
  9. {
  10. public static class AsyncScheduler
  11. {
  12. public static IAwaitable RendezVous(this IAsyncScheduler scheduler)
  13. {
  14. if (scheduler == null)
  15. throw new ArgumentNullException(nameof(scheduler));
  16. return new RendezVousAwaitable(scheduler, CancellationToken.None);
  17. }
  18. public static IAwaitable RendezVous(this IAsyncScheduler scheduler, CancellationToken token)
  19. {
  20. if (scheduler == null)
  21. throw new ArgumentNullException(nameof(scheduler));
  22. token.ThrowIfCancellationRequested();
  23. return new RendezVousAwaitable(scheduler, token);
  24. }
  25. public static IAwaitable RendezVous(this Task task, IAsyncScheduler scheduler) => RendezVous(task, scheduler, CancellationToken.None);
  26. public static IAwaitable RendezVous(this Task task, IAsyncScheduler scheduler, CancellationToken token)
  27. {
  28. if (task == null)
  29. throw new ArgumentNullException(nameof(task));
  30. if (scheduler == null)
  31. throw new ArgumentNullException(nameof(scheduler));
  32. return new TaskAwaitable(task, scheduler, token);
  33. }
  34. public static IAwaitable<T> RendezVous<T>(this Task<T> task, IAsyncScheduler scheduler) => RendezVous(task, scheduler, CancellationToken.None);
  35. public static IAwaitable<T> RendezVous<T>(this Task<T> task, IAsyncScheduler scheduler, CancellationToken token)
  36. {
  37. if (task == null)
  38. throw new ArgumentNullException(nameof(task));
  39. if (scheduler == null)
  40. throw new ArgumentNullException(nameof(scheduler));
  41. return new TaskAwaitable<T>(task, scheduler, token);
  42. }
  43. public static async Task Delay(this IAsyncScheduler scheduler, TimeSpan dueTime, CancellationToken token = default(CancellationToken))
  44. {
  45. if (scheduler == null)
  46. throw new ArgumentNullException(nameof(scheduler));
  47. var tcs = new TaskCompletionSource<bool>();
  48. var task = await scheduler.ScheduleAsync(ct =>
  49. {
  50. if (ct.IsCancellationRequested)
  51. {
  52. tcs.TrySetCanceled(ct);
  53. }
  54. else
  55. {
  56. tcs.SetResult(true);
  57. }
  58. return Task.CompletedTask;
  59. }, dueTime);
  60. using (token.Register(() => task.DisposeAsync()))
  61. {
  62. await tcs.Task;
  63. }
  64. }
  65. public static async Task Delay(this IAsyncScheduler scheduler, DateTimeOffset dueTime, CancellationToken token = default(CancellationToken))
  66. {
  67. if (scheduler == null)
  68. throw new ArgumentNullException(nameof(scheduler));
  69. var tcs = new TaskCompletionSource<bool>();
  70. var task = await scheduler.ScheduleAsync(ct =>
  71. {
  72. if (ct.IsCancellationRequested)
  73. {
  74. tcs.TrySetCanceled(ct);
  75. }
  76. else
  77. {
  78. tcs.SetResult(true);
  79. }
  80. return Task.CompletedTask;
  81. }, dueTime);
  82. using (token.Register(() => task.DisposeAsync()))
  83. {
  84. await tcs.Task;
  85. }
  86. }
  87. public static async Task ExecuteAsync(this IAsyncScheduler scheduler, Func<CancellationToken, Task> action, CancellationToken token = default(CancellationToken))
  88. {
  89. var tcs = new TaskCompletionSource<object>();
  90. var d = await scheduler.ScheduleAsync(async ct =>
  91. {
  92. try
  93. {
  94. ct.ThrowIfCancellationRequested();
  95. await action(ct).RendezVous(scheduler, ct);
  96. }
  97. catch (OperationCanceledException ex) when (ex.CancellationToken == ct)
  98. {
  99. tcs.TrySetCanceled(ct);
  100. }
  101. catch (Exception ex)
  102. {
  103. tcs.TrySetException(ex);
  104. }
  105. finally
  106. {
  107. tcs.TrySetResult(null);
  108. }
  109. });
  110. using (token.Register(() =>
  111. {
  112. try
  113. {
  114. d.DisposeAsync();
  115. }
  116. finally
  117. {
  118. tcs.TrySetCanceled(token);
  119. }
  120. }))
  121. {
  122. await tcs.Task.ConfigureAwait(false);
  123. }
  124. }
  125. public static async Task<TResult> ExecuteAsync<TResult>(this IAsyncScheduler scheduler, Func<CancellationToken, Task<TResult>> action, CancellationToken token = default(CancellationToken))
  126. {
  127. var tcs = new TaskCompletionSource<TResult>();
  128. var d = await scheduler.ScheduleAsync(async ct =>
  129. {
  130. var res = default(TResult);
  131. try
  132. {
  133. ct.ThrowIfCancellationRequested();
  134. res = await action(ct).RendezVous(scheduler, ct);
  135. }
  136. catch (OperationCanceledException ex) when (ex.CancellationToken == ct)
  137. {
  138. tcs.TrySetCanceled(ct);
  139. }
  140. catch (Exception ex)
  141. {
  142. tcs.TrySetException(ex);
  143. }
  144. finally
  145. {
  146. tcs.TrySetResult(res);
  147. }
  148. });
  149. using (token.Register(() =>
  150. {
  151. try
  152. {
  153. d.DisposeAsync();
  154. }
  155. finally
  156. {
  157. tcs.TrySetCanceled(token);
  158. }
  159. }))
  160. {
  161. return await tcs.Task.ConfigureAwait(false);
  162. }
  163. }
  164. private sealed class RendezVousAwaitable : IAwaitable, IAwaiter // PERF: Can we avoid these allocations?
  165. {
  166. private readonly IAsyncScheduler _scheduler;
  167. private readonly CancellationToken _token;
  168. private bool _done;
  169. private ExceptionDispatchInfo _error;
  170. public RendezVousAwaitable(IAsyncScheduler scheduler, CancellationToken token)
  171. {
  172. _scheduler = scheduler;
  173. _token = token;
  174. }
  175. public bool IsCompleted => _done;
  176. public IAwaiter GetAwaiter() => this;
  177. public void GetResult()
  178. {
  179. if (!_done)
  180. {
  181. throw new InvalidOperationException(); // REVIEW: No support for blocking.
  182. }
  183. if (_error != null)
  184. {
  185. _error.Throw();
  186. }
  187. }
  188. public void OnCompleted(Action continuation)
  189. {
  190. var t = _scheduler.ExecuteAsync(ct =>
  191. {
  192. try
  193. {
  194. continuation();
  195. }
  196. catch (Exception ex)
  197. {
  198. _error = ExceptionDispatchInfo.Capture(ex);
  199. }
  200. finally
  201. {
  202. _done = true;
  203. }
  204. return Task.CompletedTask;
  205. }, _token);
  206. }
  207. }
  208. private sealed class TaskAwaitable : IAwaitable, IAwaiter
  209. {
  210. private readonly ConfiguredTaskAwaitable.ConfiguredTaskAwaiter _task;
  211. private readonly IAsyncScheduler _scheduler;
  212. private readonly CancellationToken _token;
  213. public TaskAwaitable(Task task, IAsyncScheduler scheduler, CancellationToken token)
  214. {
  215. _task = task.ConfigureAwait(false).GetAwaiter();
  216. _scheduler = scheduler;
  217. _token = token;
  218. }
  219. public bool IsCompleted => _task.IsCompleted;
  220. public IAwaiter GetAwaiter() => this;
  221. public void GetResult()
  222. {
  223. _token.ThrowIfCancellationRequested();
  224. _task.GetResult();
  225. }
  226. public void OnCompleted(Action continuation)
  227. {
  228. var cancel = default(IDisposable);
  229. if (_token.CanBeCanceled)
  230. {
  231. cancel = _token.Register(() =>
  232. {
  233. Interlocked.Exchange(ref continuation, null)?.Invoke();
  234. });
  235. }
  236. try
  237. {
  238. _task.OnCompleted(() =>
  239. {
  240. var t = _scheduler.ExecuteAsync(ct =>
  241. {
  242. cancel?.Dispose();
  243. Interlocked.Exchange(ref continuation, null)?.Invoke();
  244. return Task.CompletedTask;
  245. }, _token);
  246. });
  247. }
  248. catch
  249. {
  250. cancel?.Dispose();
  251. throw;
  252. }
  253. }
  254. }
  255. private sealed class TaskAwaitable<T> : IAwaitable<T>, IAwaiter<T>
  256. {
  257. private readonly ConfiguredTaskAwaitable<T>.ConfiguredTaskAwaiter _task;
  258. private readonly IAsyncScheduler _scheduler;
  259. private readonly CancellationToken _token;
  260. public TaskAwaitable(Task<T> task, IAsyncScheduler scheduler, CancellationToken token)
  261. {
  262. _task = task.ConfigureAwait(false).GetAwaiter();
  263. _scheduler = scheduler;
  264. _token = token;
  265. }
  266. public bool IsCompleted => _task.IsCompleted;
  267. public IAwaiter<T> GetAwaiter() => this;
  268. public T GetResult()
  269. {
  270. _token.ThrowIfCancellationRequested();
  271. return _task.GetResult();
  272. }
  273. public void OnCompleted(Action continuation)
  274. {
  275. var cancel = default(IDisposable);
  276. if (_token.CanBeCanceled)
  277. {
  278. cancel = _token.Register(() =>
  279. {
  280. Interlocked.Exchange(ref continuation, null)?.Invoke();
  281. });
  282. }
  283. try
  284. {
  285. _task.OnCompleted(() =>
  286. {
  287. var t = _scheduler.ExecuteAsync(ct =>
  288. {
  289. cancel?.Dispose();
  290. Interlocked.Exchange(ref continuation, null)?.Invoke();
  291. return Task.CompletedTask;
  292. }, _token);
  293. });
  294. }
  295. catch
  296. {
  297. cancel?.Dispose();
  298. throw;
  299. }
  300. }
  301. }
  302. }
  303. }