ObservableExTest.cs 48 KB


  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. using System;
  3. using System.Collections.Generic;
  4. using System.Linq;
  5. using System.Reactive;
  6. using System.Reactive.Concurrency;
  7. using System.Reactive.Linq;
  8. using Microsoft.Reactive.Testing;
  9. using Microsoft.VisualStudio.TestTools.UnitTesting;
  10. using ReactiveTests.Dummies;
  11. namespace ReactiveTests.Tests
  12. {
  13. [TestClass]
  14. public class ObservableExTest : ReactiveTest
  15. {
  16. #region Create
  17. [TestMethod]
  18. public void Iterate_ArgumentChecking()
  19. {
  20. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableEx.Create<int>(default(Func<IObserver<int>, IEnumerable<IObservable<Object>>>)));
  21. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableEx.Create(DummyFunc<IObserver<int>, IEnumerable<IObservable<Object>>>.Instance).Subscribe(null));
  22. }
  23. IEnumerable<IObservable<Object>> ToIterate_Complete(IObservable<int> xs, IObservable<int> ys, IObservable<int> zs, IObserver<int> observer)
  24. {
  25. observer.OnNext(1);
  26. yield return xs.Select(x => new Object());
  27. observer.OnNext(2);
  28. yield return ys.Select(x => new Object());
  29. observer.OnNext(3);
  30. observer.OnCompleted();
  31. yield return zs.Select(x => new Object());
  32. observer.OnNext(4);
  33. }
  34. [TestMethod]
  35. public void Iterate_Complete()
  36. {
  37. var scheduler = new TestScheduler();
  38. var xs = scheduler.CreateColdObservable(
  39. OnNext(10, 1),
  40. OnNext(20, 2),
  41. OnNext(30, 3),
  42. OnNext(40, 4),
  43. OnCompleted<int>(50)
  44. );
  45. var ys = scheduler.CreateColdObservable(
  46. OnNext(10, 1),
  47. OnNext(20, 2),
  48. OnCompleted<int>(30)
  49. );
  50. var zs = scheduler.CreateColdObservable(
  51. OnNext(10, 1),
  52. OnNext(20, 2),
  53. OnNext(30, 3),
  54. OnNext(40, 4),
  55. OnNext(50, 5),
  56. OnCompleted<int>(60)
  57. );
  58. var res = scheduler.Start(() => ObservableEx.Create<int>(observer => ToIterate_Complete(xs, ys, zs, observer)));
  59. res.Messages.AssertEqual(
  60. OnNext(200, 1),
  61. OnNext(250, 2),
  62. OnNext(280, 3),
  63. OnCompleted<int>(280)
  64. );
  65. xs.Subscriptions.AssertEqual(
  66. Subscribe(200, 250)
  67. );
  68. ys.Subscriptions.AssertEqual(
  69. Subscribe(250, 280)
  70. );
  71. zs.Subscriptions.AssertEqual(
  72. Subscribe(280, 280)
  73. );
  74. }
  75. IEnumerable<IObservable<Object>> ToIterate_Complete_Implicit(IObservable<int> xs, IObservable<int> ys, IObservable<int> zs, IObserver<int> observer)
  76. {
  77. observer.OnNext(1);
  78. yield return xs.Select(x => new Object());
  79. observer.OnNext(2);
  80. yield return ys.Select(x => new Object());
  81. observer.OnNext(3);
  82. yield return zs.Select(x => new Object());
  83. observer.OnNext(4);
  84. }
  85. [TestMethod]
  86. public void Iterate_Complete_Implicit()
  87. {
  88. var scheduler = new TestScheduler();
  89. var xs = scheduler.CreateColdObservable(
  90. OnNext(10, 1),
  91. OnNext(20, 2),
  92. OnNext(30, 3),
  93. OnNext(40, 4),
  94. OnCompleted<int>(50)
  95. );
  96. var ys = scheduler.CreateColdObservable(
  97. OnNext(10, 1),
  98. OnNext(20, 2),
  99. OnCompleted<int>(30)
  100. );
  101. var zs = scheduler.CreateColdObservable(
  102. OnNext(10, 1),
  103. OnNext(20, 2),
  104. OnNext(30, 3),
  105. OnNext(40, 4),
  106. OnNext(50, 5),
  107. OnCompleted<int>(60)
  108. );
  109. var res = scheduler.Start(() => ObservableEx.Create<int>(observer => ToIterate_Complete_Implicit(xs, ys, zs, observer)));
  110. res.Messages.AssertEqual(
  111. OnNext(200, 1),
  112. OnNext(250, 2),
  113. OnNext(280, 3),
  114. OnNext(340, 4),
  115. OnCompleted<int>(340)
  116. );
  117. xs.Subscriptions.AssertEqual(
  118. Subscribe(200, 250)
  119. );
  120. ys.Subscriptions.AssertEqual(
  121. Subscribe(250, 280)
  122. );
  123. zs.Subscriptions.AssertEqual(
  124. Subscribe(280, 340)
  125. );
  126. }
  127. IEnumerable<IObservable<Object>> ToIterate_Throw(IObservable<int> xs, IObservable<int> ys, IObservable<int> zs, IObserver<int> observer, Exception ex)
  128. {
  129. observer.OnNext(1);
  130. yield return xs.Select(x => new Object());
  131. observer.OnNext(2);
  132. yield return ys.Select(x => new Object());
  133. observer.OnNext(3);
  134. if (xs != null)
  135. throw ex;
  136. yield return zs.Select(x => new Object());
  137. observer.OnNext(4);
  138. observer.OnCompleted();
  139. }
  140. [TestMethod]
  141. public void Iterate_Iterator_Throw()
  142. {
  143. var scheduler = new TestScheduler();
  144. var xs = scheduler.CreateColdObservable(
  145. OnNext(10, 1),
  146. OnNext(20, 2),
  147. OnNext(30, 3),
  148. OnNext(40, 4),
  149. OnCompleted<int>(50)
  150. );
  151. var ys = scheduler.CreateColdObservable(
  152. OnNext(10, 1),
  153. OnNext(20, 2),
  154. OnCompleted<int>(30)
  155. );
  156. var zs = scheduler.CreateColdObservable(
  157. OnNext(10, 1),
  158. OnNext(20, 2),
  159. OnNext(30, 3),
  160. OnNext(40, 4),
  161. OnNext(50, 5),
  162. OnCompleted<int>(60)
  163. );
  164. var ex = new Exception();
  165. var res = scheduler.Start(() => ObservableEx.Create<int>(observer => ToIterate_Throw(xs, ys, zs, observer, ex)));
  166. res.Messages.AssertEqual(
  167. OnNext(200, 1),
  168. OnNext(250, 2),
  169. OnNext(280, 3),
  170. OnError<int>(280, ex)
  171. );
  172. xs.Subscriptions.AssertEqual(
  173. Subscribe(200, 250)
  174. );
  175. ys.Subscriptions.AssertEqual(
  176. Subscribe(250, 280)
  177. );
  178. zs.Subscriptions.AssertEqual(
  179. );
  180. }
  181. IEnumerable<IObservable<Object>> ToIterate_Error(IObservable<int> xs, IObservable<int> ys, IObservable<int> zs, IObserver<int> observer, Exception ex)
  182. {
  183. observer.OnNext(1);
  184. yield return xs.Select(x => new Object());
  185. observer.OnNext(2);
  186. observer.OnError(ex);
  187. yield return ys.Select(x => new Object());
  188. observer.OnNext(3);
  189. yield return zs.Select(x => new Object());
  190. observer.OnNext(4);
  191. observer.OnCompleted();
  192. }
  193. [TestMethod]
  194. public void Iterate_Iterator_Error()
  195. {
  196. var scheduler = new TestScheduler();
  197. var ex = new Exception();
  198. var xs = scheduler.CreateColdObservable(
  199. OnNext(10, 1),
  200. OnNext(20, 2),
  201. OnNext(30, 3),
  202. OnNext(40, 4),
  203. OnCompleted<int>(50)
  204. );
  205. var ys = scheduler.CreateColdObservable(
  206. OnNext(10, 1),
  207. OnNext(20, 2),
  208. OnCompleted<int>(30)
  209. );
  210. var zs = scheduler.CreateColdObservable(
  211. OnNext(10, 1),
  212. OnNext(20, 2),
  213. OnNext(30, 3),
  214. OnNext(40, 4),
  215. OnNext(50, 5),
  216. OnCompleted<int>(60)
  217. );
  218. var res = scheduler.Start(() => ObservableEx.Create<int>(observer => ToIterate_Error(xs, ys, zs, observer, ex)));
  219. res.Messages.AssertEqual(
  220. OnNext(200, 1),
  221. OnNext(250, 2),
  222. OnError<int>(250, ex)
  223. );
  224. xs.Subscriptions.AssertEqual(
  225. Subscribe(200, 250)
  226. );
  227. ys.Subscriptions.AssertEqual(
  228. Subscribe(250, 250)
  229. );
  230. zs.Subscriptions.AssertEqual(
  231. );
  232. }
  233. IEnumerable<IObservable<Object>> ToIterate_Complete_Dispose(IObservable<int> xs, IObservable<int> ys, IObservable<int> zs, IObserver<int> observer)
  234. {
  235. observer.OnNext(1);
  236. yield return xs.Select(x => new Object());
  237. observer.OnNext(2);
  238. yield return ys.Select(x => new Object());
  239. observer.OnNext(3);
  240. yield return zs.Select(x => new Object());
  241. observer.OnNext(4);
  242. }
  243. [TestMethod]
  244. public void Iterate_Complete_Dispose()
  245. {
  246. var scheduler = new TestScheduler();
  247. var xs = scheduler.CreateColdObservable(
  248. OnNext(10, 1),
  249. OnNext(20, 2),
  250. OnNext(30, 3),
  251. OnNext(40, 4),
  252. OnCompleted<int>(50)
  253. );
  254. var ys = scheduler.CreateColdObservable(
  255. OnNext(10, 1),
  256. OnNext(20, 2),
  257. OnCompleted<int>(30)
  258. );
  259. var zs = scheduler.CreateColdObservable(
  260. OnNext(100, 1),
  261. OnNext(200, 2),
  262. OnNext(300, 3),
  263. OnNext(400, 4),
  264. OnNext(500, 5),
  265. OnNext(600, 6),
  266. OnNext(700, 7),
  267. OnNext(800, 8),
  268. OnNext(900, 9),
  269. OnNext(1000, 10)
  270. );
  271. var res = scheduler.Start(() => ObservableEx.Create<int>(observer => ToIterate_Complete_Dispose(xs, ys, zs, observer)));
  272. res.Messages.AssertEqual(
  273. OnNext(200, 1),
  274. OnNext(250, 2),
  275. OnNext(280, 3)
  276. );
  277. xs.Subscriptions.AssertEqual(
  278. Subscribe(200, 250)
  279. );
  280. ys.Subscriptions.AssertEqual(
  281. Subscribe(250, 280)
  282. );
  283. zs.Subscriptions.AssertEqual(
  284. Subscribe(280, 1000)
  285. );
  286. }
  287. [TestMethod]
  288. public void IteratorScenario()
  289. {
  290. var xs = ObservableEx.Create<int>(o => _IteratorScenario(100, 1000, o));
  291. xs.AssertEqual(new[] { 100, 1000 }.ToObservable());
  292. }
  293. static IEnumerable<IObservable<Object>> _IteratorScenario(int x, int y, IObserver<int> results)
  294. {
  295. var xs = Observable.Range(1, x).ToListObservable();
  296. yield return xs;
  297. results.OnNext(xs.Value);
  298. var ys = Observable.Range(1, y).ToListObservable();
  299. yield return ys;
  300. results.OnNext(ys.Value);
  301. }
  302. [TestMethod]
  303. public void Iterate_Void_ArgumentChecking()
  304. {
  305. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableEx.Create(default(Func<IEnumerable<IObservable<object>>>)));
  306. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableEx.Create(DummyFunc<IEnumerable<IObservable<Object>>>.Instance).Subscribe(null));
  307. }
  308. IEnumerable<IObservable<Object>> ToIterate_Void_Complete(IObservable<int> xs, IObservable<int> ys, IObservable<int> zs)
  309. {
  310. yield return xs.Select(x => new Object());
  311. yield return ys.Select(x => new Object());
  312. yield return zs.Select(x => new Object());
  313. }
  314. [TestMethod]
  315. public void Iterate_Void_Complete()
  316. {
  317. var scheduler = new TestScheduler();
  318. var xs = scheduler.CreateColdObservable(
  319. OnNext(10, 1),
  320. OnNext(20, 2),
  321. OnNext(30, 3),
  322. OnNext(40, 4),
  323. OnCompleted<int>(50)
  324. );
  325. var ys = scheduler.CreateColdObservable(
  326. OnNext(10, 1),
  327. OnNext(20, 2),
  328. OnCompleted<int>(30)
  329. );
  330. var zs = scheduler.CreateColdObservable(
  331. OnNext(10, 1),
  332. OnNext(20, 2),
  333. OnNext(30, 3),
  334. OnNext(40, 4),
  335. OnNext(50, 5),
  336. OnCompleted<int>(60)
  337. );
  338. var res = scheduler.Start(() => ObservableEx.Create(() => ToIterate_Void_Complete(xs, ys, zs)));
  339. res.Messages.AssertEqual(
  340. OnCompleted<Unit>(340)
  341. );
  342. xs.Subscriptions.AssertEqual(
  343. Subscribe(200, 250)
  344. );
  345. ys.Subscriptions.AssertEqual(
  346. Subscribe(250, 280)
  347. );
  348. zs.Subscriptions.AssertEqual(
  349. Subscribe(280, 340)
  350. );
  351. }
  352. IEnumerable<IObservable<Object>> ToIterate_Void_Complete_Implicit(IObservable<int> xs, IObservable<int> ys, IObservable<int> zs)
  353. {
  354. yield return xs.Select(x => new Object());
  355. yield return ys.Select(x => new Object());
  356. yield return zs.Select(x => new Object());
  357. }
  358. [TestMethod]
  359. public void Iterate_Void_Complete_Implicit()
  360. {
  361. var scheduler = new TestScheduler();
  362. var xs = scheduler.CreateColdObservable(
  363. OnNext(10, 1),
  364. OnNext(20, 2),
  365. OnNext(30, 3),
  366. OnNext(40, 4),
  367. OnCompleted<int>(50)
  368. );
  369. var ys = scheduler.CreateColdObservable(
  370. OnNext(10, 1),
  371. OnNext(20, 2),
  372. OnCompleted<int>(30)
  373. );
  374. var zs = scheduler.CreateColdObservable(
  375. OnNext(10, 1),
  376. OnNext(20, 2),
  377. OnNext(30, 3),
  378. OnNext(40, 4),
  379. OnNext(50, 5),
  380. OnCompleted<int>(60)
  381. );
  382. var res = scheduler.Start(() => ObservableEx.Create(() => ToIterate_Void_Complete_Implicit(xs, ys, zs)));
  383. res.Messages.AssertEqual(
  384. OnCompleted<Unit>(340)
  385. );
  386. xs.Subscriptions.AssertEqual(
  387. Subscribe(200, 250)
  388. );
  389. ys.Subscriptions.AssertEqual(
  390. Subscribe(250, 280)
  391. );
  392. zs.Subscriptions.AssertEqual(
  393. Subscribe(280, 340)
  394. );
  395. }
  396. IEnumerable<IObservable<Object>> ToIterate_Void_Throw(IObservable<int> xs, IObservable<int> ys, IObservable<int> zs, Exception ex)
  397. {
  398. yield return xs.Select(x => new Object());
  399. yield return ys.Select(x => new Object());
  400. if (xs != null)
  401. throw ex;
  402. yield return zs.Select(x => new Object());
  403. }
  404. [TestMethod]
  405. public void Iterate_Void_Iterator_Throw()
  406. {
  407. var scheduler = new TestScheduler();
  408. var xs = scheduler.CreateColdObservable(
  409. OnNext(10, 1),
  410. OnNext(20, 2),
  411. OnNext(30, 3),
  412. OnNext(40, 4),
  413. OnCompleted<int>(50)
  414. );
  415. var ys = scheduler.CreateColdObservable(
  416. OnNext(10, 1),
  417. OnNext(20, 2),
  418. OnCompleted<int>(30)
  419. );
  420. var zs = scheduler.CreateColdObservable(
  421. OnNext(10, 1),
  422. OnNext(20, 2),
  423. OnNext(30, 3),
  424. OnNext(40, 4),
  425. OnNext(50, 5),
  426. OnCompleted<int>(60)
  427. );
  428. var ex = new Exception();
  429. var res = scheduler.Start(() => ObservableEx.Create(() => ToIterate_Void_Throw(xs, ys, zs, ex)));
  430. res.Messages.AssertEqual(
  431. OnError<Unit>(280, ex)
  432. );
  433. xs.Subscriptions.AssertEqual(
  434. Subscribe(200, 250)
  435. );
  436. ys.Subscriptions.AssertEqual(
  437. Subscribe(250, 280)
  438. );
  439. zs.Subscriptions.AssertEqual(
  440. );
  441. }
  442. IEnumerable<IObservable<Object>> ToIterate_Void_Complete_Dispose(IObservable<int> xs, IObservable<int> ys, IObservable<int> zs)
  443. {
  444. yield return xs.Select(x => new Object());
  445. yield return ys.Select(x => new Object());
  446. yield return zs.Select(x => new Object());
  447. }
  448. [TestMethod]
  449. public void Iterate_Void_Complete_Dispose()
  450. {
  451. var scheduler = new TestScheduler();
  452. var xs = scheduler.CreateColdObservable(
  453. OnNext(10, 1),
  454. OnNext(20, 2),
  455. OnNext(30, 3),
  456. OnNext(40, 4),
  457. OnCompleted<int>(50)
  458. );
  459. var ys = scheduler.CreateColdObservable(
  460. OnNext(10, 1),
  461. OnNext(20, 2),
  462. OnCompleted<int>(30)
  463. );
  464. var zs = scheduler.CreateColdObservable(
  465. OnNext(100, 1),
  466. OnNext(200, 2),
  467. OnNext(300, 3),
  468. OnNext(400, 4),
  469. OnNext(500, 5),
  470. OnNext(600, 6),
  471. OnNext(700, 7),
  472. OnNext(800, 8),
  473. OnNext(900, 9),
  474. OnNext(1000, 10)
  475. );
  476. var res = scheduler.Start(() => ObservableEx.Create(() => ToIterate_Void_Complete_Dispose(xs, ys, zs)));
  477. res.Messages.AssertEqual(
  478. );
  479. xs.Subscriptions.AssertEqual(
  480. Subscribe(200, 250)
  481. );
  482. ys.Subscriptions.AssertEqual(
  483. Subscribe(250, 280)
  484. );
  485. zs.Subscriptions.AssertEqual(
  486. Subscribe(280, 1000)
  487. );
  488. }
  489. [TestMethod]
  490. [Ignore]
  491. public void Iterate_Void_Func_Throw()
  492. {
  493. var scheduler = new TestScheduler();
  494. ReactiveAssert.Throws<InvalidOperationException>(() => scheduler.Start(() => ObservableEx.Create(() => { throw new InvalidOperationException(); })));
  495. }
  496. static IEnumerable<IObservable<Object>> _IteratorScenario_Void(int x, int y)
  497. {
  498. var xs = Observable.Range(1, x).ToListObservable();
  499. yield return xs;
  500. var ys = Observable.Range(1, y).ToListObservable();
  501. yield return ys;
  502. }
  503. [TestMethod]
  504. public void IteratorScenario_Void()
  505. {
  506. var xs = ObservableEx.Create(() => _IteratorScenario_Void(100, 1000));
  507. xs.AssertEqual(new Unit[] { }.ToObservable());
  508. }
  509. #endregion
  510. #region Expand
  511. [TestMethod]
  512. public void Expand_ArgumentChecking()
  513. {
  514. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableEx.Expand(null, DummyFunc<int, IObservable<int>>.Instance, DummyScheduler.Instance));
  515. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableEx.Expand(DummyObservable<int>.Instance, null, DummyScheduler.Instance));
  516. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableEx.Expand(DummyObservable<int>.Instance, DummyFunc<int, IObservable<int>>.Instance, null));
  517. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableEx.Expand(null, DummyFunc<int, IObservable<int>>.Instance));
  518. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableEx.Expand(DummyObservable<int>.Instance, null));
  519. }
  520. [TestMethod]
  521. public void Expand_Default()
  522. {
  523. var b = Observable.Return(1).Expand(x => x < 10 ? Observable.Return(x + 1) : Observable.Empty<int>())
  524. .SequenceEqual(Observable.Range(1, 10)).First();
  525. Assert.IsTrue(b);
  526. }
  527. [TestMethod]
  528. public void Expand_Empty()
  529. {
  530. var scheduler = new TestScheduler();
  531. var xs = scheduler.CreateHotObservable(
  532. OnCompleted<int>(300)
  533. );
  534. var res = scheduler.Start(() =>
  535. xs.Expand(x => scheduler.CreateColdObservable(
  536. OnNext(100, 1),
  537. OnNext(200, 2),
  538. OnCompleted<int>(300)
  539. ), scheduler)
  540. );
  541. res.Messages.AssertEqual(
  542. OnCompleted<int>(300)
  543. );
  544. xs.Subscriptions.AssertEqual(
  545. Subscribe(201, 300)
  546. );
  547. }
  548. [TestMethod]
  549. public void Expand_Error()
  550. {
  551. var scheduler = new TestScheduler();
  552. var ex = new Exception();
  553. var xs = scheduler.CreateHotObservable(
  554. OnError<int>(300, ex)
  555. );
  556. var res = scheduler.Start(() =>
  557. xs.Expand(x => scheduler.CreateColdObservable<int>(
  558. OnNext(100 + x, 2 * x),
  559. OnNext(200 + x, 3 * x),
  560. OnCompleted<int>(300 + x)
  561. ), scheduler)
  562. );
  563. res.Messages.AssertEqual(
  564. OnError<int>(300, ex)
  565. );
  566. xs.Subscriptions.AssertEqual(
  567. Subscribe(201, 300)
  568. );
  569. }
  570. [TestMethod]
  571. public void Expand_Never()
  572. {
  573. var scheduler = new TestScheduler();
  574. var xs = scheduler.CreateHotObservable<int>(
  575. );
  576. var res = scheduler.Start(() =>
  577. xs.Expand(x => scheduler.CreateColdObservable<int>(
  578. OnNext(100 + x, 2 * x),
  579. OnNext(200 + x, 3 * x),
  580. OnCompleted<int>(300 + x)
  581. ), scheduler)
  582. );
  583. res.Messages.AssertEqual(
  584. );
  585. xs.Subscriptions.AssertEqual(
  586. Subscribe(201, 1000)
  587. );
  588. }
  589. [TestMethod]
  590. public void Expand_Basic()
  591. {
  592. var scheduler = new TestScheduler();
  593. var xs = scheduler.CreateHotObservable(
  594. OnNext(550, 1),
  595. OnNext(850, 2),
  596. OnCompleted<int>(950)
  597. );
  598. var res = scheduler.Start(() =>
  599. xs.Expand(x => scheduler.CreateColdObservable(
  600. OnNext(100, 2 * x),
  601. OnNext(200, 3 * x),
  602. OnCompleted<int>(300)
  603. ), scheduler)
  604. );
  605. res.Messages.AssertEqual(
  606. OnNext(550, 1),
  607. OnNext(651, 2),
  608. OnNext(751, 3),
  609. OnNext(752, 4),
  610. OnNext(850, 2),
  611. OnNext(852, 6),
  612. OnNext(852, 6),
  613. OnNext(853, 8),
  614. OnNext(951, 4),
  615. OnNext(952, 9),
  616. OnNext(952, 12),
  617. OnNext(953, 12),
  618. OnNext(953, 12),
  619. OnNext(954, 16)
  620. );
  621. xs.Subscriptions.AssertEqual(
  622. Subscribe(201, 950)
  623. );
  624. }
  625. [TestMethod]
  626. public void Expand_Throw()
  627. {
  628. var scheduler = new TestScheduler();
  629. var xs = scheduler.CreateHotObservable(
  630. OnNext(550, 1),
  631. OnNext(850, 2),
  632. OnCompleted<int>(950)
  633. );
  634. var ex = new Exception();
  635. var res = scheduler.Start(() =>
  636. xs.Expand(x => { throw ex; }, scheduler)
  637. );
  638. res.Messages.AssertEqual(
  639. OnNext(550, 1),
  640. OnError<int>(550, ex)
  641. );
  642. xs.Subscriptions.AssertEqual(
  643. Subscribe(201, 550)
  644. );
  645. }
  646. #endregion
  647. #region ForkJoin
  648. [TestMethod]
  649. public void ForkJoin_ArgumentChecking()
  650. {
  651. var someObservable = DummyObservable<int>.Instance;
  652. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableEx.ForkJoin(someObservable, someObservable, (Func<int, int, int>)null));
  653. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableEx.ForkJoin(someObservable, (IObservable<int>)null, (_, __) => _ + __));
  654. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableEx.ForkJoin((IObservable<int>)null, someObservable, (_, __) => _ + __));
  655. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableEx.ForkJoin((IObservable<int>[])null));
  656. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableEx.ForkJoin((IEnumerable<IObservable<int>>)null));
  657. }
  658. [TestMethod]
  659. public void ForkJoin_EmptyEmpty()
  660. {
  661. var scheduler = new TestScheduler();
  662. var msgs1 = new[] {
  663. OnNext(150, 1),
  664. OnCompleted<int>(230)
  665. };
  666. var msgs2 = new[] {
  667. OnNext(150, 1),
  668. OnCompleted<int>(250)
  669. };
  670. var o = scheduler.CreateHotObservable(msgs1);
  671. var e = scheduler.CreateHotObservable(msgs2);
  672. var res = scheduler.Start(() => e.ForkJoin(o, (x, y) => x + y));
  673. res.Messages.AssertEqual(
  674. OnCompleted<int>(250)
  675. );
  676. }
  677. [TestMethod]
  678. public void ForkJoin_None()
  679. {
  680. var scheduler = new TestScheduler();
  681. var res = scheduler.Start(() => ObservableEx.ForkJoin<int>());
  682. res.Messages.AssertEqual(
  683. OnCompleted<int[]>(200)
  684. );
  685. }
  686. [TestMethod]
  687. public void ForkJoin_EmptyReturn()
  688. {
  689. var scheduler = new TestScheduler();
  690. var msgs1 = new[] {
  691. OnNext(150, 1),
  692. OnCompleted<int>(230)
  693. };
  694. var msgs2 = new[] {
  695. OnNext(150, 1),
  696. OnNext(210, 2),
  697. OnCompleted<int>(250)
  698. };
  699. var o = scheduler.CreateHotObservable(msgs1);
  700. var e = scheduler.CreateHotObservable(msgs2);
  701. var res = scheduler.Start(() => e.ForkJoin(o, (x, y) => x + y));
  702. res.Messages.AssertEqual(
  703. OnCompleted<int>(250)
  704. );
  705. }
  706. [TestMethod]
  707. public void ForkJoin_ReturnEmpty()
  708. {
  709. var scheduler = new TestScheduler();
  710. var msgs1 = new[] {
  711. OnNext(150, 1),
  712. OnNext(210, 2),
  713. OnCompleted<int>(230)
  714. };
  715. var msgs2 = new[] {
  716. OnNext(150, 1),
  717. OnCompleted<int>(250)
  718. };
  719. var o = scheduler.CreateHotObservable(msgs1);
  720. var e = scheduler.CreateHotObservable(msgs2);
  721. var res = scheduler.Start(() => e.ForkJoin(o, (x, y) => x + y));
  722. res.Messages.AssertEqual(
  723. OnCompleted<int>(250)
  724. );
  725. }
  726. [TestMethod]
  727. public void ForkJoin_ReturnReturn()
  728. {
  729. var scheduler = new TestScheduler();
  730. var msgs1 = new[] {
  731. OnNext(150, 1),
  732. OnNext(210, 2),
  733. OnCompleted<int>(230)
  734. };
  735. var msgs2 = new[] {
  736. OnNext(150, 1),
  737. OnNext(220, 3),
  738. OnCompleted<int>(250)
  739. };
  740. var o = scheduler.CreateHotObservable(msgs1);
  741. var e = scheduler.CreateHotObservable(msgs2);
  742. var res = scheduler.Start(() => e.ForkJoin(o, (x, y) => x + y));
  743. res.Messages.AssertEqual(
  744. OnNext(250, 2 + 3),
  745. OnCompleted<int>(250)
  746. );
  747. }
  748. [TestMethod]
  749. public void ForkJoin_EmptyThrow()
  750. {
  751. var ex = new Exception();
  752. var scheduler = new TestScheduler();
  753. var msgs1 = new[] {
  754. OnNext(150, 1),
  755. OnCompleted<int>(230)
  756. };
  757. var msgs2 = new[] {
  758. OnNext(150, 1),
  759. OnError<int>(210, ex),
  760. OnCompleted<int>(250)
  761. };
  762. var o = scheduler.CreateHotObservable(msgs1);
  763. var e = scheduler.CreateHotObservable(msgs2);
  764. var res = scheduler.Start(() => e.ForkJoin(o, (x, y) => x + y));
  765. res.Messages.AssertEqual(
  766. OnError<int>(210, ex)
  767. );
  768. }
  769. [TestMethod]
  770. public void ForkJoin_ThrowEmpty()
  771. {
  772. var ex = new Exception();
  773. var scheduler = new TestScheduler();
  774. var msgs1 = new[] {
  775. OnNext(150, 1),
  776. OnError<int>(210, ex),
  777. OnCompleted<int>(230)
  778. };
  779. var msgs2 = new[] {
  780. OnNext(150, 1),
  781. OnCompleted<int>(250)
  782. };
  783. var o = scheduler.CreateHotObservable(msgs1);
  784. var e = scheduler.CreateHotObservable(msgs2);
  785. var res = scheduler.Start(() => e.ForkJoin(o, (x, y) => x + y));
  786. res.Messages.AssertEqual(
  787. OnError<int>(210, ex)
  788. );
  789. }
  790. [TestMethod]
  791. public void ForkJoin_ReturnThrow()
  792. {
  793. var ex = new Exception();
  794. var scheduler = new TestScheduler();
  795. var msgs1 = new[] {
  796. OnNext(150, 1),
  797. OnNext(210, 2),
  798. OnCompleted<int>(230)
  799. };
  800. var msgs2 = new[] {
  801. OnNext(150, 1),
  802. OnError<int>(220, ex),
  803. OnCompleted<int>(250)
  804. };
  805. var o = scheduler.CreateHotObservable(msgs1);
  806. var e = scheduler.CreateHotObservable(msgs2);
  807. var res = scheduler.Start(() => e.ForkJoin(o, (x, y) => x + y));
  808. res.Messages.AssertEqual(
  809. OnError<int>(220, ex)
  810. );
  811. }
  812. [TestMethod]
  813. public void ForkJoin_ThrowReturn()
  814. {
  815. var ex = new Exception();
  816. var scheduler = new TestScheduler();
  817. var msgs1 = new[] {
  818. OnNext(150, 1),
  819. OnError<int>(220, ex),
  820. OnCompleted<int>(230)
  821. };
  822. var msgs2 = new[] {
  823. OnNext(150, 1),
  824. OnNext(210, 2),
  825. OnCompleted<int>(250)
  826. };
  827. var o = scheduler.CreateHotObservable(msgs1);
  828. var e = scheduler.CreateHotObservable(msgs2);
  829. var res = scheduler.Start(() => e.ForkJoin(o, (x, y) => x + y));
  830. res.Messages.AssertEqual(
  831. OnError<int>(220, ex)
  832. );
  833. }
  834. [TestMethod]
  835. public void ForkJoin_Binary()
  836. {
  837. var scheduler = new TestScheduler();
  838. var msgs1 = new[] {
  839. OnNext(150, 1),
  840. OnNext(215, 2),
  841. OnNext(225, 4),
  842. OnCompleted<int>(230)
  843. };
  844. var msgs2 = new[] {
  845. OnNext(150, 1),
  846. OnNext(235, 6),
  847. OnNext(240, 7),
  848. OnCompleted<int>(250)
  849. };
  850. var o = scheduler.CreateHotObservable(msgs1);
  851. var e = scheduler.CreateHotObservable(msgs2);
  852. var res = scheduler.Start(() => e.ForkJoin(o, (x, y) => x + y));
  853. res.Messages.AssertEqual(
  854. OnNext(250, 4 + 7), // TODO: fix ForkJoin behavior
  855. OnCompleted<int>(250)
  856. );
  857. }
  858. [TestMethod]
  859. public void ForkJoin_NaryParams()
  860. {
  861. var scheduler = new TestScheduler();
  862. var msgs1 = new[] {
  863. OnNext(150, 1),
  864. OnNext(215, 2),
  865. OnNext(225, 4),
  866. OnCompleted<int>(230)
  867. };
  868. var msgs2 = new[] {
  869. OnNext(150, 1),
  870. OnNext(235, 6),
  871. OnNext(240, 7),
  872. OnCompleted<int>(250)
  873. };
  874. var msgs3 = new[] {
  875. OnNext(150, 1),
  876. OnNext(230, 3),
  877. OnNext(245, 5),
  878. OnCompleted<int>(270)
  879. };
  880. var o1 = scheduler.CreateHotObservable(msgs1);
  881. var o2 = scheduler.CreateHotObservable(msgs2);
  882. var o3 = scheduler.CreateHotObservable(msgs3);
  883. var res = scheduler.Start(() => ObservableEx.ForkJoin(o1, o2, o3));
  884. res.Messages.AssertEqual(
  885. OnNext<int[]>(270, l => l.SequenceEqual(new[] { 4, 7, 5 })), // TODO: fix ForkJoin behavior
  886. OnCompleted<int[]>(270)
  887. );
  888. }
  889. [TestMethod]
  890. public void ForkJoin_NaryParamsEmpty()
  891. {
  892. var scheduler = new TestScheduler();
  893. var msgs1 = new[] {
  894. OnNext(150, 1),
  895. OnNext(215, 2),
  896. OnNext(225, 4),
  897. OnCompleted<int>(230)
  898. };
  899. var msgs2 = new[] {
  900. OnNext(150, 1),
  901. OnNext(235, 6),
  902. OnNext(240, 7),
  903. OnCompleted<int>(250)
  904. };
  905. var msgs3 = new[] {
  906. OnCompleted<int>(270)
  907. };
  908. var o1 = scheduler.CreateHotObservable(msgs1);
  909. var o2 = scheduler.CreateHotObservable(msgs2);
  910. var o3 = scheduler.CreateHotObservable(msgs3);
  911. var res = scheduler.Start(() => ObservableEx.ForkJoin(o1, o2, o3));
  912. res.Messages.AssertEqual(
  913. OnCompleted<int[]>(270)
  914. );
  915. }
  916. [TestMethod]
  917. public void ForkJoin_NaryParamsEmptyBeforeEnd()
  918. {
  919. var scheduler = new TestScheduler();
  920. var msgs1 = new[] {
  921. OnNext(150, 1),
  922. OnNext(215, 2),
  923. OnNext(225, 4),
  924. OnCompleted<int>(230)
  925. };
  926. var msgs2 = new[] {
  927. OnCompleted<int>(235)
  928. };
  929. var msgs3 = new[] {
  930. OnNext(150, 1),
  931. OnNext(230, 3),
  932. OnNext(245, 5),
  933. OnCompleted<int>(270)
  934. };
  935. var o1 = scheduler.CreateHotObservable(msgs1);
  936. var o2 = scheduler.CreateHotObservable(msgs2);
  937. var o3 = scheduler.CreateHotObservable(msgs3);
  938. var res = scheduler.Start(() => ObservableEx.ForkJoin(o1, o2, o3));
  939. res.Messages.AssertEqual(
  940. OnCompleted<int[]>(235)
  941. );
  942. }
  943. [TestMethod]
  944. public void ForkJoin_Nary_Immediate()
  945. {
  946. ObservableEx.ForkJoin(Observable.Return(1), Observable.Return(2)).First().SequenceEqual(new[] { 1, 2 });
  947. }
  948. [TestMethod]
  949. public void ForkJoin_Nary_Virtual_And_Immediate()
  950. {
  951. var scheduler = new TestScheduler();
  952. var msgs1 = new[] {
  953. OnNext(150, 1),
  954. OnNext(215, 2),
  955. OnNext(225, 4),
  956. OnCompleted<int>(230)
  957. };
  958. var msgs2 = new[] {
  959. OnNext(150, 1),
  960. OnNext(235, 6),
  961. OnNext(240, 7),
  962. OnCompleted<int>(250)
  963. };
  964. var msgs3 = new[] {
  965. OnNext(150, 1),
  966. OnNext(230, 3),
  967. OnNext(245, 5),
  968. OnCompleted<int>(270)
  969. };
  970. var o1 = scheduler.CreateHotObservable(msgs1);
  971. var o2 = scheduler.CreateHotObservable(msgs2);
  972. var o3 = scheduler.CreateHotObservable(msgs3);
  973. var res = scheduler.Start(() => ObservableEx.ForkJoin(new List<IObservable<int>> { o1, o2, o3, Observable.Return(20) }));
  974. res.Messages.AssertEqual(
  975. OnNext<int[]>(270, l => l.SequenceEqual(new[] { 4, 7, 5, 20 })),
  976. OnCompleted<int[]>(270)
  977. );
  978. }
  979. [TestMethod]
  980. public void ForkJoin_Nary_Immediate_And_Virtual()
  981. {
  982. var scheduler = new TestScheduler();
  983. var msgs1 = new[] {
  984. OnNext(150, 1),
  985. OnNext(215, 2),
  986. OnNext(225, 4),
  987. OnCompleted<int>(230)
  988. };
  989. var msgs2 = new[] {
  990. OnNext(150, 1),
  991. OnNext(235, 6),
  992. OnNext(240, 7),
  993. OnCompleted<int>(250)
  994. };
  995. var msgs3 = new[] {
  996. OnNext(150, 1),
  997. OnNext(230, 3),
  998. OnNext(245, 5),
  999. OnCompleted<int>(270)
  1000. };
  1001. var o1 = scheduler.CreateHotObservable(msgs1);
  1002. var o2 = scheduler.CreateHotObservable(msgs2);
  1003. var o3 = scheduler.CreateHotObservable(msgs3);
  1004. var res = scheduler.Start(() => ObservableEx.ForkJoin(new List<IObservable<int>> { Observable.Return(20), o1, o2, o3 }));
  1005. res.Messages.AssertEqual(
  1006. OnNext<int[]>(270, l => l.SequenceEqual(new[] { 20, 4, 7, 5 })),
  1007. OnCompleted<int[]>(270)
  1008. );
  1009. }
  1010. [TestMethod]
  1011. public void ForkJoin_Nary()
  1012. {
  1013. var scheduler = new TestScheduler();
  1014. var msgs1 = new[] {
  1015. OnNext(150, 1),
  1016. OnNext(215, 2),
  1017. OnNext(225, 4),
  1018. OnCompleted<int>(230)
  1019. };
  1020. var msgs2 = new[] {
  1021. OnNext(150, 1),
  1022. OnNext(235, 6),
  1023. OnNext(240, 7),
  1024. OnCompleted<int>(250)
  1025. };
  1026. var msgs3 = new[] {
  1027. OnNext(150, 1),
  1028. OnNext(230, 3),
  1029. OnNext(245, 5),
  1030. OnCompleted<int>(270)
  1031. };
  1032. var o1 = scheduler.CreateHotObservable(msgs1);
  1033. var o2 = scheduler.CreateHotObservable(msgs2);
  1034. var o3 = scheduler.CreateHotObservable(msgs3);
  1035. var res = scheduler.Start(() => ObservableEx.ForkJoin(new List<IObservable<int>> { o1, o2, o3 }));
  1036. res.Messages.AssertEqual(
  1037. OnNext<int[]>(270, l => l.SequenceEqual(new[] { 4, 7, 5 })), // TODO: fix ForkJoin behavior
  1038. OnCompleted<int[]>(270)
  1039. );
  1040. }
  1041. [TestMethod]
  1042. public void Bug_1302_SelectorThrows_LeftLast()
  1043. {
  1044. var scheduler = new TestScheduler();
  1045. var xs = scheduler.CreateHotObservable(
  1046. OnNext(210, 1),
  1047. OnCompleted<int>(220)
  1048. );
  1049. var ys = scheduler.CreateHotObservable(
  1050. OnNext(215, 2),
  1051. OnCompleted<int>(217)
  1052. );
  1053. var ex = new Exception();
  1054. var results = scheduler.Start(() => xs.ForkJoin<int, int, int>(ys, (x, y) => { throw ex; }));
  1055. results.Messages.AssertEqual(
  1056. OnError<int>(220, ex)
  1057. );
  1058. xs.Subscriptions.AssertEqual(
  1059. Subscribe(200, 220)
  1060. );
  1061. ys.Subscriptions.AssertEqual(
  1062. Subscribe(200, 217)
  1063. );
  1064. }
  1065. [TestMethod]
  1066. public void Bug_1302_SelectorThrows_RightLast()
  1067. {
  1068. var scheduler = new TestScheduler();
  1069. var xs = scheduler.CreateHotObservable(
  1070. OnNext(210, 1),
  1071. OnCompleted<int>(217)
  1072. );
  1073. var ys = scheduler.CreateHotObservable(
  1074. OnNext(215, 2),
  1075. OnCompleted<int>(220)
  1076. );
  1077. var ex = new Exception();
  1078. var results = scheduler.Start(() => xs.ForkJoin<int, int, int>(ys, (x, y) => { throw ex; }));
  1079. results.Messages.AssertEqual(
  1080. OnError<int>(220, ex)
  1081. );
  1082. xs.Subscriptions.AssertEqual(
  1083. Subscribe(200, 217)
  1084. );
  1085. ys.Subscriptions.AssertEqual(
  1086. Subscribe(200, 220)
  1087. );
  1088. }
  1089. [TestMethod]
  1090. public void Bug_1302_RightLast_NoLeft()
  1091. {
  1092. var scheduler = new TestScheduler();
  1093. var xs = scheduler.CreateHotObservable(
  1094. OnCompleted<int>(217)
  1095. );
  1096. var ys = scheduler.CreateHotObservable(
  1097. OnNext(215, 2),
  1098. OnCompleted<int>(220)
  1099. );
  1100. var results = scheduler.Start(() => xs.ForkJoin<int, int, int>(ys, (x, y) => x + y));
  1101. results.Messages.AssertEqual(
  1102. OnCompleted<int>(220)
  1103. );
  1104. xs.Subscriptions.AssertEqual(
  1105. Subscribe(200, 217)
  1106. );
  1107. ys.Subscriptions.AssertEqual(
  1108. Subscribe(200, 220)
  1109. );
  1110. }
  1111. [TestMethod]
  1112. public void Bug_1302_RightLast_NoRight()
  1113. {
  1114. var scheduler = new TestScheduler();
  1115. var xs = scheduler.CreateHotObservable(
  1116. OnNext(215, 2),
  1117. OnCompleted<int>(217)
  1118. );
  1119. var ys = scheduler.CreateHotObservable(
  1120. OnCompleted<int>(220)
  1121. );
  1122. var results = scheduler.Start(() => xs.ForkJoin<int, int, int>(ys, (x, y) => x + y));
  1123. results.Messages.AssertEqual(
  1124. OnCompleted<int>(220)
  1125. );
  1126. xs.Subscriptions.AssertEqual(
  1127. Subscribe(200, 217)
  1128. );
  1129. ys.Subscriptions.AssertEqual(
  1130. Subscribe(200, 220)
  1131. );
  1132. }
  1133. #endregion
  1134. #region Let
  1135. [TestMethod]
  1136. public void Let_ArgumentChecking()
  1137. {
  1138. var someObservable = Observable.Empty<int>();
  1139. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableEx.Let(default(IObservable<int>), x => x));
  1140. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableEx.Let<int, int>(someObservable, null));
  1141. }
  1142. [TestMethod]
  1143. public void Let_CallsFunctionImmediately()
  1144. {
  1145. bool called = false;
  1146. Observable.Empty<int>().Let(x => { called = true; return x; });
  1147. Assert.IsTrue(called);
  1148. }
  1149. #endregion
  1150. #region ManySelect
  1151. [TestMethod]
  1152. public void ManySelect_ArgumentChecking()
  1153. {
  1154. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableEx.ManySelect<int, int>(null, DummyFunc<IObservable<int>, int>.Instance, DummyScheduler.Instance));
  1155. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableEx.ManySelect<int, int>(DummyObservable<int>.Instance, null, DummyScheduler.Instance));
  1156. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableEx.ManySelect<int, int>(DummyObservable<int>.Instance, DummyFunc<IObservable<int>, int>.Instance, null));
  1157. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableEx.ManySelect<int, int>(null, DummyFunc<IObservable<int>, int>.Instance));
  1158. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableEx.ManySelect<int, int>(DummyObservable<int>.Instance, null));
  1159. }
  1160. [TestMethod]
  1161. public void ManySelect_Law_1()
  1162. {
  1163. var xs = Observable.Range(1, 0);
  1164. var left = xs.ManySelect(Observable.First);
  1165. var right = xs;
  1166. Assert.IsTrue(left.SequenceEqual(right).First());
  1167. }
  1168. [TestMethod]
  1169. public void ManySelect_Law_2()
  1170. {
  1171. var xs = Observable.Range(1, 10);
  1172. Func<IObservable<int>, int> f = ys => ys.Count().First();
  1173. var left = xs.ManySelect(f).First();
  1174. var right = f(xs);
  1175. Assert.AreEqual(left, right);
  1176. }
  1177. [TestMethod]
  1178. public void ManySelect_Law_3()
  1179. {
  1180. var xs = Observable.Range(1, 10);
  1181. Func<IObservable<int>, int> f = ys => ys.Count().First();
  1182. Func<IObservable<int>, int> g = ys => ys.Last();
  1183. var left = xs.ManySelect(f).ManySelect(g);
  1184. var right = xs.ManySelect(ys => g(ys.ManySelect(f)));
  1185. Assert.IsTrue(left.SequenceEqual(right).First());
  1186. }
  1187. [TestMethod]
  1188. public void ManySelect_Basic()
  1189. {
  1190. var scheduler = new TestScheduler();
  1191. var xs = scheduler.CreateHotObservable(
  1192. OnNext(100, 1),
  1193. OnNext(220, 2),
  1194. OnNext(270, 3),
  1195. OnNext(410, 4),
  1196. OnCompleted<int>(500)
  1197. );
  1198. var res = scheduler.Start(() =>
  1199. xs.ManySelect(ys => ys.First(), scheduler)
  1200. );
  1201. res.Messages.AssertEqual(
  1202. OnNext(221, 2),
  1203. OnNext(271, 3),
  1204. OnNext(411, 4),
  1205. OnCompleted<int>(501)
  1206. );
  1207. xs.Subscriptions.AssertEqual(
  1208. Subscribe(200, 500)
  1209. );
  1210. }
  1211. [TestMethod]
  1212. public void ManySelect_Error()
  1213. {
  1214. var scheduler = new TestScheduler();
  1215. var ex = new Exception();
  1216. var xs = scheduler.CreateHotObservable(
  1217. OnNext(100, 1),
  1218. OnNext(220, 2),
  1219. OnNext(270, 3),
  1220. OnNext(410, 4),
  1221. OnError<int>(500, ex)
  1222. );
  1223. var res = scheduler.Start(() =>
  1224. xs.ManySelect(ys => ys.First(), scheduler)
  1225. );
  1226. res.Messages.AssertEqual(
  1227. OnNext(221, 2),
  1228. OnNext(271, 3),
  1229. OnNext(411, 4),
  1230. OnError<int>(501, ex)
  1231. );
  1232. xs.Subscriptions.AssertEqual(
  1233. Subscribe(200, 500)
  1234. );
  1235. }
  1236. #endregion
  1237. #region ToListObservable
  1238. [TestMethod]
  1239. public void ToListObservable_ArgumentChecking()
  1240. {
  1241. ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).ToListObservable());
  1242. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Never<int>().ToListObservable().Subscribe(null));
  1243. }
  1244. [TestMethod]
  1245. public void ToListObservable_OnNext()
  1246. {
  1247. var scheduler = new TestScheduler();
  1248. var xs = scheduler.CreateHotObservable(
  1249. OnNext(300, 1),
  1250. OnNext(400, 2),
  1251. OnNext(500, 3),
  1252. OnNext(600, 4)
  1253. );
  1254. var res = scheduler.Start(() =>
  1255. xs.ToListObservable()
  1256. );
  1257. res.Messages.AssertEqual(
  1258. );
  1259. }
  1260. [TestMethod]
  1261. public void ToListObservable_OnError()
  1262. {
  1263. var scheduler = new TestScheduler();
  1264. var ex = new InvalidOperationException();
  1265. var xs = scheduler.CreateHotObservable(
  1266. OnNext(300, 1),
  1267. OnNext(400, 2),
  1268. OnNext(500, 3),
  1269. OnError<int>(600, ex)
  1270. );
  1271. var s = default(ListObservable<int>);
  1272. var subscription = default(IDisposable);
  1273. var res = scheduler.CreateObserver<object>();
  1274. scheduler.ScheduleAbsolute(Created, () => s = xs.ToListObservable());
  1275. scheduler.ScheduleAbsolute(Subscribed, () => subscription = s.Subscribe(res));
  1276. scheduler.ScheduleAbsolute(Disposed, () => subscription.Dispose());
  1277. scheduler.Start();
  1278. ReactiveAssert.Throws<InvalidOperationException>(() => { var t = s.Value; });
  1279. res.Messages.AssertEqual(
  1280. OnError<Object>(600, ex)
  1281. );
  1282. }
  1283. [TestMethod]
  1284. public void ToListObservable_OnCompleted()
  1285. {
  1286. var scheduler = new TestScheduler();
  1287. var xs = scheduler.CreateHotObservable(
  1288. OnNext(300, 1),
  1289. OnNext(400, 2),
  1290. OnNext(500, 3),
  1291. OnCompleted<int>(600)
  1292. );
  1293. var s = default(ListObservable<int>);
  1294. var res = scheduler.Start(() =>
  1295. s = xs.ToListObservable()
  1296. );
  1297. s.AssertEqual(1, 2, 3);
  1298. res.Messages.AssertEqual(
  1299. OnCompleted<Object>(600)
  1300. );
  1301. Assert.AreEqual(3, s.Value);
  1302. }
  1303. [TestMethod]
  1304. public void ToListObservable_Disposed()
  1305. {
  1306. var scheduler = new TestScheduler();
  1307. var xs = scheduler.CreateHotObservable(
  1308. OnNext(300, 1),
  1309. OnNext(400, 2),
  1310. OnNext(500, 3),
  1311. OnNext(1050, 4),
  1312. OnCompleted<int>(1100)
  1313. );
  1314. var s = default(ListObservable<int>);
  1315. var res = scheduler.Start(() =>
  1316. s = xs.ToListObservable()
  1317. );
  1318. res.Messages.AssertEqual(
  1319. );
  1320. }
  1321. [TestMethod]
  1322. public void ToListObservable_Never()
  1323. {
  1324. var scheduler = new TestScheduler();
  1325. var xs = scheduler.CreateHotObservable<int>(
  1326. );
  1327. var res = scheduler.Start(() =>
  1328. xs.ToListObservable()
  1329. );
  1330. res.Messages.AssertEqual(
  1331. );
  1332. }
  1333. #endregion
  1334. }
  1335. }