AsyncObservableMethodBuilder.cs 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Reactive.Concurrency;
  5. using System.Reactive.Disposables;
  6. using System.Reactive.Subjects;
  7. using System.Runtime.ExceptionServices;
  8. using System.Security;
  9. using System.Threading.Tasks;
  10. namespace System.Runtime.CompilerServices
  11. {
  12. /// <summary>
  13. /// Represents a builder for asynchronous methods that return a task-like <see cref="IAsyncObservable{T}"/>.
  14. /// </summary>
  15. /// <typeparam name="T">The type of the elements in the sequence.</typeparam>
  16. public struct AsyncObservableMethodBuilder<T>
  17. {
  18. /// <summary>
  19. /// The compiler-generated asynchronous state machine representing the execution flow of the asynchronous
  20. /// method whose return type is a task-like <see cref="IAsyncObservable{T}"/>.
  21. /// </summary>
  22. private IAsyncStateMachine _stateMachine;
  23. /// <summary>
  24. /// The underlying observable sequence representing the result produced by the asynchronous method.
  25. /// </summary>
  26. private TaskObservable _inner;
  27. /// <summary>
  28. /// Creates an instance of the <see cref="AsyncObservableMethodBuilder{T}"/> struct.
  29. /// </summary>
  30. /// <returns>A new instance of the struct.</returns>
  31. public static AsyncObservableMethodBuilder<T> Create() => default;
  32. /// <summary>
  33. /// Begins running the builder with the associated state machine.
  34. /// </summary>
  35. /// <typeparam name="TStateMachine">The type of the state machine.</typeparam>
  36. /// <param name="stateMachine">The state machine instance, passed by reference.</param>
  37. /// <exception cref="ArgumentNullException"><paramref name="stateMachine"/> is <c>null</c>.</exception>
  38. public void Start<TStateMachine>(ref TStateMachine stateMachine)
  39. where TStateMachine : IAsyncStateMachine
  40. {
  41. if (stateMachine == null)
  42. throw new ArgumentNullException(nameof(stateMachine));
  43. stateMachine.MoveNext();
  44. }
  45. /// <summary>
  46. /// Associates the builder with the specified state machine.
  47. /// </summary>
  48. /// <param name="stateMachine">The state machine instance to associate with the builder.</param>
  49. /// <exception cref="ArgumentNullException"><paramref name="stateMachine"/> is <c>null</c>.</exception>
  50. /// <exception cref="InvalidOperationException">The state machine was previously set.</exception>
  51. public void SetStateMachine(IAsyncStateMachine stateMachine)
  52. {
  53. if (_stateMachine != null)
  54. throw new InvalidOperationException();
  55. _stateMachine = stateMachine ?? throw new ArgumentNullException(nameof(stateMachine));
  56. }
  57. /// <summary>
  58. /// Marks the observable as successfully completed.
  59. /// </summary>
  60. /// <param name="result">The result to use to complete the observable sequence.</param>
  61. /// <exception cref="InvalidOperationException">The observable has already completed.</exception>
  62. public void SetResult(T result)
  63. {
  64. if (_inner == null)
  65. {
  66. _inner = new TaskObservable(result);
  67. }
  68. else
  69. {
  70. _inner.SetResult(result);
  71. }
  72. }
  73. /// <summary>
  74. /// Marks the observable as failed and binds the specified exception to the observable sequence.
  75. /// </summary>
  76. /// <param name="exception">The exception to bind to the observable sequence.</param>
  77. /// <exception cref="ArgumentNullException"><paramref name="exception"/> is <c>null</c>.</exception>
  78. /// <exception cref="InvalidOperationException">The observable has already completed.</exception>
  79. public void SetException(Exception exception)
  80. {
  81. if (exception == null)
  82. throw new ArgumentNullException(nameof(exception));
  83. if (_inner == null)
  84. {
  85. _inner = new TaskObservable(exception);
  86. }
  87. else
  88. {
  89. _inner.SetException(exception);
  90. }
  91. }
  92. /// <summary>
  93. /// Gets the observable sequence for this builder.
  94. /// </summary>
  95. public IAsyncObservable<T> Task => _inner ??= new TaskObservable();
  96. /// <summary>
  97. /// Schedules the state machine to proceed to the next action when the specified awaiter completes.
  98. /// </summary>
  99. /// <typeparam name="TAwaiter">The type of the awaiter.</typeparam>
  100. /// <typeparam name="TStateMachine">The type of the state machine.</typeparam>
  101. /// <param name="awaiter">The awaiter.</param>
  102. /// <param name="stateMachine">The state machine.</param>
  103. public void AwaitOnCompleted<TAwaiter, TStateMachine>(ref TAwaiter awaiter, ref TStateMachine stateMachine)
  104. where TAwaiter : INotifyCompletion
  105. where TStateMachine : IAsyncStateMachine
  106. {
  107. try
  108. {
  109. if (_stateMachine == null)
  110. {
  111. var ignored = Task; // NB: Ensure we have the observable backed by an async subject ready.
  112. _stateMachine = stateMachine;
  113. _stateMachine.SetStateMachine(_stateMachine);
  114. }
  115. // NB: Rx has historically not bothered with execution contexts, so we don't do it here either.
  116. awaiter.OnCompleted(_stateMachine.MoveNext);
  117. }
  118. catch (Exception ex)
  119. {
  120. // NB: Prevent reentrancy into the async state machine when an exception would be observed
  121. // by the caller. This could cause concurrent execution of the async method. Instead,
  122. // rethrow the exception elsewhere.
  123. Rethrow(ex);
  124. }
  125. }
  126. /// <summary>
  127. /// Schedules the state machine to proceed to the next action when the specified awaiter completes.
  128. /// </summary>
  129. /// <typeparam name="TAwaiter">The type of the awaiter.</typeparam>
  130. /// <typeparam name="TStateMachine">The type of the state machine.</typeparam>
  131. /// <param name="awaiter">The awaiter.</param>
  132. /// <param name="stateMachine">The state machine.</param>
  133. [SecuritySafeCritical]
  134. public void AwaitUnsafeOnCompleted<TAwaiter, TStateMachine>(ref TAwaiter awaiter, ref TStateMachine stateMachine)
  135. where TAwaiter : ICriticalNotifyCompletion
  136. where TStateMachine : IAsyncStateMachine
  137. {
  138. try
  139. {
  140. if (_stateMachine == null)
  141. {
  142. var ignored = Task; // NB: Ensure we have the observable backed by an async subject ready.
  143. _stateMachine = stateMachine;
  144. _stateMachine.SetStateMachine(_stateMachine);
  145. }
  146. // NB: Rx has historically not bothered with execution contexts, so we don't do it here either.
  147. awaiter.UnsafeOnCompleted(_stateMachine.MoveNext);
  148. }
  149. catch (Exception ex)
  150. {
  151. // NB: Prevent reentrancy into the async state machine when an exception would be observed
  152. // by the caller. This could cause concurrent execution of the async method. Instead,
  153. // rethrow the exception elsewhere.
  154. Rethrow(ex);
  155. }
  156. }
  157. /// <summary>
  158. /// Rethrows an exception that was thrown from an awaiter's OnCompleted methods.
  159. /// </summary>
  160. /// <param name="exception">The exception to rethrow.</param>
  161. private static void Rethrow(Exception exception)
  162. {
  163. TaskPoolAsyncScheduler.Default.ScheduleAsync(_ =>
  164. {
  165. ExceptionDispatchInfo.Capture(exception).Throw();
  166. return default;
  167. });
  168. }
  169. /// <summary>
  170. /// Implementation of the IObservable&lt;T&gt; interface compatible with async method return types.
  171. /// </summary>
  172. /// <remarks>
  173. /// This class implements a "task-like" type that can be used as the return type of an asynchronous
  174. /// method in C# 7.0 and beyond. For example:
  175. /// <code>
  176. /// async Observable&lt;int&gt; RxAsync()
  177. /// {
  178. /// var res = await Observable.Return(21).Delay(TimeSpan.FromSeconds(1));
  179. /// return res * 2;
  180. /// }
  181. /// </code>
  182. /// </remarks>
  183. internal sealed class TaskObservable : IAsyncObservable<T>, INotifyCompletion
  184. {
  185. /// <summary>
  186. /// The underlying observable sequence to subscribe to in case the asynchronous method did not
  187. /// finish synchronously.
  188. /// </summary>
  189. private readonly AsyncAsyncSubject<T> _subject;
  190. /// <summary>
  191. /// The result returned by the asynchronous method in case the method finished synchronously.
  192. /// </summary>
  193. private readonly T _result;
  194. /// <summary>
  195. /// The exception thrown by the asynchronous method in case the method finished synchronously.
  196. /// </summary>
  197. private readonly Exception _exception;
  198. /// <summary>
  199. /// Creates a new <see cref="TaskObservable"/> for an asynchronous method that has not finished yet.
  200. /// </summary>
  201. public TaskObservable()
  202. {
  203. _subject = new SequentialAsyncAsyncSubject<T>();
  204. }
  205. /// <summary>
  206. /// Creates a new <see cref="TaskObservable"/> for an asynchronous method that synchronously returned
  207. /// the specified <paramref name="result"/> value.
  208. /// </summary>
  209. /// <param name="result">The result returned by the asynchronous method.</param>
  210. public TaskObservable(T result)
  211. {
  212. _result = result;
  213. }
  214. /// <summary>
  215. /// Creates a new <see cref="TaskObservable"/> for an asynchronous method that synchronously threw
  216. /// the specified <paramref name="exception"/>.
  217. /// </summary>
  218. /// <param name="exception">The exception thrown by the asynchronous method.</param>
  219. public TaskObservable(Exception exception)
  220. {
  221. _exception = exception;
  222. }
  223. /// <summary>
  224. /// Marks the observable as successfully completed.
  225. /// </summary>
  226. /// <param name="result">The result to use to complete the observable sequence.</param>
  227. /// <exception cref="InvalidOperationException">The observable has already completed.</exception>
  228. public async void SetResult(T result)
  229. {
  230. if (IsCompleted)
  231. throw new InvalidOperationException();
  232. // REVIEW: Async void method.
  233. await _subject.OnNextAsync(result).ConfigureAwait(false);
  234. await _subject.OnCompletedAsync().ConfigureAwait(false);
  235. }
  236. /// <summary>
  237. /// Marks the observable as failed and binds the specified exception to the observable sequence.
  238. /// </summary>
  239. /// <param name="exception">The exception to bind to the observable sequence.</param>
  240. /// <exception cref="ArgumentNullException"><paramref name="exception"/> is <c>null</c>.</exception>
  241. /// <exception cref="InvalidOperationException">The observable has already completed.</exception>
  242. public void SetException(Exception exception)
  243. {
  244. if (IsCompleted)
  245. throw new InvalidOperationException();
  246. _subject.OnErrorAsync(exception);
  247. }
  248. /// <summary>
  249. /// Subscribes the given observer to the observable sequence.
  250. /// </summary>
  251. /// <param name="observer">Observer that will receive notifications from the observable sequence.</param>
  252. /// <returns>Disposable object representing an observer's subscription to the observable sequence.</returns>
  253. /// <exception cref="ArgumentNullException"><paramref name="observer"/> is null.</exception>
  254. public ValueTask<IAsyncDisposable> SubscribeAsync(IAsyncObserver<T> observer)
  255. {
  256. if (_subject != null)
  257. {
  258. return _subject.SubscribeAsync(observer);
  259. }
  260. async ValueTask<IAsyncDisposable> CoreAsync()
  261. {
  262. if (_exception != null)
  263. {
  264. await observer.OnErrorAsync(_exception).ConfigureAwait(false);
  265. }
  266. else
  267. {
  268. await observer.OnNextAsync(_result).ConfigureAwait(false);
  269. }
  270. return AsyncDisposable.Nop;
  271. }
  272. return CoreAsync();
  273. }
  274. /// <summary>
  275. /// Gets an awaiter that can be used to await the eventual completion of the observable sequence.
  276. /// </summary>
  277. /// <returns>An awaiter that can be used to await the eventual completion of the observable sequence.</returns>
  278. public AsyncAsyncSubject<T> GetAwaiter() => _subject;
  279. /// <summary>
  280. /// Gets a Boolean indicating whether the observable sequence has completed.
  281. /// </summary>
  282. public bool IsCompleted => _subject?.IsCompleted ?? true;
  283. /// <summary>
  284. /// Gets the result produced by the observable sequence.
  285. /// </summary>
  286. /// <returns>The result produced by the observable sequence.</returns>
  287. public T GetResult()
  288. {
  289. if (_subject != null)
  290. {
  291. return _subject.GetResult();
  292. }
  293. if (_exception != null)
  294. {
  295. ExceptionDispatchInfo.Capture(_exception).Throw();
  296. }
  297. return _result;
  298. }
  299. /// <summary>
  300. /// Attaches the specified <paramref name="continuation"/> to the observable sequence.
  301. /// </summary>
  302. /// <param name="continuation">The continuation to attach.</param>
  303. public void OnCompleted(Action continuation)
  304. {
  305. if (_subject != null)
  306. {
  307. _subject.OnCompleted(continuation);
  308. }
  309. else
  310. {
  311. continuation();
  312. }
  313. }
  314. }
  315. }
  316. }