TaskObservableExtensionsTest.cs 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. #if !NO_TPL
  3. using System;
  4. using System.Linq;
  5. using System.Reactive;
  6. using System.Reactive.Concurrency;
  7. using System.Reactive.Linq;
  8. using System.Reactive.Threading.Tasks;
  9. using System.Threading;
  10. using System.Threading.Tasks;
  11. using Microsoft.Reactive.Testing;
  12. using Microsoft.VisualStudio.TestTools.UnitTesting;
  13. namespace ReactiveTests.Tests
  14. {
  15. [TestClass]
  16. public class TaskObservableExtensionsTest : ReactiveTest
  17. {
  18. #region ToObservable
  19. [TestMethod]
  20. public void TaskToObservable_NonVoid_ArgumentChecking()
  21. {
  22. var s = Scheduler.Immediate;
  23. ReactiveAssert.Throws<ArgumentNullException>(() => TaskObservableExtensions.ToObservable((Task<int>)null));
  24. ReactiveAssert.Throws<ArgumentNullException>(() => TaskObservableExtensions.ToObservable((Task<int>)null, s));
  25. ReactiveAssert.Throws<ArgumentNullException>(() => TaskObservableExtensions.ToObservable(Task.FromResult(42), default(IScheduler)));
  26. var tcs = new System.Threading.Tasks.TaskCompletionSource<int>();
  27. var task = tcs.Task;
  28. ReactiveAssert.Throws<ArgumentNullException>(() => task.ToObservable().Subscribe(null));
  29. }
  30. [TestMethod]
  31. public void TaskToObservable_NonVoid_Complete_BeforeCreate()
  32. {
  33. var taskScheduler = new TestTaskScheduler();
  34. var taskFactory = new TaskFactory(taskScheduler);
  35. var res = default(ITestableObserver<int>);
  36. taskFactory.StartNew(() =>
  37. {
  38. var scheduler = new TestScheduler();
  39. var taskSource = new TaskCompletionSource<int>();
  40. taskSource.Task.ContinueWith(t => { var e = t.Exception; });
  41. scheduler.ScheduleAbsolute(10, () => taskSource.SetResult(42));
  42. res = scheduler.Start(() =>
  43. taskSource.Task.ToObservable()
  44. );
  45. });
  46. res.Messages.AssertEqual(
  47. OnNext(200, 42),
  48. OnCompleted<int>(200)
  49. );
  50. }
  51. [TestMethod]
  52. public void TaskToObservable_NonVoid_Complete_BeforeSubscribe()
  53. {
  54. var taskScheduler = new TestTaskScheduler();
  55. var taskFactory = new TaskFactory(taskScheduler);
  56. var res = default(ITestableObserver<int>);
  57. taskFactory.StartNew(() =>
  58. {
  59. var scheduler = new TestScheduler();
  60. var taskSource = new TaskCompletionSource<int>();
  61. taskSource.Task.ContinueWith(t => { var e = t.Exception; });
  62. scheduler.ScheduleAbsolute(110, () => taskSource.SetResult(42));
  63. res = scheduler.Start(() =>
  64. taskSource.Task.ToObservable()
  65. );
  66. });
  67. res.Messages.AssertEqual(
  68. OnNext(200, 42),
  69. OnCompleted<int>(200)
  70. );
  71. }
  72. [TestMethod]
  73. public void TaskToObservable_NonVoid_Complete_BeforeDispose()
  74. {
  75. var taskScheduler = new TestTaskScheduler();
  76. var taskFactory = new TaskFactory(taskScheduler);
  77. var res = default(ITestableObserver<int>);
  78. taskFactory.StartNew(() =>
  79. {
  80. var scheduler = new TestScheduler();
  81. var taskSource = new TaskCompletionSource<int>();
  82. taskSource.Task.ContinueWith(t => { var e = t.Exception; });
  83. scheduler.ScheduleAbsolute(300, () => taskSource.SetResult(42));
  84. res = scheduler.Start(() =>
  85. taskSource.Task.ToObservable()
  86. );
  87. });
  88. res.Messages.AssertEqual(
  89. OnNext(300, 42),
  90. OnCompleted<int>(300)
  91. );
  92. }
  93. [TestMethod]
  94. public void TaskToObservable_NonVoid_Complete_AfterDispose()
  95. {
  96. var taskScheduler = new TestTaskScheduler();
  97. var taskFactory = new TaskFactory(taskScheduler);
  98. var res = default(ITestableObserver<int>);
  99. taskFactory.StartNew(() =>
  100. {
  101. var scheduler = new TestScheduler();
  102. var taskSource = new TaskCompletionSource<int>();
  103. taskSource.Task.ContinueWith(t => { var e = t.Exception; });
  104. scheduler.ScheduleAbsolute(1100, () => taskSource.SetResult(42));
  105. res = scheduler.Start(() =>
  106. taskSource.Task.ToObservable()
  107. );
  108. });
  109. res.Messages.AssertEqual(
  110. );
  111. }
  112. [TestMethod]
  113. public void TaskToObservable_NonVoid_Exception_BeforeCreate()
  114. {
  115. var taskScheduler = new TestTaskScheduler();
  116. var taskFactory = new TaskFactory(taskScheduler);
  117. var res = default(ITestableObserver<int>);
  118. var ex = new Exception();
  119. taskFactory.StartNew(() =>
  120. {
  121. var scheduler = new TestScheduler();
  122. var taskSource = new TaskCompletionSource<int>();
  123. taskSource.Task.ContinueWith(t => { var e = t.Exception; });
  124. scheduler.ScheduleAbsolute(10, () => taskSource.SetException(ex));
  125. res = scheduler.Start(() =>
  126. taskSource.Task.ToObservable()
  127. );
  128. });
  129. res.Messages.AssertEqual(
  130. OnError<int>(200, ex)
  131. );
  132. }
  133. [TestMethod]
  134. public void TaskToObservable_NonVoid_Exception_BeforeSubscribe()
  135. {
  136. var taskScheduler = new TestTaskScheduler();
  137. var taskFactory = new TaskFactory(taskScheduler);
  138. var res = default(ITestableObserver<int>);
  139. var ex = new Exception();
  140. taskFactory.StartNew(() =>
  141. {
  142. var scheduler = new TestScheduler();
  143. var taskSource = new TaskCompletionSource<int>();
  144. taskSource.Task.ContinueWith(t => { var e = t.Exception; });
  145. scheduler.ScheduleAbsolute(110, () => taskSource.SetException(ex));
  146. res = scheduler.Start(() =>
  147. taskSource.Task.ToObservable()
  148. );
  149. });
  150. res.Messages.AssertEqual(
  151. OnError<int>(200, ex)
  152. );
  153. }
  154. [TestMethod]
  155. public void TaskToObservable_NonVoid_Exception_BeforeDispose()
  156. {
  157. var taskScheduler = new TestTaskScheduler();
  158. var taskFactory = new TaskFactory(taskScheduler);
  159. var res = default(ITestableObserver<int>);
  160. var ex = new Exception();
  161. taskFactory.StartNew(() =>
  162. {
  163. var scheduler = new TestScheduler();
  164. var taskSource = new TaskCompletionSource<int>();
  165. taskSource.Task.ContinueWith(t => { var e = t.Exception; });
  166. scheduler.ScheduleAbsolute(300, () => taskSource.SetException(ex));
  167. res = scheduler.Start(() =>
  168. taskSource.Task.ToObservable()
  169. );
  170. });
  171. res.Messages.AssertEqual(
  172. OnError<int>(300, ex)
  173. );
  174. }
  175. [TestMethod]
  176. public void TaskToObservable_NonVoid_Exception_AfterDispose()
  177. {
  178. var taskScheduler = new TestTaskScheduler();
  179. var taskFactory = new TaskFactory(taskScheduler);
  180. var res = default(ITestableObserver<int>);
  181. taskFactory.StartNew(() =>
  182. {
  183. var scheduler = new TestScheduler();
  184. var taskSource = new TaskCompletionSource<int>();
  185. taskSource.Task.ContinueWith(t => { var e = t.Exception; });
  186. scheduler.ScheduleAbsolute(1100, () => taskSource.SetException(new Exception()));
  187. res = scheduler.Start(() =>
  188. taskSource.Task.ToObservable()
  189. );
  190. });
  191. res.Messages.AssertEqual(
  192. );
  193. }
  194. [TestMethod]
  195. public void TaskToObservable_NonVoid_Canceled_BeforeCreate()
  196. {
  197. var taskScheduler = new TestTaskScheduler();
  198. var taskFactory = new TaskFactory(taskScheduler);
  199. var res = default(ITestableObserver<int>);
  200. taskFactory.StartNew(() =>
  201. {
  202. var scheduler = new TestScheduler();
  203. var taskSource = new TaskCompletionSource<int>();
  204. taskSource.Task.ContinueWith(t => { var e = t.Exception; });
  205. scheduler.ScheduleAbsolute(10, () => taskSource.SetCanceled());
  206. res = scheduler.Start(() =>
  207. taskSource.Task.ToObservable()
  208. );
  209. });
  210. res.Messages.AssertEqual(
  211. OnError<int>(200, ex => ex is TaskCanceledException)
  212. );
  213. }
  214. [TestMethod]
  215. public void TaskToObservable_NonVoid_Canceled_BeforeSubscribe()
  216. {
  217. var taskScheduler = new TestTaskScheduler();
  218. var taskFactory = new TaskFactory(taskScheduler);
  219. var res = default(ITestableObserver<int>);
  220. taskFactory.StartNew(() =>
  221. {
  222. var scheduler = new TestScheduler();
  223. var taskSource = new TaskCompletionSource<int>();
  224. taskSource.Task.ContinueWith(t => { var e = t.Exception; });
  225. scheduler.ScheduleAbsolute(110, () => taskSource.SetCanceled());
  226. res = scheduler.Start(() =>
  227. taskSource.Task.ToObservable()
  228. );
  229. });
  230. res.Messages.AssertEqual(
  231. OnError<int>(200, ex => ex is TaskCanceledException)
  232. );
  233. }
  234. [TestMethod]
  235. public void TaskToObservable_NonVoid_Canceled_BeforeDispose()
  236. {
  237. var taskScheduler = new TestTaskScheduler();
  238. var taskFactory = new TaskFactory(taskScheduler);
  239. var res = default(ITestableObserver<int>);
  240. taskFactory.StartNew(() =>
  241. {
  242. var scheduler = new TestScheduler();
  243. var taskSource = new TaskCompletionSource<int>();
  244. taskSource.Task.ContinueWith(t => { var e = t.Exception; });
  245. scheduler.ScheduleAbsolute(300, () => taskSource.SetCanceled());
  246. res = scheduler.Start(() =>
  247. taskSource.Task.ToObservable()
  248. );
  249. });
  250. res.Messages.AssertEqual(
  251. OnError<int>(300, ex => ex is TaskCanceledException)
  252. );
  253. }
  254. [TestMethod]
  255. public void TaskToObservable_NonVoid_Canceled_AfterDispose()
  256. {
  257. var taskScheduler = new TestTaskScheduler();
  258. var taskFactory = new TaskFactory(taskScheduler);
  259. var res = default(ITestableObserver<int>);
  260. taskFactory.StartNew(() =>
  261. {
  262. var scheduler = new TestScheduler();
  263. var taskSource = new TaskCompletionSource<int>();
  264. taskSource.Task.ContinueWith(t => { var e = t.Exception; });
  265. scheduler.ScheduleAbsolute(1100, () => taskSource.SetCanceled());
  266. res = scheduler.Start(() =>
  267. taskSource.Task.ToObservable()
  268. );
  269. });
  270. res.Messages.AssertEqual(
  271. );
  272. }
  273. #if DESKTOPCLR
  274. [TestMethod]
  275. public void TaskToObservable_NonVoid_Scheduler()
  276. {
  277. var e = new ManualResetEvent(false);
  278. var x = default(int);
  279. var t = default(int);
  280. var cts = new TaskCompletionSource<int>();
  281. var xs = cts.Task.ToObservable(Scheduler.Immediate);
  282. xs.Subscribe(res =>
  283. {
  284. x = res;
  285. t = Thread.CurrentThread.ManagedThreadId;
  286. e.Set();
  287. });
  288. cts.SetResult(42);
  289. e.WaitOne();
  290. Assert.AreEqual(42, x);
  291. Assert.AreEqual(Thread.CurrentThread.ManagedThreadId, t);
  292. }
  293. #endif
  294. [TestMethod]
  295. public void TaskToObservable_Void_ArgumentChecking()
  296. {
  297. var s = Scheduler.Immediate;
  298. ReactiveAssert.Throws<ArgumentNullException>(() => TaskObservableExtensions.ToObservable((Task)null));
  299. ReactiveAssert.Throws<ArgumentNullException>(() => TaskObservableExtensions.ToObservable((Task)null, s));
  300. ReactiveAssert.Throws<ArgumentNullException>(() => TaskObservableExtensions.ToObservable((Task)Task.FromResult(42), default(IScheduler)));
  301. var tcs = new System.Threading.Tasks.TaskCompletionSource<int>();
  302. System.Threading.Tasks.Task task = tcs.Task;
  303. ReactiveAssert.Throws<ArgumentNullException>(() => task.ToObservable().Subscribe(null));
  304. }
  305. [TestMethod]
  306. public void TaskToObservable_Void_Complete_BeforeCreate()
  307. {
  308. var taskScheduler = new TestTaskScheduler();
  309. var taskFactory = new TaskFactory(taskScheduler);
  310. var res = default(ITestableObserver<Unit>);
  311. taskFactory.StartNew(() =>
  312. {
  313. var scheduler = new TestScheduler();
  314. var taskSource = new TaskCompletionSource<int>();
  315. taskSource.Task.ContinueWith(t => { var e = t.Exception; });
  316. scheduler.ScheduleAbsolute(10, () => taskSource.SetResult(42));
  317. res = scheduler.Start(() =>
  318. ((Task)taskSource.Task).ToObservable()
  319. );
  320. });
  321. res.Messages.AssertEqual(
  322. OnNext(200, new Unit()),
  323. OnCompleted<Unit>(200)
  324. );
  325. }
  326. [TestMethod]
  327. public void TaskToObservable_Void_Complete_BeforeSubscribe()
  328. {
  329. var taskScheduler = new TestTaskScheduler();
  330. var taskFactory = new TaskFactory(taskScheduler);
  331. var res = default(ITestableObserver<Unit>);
  332. taskFactory.StartNew(() =>
  333. {
  334. var scheduler = new TestScheduler();
  335. var taskSource = new TaskCompletionSource<int>();
  336. taskSource.Task.ContinueWith(t => { var e = t.Exception; });
  337. scheduler.ScheduleAbsolute(110, () => taskSource.SetResult(42));
  338. res = scheduler.Start(() =>
  339. ((Task)taskSource.Task).ToObservable()
  340. );
  341. });
  342. res.Messages.AssertEqual(
  343. OnNext(200, new Unit()),
  344. OnCompleted<Unit>(200)
  345. );
  346. }
  347. [TestMethod]
  348. public void TaskToObservable_Void_Complete_BeforeDispose()
  349. {
  350. var taskScheduler = new TestTaskScheduler();
  351. var taskFactory = new TaskFactory(taskScheduler);
  352. var res = default(ITestableObserver<Unit>);
  353. taskFactory.StartNew(() =>
  354. {
  355. var scheduler = new TestScheduler();
  356. var taskSource = new TaskCompletionSource<int>();
  357. taskSource.Task.ContinueWith(t => { var e = t.Exception; });
  358. scheduler.ScheduleAbsolute(300, () => taskSource.SetResult(42));
  359. res = scheduler.Start(() =>
  360. ((Task)taskSource.Task).ToObservable()
  361. );
  362. });
  363. res.Messages.AssertEqual(
  364. OnNext(300, new Unit()),
  365. OnCompleted<Unit>(300)
  366. );
  367. }
  368. [TestMethod]
  369. public void TaskToObservable_Void_Complete_AfterDispose()
  370. {
  371. var taskScheduler = new TestTaskScheduler();
  372. var taskFactory = new TaskFactory(taskScheduler);
  373. var res = default(ITestableObserver<Unit>);
  374. taskFactory.StartNew(() =>
  375. {
  376. var scheduler = new TestScheduler();
  377. var taskSource = new TaskCompletionSource<int>();
  378. taskSource.Task.ContinueWith(t => { var e = t.Exception; });
  379. scheduler.ScheduleAbsolute(1100, () => taskSource.SetResult(42));
  380. res = scheduler.Start(() =>
  381. ((Task)taskSource.Task).ToObservable()
  382. );
  383. });
  384. res.Messages.AssertEqual(
  385. );
  386. }
  387. [TestMethod]
  388. public void TaskToObservable_Void_Exception_BeforeCreate()
  389. {
  390. var taskScheduler = new TestTaskScheduler();
  391. var taskFactory = new TaskFactory(taskScheduler);
  392. var res = default(ITestableObserver<Unit>);
  393. var ex = new Exception();
  394. taskFactory.StartNew(() =>
  395. {
  396. var scheduler = new TestScheduler();
  397. var taskSource = new TaskCompletionSource<int>();
  398. taskSource.Task.ContinueWith(t => { var e = t.Exception; });
  399. scheduler.ScheduleAbsolute(10, () => taskSource.SetException(ex));
  400. res = scheduler.Start(() =>
  401. ((Task)taskSource.Task).ToObservable()
  402. );
  403. });
  404. res.Messages.AssertEqual(
  405. OnError<Unit>(200, ex)
  406. );
  407. }
  408. [TestMethod]
  409. public void TaskToObservable_Void_Exception_BeforeSubscribe()
  410. {
  411. var taskScheduler = new TestTaskScheduler();
  412. var taskFactory = new TaskFactory(taskScheduler);
  413. var res = default(ITestableObserver<Unit>);
  414. var ex = new Exception();
  415. taskFactory.StartNew(() =>
  416. {
  417. var scheduler = new TestScheduler();
  418. var taskSource = new TaskCompletionSource<int>();
  419. taskSource.Task.ContinueWith(t => { var e = t.Exception; });
  420. scheduler.ScheduleAbsolute(110, () => taskSource.SetException(ex));
  421. res = scheduler.Start(() =>
  422. ((Task)taskSource.Task).ToObservable()
  423. );
  424. });
  425. res.Messages.AssertEqual(
  426. OnError<Unit>(200, ex)
  427. );
  428. }
  429. [TestMethod]
  430. public void TaskToObservable_Void_Exception_BeforeDispose()
  431. {
  432. var taskScheduler = new TestTaskScheduler();
  433. var taskFactory = new TaskFactory(taskScheduler);
  434. var res = default(ITestableObserver<Unit>);
  435. var ex = new Exception();
  436. taskFactory.StartNew(() =>
  437. {
  438. var scheduler = new TestScheduler();
  439. var taskSource = new TaskCompletionSource<int>();
  440. taskSource.Task.ContinueWith(t => { var e = t.Exception; });
  441. scheduler.ScheduleAbsolute(300, () => taskSource.SetException(ex));
  442. res = scheduler.Start(() =>
  443. ((Task)taskSource.Task).ToObservable()
  444. );
  445. });
  446. res.Messages.AssertEqual(
  447. OnError<Unit>(300, ex)
  448. );
  449. }
  450. [TestMethod]
  451. public void TaskToObservable_Void_Exception_AfterDispose()
  452. {
  453. var taskScheduler = new TestTaskScheduler();
  454. var taskFactory = new TaskFactory(taskScheduler);
  455. var res = default(ITestableObserver<Unit>);
  456. taskFactory.StartNew(() =>
  457. {
  458. var scheduler = new TestScheduler();
  459. var taskSource = new TaskCompletionSource<int>();
  460. taskSource.Task.ContinueWith(t => { var e = t.Exception; });
  461. scheduler.ScheduleAbsolute(1100, () => taskSource.SetException(new Exception()));
  462. res = scheduler.Start(() =>
  463. ((Task)taskSource.Task).ToObservable()
  464. );
  465. });
  466. res.Messages.AssertEqual(
  467. );
  468. }
  469. [TestMethod]
  470. public void TaskToObservable_Void_Canceled_BeforeCreate()
  471. {
  472. var taskScheduler = new TestTaskScheduler();
  473. var taskFactory = new TaskFactory(taskScheduler);
  474. var res = default(ITestableObserver<Unit>);
  475. taskFactory.StartNew(() =>
  476. {
  477. var scheduler = new TestScheduler();
  478. var taskSource = new TaskCompletionSource<int>();
  479. taskSource.Task.ContinueWith(t => { var e = t.Exception; });
  480. scheduler.ScheduleAbsolute(10, () => taskSource.SetCanceled());
  481. res = scheduler.Start(() =>
  482. ((Task)taskSource.Task).ToObservable()
  483. );
  484. });
  485. res.Messages.AssertEqual(
  486. OnError<Unit>(200, ex => ex is TaskCanceledException)
  487. );
  488. }
  489. [TestMethod]
  490. public void TaskToObservable_Void_Canceled_BeforeSubscribe()
  491. {
  492. var taskScheduler = new TestTaskScheduler();
  493. var taskFactory = new TaskFactory(taskScheduler);
  494. var res = default(ITestableObserver<Unit>);
  495. taskFactory.StartNew(() =>
  496. {
  497. var scheduler = new TestScheduler();
  498. var taskSource = new TaskCompletionSource<int>();
  499. taskSource.Task.ContinueWith(t => { var e = t.Exception; });
  500. scheduler.ScheduleAbsolute(110, () => taskSource.SetCanceled());
  501. res = scheduler.Start(() =>
  502. ((Task)taskSource.Task).ToObservable()
  503. );
  504. });
  505. res.Messages.AssertEqual(
  506. OnError<Unit>(200, ex => ex is TaskCanceledException)
  507. );
  508. }
  509. [TestMethod]
  510. public void TaskToObservable_Void_Canceled_BeforeDispose()
  511. {
  512. var taskScheduler = new TestTaskScheduler();
  513. var taskFactory = new TaskFactory(taskScheduler);
  514. var res = default(ITestableObserver<Unit>);
  515. taskFactory.StartNew(() =>
  516. {
  517. var scheduler = new TestScheduler();
  518. var taskSource = new TaskCompletionSource<int>();
  519. taskSource.Task.ContinueWith(t => { var e = t.Exception; });
  520. scheduler.ScheduleAbsolute(300, () => taskSource.SetCanceled());
  521. res = scheduler.Start(() =>
  522. ((Task)taskSource.Task).ToObservable()
  523. );
  524. });
  525. res.Messages.AssertEqual(
  526. OnError<Unit>(300, ex => ex is TaskCanceledException)
  527. );
  528. }
  529. [TestMethod]
  530. public void TaskToObservable_Void_Canceled_AfterDispose()
  531. {
  532. var taskScheduler = new TestTaskScheduler();
  533. var taskFactory = new TaskFactory(taskScheduler);
  534. var res = default(ITestableObserver<Unit>);
  535. taskFactory.StartNew(() =>
  536. {
  537. var scheduler = new TestScheduler();
  538. var taskSource = new TaskCompletionSource<int>();
  539. taskSource.Task.ContinueWith(t => { var e = t.Exception; });
  540. scheduler.ScheduleAbsolute(1100, () => taskSource.SetCanceled());
  541. res = scheduler.Start(() =>
  542. ((Task)taskSource.Task).ToObservable()
  543. );
  544. });
  545. res.Messages.AssertEqual(
  546. );
  547. }
  548. #if DESKTOPCLR
  549. [TestMethod]
  550. public void TaskToObservable_Void_Scheduler()
  551. {
  552. var e = new ManualResetEvent(false);
  553. var t = default(int);
  554. var tcs = new TaskCompletionSource<int>();
  555. var xs = ((Task)tcs.Task).ToObservable(Scheduler.Immediate);
  556. xs.Subscribe(res =>
  557. {
  558. t = Thread.CurrentThread.ManagedThreadId;
  559. e.Set();
  560. });
  561. tcs.SetResult(42);
  562. e.WaitOne();
  563. Assert.AreEqual(Thread.CurrentThread.ManagedThreadId, t);
  564. }
  565. #endif
  566. #endregion
  567. #region ToTask
  568. [TestMethod]
  569. public void ObservableToTask_ArgumentChecking()
  570. {
  571. ReactiveAssert.Throws<ArgumentNullException>(() => TaskObservableExtensions.ToTask<int>(null));
  572. ReactiveAssert.Throws<ArgumentNullException>(() => TaskObservableExtensions.ToTask<int>(null, new CancellationToken()));
  573. ReactiveAssert.Throws<ArgumentNullException>(() => TaskObservableExtensions.ToTask<int>(null, new object()));
  574. ReactiveAssert.Throws<ArgumentNullException>(() => TaskObservableExtensions.ToTask<int>(null, new CancellationToken(), new object()));
  575. }
  576. [TestMethod]
  577. public void ObservableToTaskNoValue()
  578. {
  579. var scheduler = new TestScheduler();
  580. var xs = Observable.Empty<int>(scheduler);
  581. var continuation = xs.ToTask();
  582. scheduler.Start();
  583. Assert.IsTrue(continuation.IsFaulted);
  584. Assert.IsTrue(continuation.Exception.InnerExceptions.Count == 1 && continuation.Exception.InnerExceptions[0] is InvalidOperationException);
  585. Assert.AreEqual(1, scheduler.Clock);
  586. }
  587. [TestMethod]
  588. public void ObservableToTaskSingleValue()
  589. {
  590. var scheduler = new TestScheduler();
  591. var xs = Observable.Return(5, scheduler);
  592. var continuation = xs.ToTask();
  593. scheduler.Start();
  594. Assert.IsTrue(continuation.IsCompleted);
  595. Assert.AreEqual(5, continuation.Result);
  596. Assert.AreEqual(1, scheduler.Clock);
  597. }
  598. [TestMethod]
  599. public void ObservableToTaskMultipleValues()
  600. {
  601. var scheduler = new TestScheduler();
  602. var xs = scheduler.CreateColdObservable(
  603. OnNext(100, 1),
  604. OnNext(150, 2),
  605. OnNext(200, 3),
  606. OnCompleted<int>(200)
  607. );
  608. var continuation = xs.ToTask();
  609. scheduler.Start();
  610. Assert.IsTrue(continuation.IsCompleted);
  611. Assert.AreEqual(3, continuation.Result);
  612. xs.Subscriptions.AssertEqual(
  613. Subscribe(0, 200)
  614. );
  615. }
  616. [TestMethod]
  617. public void ObservableToTaskException()
  618. {
  619. var scheduler = new TestScheduler();
  620. var ex = new InvalidOperationException();
  621. var xs = scheduler.CreateColdObservable(
  622. OnNext(100, 1),
  623. OnNext(150, 2),
  624. OnError<int>(200, ex)
  625. );
  626. var continuation = xs.ToTask();
  627. scheduler.Start();
  628. Assert.IsTrue(continuation.IsFaulted);
  629. var ag = continuation.Exception;
  630. Assert.IsNotNull(ag);
  631. Assert.AreEqual(1, ag.InnerExceptions.Count);
  632. Assert.AreEqual(ex, ag.InnerExceptions[0]);
  633. xs.Subscriptions.AssertEqual(
  634. Subscribe(0, 200)
  635. );
  636. }
  637. [TestMethod]
  638. public void ObservableToTaskCancelled()
  639. {
  640. var scheduler = new TestScheduler();
  641. var xs = scheduler.CreateColdObservable(
  642. OnNext(100, 1),
  643. OnNext(150, 2),
  644. OnCompleted<int>(200)
  645. );
  646. var cts = new CancellationTokenSource();
  647. var continuation = xs.ToTask(cts.Token);
  648. scheduler.ScheduleAbsolute(125, cts.Cancel);
  649. scheduler.Start();
  650. Assert.IsTrue(continuation.IsCanceled);
  651. xs.Subscriptions.AssertEqual(
  652. Subscribe(0, 125)
  653. );
  654. }
  655. [TestMethod]
  656. public void ObservableToTaskWithStateSingleValue()
  657. {
  658. var state = "bar";
  659. var scheduler = new TestScheduler();
  660. var xs = Observable.Return(5, scheduler);
  661. var continuation = xs.ToTask(state);
  662. Assert.AreSame(continuation.AsyncState, state);
  663. scheduler.Start();
  664. Assert.IsTrue(continuation.IsCompleted);
  665. Assert.AreEqual(5, continuation.Result);
  666. Assert.AreEqual(1, scheduler.Clock);
  667. }
  668. #endregion
  669. }
  670. }
  671. #endif