1
0

FromAsyncTest.cs 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Linq;
  6. using System.Reactive;
  7. using System.Reactive.Concurrency;
  8. using System.Reactive.Linq;
  9. using System.Threading;
  10. using System.Threading.Tasks;
  11. using Microsoft.Reactive.Testing;
  12. using Microsoft.VisualStudio.TestTools.UnitTesting;
  13. using Tests.System.Reactive;
  14. using Assert = Xunit.Assert;
  15. namespace ReactiveTests.Tests
  16. {
  17. [TestClass]
  18. public class FromAsyncTest : ReactiveTest
  19. {
  20. private readonly Task<int> _doneTask;
  21. public FromAsyncTest()
  22. {
  23. var tcs = new TaskCompletionSource<int>();
  24. tcs.SetResult(42);
  25. _doneTask = tcs.Task;
  26. }
  27. #region Func
  28. [TestMethod]
  29. public void FromAsync_Func_ArgumentChecking()
  30. {
  31. var s = Scheduler.Immediate;
  32. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.FromAsync(default(Func<Task<int>>)));
  33. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.FromAsync(default(Func<CancellationToken, Task<int>>)));
  34. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.FromAsync(default(Func<Task<int>>), s));
  35. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.FromAsync(() => _doneTask, default(IScheduler)));
  36. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.FromAsync(default(Func<CancellationToken, Task<int>>), s));
  37. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.FromAsync(ct => _doneTask, default(IScheduler)));
  38. }
  39. [TestMethod]
  40. public void FromAsync_Func_Success()
  41. {
  42. var n = 42;
  43. var i = 0;
  44. var xs = Observable.FromAsync(() =>
  45. {
  46. i++;
  47. return Task.Factory.StartNew(() => n);
  48. });
  49. Assert.Equal(n, xs.Single());
  50. Assert.Equal(1, i);
  51. Assert.Equal(n, xs.Single());
  52. Assert.Equal(2, i);
  53. }
  54. [TestMethod]
  55. public void FromAsync_Func_Throw_Synchronous()
  56. {
  57. var ex = new Exception();
  58. var xs = Observable.FromAsync<int>(() =>
  59. {
  60. throw ex;
  61. });
  62. ReactiveAssert.Throws(ex, () => xs.Single());
  63. }
  64. [TestMethod]
  65. public void FromAsync_Func_Throw_Asynchronous()
  66. {
  67. var ex = new Exception();
  68. var xs = Observable.FromAsync(() =>
  69. Task.Factory.StartNew<int>(() => { throw ex; })
  70. );
  71. ReactiveAssert.Throws(ex, () => xs.Single());
  72. }
  73. [TestMethod]
  74. public void FromAsync_FuncWithCancel_Success()
  75. {
  76. var n = 42;
  77. var i = 0;
  78. var xs = Observable.FromAsync(ct =>
  79. {
  80. i++;
  81. return Task.Factory.StartNew(() => n);
  82. });
  83. Assert.Equal(n, xs.Single());
  84. Assert.Equal(1, i);
  85. Assert.Equal(n, xs.Single());
  86. Assert.Equal(2, i);
  87. }
  88. [TestMethod]
  89. public void FromAsync_FuncWithCancel_Throw_Synchronous()
  90. {
  91. var ex = new Exception();
  92. var xs = Observable.FromAsync<int>(ct =>
  93. {
  94. throw ex;
  95. });
  96. ReactiveAssert.Throws(ex, () => xs.Single());
  97. }
  98. [TestMethod]
  99. public void FromAsync_FuncWithCancel_Throw_Asynchronous()
  100. {
  101. var ex = new Exception();
  102. var xs = Observable.FromAsync(ct =>
  103. Task.Factory.StartNew<int>(() => { throw ex; })
  104. );
  105. ReactiveAssert.Throws(ex, () => xs.Single());
  106. }
  107. [TestMethod]
  108. public void FromAsync_FuncWithCancel_Cancel()
  109. {
  110. var e = new ManualResetEvent(false);
  111. var f = new ManualResetEvent(false);
  112. var t = default(Task<int>);
  113. var xs = Observable.FromAsync(ct =>
  114. t = Task.Factory.StartNew<int>(() =>
  115. {
  116. try
  117. {
  118. e.Set();
  119. while (true)
  120. {
  121. ct.ThrowIfCancellationRequested();
  122. }
  123. }
  124. finally
  125. {
  126. f.Set();
  127. }
  128. })
  129. );
  130. var d = xs.Subscribe(_ => { });
  131. e.WaitOne();
  132. d.Dispose();
  133. f.WaitOne();
  134. while (!t.IsCompleted)
  135. {
  136. ;
  137. }
  138. }
  139. [TestMethod]
  140. public void FromAsync_Func_UnsubscribeThenError_ErrorReportedAsUnobserved()
  141. {
  142. FromAsync_Func_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
  143. createTask => Observable.FromAsync(createTask),
  144. errorObservation =>
  145. {
  146. errorObservation.AssertExceptionReportedAsUnobserved();
  147. });
  148. }
  149. [TestMethod]
  150. public void FromAsync_FuncWithCancel_UnsubscribeThenError_ErrorReportedAsUnobserved()
  151. {
  152. FromAsync_Func_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
  153. createTask => Observable.FromAsync(_ => createTask()),
  154. errorObservation =>
  155. {
  156. errorObservation.AssertExceptionReportedAsUnobserved();
  157. });
  158. }
  159. [TestMethod]
  160. public void FromAsync_Func_WithScheduler_UnsubscribeThenError_ErrorReportedAsUnobserved()
  161. {
  162. FromAsync_Func_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
  163. createTask => Observable.FromAsync(createTask, TaskPoolScheduler.Default),
  164. errorObservation =>
  165. {
  166. errorObservation.AssertExceptionReportedAsUnobserved();
  167. });
  168. }
  169. [TestMethod]
  170. public void FromAsync_FuncWithCancel_WithScheduler_UnsubscribeThenError_ErrorReportedAsUnobserved()
  171. {
  172. FromAsync_Func_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
  173. createTask => Observable.FromAsync(_ => createTask(), TaskPoolScheduler.Default),
  174. errorObservation =>
  175. {
  176. errorObservation.AssertExceptionReportedAsUnobserved();
  177. });
  178. }
  179. [TestMethod]
  180. public void FromAsync_Func_IgnorePostUnsubscribeErrors_UnsubscribeThenError_ErrorNotReportedAsUnobserved()
  181. {
  182. FromAsync_Func_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
  183. createTask => Observable.FromAsync(createTask, new TaskObservationOptions(null, ignoreExceptionsAfterUnsubscribe: true)),
  184. errorObservation =>
  185. {
  186. errorObservation.AssertExceptionNotReportedAsUnobserved();
  187. });
  188. }
  189. [TestMethod]
  190. public void FromAsync_FuncWithCancel_IgnorePostUnsubscribeErrors_UnsubscribeThenError_ErrorNotReportedAsUnobserved()
  191. {
  192. FromAsync_Func_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
  193. createTask => Observable.FromAsync(_ => createTask(), new TaskObservationOptions(null, ignoreExceptionsAfterUnsubscribe: true)),
  194. errorObservation =>
  195. {
  196. errorObservation.AssertExceptionNotReportedAsUnobserved();
  197. });
  198. }
  199. [TestMethod]
  200. public void FromAsync_Func_WithScheduler_IgnorePostUnsubscribeErrors_UnsubscribeThenError_ErrorNotReportedAsUnobserved()
  201. {
  202. FromAsync_Func_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
  203. createTask => Observable.FromAsync(createTask, new TaskObservationOptions(TaskPoolScheduler.Default, ignoreExceptionsAfterUnsubscribe: true)),
  204. errorObservation =>
  205. {
  206. errorObservation.AssertExceptionNotReportedAsUnobserved();
  207. });
  208. }
  209. [TestMethod]
  210. public void FromAsync_FuncWithCancel_WithScheduler_IgnorePostUnsubscribeErrors_UnsubscribeThenError_ErrorNotReportedAsUnobserved()
  211. {
  212. FromAsync_Func_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
  213. createTask => Observable.FromAsync(_ => createTask(), new TaskObservationOptions(TaskPoolScheduler.Default, ignoreExceptionsAfterUnsubscribe: true)),
  214. errorObservation =>
  215. {
  216. errorObservation.AssertExceptionNotReportedAsUnobserved();
  217. });
  218. }
  219. #if DESKTOPCLR
  220. [TestMethod]
  221. public void FromAsync_Func_Scheduler1()
  222. {
  223. var e = new ManualResetEvent(false);
  224. var x = default(int);
  225. var t = default(int);
  226. var tcs = new TaskCompletionSource<int>();
  227. var xs = Observable.FromAsync(() => tcs.Task, Scheduler.Immediate);
  228. xs.Subscribe(res =>
  229. {
  230. x = res;
  231. t = Thread.CurrentThread.ManagedThreadId;
  232. e.Set();
  233. });
  234. tcs.SetResult(42);
  235. e.WaitOne();
  236. Assert.Equal(42, x);
  237. Assert.Equal(Thread.CurrentThread.ManagedThreadId, t);
  238. }
  239. [TestMethod]
  240. public void FromAsync_Func_Scheduler2()
  241. {
  242. var e = new ManualResetEvent(false);
  243. var x = default(int);
  244. var t = default(int);
  245. var tcs = new TaskCompletionSource<int>();
  246. var xs = Observable.FromAsync(ct => tcs.Task, Scheduler.Immediate);
  247. xs.Subscribe(res =>
  248. {
  249. x = res;
  250. t = Thread.CurrentThread.ManagedThreadId;
  251. e.Set();
  252. });
  253. tcs.SetResult(42);
  254. e.WaitOne();
  255. Assert.Equal(42, x);
  256. Assert.Equal(Thread.CurrentThread.ManagedThreadId, t);
  257. }
  258. #endif
  259. #endregion
  260. #region Action
  261. [TestMethod]
  262. public void FromAsync_Action_ArgumentChecking()
  263. {
  264. var s = Scheduler.Immediate;
  265. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.FromAsync(default(Func<Task>)));
  266. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.FromAsync(default(Func<CancellationToken, Task>)));
  267. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.FromAsync(default(Func<Task>), s));
  268. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.FromAsync(default(Func<CancellationToken, Task>), s));
  269. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.FromAsync(ct => (Task)_doneTask, default(IScheduler)));
  270. }
  271. [TestMethod]
  272. public void FromAsync_Action_Success()
  273. {
  274. var i = 0;
  275. var xs = Observable.FromAsync(() =>
  276. {
  277. i++;
  278. return Task.Factory.StartNew(() => { });
  279. });
  280. Assert.Equal(Unit.Default, xs.Single());
  281. Assert.Equal(1, i);
  282. Assert.Equal(Unit.Default, xs.Single());
  283. Assert.Equal(2, i);
  284. }
  285. [TestMethod]
  286. public void FromAsync_Action_Throw_Synchronous()
  287. {
  288. var ex = new Exception();
  289. var xs = Observable.FromAsync(() =>
  290. {
  291. throw ex;
  292. });
  293. ReactiveAssert.Throws(ex, () => xs.Single());
  294. }
  295. [TestMethod]
  296. public void FromAsync_Action_Throw_Asynchronous()
  297. {
  298. var ex = new Exception();
  299. var xs = Observable.FromAsync(() =>
  300. Task.Factory.StartNew(() => { throw ex; })
  301. );
  302. ReactiveAssert.Throws(ex, () => xs.Single());
  303. }
  304. [TestMethod]
  305. public void FromAsync_ActionWithCancel_Success()
  306. {
  307. var i = 0;
  308. var xs = Observable.FromAsync(ct =>
  309. {
  310. i++;
  311. return Task.Factory.StartNew(() => { });
  312. });
  313. Assert.Equal(Unit.Default, xs.Single());
  314. Assert.Equal(1, i);
  315. Assert.Equal(Unit.Default, xs.Single());
  316. Assert.Equal(2, i);
  317. }
  318. [TestMethod]
  319. public void FromAsync_ActionWithCancel_Throw_Synchronous()
  320. {
  321. var ex = new Exception();
  322. var xs = Observable.FromAsync(ct =>
  323. {
  324. throw ex;
  325. });
  326. ReactiveAssert.Throws(ex, () => xs.Single());
  327. }
  328. [TestMethod]
  329. public void FromAsync_ActionWithCancel_Throw_Asynchronous()
  330. {
  331. var ex = new Exception();
  332. var xs = Observable.FromAsync(ct =>
  333. Task.Factory.StartNew(() => { throw ex; })
  334. );
  335. ReactiveAssert.Throws(ex, () => xs.Single());
  336. }
  337. [TestMethod]
  338. public void FromAsync_ActionWithCancel_Cancel()
  339. {
  340. var e = new ManualResetEvent(false);
  341. var f = new ManualResetEvent(false);
  342. var t = default(Task);
  343. var xs = Observable.FromAsync(ct =>
  344. t = Task.Factory.StartNew(() =>
  345. {
  346. try
  347. {
  348. e.Set();
  349. while (true)
  350. {
  351. ct.ThrowIfCancellationRequested();
  352. }
  353. }
  354. finally
  355. {
  356. f.Set();
  357. }
  358. })
  359. );
  360. var d = xs.Subscribe(_ => { });
  361. e.WaitOne();
  362. d.Dispose();
  363. f.WaitOne();
  364. while (!t.IsCompleted)
  365. {
  366. ;
  367. }
  368. }
  369. [TestMethod]
  370. public void FromAsync_Action_UnsubscribeThenError_ErrorReportedAsUnobserved()
  371. {
  372. FromAsync_Action_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
  373. createTask => Observable.FromAsync(createTask),
  374. errorObservation =>
  375. {
  376. errorObservation.AssertExceptionReportedAsUnobserved();
  377. });
  378. }
  379. [TestMethod]
  380. public void FromAsync_ActionWithCancel_UnsubscribeThenError_ErrorReportedAsUnobserved()
  381. {
  382. FromAsync_Action_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
  383. createTask => Observable.FromAsync(_ => createTask()),
  384. errorObservation =>
  385. {
  386. errorObservation.AssertExceptionReportedAsUnobserved();
  387. });
  388. }
  389. [TestMethod]
  390. public void FromAsync_Action_WithScheduler_UnsubscribeThenError_ErrorReportedAsUnobserved()
  391. {
  392. FromAsync_Action_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
  393. createTask => Observable.FromAsync(createTask, TaskPoolScheduler.Default),
  394. errorObservation =>
  395. {
  396. errorObservation.AssertExceptionReportedAsUnobserved();
  397. });
  398. }
  399. [TestMethod]
  400. public void FromAsync_ActionWithCancel_WithScheduler_UnsubscribeThenError_ErrorReportedAsUnobserved()
  401. {
  402. FromAsync_Action_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
  403. createTask => Observable.FromAsync(_ => createTask(), TaskPoolScheduler.Default),
  404. errorObservation =>
  405. {
  406. errorObservation.AssertExceptionReportedAsUnobserved();
  407. });
  408. }
  409. [TestMethod]
  410. public void FromAsync_Action_IgnorePostUnsubscribeErrors_UnsubscribeThenError_ErrorNotReportedAsUnobserved()
  411. {
  412. FromAsync_Action_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
  413. createTask => Observable.FromAsync(createTask, new TaskObservationOptions(null, ignoreExceptionsAfterUnsubscribe: true)),
  414. errorObservation =>
  415. {
  416. errorObservation.AssertExceptionNotReportedAsUnobserved();
  417. });
  418. }
  419. [TestMethod]
  420. public void FromAsync_ActionWithCancel_IgnorePostUnsubscribeErrors_UnsubscribeThenError_ErrorNotReportedAsUnobserved()
  421. {
  422. FromAsync_Action_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
  423. createTask => Observable.FromAsync(_ => createTask(), new TaskObservationOptions(scheduler: null, ignoreExceptionsAfterUnsubscribe: true)),
  424. errorObservation =>
  425. {
  426. errorObservation.AssertExceptionNotReportedAsUnobserved();
  427. });
  428. }
  429. [TestMethod]
  430. public void FromAsync_Action_WithScheduler_IgnorePostUnsubscribeErrors_UnsubscribeThenError_ErrorNotReportedAsUnobserved()
  431. {
  432. FromAsync_Action_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
  433. createTask => Observable.FromAsync(createTask, new TaskObservationOptions(TaskPoolScheduler.Default, ignoreExceptionsAfterUnsubscribe: true)),
  434. errorObservation =>
  435. {
  436. errorObservation.AssertExceptionNotReportedAsUnobserved();
  437. });
  438. }
  439. [TestMethod]
  440. public void FromAsync_ActionWithCancel_WithScheduler_IgnorePostUnsubscribeErrors_UnsubscribeThenError_ErrorNotReportedAsUnobserved()
  441. {
  442. FromAsync_Action_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
  443. createTask => Observable.FromAsync(_ => createTask(), new TaskObservationOptions(TaskPoolScheduler.Default, ignoreExceptionsAfterUnsubscribe: true)),
  444. errorObservation =>
  445. {
  446. errorObservation.AssertExceptionNotReportedAsUnobserved();
  447. });
  448. }
  449. #if DESKTOPCLR
  450. [TestMethod]
  451. public void FromAsync_Action_Scheduler1()
  452. {
  453. var e = new ManualResetEvent(false);
  454. var t = default(int);
  455. var tcs = new TaskCompletionSource<int>();
  456. var xs = Observable.FromAsync(() => (Task)tcs.Task, Scheduler.Immediate);
  457. xs.Subscribe(res =>
  458. {
  459. t = Thread.CurrentThread.ManagedThreadId;
  460. e.Set();
  461. });
  462. tcs.SetResult(42);
  463. e.WaitOne();
  464. Assert.Equal(Thread.CurrentThread.ManagedThreadId, t);
  465. }
  466. [TestMethod]
  467. public void FromAsync_Action_Scheduler2()
  468. {
  469. var e = new ManualResetEvent(false);
  470. var t = default(int);
  471. var tcs = new TaskCompletionSource<int>();
  472. var xs = Observable.FromAsync(ct => (Task)tcs.Task, Scheduler.Immediate);
  473. xs.Subscribe(res =>
  474. {
  475. t = Thread.CurrentThread.ManagedThreadId;
  476. e.Set();
  477. });
  478. tcs.SetResult(42);
  479. e.WaitOne();
  480. Assert.Equal(Thread.CurrentThread.ManagedThreadId, t);
  481. }
  482. #endif
  483. #endregion
  484. private void FromAsync_Func_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
  485. Func<Func<Task<int>>, IObservable<int>> createObservable,
  486. Action<TaskErrorObservation> testResults)
  487. {
  488. FromAsync_Func_ErrorAfterUnsubscribeReportedAsUnobserved_Core<int>(createObservable, testResults);
  489. }
  490. private void FromAsync_Action_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
  491. Func<Func<Task>, IObservable<Unit>> createObservable,
  492. Action<TaskErrorObservation> testResults)
  493. {
  494. FromAsync_Func_ErrorAfterUnsubscribeReportedAsUnobserved_Core<Unit>(createObservable, testResults);
  495. }
  496. private void FromAsync_Func_ErrorAfterUnsubscribeReportedAsUnobserved_Core<T>(
  497. Func<Func<Task<T>>, IObservable<T>> createObservable,
  498. Action<TaskErrorObservation> testResults)
  499. {
  500. using Barrier gate = new(2);
  501. using TaskErrorObservation errorObservation = new();
  502. var sub = errorObservation.SuscribeWithoutKeepingSourceReachable<T>(
  503. (setTask, exception) => createObservable(
  504. () => setTask(Task.Factory.StartNew<T>(
  505. () =>
  506. {
  507. // 1: Notify that task execution has begun
  508. gate.SignalAndWait();
  509. // 2: Wait until unsubscribe Dispose has returned
  510. gate.SignalAndWait();
  511. throw exception;
  512. })))
  513. .Subscribe());
  514. // 1: wait until task execution has begun
  515. gate.SignalAndWait();
  516. sub.Dispose();
  517. // 2: Notify that unsubscribe Dispose has returned
  518. gate.SignalAndWait();
  519. testResults(errorObservation);
  520. }
  521. }
  522. }