TaskObservableExtensions.cs 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Reactive.Concurrency;
  5. using System.Reactive.Disposables;
  6. using System.Reactive.Linq;
  7. using System.Reactive.Linq.ObservableImpl;
  8. using System.Threading;
  9. using System.Threading.Tasks;
  10. namespace System.Reactive.Threading.Tasks
  11. {
  12. /// <summary>
  13. /// Provides a set of static methods for converting tasks to observable sequences.
  14. /// </summary>
  15. public static class TaskObservableExtensions
  16. {
  17. private sealed class SlowTaskObservable : IObservable<Unit>
  18. {
  19. private readonly Task _task;
  20. private readonly IScheduler? _scheduler;
  21. private readonly bool _ignoreExceptionsAfterUnsubscribe;
  22. public SlowTaskObservable(Task task, IScheduler? scheduler, bool ignoreExceptionsAfterUnsubscribe)
  23. {
  24. _task = task;
  25. _scheduler = scheduler;
  26. _ignoreExceptionsAfterUnsubscribe = ignoreExceptionsAfterUnsubscribe;
  27. }
  28. public IDisposable Subscribe(IObserver<Unit> observer)
  29. {
  30. if (observer == null)
  31. {
  32. throw new ArgumentNullException(nameof(observer));
  33. }
  34. var cts = new CancellationDisposable();
  35. var options = GetTaskContinuationOptions(_scheduler);
  36. if (_scheduler == null)
  37. {
  38. _task.ContinueWith(static (t, subjectObject) => t.EmitTaskResult((IObserver<Unit>)subjectObject!), observer, cts.Token, options, TaskScheduler.Current);
  39. }
  40. else
  41. {
  42. _task.ContinueWithState(
  43. static (task, tuple) => tuple.scheduler.ScheduleAction(
  44. (task, tuple.observer),
  45. static tuple2 => tuple2.task.EmitTaskResult(tuple2.observer)),
  46. (scheduler: _scheduler, observer),
  47. cts.Token,
  48. options);
  49. }
  50. if (_ignoreExceptionsAfterUnsubscribe)
  51. {
  52. _task.ContinueWith(t => _ = t.Exception, TaskContinuationOptions.OnlyOnFaulted);
  53. }
  54. return cts;
  55. }
  56. }
  57. private sealed class SlowTaskObservable<TResult> : IObservable<TResult>
  58. {
  59. private readonly Task<TResult> _task;
  60. private readonly IScheduler? _scheduler;
  61. private readonly bool _ignoreExceptionsAfterUnsubscribe;
  62. public SlowTaskObservable(Task<TResult> task, IScheduler? scheduler, bool ignoreExceptionsAfterUnsubscribe)
  63. {
  64. _task = task;
  65. _scheduler = scheduler;
  66. _ignoreExceptionsAfterUnsubscribe = ignoreExceptionsAfterUnsubscribe;
  67. }
  68. public IDisposable Subscribe(IObserver<TResult> observer)
  69. {
  70. if (observer == null)
  71. {
  72. throw new ArgumentNullException(nameof(observer));
  73. }
  74. var cts = new CancellationDisposable();
  75. var options = GetTaskContinuationOptions(_scheduler);
  76. if (_scheduler == null)
  77. {
  78. _task.ContinueWith(static (t, subjectObject) => t.EmitTaskResult((IObserver<TResult>)subjectObject!), observer, cts.Token, options, TaskScheduler.Current);
  79. }
  80. else
  81. {
  82. _task.ContinueWithState(
  83. static (task, tuple) => tuple.scheduler.ScheduleAction(
  84. (task, tuple.observer),
  85. static tuple2 => tuple2.task.EmitTaskResult(tuple2.observer)),
  86. (scheduler: _scheduler, observer),
  87. cts.Token,
  88. options);
  89. }
  90. if (_ignoreExceptionsAfterUnsubscribe)
  91. {
  92. _task.ContinueWith(t => _ = t.Exception, TaskContinuationOptions.OnlyOnFaulted);
  93. }
  94. return cts;
  95. }
  96. }
  97. /// <summary>
  98. /// Returns an observable sequence that signals when the task completes.
  99. /// </summary>
  100. /// <param name="task">Task to convert to an observable sequence.</param>
  101. /// <returns>An observable sequence that produces a unit value when the task completes, or propagates the exception produced by the task.</returns>
  102. /// <exception cref="ArgumentNullException"><paramref name="task"/> is <c>null</c>.</exception>
  103. /// <remarks>If the specified task object supports cancellation, consider using <see cref="Observable.FromAsync(Func{CancellationToken, Task})"/> instead.</remarks>
  104. public static IObservable<Unit> ToObservable(this Task task)
  105. {
  106. return ToObservable(task, ignoreExceptionsAfterUnsubscribe: false);
  107. }
  108. /// <summary>
  109. /// Returns an observable sequence that signals when the task completes.
  110. /// </summary>
  111. /// <param name="task">Task to convert to an observable sequence.</param>
  112. /// <param name="ignoreExceptionsAfterUnsubscribe">
  113. /// If true, exceptions that occur after cancellation has been initiated by unsubscribing from the observable
  114. /// this method returns will be handled and silently ignored. If false, they will go unobserved, meaning they
  115. /// will eventually emerge through <see cref="TaskScheduler.UnobservedTaskException"/>.
  116. /// </param>
  117. /// <returns>An observable sequence that produces a unit value when the task completes, or propagates the exception produced by the task.</returns>
  118. /// <exception cref="ArgumentNullException"><paramref name="task"/> is <c>null</c>.</exception>
  119. /// <remarks>If the specified task object supports cancellation, consider using <see cref="Observable.FromAsync(Func{CancellationToken, Task})"/> instead.</remarks>
  120. public static IObservable<Unit> ToObservable(this Task task, bool ignoreExceptionsAfterUnsubscribe)
  121. {
  122. if (task == null)
  123. {
  124. throw new ArgumentNullException(nameof(task));
  125. }
  126. return ToObservableImpl(task, scheduler: null, ignoreExceptionsAfterUnsubscribe);
  127. }
  128. /// <summary>
  129. /// Returns an observable sequence that signals when the task completes.
  130. /// </summary>
  131. /// <param name="task">Task to convert to an observable sequence.</param>
  132. /// <param name="scheduler">Scheduler on which to notify observers about completion, cancellation or failure.</param>
  133. /// <returns>An observable sequence that produces a unit value when the task completes, or propagates the exception produced by the task.</returns>
  134. /// <exception cref="ArgumentNullException"><paramref name="task"/> is <c>null</c> or <paramref name="scheduler"/> is <c>null</c>.</exception>
  135. /// <remarks>If the specified task object supports cancellation, consider using <see cref="Observable.FromAsync(Func{CancellationToken, Task})"/> instead.</remarks>
  136. public static IObservable<Unit> ToObservable(this Task task, IScheduler scheduler)
  137. {
  138. return ToObservable(task, scheduler, ignoreExceptionsAfterUnsubscribe: false);
  139. }
  140. /// <summary>
  141. /// Returns an observable sequence that signals when the task completes.
  142. /// </summary>
  143. /// <param name="task">Task to convert to an observable sequence.</param>
  144. /// <param name="scheduler">Scheduler on which to notify observers about completion, cancellation or failure.</param>
  145. /// <param name="ignoreExceptionsAfterUnsubscribe">
  146. /// If true, exceptions that occur after cancellation has been initiated by unsubscribing from the observable
  147. /// this method returns will be handled and silently ignored. If false, they will go unobserved, meaning they
  148. /// will eventually emerge through <see cref="TaskScheduler.UnobservedTaskException"/>.
  149. /// </param>
  150. /// <returns>An observable sequence that produces a unit value when the task completes, or propagates the exception produced by the task.</returns>
  151. /// <exception cref="ArgumentNullException"><paramref name="task"/> is <c>null</c> or <paramref name="scheduler"/> is <c>null</c>.</exception>
  152. /// <remarks>If the specified task object supports cancellation, consider using <see cref="Observable.FromAsync(Func{CancellationToken, Task})"/> instead.</remarks>
  153. public static IObservable<Unit> ToObservable(this Task task, IScheduler scheduler, bool ignoreExceptionsAfterUnsubscribe)
  154. {
  155. if (task == null)
  156. {
  157. throw new ArgumentNullException(nameof(task));
  158. }
  159. if (scheduler == null)
  160. {
  161. throw new ArgumentNullException(nameof(scheduler));
  162. }
  163. return ToObservableImpl(task, scheduler, ignoreExceptionsAfterUnsubscribe);
  164. }
  165. private static IObservable<Unit> ToObservableImpl(Task task, IScheduler? scheduler, bool ignoreExceptionsAfterUnsubscribe)
  166. {
  167. if (task.IsCompleted)
  168. {
  169. scheduler ??= ImmediateScheduler.Instance;
  170. return task.Status switch
  171. {
  172. TaskStatus.Faulted => new Throw<Unit>(task.GetSingleException(), scheduler),
  173. TaskStatus.Canceled => new Throw<Unit>(new TaskCanceledException(task), scheduler),
  174. _ => new Return<Unit>(Unit.Default, scheduler)
  175. };
  176. }
  177. return new SlowTaskObservable(task, scheduler, ignoreExceptionsAfterUnsubscribe);
  178. }
  179. private static void EmitTaskResult(this Task task, IObserver<Unit> subject)
  180. {
  181. switch (task.Status)
  182. {
  183. case TaskStatus.RanToCompletion:
  184. subject.OnNext(Unit.Default);
  185. subject.OnCompleted();
  186. break;
  187. case TaskStatus.Faulted:
  188. subject.OnError(task.GetSingleException());
  189. break;
  190. case TaskStatus.Canceled:
  191. subject.OnError(new TaskCanceledException(task));
  192. break;
  193. }
  194. }
  195. internal static IDisposable Subscribe(this Task task, IObserver<Unit> observer)
  196. {
  197. if (task.IsCompleted)
  198. {
  199. task.EmitTaskResult(observer);
  200. return Disposable.Empty;
  201. }
  202. var cts = new CancellationDisposable();
  203. task.ContinueWith(
  204. static (t, observerObject) => t.EmitTaskResult((IObserver<Unit>)observerObject!),
  205. observer,
  206. cts.Token,
  207. TaskContinuationOptions.ExecuteSynchronously,
  208. TaskScheduler.Current);
  209. return cts;
  210. }
  211. /// <summary>
  212. /// Returns an observable sequence that propagates the result of the task.
  213. /// </summary>
  214. /// <typeparam name="TResult">The type of the result produced by the task.</typeparam>
  215. /// <param name="task">Task to convert to an observable sequence.</param>
  216. /// <returns>An observable sequence that produces the task's result, or propagates the exception produced by the task.</returns>
  217. /// <exception cref="ArgumentNullException"><paramref name="task"/> is <c>null</c>.</exception>
  218. /// <remarks>If the specified task object supports cancellation, consider using <see cref="Observable.FromAsync{TResult}(Func{CancellationToken, Task{TResult}})"/> instead.</remarks>
  219. public static IObservable<TResult> ToObservable<TResult>(this Task<TResult> task)
  220. {
  221. return ToObservable(task, ignoreExceptionsAfterUnsubscribe: false);
  222. }
  223. /// <summary>
  224. /// Returns an observable sequence that propagates the result of the task.
  225. /// </summary>
  226. /// <typeparam name="TResult">The type of the result produced by the task.</typeparam>
  227. /// <param name="task">Task to convert to an observable sequence.</param>
  228. /// <param name="ignoreExceptionsAfterUnsubscribe">
  229. /// If true, exceptions that occur after cancellation has been initiated by unsubscribing from the observable
  230. /// this method returns will be handled and silently ignored. If false, they will go unobserved, meaning they
  231. /// will eventually emerge through <see cref="TaskScheduler.UnobservedTaskException"/>.
  232. /// </param>
  233. /// <returns>An observable sequence that produces the task's result, or propagates the exception produced by the task.</returns>
  234. /// <exception cref="ArgumentNullException"><paramref name="task"/> is <c>null</c>.</exception>
  235. /// <remarks>If the specified task object supports cancellation, consider using <see cref="Observable.FromAsync{TResult}(Func{CancellationToken, Task{TResult}})"/> instead.</remarks>
  236. public static IObservable<TResult> ToObservable<TResult>(this Task<TResult> task, bool ignoreExceptionsAfterUnsubscribe)
  237. {
  238. if (task == null)
  239. {
  240. throw new ArgumentNullException(nameof(task));
  241. }
  242. return ToObservableImpl(task, scheduler: null, ignoreExceptionsAfterUnsubscribe);
  243. }
  244. /// <summary>
  245. /// Returns an observable sequence that propagates the result of the task.
  246. /// </summary>
  247. /// <typeparam name="TResult">The type of the result produced by the task.</typeparam>
  248. /// <param name="task">Task to convert to an observable sequence.</param>
  249. /// <param name="scheduler">Scheduler on which to notify observers about completion, cancellation or failure.</param>
  250. /// <returns>An observable sequence that produces the task's result, or propagates the exception produced by the task.</returns>
  251. /// <exception cref="ArgumentNullException"><paramref name="task"/> is <c>null</c> or <paramref name="scheduler"/> is <c>null</c>.</exception>
  252. /// <remarks>If the specified task object supports cancellation, consider using <see cref="Observable.FromAsync{TResult}(Func{CancellationToken, Task{TResult}})"/> instead.</remarks>
  253. public static IObservable<TResult> ToObservable<TResult>(this Task<TResult> task, IScheduler scheduler)
  254. {
  255. return ToObservable(task, scheduler, ignoreExceptionsAfterUnsubscribe: false);
  256. }
  257. /// <summary>
  258. /// Returns an observable sequence that propagates the result of the task.
  259. /// </summary>
  260. /// <typeparam name="TResult">The type of the result produced by the task.</typeparam>
  261. /// <param name="task">Task to convert to an observable sequence.</param>
  262. /// <param name="scheduler">Scheduler on which to notify observers about completion, cancellation or failure.</param>
  263. /// <param name="ignoreExceptionsAfterUnsubscribe">
  264. /// If true, exceptions that occur after cancellation has been initiated by unsubscribing from the observable
  265. /// this method returns will be handled and silently ignored. If false, they will go unobserved, meaning they
  266. /// will eventually emerge through <see cref="TaskScheduler.UnobservedTaskException"/>.
  267. /// </param>
  268. /// <returns>An observable sequence that produces the task's result, or propagates the exception produced by the task.</returns>
  269. /// <exception cref="ArgumentNullException"><paramref name="task"/> is <c>null</c> or <paramref name="scheduler"/> is <c>null</c>.</exception>
  270. /// <remarks>If the specified task object supports cancellation, consider using <see cref="Observable.FromAsync{TResult}(Func{CancellationToken, Task{TResult}})"/> instead.</remarks>
  271. public static IObservable<TResult> ToObservable<TResult>(this Task<TResult> task, IScheduler scheduler, bool ignoreExceptionsAfterUnsubscribe)
  272. {
  273. if (task == null)
  274. {
  275. throw new ArgumentNullException(nameof(task));
  276. }
  277. if (scheduler == null)
  278. {
  279. throw new ArgumentNullException(nameof(scheduler));
  280. }
  281. return ToObservableImpl(task, scheduler, ignoreExceptionsAfterUnsubscribe);
  282. }
  283. private static IObservable<TResult> ToObservableImpl<TResult>(Task<TResult> task, IScheduler? scheduler, bool ignoreExceptionsAfterUnsubscribe)
  284. {
  285. if (task.IsCompleted)
  286. {
  287. scheduler ??= ImmediateScheduler.Instance;
  288. return task.Status switch
  289. {
  290. TaskStatus.Faulted => new Throw<TResult>(task.GetSingleException(), scheduler),
  291. TaskStatus.Canceled => new Throw<TResult>(new TaskCanceledException(task), scheduler),
  292. _ => new Return<TResult>(task.Result, scheduler)
  293. };
  294. }
  295. return new SlowTaskObservable<TResult>(task, scheduler, ignoreExceptionsAfterUnsubscribe);
  296. }
  297. private static void EmitTaskResult<TResult>(this Task<TResult> task, IObserver<TResult> subject)
  298. {
  299. switch (task.Status)
  300. {
  301. case TaskStatus.RanToCompletion:
  302. subject.OnNext(task.Result);
  303. subject.OnCompleted();
  304. break;
  305. case TaskStatus.Faulted:
  306. subject.OnError(task.GetSingleException());
  307. break;
  308. case TaskStatus.Canceled:
  309. subject.OnError(new TaskCanceledException(task));
  310. break;
  311. }
  312. }
  313. private static TaskContinuationOptions GetTaskContinuationOptions(IScheduler? scheduler)
  314. {
  315. var options = TaskContinuationOptions.None;
  316. if (scheduler != null)
  317. {
  318. //
  319. // We explicitly don't special-case the immediate scheduler here. If the user asks for a
  320. // synchronous completion, we'll try our best. However, there's no guarantee due to the
  321. // internal stack probing in the TPL, which may cause asynchronous completion on a thread
  322. // pool thread in order to avoid stack overflows. Therefore we can only attempt to be more
  323. // efficient in the case where the user specified a scheduler, hence we know that the
  324. // continuation will trigger a scheduling operation. In case of the immediate scheduler,
  325. // it really becomes "immediate scheduling" wherever the TPL decided to run the continuation,
  326. // i.e. not necessarily where the task was completed from.
  327. //
  328. options |= TaskContinuationOptions.ExecuteSynchronously;
  329. }
  330. return options;
  331. }
  332. internal static IDisposable Subscribe<TResult>(this Task<TResult> task, IObserver<TResult> observer)
  333. {
  334. if (task.IsCompleted)
  335. {
  336. task.EmitTaskResult(observer);
  337. return Disposable.Empty;
  338. }
  339. var cts = new CancellationDisposable();
  340. task.ContinueWith(
  341. static (t, observerObject) => t.EmitTaskResult((IObserver<TResult>)observerObject!),
  342. observer,
  343. cts.Token,
  344. TaskContinuationOptions.ExecuteSynchronously,
  345. TaskScheduler.Current);
  346. return cts;
  347. }
  348. /// <summary>
  349. /// Returns a task that will receive the last value or the exception produced by the observable sequence.
  350. /// </summary>
  351. /// <typeparam name="TResult">The type of the elements in the source sequence.</typeparam>
  352. /// <param name="observable">Observable sequence to convert to a task.</param>
  353. /// <returns>A task that will receive the last element or the exception produced by the observable sequence.</returns>
  354. /// <exception cref="ArgumentNullException"><paramref name="observable"/> is <c>null</c>.</exception>
  355. public static Task<TResult> ToTask<TResult>(this IObservable<TResult> observable)
  356. {
  357. if (observable == null)
  358. {
  359. throw new ArgumentNullException(nameof(observable));
  360. }
  361. return observable.ToTask(new CancellationToken(), state: null);
  362. }
  363. /// <summary>
  364. /// Returns a task that will receive the last value or the exception produced by the observable sequence.
  365. /// </summary>
  366. /// <typeparam name="TResult">The type of the elements in the source sequence.</typeparam>
  367. /// <param name="observable">Observable sequence to convert to a task.</param>
  368. /// <param name="scheduler">The scheduler used for overriding where the task completion signals will be issued.</param>
  369. /// <returns>A task that will receive the last element or the exception produced by the observable sequence.</returns>
  370. /// <exception cref="ArgumentNullException"><paramref name="observable"/> or <paramref name="scheduler"/> is <c>null</c>.</exception>
  371. public static Task<TResult> ToTask<TResult>(this IObservable<TResult> observable, IScheduler scheduler)
  372. {
  373. return observable.ToTask().ContinueOnScheduler(scheduler);
  374. }
  375. /// <summary>
  376. /// Returns a task that will receive the last value or the exception produced by the observable sequence.
  377. /// </summary>
  378. /// <typeparam name="TResult">The type of the elements in the source sequence.</typeparam>
  379. /// <param name="observable">Observable sequence to convert to a task.</param>
  380. /// <param name="state">The state to use as the underlying task's AsyncState.</param>
  381. /// <returns>A task that will receive the last element or the exception produced by the observable sequence.</returns>
  382. /// <exception cref="ArgumentNullException"><paramref name="observable"/> is <c>null</c>.</exception>
  383. public static Task<TResult> ToTask<TResult>(this IObservable<TResult> observable, object? state)
  384. {
  385. if (observable == null)
  386. {
  387. throw new ArgumentNullException(nameof(observable));
  388. }
  389. return observable.ToTask(new CancellationToken(), state);
  390. }
  391. /// <summary>
  392. /// Returns a task that will receive the last value or the exception produced by the observable sequence.
  393. /// </summary>
  394. /// <typeparam name="TResult">The type of the elements in the source sequence.</typeparam>
  395. /// <param name="observable">Observable sequence to convert to a task.</param>
  396. /// <param name="state">The state to use as the underlying task's AsyncState.</param>
  397. /// <param name="scheduler">The scheduler used for overriding where the task completion signals will be issued.</param>
  398. /// <returns>A task that will receive the last element or the exception produced by the observable sequence.</returns>
  399. /// <exception cref="ArgumentNullException"><paramref name="observable"/> or <paramref name="scheduler"/> is <c>null</c>.</exception>
  400. public static Task<TResult> ToTask<TResult>(this IObservable<TResult> observable, object? state, IScheduler scheduler)
  401. {
  402. return observable.ToTask(new CancellationToken(), state).ContinueOnScheduler(scheduler);
  403. }
  404. /// <summary>
  405. /// Returns a task that will receive the last value or the exception produced by the observable sequence.
  406. /// </summary>
  407. /// <typeparam name="TResult">The type of the elements in the source sequence.</typeparam>
  408. /// <param name="observable">Observable sequence to convert to a task.</param>
  409. /// <param name="cancellationToken">Cancellation token that can be used to cancel the task, causing unsubscription from the observable sequence.</param>
  410. /// <returns>A task that will receive the last element or the exception produced by the observable sequence.</returns>
  411. /// <exception cref="ArgumentNullException"><paramref name="observable"/> is <c>null</c>.</exception>
  412. public static Task<TResult> ToTask<TResult>(this IObservable<TResult> observable, CancellationToken cancellationToken)
  413. {
  414. if (observable == null)
  415. {
  416. throw new ArgumentNullException(nameof(observable));
  417. }
  418. return observable.ToTask(cancellationToken, state: null);
  419. }
  420. /// <summary>
  421. /// Returns a task that will receive the last value or the exception produced by the observable sequence.
  422. /// </summary>
  423. /// <typeparam name="TResult">The type of the elements in the source sequence.</typeparam>
  424. /// <param name="observable">Observable sequence to convert to a task.</param>
  425. /// <param name="cancellationToken">Cancellation token that can be used to cancel the task, causing unsubscription from the observable sequence.</param>
  426. /// <param name="scheduler">The scheduler used for overriding where the task completion signals will be issued.</param>
  427. /// <returns>A task that will receive the last element or the exception produced by the observable sequence.</returns>
  428. /// <exception cref="ArgumentNullException"><paramref name="observable"/> or <paramref name="scheduler"/> is <c>null</c>.</exception>
  429. public static Task<TResult> ToTask<TResult>(this IObservable<TResult> observable, CancellationToken cancellationToken, IScheduler scheduler)
  430. {
  431. return observable.ToTask(cancellationToken, state: null).ContinueOnScheduler(scheduler);
  432. }
  433. internal static Task<TResult> ContinueOnScheduler<TResult>(this Task<TResult> task, IScheduler scheduler)
  434. {
  435. if (scheduler == null)
  436. {
  437. throw new ArgumentNullException(nameof(scheduler));
  438. }
  439. var tcs = new TaskCompletionSource<TResult>(task.AsyncState);
  440. task.ContinueWith(
  441. static (t, o) =>
  442. {
  443. var (scheduler, tcs) = ((IScheduler, TaskCompletionSource<TResult>))o!;
  444. scheduler.ScheduleAction((t, tcs), static state =>
  445. {
  446. if (state.t.IsCanceled)
  447. {
  448. state.tcs.TrySetCanceled(new TaskCanceledException(state.t).CancellationToken);
  449. }
  450. else if (state.t.IsFaulted)
  451. {
  452. state.tcs.TrySetException(state.t.GetSingleException());
  453. }
  454. else
  455. {
  456. state.tcs.TrySetResult(state.t.Result);
  457. }
  458. });
  459. },
  460. (scheduler, tcs),
  461. TaskContinuationOptions.ExecuteSynchronously);
  462. return tcs.Task;
  463. }
  464. private sealed class ToTaskObserver<TResult> : SafeObserver<TResult>
  465. {
  466. private readonly CancellationToken _ct;
  467. private readonly TaskCompletionSource<TResult> _tcs;
  468. private readonly CancellationTokenRegistration _ctr;
  469. private bool _hasValue;
  470. private TResult? _lastValue;
  471. public ToTaskObserver(TaskCompletionSource<TResult> tcs, CancellationToken ct)
  472. {
  473. _ct = ct;
  474. _tcs = tcs;
  475. if (ct.CanBeCanceled)
  476. {
  477. _ctr = ct.Register(static @this => ((ToTaskObserver<TResult>)@this!).Cancel(), this);
  478. }
  479. }
  480. public override void OnNext(TResult value)
  481. {
  482. _hasValue = true;
  483. _lastValue = value;
  484. }
  485. public override void OnError(Exception error)
  486. {
  487. _tcs.TrySetException(error);
  488. _ctr.Dispose(); // no null-check needed (struct)
  489. Dispose();
  490. }
  491. public override void OnCompleted()
  492. {
  493. if (_hasValue)
  494. {
  495. _tcs.TrySetResult(_lastValue!);
  496. }
  497. else
  498. {
  499. try
  500. {
  501. throw new InvalidOperationException(Strings_Linq.NO_ELEMENTS);
  502. }
  503. catch (Exception e)
  504. {
  505. _tcs.TrySetException(e);
  506. }
  507. }
  508. _ctr.Dispose(); // no null-check needed (struct)
  509. Dispose();
  510. }
  511. private void Cancel()
  512. {
  513. Dispose();
  514. _tcs.TrySetCanceled(_ct);
  515. }
  516. }
  517. /// <summary>
  518. /// Returns a task that will receive the last value or the exception produced by the observable sequence.
  519. /// </summary>
  520. /// <typeparam name="TResult">The type of the elements in the source sequence.</typeparam>
  521. /// <param name="observable">Observable sequence to convert to a task.</param>
  522. /// <param name="cancellationToken">Cancellation token that can be used to cancel the task, causing unsubscription from the observable sequence.</param>
  523. /// <param name="state">The state to use as the underlying task's <see cref="Task.AsyncState"/>.</param>
  524. /// <returns>A task that will receive the last element or the exception produced by the observable sequence.</returns>
  525. /// <exception cref="ArgumentNullException"><paramref name="observable"/> is <c>null</c>.</exception>
  526. public static Task<TResult> ToTask<TResult>(this IObservable<TResult> observable, CancellationToken cancellationToken, object? state)
  527. {
  528. if (observable == null)
  529. {
  530. throw new ArgumentNullException(nameof(observable));
  531. }
  532. var tcs = new TaskCompletionSource<TResult>(state);
  533. var taskCompletionObserver = new ToTaskObserver<TResult>(tcs, cancellationToken);
  534. //
  535. // Subtle race condition: if the source completes before we reach the line below, the SingleAssigmentDisposable
  536. // will already have been disposed. Upon assignment, the disposable resource being set will be disposed on the
  537. // spot, which may throw an exception.
  538. //
  539. try
  540. {
  541. //
  542. // [OK] Use of unsafe Subscribe: we're catching the exception here to set the TaskCompletionSource.
  543. //
  544. // Notice we could use a safe subscription to route errors through OnError, but we still need the
  545. // exception handling logic here for the reason explained above. We cannot afford to throw here
  546. // and as a result never set the TaskCompletionSource, so we tunnel everything through here.
  547. //
  548. taskCompletionObserver.SetResource(observable.Subscribe/*Unsafe*/(taskCompletionObserver));
  549. }
  550. catch (Exception ex)
  551. {
  552. tcs.TrySetException(ex);
  553. }
  554. return tcs.Task;
  555. }
  556. /// <summary>
  557. /// Returns a task that will receive the last value or the exception produced by the observable sequence.
  558. /// </summary>
  559. /// <typeparam name="TResult">The type of the elements in the source sequence.</typeparam>
  560. /// <param name="observable">Observable sequence to convert to a task.</param>
  561. /// <param name="cancellationToken">Cancellation token that can be used to cancel the task, causing unsubscription from the observable sequence.</param>
  562. /// <param name="state">The state to use as the underlying task's <see cref="Task.AsyncState"/>.</param>
  563. /// <param name="scheduler">The scheduler used for overriding where the task completion signals will be issued.</param>
  564. /// <returns>A task that will receive the last element or the exception produced by the observable sequence.</returns>
  565. /// <exception cref="ArgumentNullException"><paramref name="observable"/> or <paramref name="scheduler"/> is <c>null</c>.</exception>
  566. public static Task<TResult> ToTask<TResult>(this IObservable<TResult> observable, CancellationToken cancellationToken, object? state, IScheduler scheduler)
  567. {
  568. return observable.ToTask(cancellationToken, state).ContinueOnScheduler(scheduler);
  569. }
  570. }
  571. }