AsyncObservableMethodBuilder.cs 15 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Reactive.Concurrency;
  5. using System.Reactive.Disposables;
  6. using System.Reactive.Subjects;
  7. using System.Runtime.ExceptionServices;
  8. using System.Security;
  9. using System.Threading.Tasks;
  10. namespace System.Runtime.CompilerServices
  11. {
  12. /// <summary>
  13. /// Represents a builder for asynchronous methods that return a task-like <see cref="IAsyncObservable{T}"/>.
  14. /// </summary>
  15. /// <typeparam name="T">The type of the elements in the sequence.</typeparam>
  16. public struct AsyncObservableMethodBuilder<T>
  17. {
  18. /// <summary>
  19. /// The compiler-generated asynchronous state machine representing the execution flow of the asynchronous
  20. /// method whose return type is a task-like <see cref="IAsyncObservable{T}"/>.
  21. /// </summary>
  22. private IAsyncStateMachine _stateMachine;
  23. /// <summary>
  24. /// The underlying observable sequence representing the result produced by the asynchronous method.
  25. /// </summary>
  26. private TaskObservable _inner;
  27. /// <summary>
  28. /// Creates an instance of the <see cref="AsyncObservableMethodBuilder{T}"/> struct.
  29. /// </summary>
  30. /// <returns>A new instance of the struct.</returns>
  31. public static AsyncObservableMethodBuilder<T> Create() => default;
  32. /// <summary>
  33. /// Begins running the builder with the associated state machine.
  34. /// </summary>
  35. /// <typeparam name="TStateMachine">The type of the state machine.</typeparam>
  36. /// <param name="stateMachine">The state machine instance, passed by reference.</param>
  37. /// <exception cref="ArgumentNullException"><paramref name="stateMachine"/> is <c>null</c>.</exception>
  38. public void Start<TStateMachine>(ref TStateMachine stateMachine)
  39. where TStateMachine : IAsyncStateMachine
  40. {
  41. if (stateMachine == null)
  42. throw new ArgumentNullException(nameof(stateMachine));
  43. stateMachine.MoveNext();
  44. }
  45. /// <summary>
  46. /// Associates the builder with the specified state machine.
  47. /// </summary>
  48. /// <param name="stateMachine">The state machine instance to associate with the builder.</param>
  49. /// <exception cref="ArgumentNullException"><paramref name="stateMachine"/> is <c>null</c>.</exception>
  50. /// <exception cref="InvalidOperationException">The state machine was previously set.</exception>
  51. public void SetStateMachine(IAsyncStateMachine stateMachine)
  52. {
  53. if (_stateMachine != null)
  54. throw new InvalidOperationException();
  55. _stateMachine = stateMachine ?? throw new ArgumentNullException(nameof(stateMachine));
  56. }
  57. /// <summary>
  58. /// Marks the observable as successfully completed.
  59. /// </summary>
  60. /// <param name="result">The result to use to complete the observable sequence.</param>
  61. /// <exception cref="InvalidOperationException">The observable has already completed.</exception>
  62. public void SetResult(T result)
  63. {
  64. if (_inner == null)
  65. {
  66. _inner = new TaskObservable(result);
  67. }
  68. else
  69. {
  70. _inner.SetResult(result);
  71. }
  72. }
  73. /// <summary>
  74. /// Marks the observable as failed and binds the specified exception to the observable sequence.
  75. /// </summary>
  76. /// <param name="exception">The exception to bind to the observable sequence.</param>
  77. /// <exception cref="ArgumentNullException"><paramref name="exception"/> is <c>null</c>.</exception>
  78. /// <exception cref="InvalidOperationException">The observable has already completed.</exception>
  79. public void SetException(Exception exception)
  80. {
  81. if (exception == null)
  82. throw new ArgumentNullException(nameof(exception));
  83. if (_inner == null)
  84. {
  85. _inner = new TaskObservable(exception);
  86. }
  87. else
  88. {
  89. _inner.SetException(exception);
  90. }
  91. }
  92. /// <summary>
  93. /// Gets the observable sequence for this builder.
  94. /// </summary>
  95. public IAsyncObservable<T> Task => _inner ??= new TaskObservable();
  96. /// <summary>
  97. /// Schedules the state machine to proceed to the next action when the specified awaiter completes.
  98. /// </summary>
  99. /// <typeparam name="TAwaiter">The type of the awaiter.</typeparam>
  100. /// <typeparam name="TStateMachine">The type of the state machine.</typeparam>
  101. /// <param name="awaiter">The awaiter.</param>
  102. /// <param name="stateMachine">The state machine.</param>
  103. public void AwaitOnCompleted<TAwaiter, TStateMachine>(ref TAwaiter awaiter, ref TStateMachine stateMachine)
  104. where TAwaiter : INotifyCompletion
  105. where TStateMachine : IAsyncStateMachine
  106. {
  107. try
  108. {
  109. if (_stateMachine == null)
  110. {
  111. var ignored = Task; // NB: Ensure we have the observable backed by an async subject ready.
  112. _stateMachine = stateMachine;
  113. _stateMachine.SetStateMachine(_stateMachine);
  114. }
  115. // NB: Rx has historically not bothered with execution contexts, so we don't do it here either.
  116. awaiter.OnCompleted(_stateMachine.MoveNext);
  117. }
  118. catch (Exception ex)
  119. {
  120. // NB: Prevent reentrancy into the async state machine when an exception would be observed
  121. // by the caller. This could cause concurrent execution of the async method. Instead,
  122. // rethrow the exception elsewhere.
  123. Rethrow(ex);
  124. }
  125. }
  126. /// <summary>
  127. /// Schedules the state machine to proceed to the next action when the specified awaiter completes.
  128. /// </summary>
  129. /// <typeparam name="TAwaiter">The type of the awaiter.</typeparam>
  130. /// <typeparam name="TStateMachine">The type of the state machine.</typeparam>
  131. /// <param name="awaiter">The awaiter.</param>
  132. /// <param name="stateMachine">The state machine.</param>
  133. [SecuritySafeCritical]
  134. public void AwaitUnsafeOnCompleted<TAwaiter, TStateMachine>(ref TAwaiter awaiter, ref TStateMachine stateMachine)
  135. where TAwaiter : ICriticalNotifyCompletion
  136. where TStateMachine : IAsyncStateMachine
  137. {
  138. try
  139. {
  140. if (_stateMachine == null)
  141. {
  142. var ignored = Task; // NB: Ensure we have the observable backed by an async subject ready.
  143. _stateMachine = stateMachine;
  144. _stateMachine.SetStateMachine(_stateMachine);
  145. }
  146. // NB: Rx has historically not bothered with execution contexts, so we don't do it here either.
  147. awaiter.UnsafeOnCompleted(_stateMachine.MoveNext);
  148. }
  149. catch (Exception ex)
  150. {
  151. // NB: Prevent reentrancy into the async state machine when an exception would be observed
  152. // by the caller. This could cause concurrent execution of the async method. Instead,
  153. // rethrow the exception elsewhere.
  154. Rethrow(ex);
  155. }
  156. }
  157. /// <summary>
  158. /// Rethrows an exception that was thrown from an awaiter's OnCompleted methods.
  159. /// </summary>
  160. /// <param name="exception">The exception to rethrow.</param>
  161. private static void Rethrow(Exception exception)
  162. {
  163. TaskPoolAsyncScheduler.Default.ScheduleAsync(_ =>
  164. {
  165. ExceptionDispatchInfo.Capture(exception).Throw();
  166. return default;
  167. });
  168. }
  169. /// <summary>
  170. /// Implementation of the IObservable&lt;T&gt; interface compatible with async method return types.
  171. /// </summary>
  172. /// <remarks>
  173. /// This class implements a "task-like" type that can be used as the return type of an asynchronous
  174. /// method in C# 7.0 and beyond. For example:
  175. /// <code>
  176. /// async Observable&lt;int&gt; RxAsync()
  177. /// {
  178. /// var res = await Observable.Return(21).Delay(TimeSpan.FromSeconds(1));
  179. /// return res * 2;
  180. /// }
  181. /// </code>
  182. /// </remarks>
  183. /// <typeparam name="T">The type of the elements in the sequence.</typeparam>
  184. internal sealed class TaskObservable : IAsyncObservable<T>, INotifyCompletion
  185. {
  186. /// <summary>
  187. /// The underlying observable sequence to subscribe to in case the asynchronous method did not
  188. /// finish synchronously.
  189. /// </summary>
  190. private readonly AsyncAsyncSubject<T> _subject;
  191. /// <summary>
  192. /// The result returned by the asynchronous method in case the method finished synchronously.
  193. /// </summary>
  194. private readonly T _result;
  195. /// <summary>
  196. /// The exception thrown by the asynchronous method in case the method finished synchronously.
  197. /// </summary>
  198. private readonly Exception _exception;
  199. /// <summary>
  200. /// Creates a new <see cref="TaskObservable"/> for an asynchronous method that has not finished yet.
  201. /// </summary>
  202. public TaskObservable()
  203. {
  204. _subject = new SequentialAsyncAsyncSubject<T>();
  205. }
  206. /// <summary>
  207. /// Creates a new <see cref="TaskObservable"/> for an asynchronous method that synchronously returned
  208. /// the specified <paramref name="result"/> value.
  209. /// </summary>
  210. /// <param name="result">The result returned by the asynchronous method.</param>
  211. public TaskObservable(T result)
  212. {
  213. _result = result;
  214. }
  215. /// <summary>
  216. /// Creates a new <see cref="TaskObservable"/> for an asynchronous method that synchronously threw
  217. /// the specified <paramref name="exception"/>.
  218. /// </summary>
  219. /// <param name="exception">The exception thrown by the asynchronous method.</param>
  220. public TaskObservable(Exception exception)
  221. {
  222. _exception = exception;
  223. }
  224. /// <summary>
  225. /// Marks the observable as successfully completed.
  226. /// </summary>
  227. /// <param name="result">The result to use to complete the observable sequence.</param>
  228. /// <exception cref="InvalidOperationException">The observable has already completed.</exception>
  229. public async void SetResult(T result)
  230. {
  231. if (IsCompleted)
  232. throw new InvalidOperationException();
  233. // REVIEW: Async void method.
  234. await _subject.OnNextAsync(result).ConfigureAwait(false);
  235. await _subject.OnCompletedAsync().ConfigureAwait(false);
  236. }
  237. /// <summary>
  238. /// Marks the observable as failed and binds the specified exception to the observable sequence.
  239. /// </summary>
  240. /// <param name="exception">The exception to bind to the observable sequence.</param>
  241. /// <exception cref="ArgumentNullException"><paramref name="exception"/> is <c>null</c>.</exception>
  242. /// <exception cref="InvalidOperationException">The observable has already completed.</exception>
  243. public void SetException(Exception exception)
  244. {
  245. if (IsCompleted)
  246. throw new InvalidOperationException();
  247. _subject.OnErrorAsync(exception);
  248. }
  249. /// <summary>
  250. /// Subscribes the given observer to the observable sequence.
  251. /// </summary>
  252. /// <param name="observer">Observer that will receive notifications from the observable sequence.</param>
  253. /// <returns>Disposable object representing an observer's subscription to the observable sequence.</returns>
  254. /// <exception cref="ArgumentNullException"><paramref name="observer"/> is null.</exception>
  255. public ValueTask<IAsyncDisposable> SubscribeAsync(IAsyncObserver<T> observer)
  256. {
  257. if (_subject != null)
  258. {
  259. return _subject.SubscribeAsync(observer);
  260. }
  261. async ValueTask<IAsyncDisposable> CoreAsync()
  262. {
  263. if (_exception != null)
  264. {
  265. await observer.OnErrorAsync(_exception).ConfigureAwait(false);
  266. }
  267. else
  268. {
  269. await observer.OnNextAsync(_result).ConfigureAwait(false);
  270. }
  271. return AsyncDisposable.Nop;
  272. }
  273. return CoreAsync();
  274. }
  275. /// <summary>
  276. /// Gets an awaiter that can be used to await the eventual completion of the observable sequence.
  277. /// </summary>
  278. /// <returns>An awaiter that can be used to await the eventual completion of the observable sequence.</returns>
  279. public AsyncAsyncSubject<T> GetAwaiter() => _subject;
  280. /// <summary>
  281. /// Gets a Boolean indicating whether the observable sequence has completed.
  282. /// </summary>
  283. public bool IsCompleted => _subject?.IsCompleted ?? true;
  284. /// <summary>
  285. /// Gets the result produced by the observable sequence.
  286. /// </summary>
  287. /// <returns>The result produced by the observable sequence.</returns>
  288. public T GetResult()
  289. {
  290. if (_subject != null)
  291. {
  292. return _subject.GetResult();
  293. }
  294. if (_exception != null)
  295. {
  296. ExceptionDispatchInfo.Capture(_exception).Throw();
  297. }
  298. return _result;
  299. }
  300. /// <summary>
  301. /// Attaches the specified <paramref name="continuation"/> to the observable sequence.
  302. /// </summary>
  303. /// <param name="continuation">The continuation to attach.</param>
  304. public void OnCompleted(Action continuation)
  305. {
  306. if (_subject != null)
  307. {
  308. _subject.OnCompleted(continuation);
  309. }
  310. else
  311. {
  312. continuation();
  313. }
  314. }
  315. }
  316. }
  317. }