TaskExt.cs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Diagnostics;
  6. using System.Runtime.CompilerServices;
  7. namespace System.Threading.Tasks
  8. {
  9. internal static class TaskExt
  10. {
  11. public static readonly Task<bool> Never = new TaskCompletionSource<bool>().Task;
  12. public static readonly TaskCompletionSource<bool> True;
  13. static TaskExt()
  14. {
  15. True = new TaskCompletionSource<bool>();
  16. True.SetResult(true);
  17. }
  18. #if USE_FAIR_AND_CHEAPER_MERGE
  19. public static WhenAnyValueTask<T> WhenAny<T>(ValueTask<T>[] tasks)
  20. {
  21. var whenAny = new WhenAnyValueTask<T>(tasks);
  22. whenAny.Start();
  23. return whenAny;
  24. }
  25. // REVIEW: Evaluate options to reduce locking and test performance. Right now, there's one lock
  26. // protecting the queue and the completion delegate field. Care has been taken to limit
  27. // the time under the lock, and the (sequential single) reader path has limited locking.
  28. // Contention due to concurrent completion of tasks could be a concern.
  29. internal sealed class WhenAnyValueTask<T>
  30. {
  31. /// <summary>
  32. /// The tasks to await. Entries in this array may be replaced using <see cref="Replace"/>.
  33. /// </summary>
  34. private readonly ValueTask<T>[] _tasks;
  35. /// <summary>
  36. /// Array of cached delegates passed to awaiters on tasks. These delegates have a closure containing the task index.
  37. /// </summary>
  38. private readonly Action[] _onReady;
  39. /// <summary>
  40. /// Queue of indexes of ready tasks. Awaiting the <see cref="WhenAnyValueTask{T}"/> object will consume this queue in order.
  41. /// </summary>
  42. /// <remarks>
  43. /// A lock on this field is taken when updating the queue or <see cref="_onCompleted"/>.
  44. /// </remarks>
  45. private readonly Queue<int> _ready;
  46. /// <summary>
  47. /// Callback of the current awaiter, if any.
  48. /// </summary>
  49. /// <remarks>
  50. /// Protected for reads and writes by a lock on <see cref="_ready"/>.
  51. /// </remarks>
  52. private Action _onCompleted;
  53. /// <summary>
  54. /// Creates a when any task around the specified tasks.
  55. /// </summary>
  56. /// <param name="tasks">Initial set of tasks to await.</param>
  57. public WhenAnyValueTask(ValueTask<T>[] tasks)
  58. {
  59. _tasks = tasks;
  60. var n = tasks.Length;
  61. _ready = new Queue<int>(n); // NB: Should never exceed this length, so we won't see dynamic realloc.
  62. _onReady = new Action[n];
  63. for (var i = 0; i < n; i++)
  64. {
  65. //
  66. // Cache these delegates, for they have closures (over `this` and `index`), and we need them
  67. // for each replacement of a task, to hook up the continuation.
  68. //
  69. int index = i;
  70. _onReady[index] = () => OnReady(index);
  71. }
  72. }
  73. /// <summary>
  74. /// Start awaiting the tasks. This is done separately from the constructor to avoid complexity dealing
  75. /// with handling concurrent callbacks to the current instance while the constructor is running.
  76. /// </summary>
  77. public void Start()
  78. {
  79. for (var i = 0; i < _tasks.Length; i++)
  80. {
  81. //
  82. // Register a callback with the task, which will enqueue the index of the completed task
  83. // for consumption by awaiters.
  84. //
  85. _tasks[i].ConfigureAwait(false).GetAwaiter().OnCompleted(_onReady[i]);
  86. }
  87. }
  88. /// <summary>
  89. /// Gets an awaiter to await completion of any of the awaited tasks, returning the index of the completed
  90. /// task. When sequentially awaiting the current instance, task indices are yielded in the order that of
  91. /// completion. If all tasks have completed and been observed by awaiting the current instance, the awaiter
  92. /// never returns on a subsequent attempt to await the completion of any task. The caller is responsible
  93. /// for bookkeeping that avoids awaiting this instance more often than the number of pending tasks.
  94. /// </summary>
  95. /// <returns>Awaiter to await completion of any of the awaited task.</returns>
  96. /// <remarks>This class only supports a single active awaiter at any point in time.</remarks>
  97. public Awaiter GetAwaiter() => new Awaiter(this);
  98. /// <summary>
  99. /// Replaces the (completed) task at the specified <paramref name="index"/> and starts awaiting it.
  100. /// </summary>
  101. /// <param name="index">The index of the parameter to replace.</param>
  102. /// <param name="task">The new task to store and await at the specified index.</param>
  103. public void Replace(int index, in ValueTask<T> task)
  104. {
  105. Debug.Assert(_tasks[index].IsCompleted, "A task shouldn't be replaced before it has completed.");
  106. _tasks[index] = task;
  107. task.ConfigureAwait(false).GetAwaiter().OnCompleted(_onReady[index]);
  108. }
  109. /// <summary>
  110. /// Called when any task has completed (thus may run concurrently).
  111. /// </summary>
  112. /// <param name="index">The index of the completed task in <see cref="_tasks"/>.</param>
  113. private void OnReady(int index)
  114. {
  115. Action onCompleted = null;
  116. lock (_ready)
  117. {
  118. //
  119. // Store the index of the task that has completed. This will be picked up from GetResult.
  120. //
  121. _ready.Enqueue(index);
  122. //
  123. // If there's a current awaiter, we'll steal its continuation action and invoke it. By setting
  124. // the continuation action to null, we avoid waking up the same awaiter more than once. Any
  125. // task completions that occur while no awaiter is active will end up being enqueued in _ready.
  126. //
  127. if (_onCompleted != null)
  128. {
  129. onCompleted = _onCompleted;
  130. _onCompleted = null;
  131. }
  132. }
  133. onCompleted?.Invoke();
  134. }
  135. /// <summary>
  136. /// Invoked by awaiters to check if any task has completed, in order to short-circuit the await operation.
  137. /// </summary>
  138. /// <returns><c>true</c> if any task has completed; otherwise, <c>false</c>.</returns>
  139. private bool IsCompleted()
  140. {
  141. // REVIEW: Evaluate options to reduce locking, so the single consuming awaiter has limited contention
  142. // with the multiple concurrent completing enumerator tasks, e.g. using ConcurrentQueue<T>.
  143. lock (_ready)
  144. {
  145. return _ready.Count > 0;
  146. }
  147. }
  148. /// <summary>
  149. /// Gets the index of the earliest task that has completed, used by the awaiter. After stealing an index from
  150. /// the ready queue (by means of awaiting the current instance), the user may chose to replace the task at the
  151. /// returned index by a new task, using the <see cref="Replace"/> method.
  152. /// </summary>
  153. /// <returns>Index of the earliest task that has completed.</returns>
  154. private int GetResult()
  155. {
  156. lock (_ready)
  157. {
  158. return _ready.Dequeue();
  159. }
  160. }
  161. /// <summary>
  162. /// Register a continuation passed by an awaiter.
  163. /// </summary>
  164. /// <param name="action">The continuation action delegate to call when any task is ready.</param>
  165. private void OnCompleted(Action action)
  166. {
  167. bool shouldInvoke = false;
  168. lock (_ready)
  169. {
  170. //
  171. // Check if we have anything ready (which could happen in the short window between checking
  172. // for IsCompleted and calling OnCompleted). If so, we should invoke the action directly. Not
  173. // doing so would be a correctness issue where a task has completed, its index was enqueued,
  174. // but the continuation was never called (unless another task completes and calls the action
  175. // delegate, whose subsequent call to GetResult would pick up the lost index).
  176. //
  177. if (_ready.Count > 0)
  178. {
  179. shouldInvoke = true;
  180. }
  181. else
  182. {
  183. Debug.Assert(_onCompleted == null, "Only a single awaiter is allowed.");
  184. _onCompleted = action;
  185. }
  186. }
  187. //
  188. // NB: We assume this case is rare enough (IsCompleted and OnCompleted happen right after one
  189. // another, and an enqueue should have happened right in between to go from an empty to a
  190. // non-empty queue), so we don't run the risk of triggering a stack overflow due to
  191. // synchronous completion of the await operation (which may be in a loop that awaits the
  192. // current instance again).
  193. //
  194. if (shouldInvoke)
  195. {
  196. action();
  197. }
  198. }
  199. /// <summary>
  200. /// Awaiter type used to await completion of any task.
  201. /// </summary>
  202. public struct Awaiter : INotifyCompletion
  203. {
  204. private readonly WhenAnyValueTask<T> _parent;
  205. public Awaiter(WhenAnyValueTask<T> parent) => _parent = parent;
  206. public bool IsCompleted => _parent.IsCompleted();
  207. public int GetResult() => _parent.GetResult();
  208. public void OnCompleted(Action action) => _parent.OnCompleted(action);
  209. }
  210. }
  211. #endif
  212. }
  213. }