TaskAsyncObservableExtensions.cs 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Reactive;
  5. using System.Reactive.Concurrency;
  6. using System.Reactive.Linq;
  7. using System.Reactive.Subjects;
  8. namespace System.Threading.Tasks
  9. {
  10. // TODO: Add ToTask.
  11. public static class TaskAsyncObservableExtensions
  12. {
  13. public static IAsyncObservable<Unit> ToAsyncObservable(this Task task)
  14. {
  15. if (task == null)
  16. throw new ArgumentNullException(nameof(task));
  17. return AsyncObservable.Create<Unit>(observer => task.AcceptAsync(observer));
  18. }
  19. public static IAsyncObservable<Unit> ToAsyncObservable(this Task task, IAsyncScheduler scheduler)
  20. {
  21. if (task == null)
  22. throw new ArgumentNullException(nameof(task));
  23. if (scheduler == null)
  24. throw new ArgumentNullException(nameof(scheduler));
  25. return AsyncObservable.Create<Unit>(observer => task.AcceptAsync(observer, scheduler));
  26. }
  27. public static IAsyncObservable<TResult> ToAsyncObservable<TResult>(this Task<TResult> task)
  28. {
  29. if (task == null)
  30. throw new ArgumentNullException(nameof(task));
  31. return AsyncObservable.Create<TResult>(observer => task.AcceptAsync(observer));
  32. }
  33. public static IAsyncObservable<TResult> ToAsyncObservable<TResult>(this Task<TResult> task, IAsyncScheduler scheduler)
  34. {
  35. if (task == null)
  36. throw new ArgumentNullException(nameof(task));
  37. if (scheduler == null)
  38. throw new ArgumentNullException(nameof(scheduler));
  39. return AsyncObservable.Create<TResult>(observer => task.AcceptAsync(observer, scheduler));
  40. }
  41. public static ValueTask<IAsyncDisposable> AcceptAsync(this Task task, IAsyncObserver<Unit> observer) => AcceptAsync(task, observer, ImmediateAsyncScheduler.Instance);
  42. public static ValueTask<IAsyncDisposable> AcceptAsync(this Task task, IAsyncObserver<Unit> observer, IAsyncScheduler scheduler)
  43. {
  44. if (task == null)
  45. throw new ArgumentNullException(nameof(task));
  46. if (observer == null)
  47. throw new ArgumentNullException(nameof(observer));
  48. if (scheduler == null)
  49. throw new ArgumentNullException(nameof(scheduler));
  50. ValueTask<IAsyncDisposable> CompleteAsync()
  51. {
  52. return scheduler.ScheduleAsync(async ct =>
  53. {
  54. if (ct.IsCancellationRequested)
  55. {
  56. return;
  57. }
  58. switch (task.Status)
  59. {
  60. case TaskStatus.RanToCompletion:
  61. await observer.OnNextAsync(Unit.Default).RendezVous(scheduler, ct);
  62. await observer.OnCompletedAsync().RendezVous(scheduler, ct);
  63. break;
  64. case TaskStatus.Faulted:
  65. await observer.OnErrorAsync(task.Exception.InnerException).RendezVous(scheduler, ct);
  66. break;
  67. case TaskStatus.Canceled:
  68. await observer.OnErrorAsync(new TaskCanceledException(task)).RendezVous(scheduler, ct);
  69. break;
  70. }
  71. });
  72. }
  73. ValueTask<IAsyncDisposable> CoreAsync()
  74. {
  75. if (task.IsCompleted)
  76. {
  77. return CompleteAsync();
  78. }
  79. else
  80. {
  81. var tco = TaskContinuationOptions.None;
  82. if (scheduler == ImmediateAsyncScheduler.Instance)
  83. {
  84. tco = TaskContinuationOptions.ExecuteSynchronously;
  85. }
  86. var subject = new SequentialAsyncAsyncSubject<Unit>();
  87. task.ContinueWith(t => CompleteAsync(), tco);
  88. return subject.SubscribeAsync(observer);
  89. }
  90. }
  91. return CoreAsync();
  92. }
  93. public static ValueTask<IAsyncDisposable> AcceptAsync<TResult>(this Task<TResult> task, IAsyncObserver<TResult> observer) => AcceptAsync(task, observer, ImmediateAsyncScheduler.Instance);
  94. public static ValueTask<IAsyncDisposable> AcceptAsync<TResult>(this Task<TResult> task, IAsyncObserver<TResult> observer, IAsyncScheduler scheduler)
  95. {
  96. if (task == null)
  97. throw new ArgumentNullException(nameof(task));
  98. if (observer == null)
  99. throw new ArgumentNullException(nameof(observer));
  100. if (scheduler == null)
  101. throw new ArgumentNullException(nameof(scheduler));
  102. ValueTask<IAsyncDisposable> CompleteAsync()
  103. {
  104. return scheduler.ScheduleAsync(async ct =>
  105. {
  106. if (ct.IsCancellationRequested)
  107. {
  108. return;
  109. }
  110. switch (task.Status)
  111. {
  112. case TaskStatus.RanToCompletion:
  113. await observer.OnNextAsync(task.Result).RendezVous(scheduler, ct);
  114. await observer.OnCompletedAsync().RendezVous(scheduler, ct);
  115. break;
  116. case TaskStatus.Faulted:
  117. await observer.OnErrorAsync(task.Exception.InnerException).RendezVous(scheduler, ct);
  118. break;
  119. case TaskStatus.Canceled:
  120. await observer.OnErrorAsync(new TaskCanceledException(task)).RendezVous(scheduler, ct);
  121. break;
  122. }
  123. });
  124. }
  125. ValueTask<IAsyncDisposable> CoreAsync()
  126. {
  127. if (task.IsCompleted)
  128. {
  129. return CompleteAsync();
  130. }
  131. else
  132. {
  133. var tco = TaskContinuationOptions.None;
  134. if (scheduler == ImmediateAsyncScheduler.Instance)
  135. {
  136. tco = TaskContinuationOptions.ExecuteSynchronously;
  137. }
  138. var subject = new SequentialAsyncAsyncSubject<TResult>();
  139. task.ContinueWith(t => CompleteAsync(), tco);
  140. return subject.SubscribeAsync(observer);
  141. }
  142. }
  143. return CoreAsync();
  144. }
  145. }
  146. }