TaskExt.cs 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. using System;
  3. using System.Collections.Generic;
  4. using System.Linq;
  5. namespace System.Threading.Tasks
  6. {
  7. static class TaskExt
  8. {
  9. public static Task<T> Return<T>(T value, CancellationToken cancellationToken)
  10. {
  11. var tcs = new TaskCompletionSource<T>();
  12. tcs.TrySetResult(value);
  13. return tcs.Task;
  14. }
  15. public static Task<T> Throw<T>(Exception exception, CancellationToken cancellationToken)
  16. {
  17. var tcs = new TaskCompletionSource<T>();
  18. tcs.TrySetException(exception);
  19. return tcs.Task;
  20. }
  21. public static void Handle<T, R>(this Task<T> task, TaskCompletionSource<R> tcs, Action<T> success)
  22. {
  23. if (task.IsFaulted)
  24. tcs.TrySetException(task.Exception);
  25. else if (task.IsCanceled)
  26. tcs.TrySetCanceled();
  27. else if (task.IsCompleted)
  28. success(task.Result);
  29. }
  30. public static void Handle<T, R>(this Task<T> task, TaskCompletionSource<R> tcs, Action<T> success, Action<AggregateException> error)
  31. {
  32. if (task.IsFaulted)
  33. error(task.Exception);
  34. else if (task.IsCanceled)
  35. tcs.TrySetCanceled();
  36. else if (task.IsCompleted)
  37. success(task.Result);
  38. }
  39. public static void Handle<T, R>(this Task<T> task, TaskCompletionSource<R> tcs, Action<T> success, Action<AggregateException> error, Action canceled)
  40. {
  41. if (task.IsFaulted)
  42. error(task.Exception);
  43. else if (task.IsCanceled)
  44. canceled();
  45. else if (task.IsCompleted)
  46. success(task.Result);
  47. }
  48. public static Task<bool> UsingEnumerator(this Task<bool> task, IDisposable disposable)
  49. {
  50. task.ContinueWith(t =>
  51. {
  52. if (t.IsFaulted)
  53. {
  54. var ignored = t.Exception; // don't remove!
  55. }
  56. if (t.IsFaulted || t.IsCanceled || !t.Result)
  57. disposable.Dispose();
  58. }, TaskContinuationOptions.ExecuteSynchronously);
  59. return task;
  60. }
  61. public static Task Then<T>(this Task<T> task, Action<Task<T>> continuation)
  62. {
  63. //
  64. // Central location to deal with continuations; allows for experimentation with flags.
  65. // Note that right now, we don't go for synchronous execution. Users can block on the
  66. // task returned from MoveNext, which can cause deadlocks (e.g. typical uses of GroupBy
  67. // involve some aggregate). We'd need deeper asynchrony to make this work with less
  68. // spawning of tasks.
  69. //
  70. return task.ContinueWith(continuation);
  71. }
  72. public static Task<R> Then<T, R>(this Task<T> task, Func<Task<T>, R> continuation)
  73. {
  74. //
  75. // See comment on Then<T> for rationale.
  76. //
  77. return task.ContinueWith(continuation);
  78. }
  79. public static Task<bool> UsingEnumeratorSync(this Task<bool> task, IDisposable disposable)
  80. {
  81. var tcs = new TaskCompletionSource<bool>();
  82. task.ContinueWith(t =>
  83. {
  84. if (t.IsFaulted || t.IsCanceled || !t.Result)
  85. disposable.Dispose(); // TODO: Check whether we need exception handling here!
  86. t.Handle(tcs, res => tcs.TrySetResult(res));
  87. }, TaskContinuationOptions.ExecuteSynchronously);
  88. return tcs.Task;
  89. }
  90. public static Task<R> Finally<R>(this Task<R> task, Action action)
  91. {
  92. task.ContinueWith(t =>
  93. {
  94. if (t.IsFaulted)
  95. {
  96. var ignored = t.Exception; // don't remove!
  97. }
  98. action();
  99. }, TaskContinuationOptions.ExecuteSynchronously);
  100. return task;
  101. }
  102. public static Task<V> Zip<T, U, V>(this Task<T> t1, Task<U> t2, Func<T, U, V> f)
  103. {
  104. var gate = new object();
  105. var tcs = new TaskCompletionSource<V>();
  106. var i = 2;
  107. var complete = new Action<Task>(t =>
  108. {
  109. if (Interlocked.Decrement(ref i) == 0)
  110. {
  111. var exs = new List<Exception>();
  112. if (t1.IsFaulted)
  113. exs.Add(t1.Exception);
  114. if (t2.IsFaulted)
  115. exs.Add(t2.Exception);
  116. if (exs.Count > 0)
  117. tcs.TrySetException(exs);
  118. else if (t1.IsCanceled || t2.IsCanceled)
  119. tcs.TrySetCanceled();
  120. else
  121. {
  122. var res = default(V);
  123. try
  124. {
  125. res = f(t1.Result, t2.Result);
  126. }
  127. catch (Exception ex)
  128. {
  129. tcs.TrySetException(ex);
  130. return;
  131. }
  132. tcs.TrySetResult(res);
  133. }
  134. }
  135. });
  136. t1.ContinueWith(complete);
  137. t2.ContinueWith(complete);
  138. return tcs.Task;
  139. }
  140. }
  141. }