FromAsyncTest.cs 21 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Linq;
  6. using System.Reactive;
  7. using System.Reactive.Concurrency;
  8. using System.Reactive.Linq;
  9. using System.Threading;
  10. using System.Threading.Tasks;
  11. using Microsoft.Reactive.Testing;
  12. using Microsoft.VisualStudio.TestTools.UnitTesting;
  13. using Tests.System.Reactive;
  14. using Assert = Xunit.Assert;
  15. namespace ReactiveTests.Tests
  16. {
  17. [TestClass]
  18. public class FromAsyncTest : ReactiveTest
  19. {
  20. private readonly Task<int> _doneTask;
  21. public FromAsyncTest()
  22. {
  23. var tcs = new TaskCompletionSource<int>();
  24. tcs.SetResult(42);
  25. _doneTask = tcs.Task;
  26. }
  27. #region Func
  28. [TestMethod]
  29. public void FromAsync_Func_ArgumentChecking()
  30. {
  31. var s = Scheduler.Immediate;
  32. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.FromAsync(default(Func<Task<int>>)));
  33. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.FromAsync(default(Func<CancellationToken, Task<int>>)));
  34. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.FromAsync(default(Func<Task<int>>), s));
  35. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.FromAsync(() => _doneTask, default(IScheduler)));
  36. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.FromAsync(default(Func<CancellationToken, Task<int>>), s));
  37. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.FromAsync(ct => _doneTask, default(IScheduler)));
  38. }
  39. [TestMethod]
  40. public void FromAsync_Func_Success()
  41. {
  42. var n = 42;
  43. var i = 0;
  44. var xs = Observable.FromAsync(() =>
  45. {
  46. i++;
  47. return Task.Factory.StartNew(() => n);
  48. });
  49. Assert.Equal(n, xs.Single());
  50. Assert.Equal(1, i);
  51. Assert.Equal(n, xs.Single());
  52. Assert.Equal(2, i);
  53. }
  54. [TestMethod]
  55. public void FromAsync_Func_Throw_Synchronous()
  56. {
  57. var ex = new Exception();
  58. var xs = Observable.FromAsync<int>(() =>
  59. {
  60. throw ex;
  61. });
  62. ReactiveAssert.Throws(ex, () => xs.Single());
  63. }
  64. [TestMethod]
  65. public void FromAsync_Func_Throw_Asynchronous()
  66. {
  67. var ex = new Exception();
  68. var xs = Observable.FromAsync(() =>
  69. Task.Factory.StartNew<int>(() => { throw ex; })
  70. );
  71. ReactiveAssert.Throws(ex, () => xs.Single());
  72. }
  73. [TestMethod]
  74. public void FromAsync_FuncWithCancel_Success()
  75. {
  76. var n = 42;
  77. var i = 0;
  78. var xs = Observable.FromAsync(ct =>
  79. {
  80. i++;
  81. return Task.Factory.StartNew(() => n);
  82. });
  83. Assert.Equal(n, xs.Single());
  84. Assert.Equal(1, i);
  85. Assert.Equal(n, xs.Single());
  86. Assert.Equal(2, i);
  87. }
  88. [TestMethod]
  89. public void FromAsync_FuncWithCancel_Throw_Synchronous()
  90. {
  91. var ex = new Exception();
  92. var xs = Observable.FromAsync<int>(ct =>
  93. {
  94. throw ex;
  95. });
  96. ReactiveAssert.Throws(ex, () => xs.Single());
  97. }
  98. [TestMethod]
  99. public void FromAsync_FuncWithCancel_Throw_Asynchronous()
  100. {
  101. var ex = new Exception();
  102. var xs = Observable.FromAsync(ct =>
  103. Task.Factory.StartNew<int>(() => { throw ex; })
  104. );
  105. ReactiveAssert.Throws(ex, () => xs.Single());
  106. }
  107. [TestMethod]
  108. public void FromAsync_FuncWithCancel_Cancel()
  109. {
  110. var e = new ManualResetEvent(false);
  111. var f = new ManualResetEvent(false);
  112. var t = default(Task<int>);
  113. var xs = Observable.FromAsync(ct =>
  114. t = Task.Factory.StartNew<int>(() =>
  115. {
  116. try
  117. {
  118. e.Set();
  119. while (true)
  120. {
  121. ct.ThrowIfCancellationRequested();
  122. }
  123. }
  124. finally
  125. {
  126. f.Set();
  127. }
  128. })
  129. );
  130. var d = xs.Subscribe(_ => { });
  131. e.WaitOne();
  132. d.Dispose();
  133. f.WaitOne();
  134. while (!t.IsCompleted)
  135. {
  136. ;
  137. }
  138. }
  139. [TestMethod]
  140. public void FromAsync_Func_UnsubscribeThenError_ErrorReportedAsUnobserved()
  141. {
  142. FromAsync_Func_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
  143. createTask => Observable.FromAsync(createTask),
  144. errorObservation =>
  145. {
  146. errorObservation.AssertExceptionReportedAsUnobserved();
  147. });
  148. }
  149. [TestMethod]
  150. public void FromAsync_FuncWithCancel_UnsubscribeThenError_ErrorReportedAsUnobserved()
  151. {
  152. FromAsync_Func_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
  153. createTask => Observable.FromAsync(_ => createTask()),
  154. errorObservation =>
  155. {
  156. errorObservation.AssertExceptionReportedAsUnobserved();
  157. });
  158. }
  159. [TestMethod]
  160. public void FromAsync_Func_WithScheduler_UnsubscribeThenError_ErrorReportedAsUnobserved()
  161. {
  162. FromAsync_Func_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
  163. createTask => Observable.FromAsync(createTask, TaskPoolScheduler.Default),
  164. errorObservation =>
  165. {
  166. errorObservation.AssertExceptionReportedAsUnobserved();
  167. });
  168. }
  169. [TestMethod]
  170. public void FromAsync_FuncWithCancel_WithScheduler_UnsubscribeThenError_ErrorReportedAsUnobserved()
  171. {
  172. FromAsync_Func_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
  173. createTask => Observable.FromAsync(_ => createTask(), TaskPoolScheduler.Default),
  174. errorObservation =>
  175. {
  176. errorObservation.AssertExceptionReportedAsUnobserved();
  177. });
  178. }
  179. [TestMethod]
  180. public void FromAsync_Func_IgnorePostUnsubscribeErrors_UnsubscribeThenError_ErrorNotReportedAsUnobserved()
  181. {
  182. FromAsync_Func_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
  183. createTask => Observable.FromAsync(createTask, new TaskObservationOptions(null, ignoreExceptionsAfterUnsubscribe: true)),
  184. errorObservation =>
  185. {
  186. errorObservation.AssertExceptionNotReportedAsUnobserved();
  187. });
  188. }
  189. [TestMethod]
  190. public void FromAsync_FuncWithCancel_IgnorePostUnsubscribeErrors_UnsubscribeThenError_ErrorNotReportedAsUnobserved()
  191. {
  192. FromAsync_Func_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
  193. createTask => Observable.FromAsync(_ => createTask(), new TaskObservationOptions(null, ignoreExceptionsAfterUnsubscribe: true)),
  194. errorObservation =>
  195. {
  196. errorObservation.AssertExceptionNotReportedAsUnobserved();
  197. });
  198. }
  199. [TestMethod]
  200. public void FromAsync_Func_WithScheduler_IgnorePostUnsubscribeErrors_UnsubscribeThenError_ErrorNotReportedAsUnobserved()
  201. {
  202. FromAsync_Func_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
  203. createTask => Observable.FromAsync(createTask, new TaskObservationOptions(TaskPoolScheduler.Default, ignoreExceptionsAfterUnsubscribe: true)),
  204. errorObservation =>
  205. {
  206. errorObservation.AssertExceptionNotReportedAsUnobserved();
  207. });
  208. }
  209. [TestMethod]
  210. public void FromAsync_FuncWithCancel_WithScheduler_IgnorePostUnsubscribeErrors_UnsubscribeThenError_ErrorNotReportedAsUnobserved()
  211. {
  212. FromAsync_Func_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
  213. createTask => Observable.FromAsync(_ => createTask(), new TaskObservationOptions(TaskPoolScheduler.Default, ignoreExceptionsAfterUnsubscribe: true)),
  214. errorObservation =>
  215. {
  216. errorObservation.AssertExceptionNotReportedAsUnobserved();
  217. });
  218. }
  219. #if DESKTOPCLR
  220. [TestMethod]
  221. public void FromAsync_Func_Scheduler1()
  222. {
  223. var e = new ManualResetEvent(false);
  224. var x = default(int);
  225. var t = default(int);
  226. var tcs = new TaskCompletionSource<int>();
  227. var xs = Observable.FromAsync(() => tcs.Task, Scheduler.Immediate);
  228. xs.Subscribe(res =>
  229. {
  230. x = res;
  231. t = Environment.CurrentManagedThreadId;
  232. e.Set();
  233. });
  234. tcs.SetResult(42);
  235. e.WaitOne();
  236. Assert.Equal(42, x);
  237. Assert.Equal(Environment.CurrentManagedThreadId, t);
  238. }
  239. [TestMethod]
  240. public void FromAsync_Func_Scheduler2()
  241. {
  242. var e = new ManualResetEvent(false);
  243. var x = default(int);
  244. var t = default(int);
  245. var tcs = new TaskCompletionSource<int>();
  246. var xs = Observable.FromAsync(ct => tcs.Task, Scheduler.Immediate);
  247. xs.Subscribe(res =>
  248. {
  249. x = res;
  250. t = Environment.CurrentManagedThreadId;
  251. e.Set();
  252. });
  253. tcs.SetResult(42);
  254. e.WaitOne();
  255. Assert.Equal(42, x);
  256. Assert.Equal(Environment.CurrentManagedThreadId, t);
  257. }
  258. #endif
  259. #endregion
  260. #region Action
  261. [TestMethod]
  262. public void FromAsync_Action_ArgumentChecking()
  263. {
  264. var s = Scheduler.Immediate;
  265. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.FromAsync(default(Func<Task>)));
  266. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.FromAsync(default(Func<CancellationToken, Task>)));
  267. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.FromAsync(default(Func<Task>), s));
  268. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.FromAsync(default(Func<CancellationToken, Task>), s));
  269. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.FromAsync(ct => (Task)_doneTask, default(IScheduler)));
  270. }
  271. [TestMethod]
  272. public void FromAsync_Action_Success()
  273. {
  274. var i = 0;
  275. var xs = Observable.FromAsync(() =>
  276. {
  277. i++;
  278. return Task.Factory.StartNew(() => { });
  279. });
  280. Assert.Equal(Unit.Default, xs.Single());
  281. Assert.Equal(1, i);
  282. Assert.Equal(Unit.Default, xs.Single());
  283. Assert.Equal(2, i);
  284. }
  285. [TestMethod]
  286. public void FromAsync_Action_Throw_Synchronous()
  287. {
  288. var ex = new Exception();
  289. var xs = Observable.FromAsync(() =>
  290. {
  291. throw ex;
  292. });
  293. ReactiveAssert.Throws(ex, () => xs.Single());
  294. }
  295. [TestMethod]
  296. public void FromAsync_Action_Throw_Asynchronous()
  297. {
  298. var ex = new Exception();
  299. var xs = Observable.FromAsync(() =>
  300. Task.Factory.StartNew(() => { throw ex; })
  301. );
  302. ReactiveAssert.Throws(ex, () => xs.Single());
  303. }
  304. [TestMethod]
  305. public void FromAsync_ActionWithCancel_Success()
  306. {
  307. var i = 0;
  308. var xs = Observable.FromAsync(ct =>
  309. {
  310. i++;
  311. return Task.Factory.StartNew(() => { }, CancellationToken.None); // Not forwarding ct because we want this task always to run and complete.
  312. });
  313. Assert.Equal(Unit.Default, xs.Single());
  314. Assert.Equal(1, i);
  315. Assert.Equal(Unit.Default, xs.Single());
  316. Assert.Equal(2, i);
  317. }
  318. [TestMethod]
  319. public void FromAsync_ActionWithCancel_Throw_Synchronous()
  320. {
  321. var ex = new Exception();
  322. var xs = Observable.FromAsync(ct =>
  323. {
  324. throw ex;
  325. });
  326. ReactiveAssert.Throws(ex, () => xs.Single());
  327. }
  328. [TestMethod]
  329. public void FromAsync_ActionWithCancel_Throw_Asynchronous()
  330. {
  331. var ex = new Exception();
  332. var xs = Observable.FromAsync(ct =>
  333. Task.Factory.StartNew(() => { throw ex; }, CancellationToken.None) // Not forwarding ct because we always want this task to run and fail
  334. );
  335. ReactiveAssert.Throws(ex, () => xs.Single());
  336. }
  337. [TestMethod]
  338. public void FromAsync_ActionWithCancel_Cancel()
  339. {
  340. var e = new ManualResetEvent(false);
  341. var f = new ManualResetEvent(false);
  342. var t = default(Task);
  343. var xs = Observable.FromAsync(ct =>
  344. t = Task.Factory.StartNew(() =>
  345. {
  346. try
  347. {
  348. e.Set();
  349. while (true)
  350. {
  351. ct.ThrowIfCancellationRequested();
  352. }
  353. }
  354. finally
  355. {
  356. f.Set();
  357. }
  358. },
  359. CancellationToken.None) // Not forwarding ct because we are testing the case where the task is already running by the time cancellation is detected
  360. );
  361. var d = xs.Subscribe(_ => { });
  362. e.WaitOne();
  363. d.Dispose();
  364. f.WaitOne();
  365. while (!t.IsCompleted)
  366. {
  367. ;
  368. }
  369. }
  370. [TestMethod]
  371. public void FromAsync_Action_UnsubscribeThenError_ErrorReportedAsUnobserved()
  372. {
  373. FromAsync_Action_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
  374. createTask => Observable.FromAsync(createTask),
  375. errorObservation =>
  376. {
  377. errorObservation.AssertExceptionReportedAsUnobserved();
  378. });
  379. }
  380. [TestMethod]
  381. public void FromAsync_ActionWithCancel_UnsubscribeThenError_ErrorReportedAsUnobserved()
  382. {
  383. FromAsync_Action_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
  384. createTask => Observable.FromAsync(_ => createTask()),
  385. errorObservation =>
  386. {
  387. errorObservation.AssertExceptionReportedAsUnobserved();
  388. });
  389. }
  390. [TestMethod]
  391. public void FromAsync_Action_WithScheduler_UnsubscribeThenError_ErrorReportedAsUnobserved()
  392. {
  393. FromAsync_Action_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
  394. createTask => Observable.FromAsync(createTask, TaskPoolScheduler.Default),
  395. errorObservation =>
  396. {
  397. errorObservation.AssertExceptionReportedAsUnobserved();
  398. });
  399. }
  400. [TestMethod]
  401. public void FromAsync_ActionWithCancel_WithScheduler_UnsubscribeThenError_ErrorReportedAsUnobserved()
  402. {
  403. FromAsync_Action_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
  404. createTask => Observable.FromAsync(_ => createTask(), TaskPoolScheduler.Default),
  405. errorObservation =>
  406. {
  407. errorObservation.AssertExceptionReportedAsUnobserved();
  408. });
  409. }
  410. [TestMethod]
  411. public void FromAsync_Action_IgnorePostUnsubscribeErrors_UnsubscribeThenError_ErrorNotReportedAsUnobserved()
  412. {
  413. FromAsync_Action_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
  414. createTask => Observable.FromAsync(createTask, new TaskObservationOptions(null, ignoreExceptionsAfterUnsubscribe: true)),
  415. errorObservation =>
  416. {
  417. errorObservation.AssertExceptionNotReportedAsUnobserved();
  418. });
  419. }
  420. [TestMethod]
  421. public void FromAsync_ActionWithCancel_IgnorePostUnsubscribeErrors_UnsubscribeThenError_ErrorNotReportedAsUnobserved()
  422. {
  423. FromAsync_Action_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
  424. createTask => Observable.FromAsync(_ => createTask(), new TaskObservationOptions(scheduler: null, ignoreExceptionsAfterUnsubscribe: true)),
  425. errorObservation =>
  426. {
  427. errorObservation.AssertExceptionNotReportedAsUnobserved();
  428. });
  429. }
  430. [TestMethod]
  431. public void FromAsync_Action_WithScheduler_IgnorePostUnsubscribeErrors_UnsubscribeThenError_ErrorNotReportedAsUnobserved()
  432. {
  433. FromAsync_Action_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
  434. createTask => Observable.FromAsync(createTask, new TaskObservationOptions(TaskPoolScheduler.Default, ignoreExceptionsAfterUnsubscribe: true)),
  435. errorObservation =>
  436. {
  437. errorObservation.AssertExceptionNotReportedAsUnobserved();
  438. });
  439. }
  440. [TestMethod]
  441. public void FromAsync_ActionWithCancel_WithScheduler_IgnorePostUnsubscribeErrors_UnsubscribeThenError_ErrorNotReportedAsUnobserved()
  442. {
  443. FromAsync_Action_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
  444. createTask => Observable.FromAsync(_ => createTask(), new TaskObservationOptions(TaskPoolScheduler.Default, ignoreExceptionsAfterUnsubscribe: true)),
  445. errorObservation =>
  446. {
  447. errorObservation.AssertExceptionNotReportedAsUnobserved();
  448. });
  449. }
  450. #if DESKTOPCLR
  451. [TestMethod]
  452. public void FromAsync_Action_Scheduler1()
  453. {
  454. var e = new ManualResetEvent(false);
  455. var t = default(int);
  456. var tcs = new TaskCompletionSource<int>();
  457. var xs = Observable.FromAsync(() => (Task)tcs.Task, Scheduler.Immediate);
  458. xs.Subscribe(res =>
  459. {
  460. t = Environment.CurrentManagedThreadId;
  461. e.Set();
  462. });
  463. tcs.SetResult(42);
  464. e.WaitOne();
  465. Assert.Equal(Environment.CurrentManagedThreadId, t);
  466. }
  467. [TestMethod]
  468. public void FromAsync_Action_Scheduler2()
  469. {
  470. var e = new ManualResetEvent(false);
  471. var t = default(int);
  472. var tcs = new TaskCompletionSource<int>();
  473. var xs = Observable.FromAsync(ct => (Task)tcs.Task, Scheduler.Immediate);
  474. xs.Subscribe(res =>
  475. {
  476. t = Environment.CurrentManagedThreadId;
  477. e.Set();
  478. });
  479. tcs.SetResult(42);
  480. e.WaitOne();
  481. Assert.Equal(Environment.CurrentManagedThreadId, t);
  482. }
  483. #endif
  484. #endregion
  485. private void FromAsync_Func_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
  486. Func<Func<Task<int>>, IObservable<int>> createObservable,
  487. Action<TaskErrorObservation> testResults)
  488. {
  489. FromAsync_Func_ErrorAfterUnsubscribeReportedAsUnobserved_Core<int>(createObservable, testResults);
  490. }
  491. private void FromAsync_Action_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
  492. Func<Func<Task>, IObservable<Unit>> createObservable,
  493. Action<TaskErrorObservation> testResults)
  494. {
  495. FromAsync_Func_ErrorAfterUnsubscribeReportedAsUnobserved_Core<Unit>(createObservable, testResults);
  496. }
  497. private void FromAsync_Func_ErrorAfterUnsubscribeReportedAsUnobserved_Core<T>(
  498. Func<Func<Task<T>>, IObservable<T>> createObservable,
  499. Action<TaskErrorObservation> testResults)
  500. {
  501. using Barrier gate = new(2);
  502. using TaskErrorObservation errorObservation = new();
  503. var sub = errorObservation.SuscribeWithoutKeepingSourceReachable<T>(
  504. (setTask, exception) => createObservable(
  505. () => setTask(Task.Factory.StartNew<T>(
  506. () =>
  507. {
  508. // 1: Notify that task execution has begun
  509. gate.SignalAndWait();
  510. // 2: Wait until unsubscribe Dispose has returned
  511. gate.SignalAndWait();
  512. throw exception;
  513. })))
  514. .Subscribe());
  515. // 1: wait until task execution has begun
  516. gate.SignalAndWait();
  517. sub.Dispose();
  518. // 2: Notify that unsubscribe Dispose has returned
  519. gate.SignalAndWait();
  520. testResults(errorObservation);
  521. }
  522. }
  523. }