ObservableExTest.cs 48 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Reactive;
  8. using System.Reactive.Concurrency;
  9. using System.Reactive.Linq;
  10. using Microsoft.Reactive.Testing;
  11. using Xunit;
  12. using ReactiveTests.Dummies;
  13. namespace ReactiveTests.Tests
  14. {
  15. public class ObservableExTest : ReactiveTest
  16. {
  17. #region Create
  18. [Fact]
  19. public void Iterate_ArgumentChecking()
  20. {
  21. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableEx.Create<int>(default(Func<IObserver<int>, IEnumerable<IObservable<Object>>>)));
  22. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableEx.Create(DummyFunc<IObserver<int>, IEnumerable<IObservable<Object>>>.Instance).Subscribe(null));
  23. }
  24. IEnumerable<IObservable<Object>> ToIterate_Complete(IObservable<int> xs, IObservable<int> ys, IObservable<int> zs, IObserver<int> observer)
  25. {
  26. observer.OnNext(1);
  27. yield return xs.Select(x => new Object());
  28. observer.OnNext(2);
  29. yield return ys.Select(x => new Object());
  30. observer.OnNext(3);
  31. observer.OnCompleted();
  32. yield return zs.Select(x => new Object());
  33. observer.OnNext(4);
  34. }
  35. [Fact]
  36. public void Iterate_Complete()
  37. {
  38. var scheduler = new TestScheduler();
  39. var xs = scheduler.CreateColdObservable(
  40. OnNext(10, 1),
  41. OnNext(20, 2),
  42. OnNext(30, 3),
  43. OnNext(40, 4),
  44. OnCompleted<int>(50)
  45. );
  46. var ys = scheduler.CreateColdObservable(
  47. OnNext(10, 1),
  48. OnNext(20, 2),
  49. OnCompleted<int>(30)
  50. );
  51. var zs = scheduler.CreateColdObservable(
  52. OnNext(10, 1),
  53. OnNext(20, 2),
  54. OnNext(30, 3),
  55. OnNext(40, 4),
  56. OnNext(50, 5),
  57. OnCompleted<int>(60)
  58. );
  59. var res = scheduler.Start(() => ObservableEx.Create<int>(observer => ToIterate_Complete(xs, ys, zs, observer)));
  60. res.Messages.AssertEqual(
  61. OnNext(200, 1),
  62. OnNext(250, 2),
  63. OnNext(280, 3),
  64. OnCompleted<int>(280)
  65. );
  66. xs.Subscriptions.AssertEqual(
  67. Subscribe(200, 250)
  68. );
  69. ys.Subscriptions.AssertEqual(
  70. Subscribe(250, 280)
  71. );
  72. zs.Subscriptions.AssertEqual(
  73. Subscribe(280, 280)
  74. );
  75. }
  76. IEnumerable<IObservable<Object>> ToIterate_Complete_Implicit(IObservable<int> xs, IObservable<int> ys, IObservable<int> zs, IObserver<int> observer)
  77. {
  78. observer.OnNext(1);
  79. yield return xs.Select(x => new Object());
  80. observer.OnNext(2);
  81. yield return ys.Select(x => new Object());
  82. observer.OnNext(3);
  83. yield return zs.Select(x => new Object());
  84. observer.OnNext(4);
  85. }
  86. [Fact]
  87. public void Iterate_Complete_Implicit()
  88. {
  89. var scheduler = new TestScheduler();
  90. var xs = scheduler.CreateColdObservable(
  91. OnNext(10, 1),
  92. OnNext(20, 2),
  93. OnNext(30, 3),
  94. OnNext(40, 4),
  95. OnCompleted<int>(50)
  96. );
  97. var ys = scheduler.CreateColdObservable(
  98. OnNext(10, 1),
  99. OnNext(20, 2),
  100. OnCompleted<int>(30)
  101. );
  102. var zs = scheduler.CreateColdObservable(
  103. OnNext(10, 1),
  104. OnNext(20, 2),
  105. OnNext(30, 3),
  106. OnNext(40, 4),
  107. OnNext(50, 5),
  108. OnCompleted<int>(60)
  109. );
  110. var res = scheduler.Start(() => ObservableEx.Create<int>(observer => ToIterate_Complete_Implicit(xs, ys, zs, observer)));
  111. res.Messages.AssertEqual(
  112. OnNext(200, 1),
  113. OnNext(250, 2),
  114. OnNext(280, 3),
  115. OnNext(340, 4),
  116. OnCompleted<int>(340)
  117. );
  118. xs.Subscriptions.AssertEqual(
  119. Subscribe(200, 250)
  120. );
  121. ys.Subscriptions.AssertEqual(
  122. Subscribe(250, 280)
  123. );
  124. zs.Subscriptions.AssertEqual(
  125. Subscribe(280, 340)
  126. );
  127. }
  128. IEnumerable<IObservable<Object>> ToIterate_Throw(IObservable<int> xs, IObservable<int> ys, IObservable<int> zs, IObserver<int> observer, Exception ex)
  129. {
  130. observer.OnNext(1);
  131. yield return xs.Select(x => new Object());
  132. observer.OnNext(2);
  133. yield return ys.Select(x => new Object());
  134. observer.OnNext(3);
  135. if (xs != null)
  136. throw ex;
  137. yield return zs.Select(x => new Object());
  138. observer.OnNext(4);
  139. observer.OnCompleted();
  140. }
  141. [Fact]
  142. public void Iterate_Iterator_Throw()
  143. {
  144. var scheduler = new TestScheduler();
  145. var xs = scheduler.CreateColdObservable(
  146. OnNext(10, 1),
  147. OnNext(20, 2),
  148. OnNext(30, 3),
  149. OnNext(40, 4),
  150. OnCompleted<int>(50)
  151. );
  152. var ys = scheduler.CreateColdObservable(
  153. OnNext(10, 1),
  154. OnNext(20, 2),
  155. OnCompleted<int>(30)
  156. );
  157. var zs = scheduler.CreateColdObservable(
  158. OnNext(10, 1),
  159. OnNext(20, 2),
  160. OnNext(30, 3),
  161. OnNext(40, 4),
  162. OnNext(50, 5),
  163. OnCompleted<int>(60)
  164. );
  165. var ex = new Exception();
  166. var res = scheduler.Start(() => ObservableEx.Create<int>(observer => ToIterate_Throw(xs, ys, zs, observer, ex)));
  167. res.Messages.AssertEqual(
  168. OnNext(200, 1),
  169. OnNext(250, 2),
  170. OnNext(280, 3),
  171. OnError<int>(280, ex)
  172. );
  173. xs.Subscriptions.AssertEqual(
  174. Subscribe(200, 250)
  175. );
  176. ys.Subscriptions.AssertEqual(
  177. Subscribe(250, 280)
  178. );
  179. zs.Subscriptions.AssertEqual(
  180. );
  181. }
  182. IEnumerable<IObservable<Object>> ToIterate_Error(IObservable<int> xs, IObservable<int> ys, IObservable<int> zs, IObserver<int> observer, Exception ex)
  183. {
  184. observer.OnNext(1);
  185. yield return xs.Select(x => new Object());
  186. observer.OnNext(2);
  187. observer.OnError(ex);
  188. yield return ys.Select(x => new Object());
  189. observer.OnNext(3);
  190. yield return zs.Select(x => new Object());
  191. observer.OnNext(4);
  192. observer.OnCompleted();
  193. }
  194. [Fact]
  195. public void Iterate_Iterator_Error()
  196. {
  197. var scheduler = new TestScheduler();
  198. var ex = new Exception();
  199. var xs = scheduler.CreateColdObservable(
  200. OnNext(10, 1),
  201. OnNext(20, 2),
  202. OnNext(30, 3),
  203. OnNext(40, 4),
  204. OnCompleted<int>(50)
  205. );
  206. var ys = scheduler.CreateColdObservable(
  207. OnNext(10, 1),
  208. OnNext(20, 2),
  209. OnCompleted<int>(30)
  210. );
  211. var zs = scheduler.CreateColdObservable(
  212. OnNext(10, 1),
  213. OnNext(20, 2),
  214. OnNext(30, 3),
  215. OnNext(40, 4),
  216. OnNext(50, 5),
  217. OnCompleted<int>(60)
  218. );
  219. var res = scheduler.Start(() => ObservableEx.Create<int>(observer => ToIterate_Error(xs, ys, zs, observer, ex)));
  220. res.Messages.AssertEqual(
  221. OnNext(200, 1),
  222. OnNext(250, 2),
  223. OnError<int>(250, ex)
  224. );
  225. xs.Subscriptions.AssertEqual(
  226. Subscribe(200, 250)
  227. );
  228. ys.Subscriptions.AssertEqual(
  229. Subscribe(250, 250)
  230. );
  231. zs.Subscriptions.AssertEqual(
  232. );
  233. }
  234. IEnumerable<IObservable<Object>> ToIterate_Complete_Dispose(IObservable<int> xs, IObservable<int> ys, IObservable<int> zs, IObserver<int> observer)
  235. {
  236. observer.OnNext(1);
  237. yield return xs.Select(x => new Object());
  238. observer.OnNext(2);
  239. yield return ys.Select(x => new Object());
  240. observer.OnNext(3);
  241. yield return zs.Select(x => new Object());
  242. observer.OnNext(4);
  243. }
  244. [Fact]
  245. public void Iterate_Complete_Dispose()
  246. {
  247. var scheduler = new TestScheduler();
  248. var xs = scheduler.CreateColdObservable(
  249. OnNext(10, 1),
  250. OnNext(20, 2),
  251. OnNext(30, 3),
  252. OnNext(40, 4),
  253. OnCompleted<int>(50)
  254. );
  255. var ys = scheduler.CreateColdObservable(
  256. OnNext(10, 1),
  257. OnNext(20, 2),
  258. OnCompleted<int>(30)
  259. );
  260. var zs = scheduler.CreateColdObservable(
  261. OnNext(100, 1),
  262. OnNext(200, 2),
  263. OnNext(300, 3),
  264. OnNext(400, 4),
  265. OnNext(500, 5),
  266. OnNext(600, 6),
  267. OnNext(700, 7),
  268. OnNext(800, 8),
  269. OnNext(900, 9),
  270. OnNext(1000, 10)
  271. );
  272. var res = scheduler.Start(() => ObservableEx.Create<int>(observer => ToIterate_Complete_Dispose(xs, ys, zs, observer)));
  273. res.Messages.AssertEqual(
  274. OnNext(200, 1),
  275. OnNext(250, 2),
  276. OnNext(280, 3)
  277. );
  278. xs.Subscriptions.AssertEqual(
  279. Subscribe(200, 250)
  280. );
  281. ys.Subscriptions.AssertEqual(
  282. Subscribe(250, 280)
  283. );
  284. zs.Subscriptions.AssertEqual(
  285. Subscribe(280, 1000)
  286. );
  287. }
  288. [Fact]
  289. public void IteratorScenario()
  290. {
  291. var xs = ObservableEx.Create<int>(o => _IteratorScenario(100, 1000, o));
  292. xs.AssertEqual(new[] { 100, 1000 }.ToObservable());
  293. }
  294. static IEnumerable<IObservable<Object>> _IteratorScenario(int x, int y, IObserver<int> results)
  295. {
  296. var xs = Observable.Range(1, x).ToListObservable();
  297. yield return xs;
  298. results.OnNext(xs.Value);
  299. var ys = Observable.Range(1, y).ToListObservable();
  300. yield return ys;
  301. results.OnNext(ys.Value);
  302. }
  303. [Fact]
  304. public void Iterate_Void_ArgumentChecking()
  305. {
  306. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableEx.Create(default(Func<IEnumerable<IObservable<object>>>)));
  307. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableEx.Create(DummyFunc<IEnumerable<IObservable<Object>>>.Instance).Subscribe(null));
  308. }
  309. IEnumerable<IObservable<Object>> ToIterate_Void_Complete(IObservable<int> xs, IObservable<int> ys, IObservable<int> zs)
  310. {
  311. yield return xs.Select(x => new Object());
  312. yield return ys.Select(x => new Object());
  313. yield return zs.Select(x => new Object());
  314. }
  315. [Fact]
  316. public void Iterate_Void_Complete()
  317. {
  318. var scheduler = new TestScheduler();
  319. var xs = scheduler.CreateColdObservable(
  320. OnNext(10, 1),
  321. OnNext(20, 2),
  322. OnNext(30, 3),
  323. OnNext(40, 4),
  324. OnCompleted<int>(50)
  325. );
  326. var ys = scheduler.CreateColdObservable(
  327. OnNext(10, 1),
  328. OnNext(20, 2),
  329. OnCompleted<int>(30)
  330. );
  331. var zs = scheduler.CreateColdObservable(
  332. OnNext(10, 1),
  333. OnNext(20, 2),
  334. OnNext(30, 3),
  335. OnNext(40, 4),
  336. OnNext(50, 5),
  337. OnCompleted<int>(60)
  338. );
  339. var res = scheduler.Start(() => ObservableEx.Create(() => ToIterate_Void_Complete(xs, ys, zs)));
  340. res.Messages.AssertEqual(
  341. OnCompleted<Unit>(340)
  342. );
  343. xs.Subscriptions.AssertEqual(
  344. Subscribe(200, 250)
  345. );
  346. ys.Subscriptions.AssertEqual(
  347. Subscribe(250, 280)
  348. );
  349. zs.Subscriptions.AssertEqual(
  350. Subscribe(280, 340)
  351. );
  352. }
  353. IEnumerable<IObservable<Object>> ToIterate_Void_Complete_Implicit(IObservable<int> xs, IObservable<int> ys, IObservable<int> zs)
  354. {
  355. yield return xs.Select(x => new Object());
  356. yield return ys.Select(x => new Object());
  357. yield return zs.Select(x => new Object());
  358. }
  359. [Fact]
  360. public void Iterate_Void_Complete_Implicit()
  361. {
  362. var scheduler = new TestScheduler();
  363. var xs = scheduler.CreateColdObservable(
  364. OnNext(10, 1),
  365. OnNext(20, 2),
  366. OnNext(30, 3),
  367. OnNext(40, 4),
  368. OnCompleted<int>(50)
  369. );
  370. var ys = scheduler.CreateColdObservable(
  371. OnNext(10, 1),
  372. OnNext(20, 2),
  373. OnCompleted<int>(30)
  374. );
  375. var zs = scheduler.CreateColdObservable(
  376. OnNext(10, 1),
  377. OnNext(20, 2),
  378. OnNext(30, 3),
  379. OnNext(40, 4),
  380. OnNext(50, 5),
  381. OnCompleted<int>(60)
  382. );
  383. var res = scheduler.Start(() => ObservableEx.Create(() => ToIterate_Void_Complete_Implicit(xs, ys, zs)));
  384. res.Messages.AssertEqual(
  385. OnCompleted<Unit>(340)
  386. );
  387. xs.Subscriptions.AssertEqual(
  388. Subscribe(200, 250)
  389. );
  390. ys.Subscriptions.AssertEqual(
  391. Subscribe(250, 280)
  392. );
  393. zs.Subscriptions.AssertEqual(
  394. Subscribe(280, 340)
  395. );
  396. }
  397. IEnumerable<IObservable<Object>> ToIterate_Void_Throw(IObservable<int> xs, IObservable<int> ys, IObservable<int> zs, Exception ex)
  398. {
  399. yield return xs.Select(x => new Object());
  400. yield return ys.Select(x => new Object());
  401. if (xs != null)
  402. throw ex;
  403. yield return zs.Select(x => new Object());
  404. }
  405. [Fact]
  406. public void Iterate_Void_Iterator_Throw()
  407. {
  408. var scheduler = new TestScheduler();
  409. var xs = scheduler.CreateColdObservable(
  410. OnNext(10, 1),
  411. OnNext(20, 2),
  412. OnNext(30, 3),
  413. OnNext(40, 4),
  414. OnCompleted<int>(50)
  415. );
  416. var ys = scheduler.CreateColdObservable(
  417. OnNext(10, 1),
  418. OnNext(20, 2),
  419. OnCompleted<int>(30)
  420. );
  421. var zs = scheduler.CreateColdObservable(
  422. OnNext(10, 1),
  423. OnNext(20, 2),
  424. OnNext(30, 3),
  425. OnNext(40, 4),
  426. OnNext(50, 5),
  427. OnCompleted<int>(60)
  428. );
  429. var ex = new Exception();
  430. var res = scheduler.Start(() => ObservableEx.Create(() => ToIterate_Void_Throw(xs, ys, zs, ex)));
  431. res.Messages.AssertEqual(
  432. OnError<Unit>(280, ex)
  433. );
  434. xs.Subscriptions.AssertEqual(
  435. Subscribe(200, 250)
  436. );
  437. ys.Subscriptions.AssertEqual(
  438. Subscribe(250, 280)
  439. );
  440. zs.Subscriptions.AssertEqual(
  441. );
  442. }
  443. IEnumerable<IObservable<Object>> ToIterate_Void_Complete_Dispose(IObservable<int> xs, IObservable<int> ys, IObservable<int> zs)
  444. {
  445. yield return xs.Select(x => new Object());
  446. yield return ys.Select(x => new Object());
  447. yield return zs.Select(x => new Object());
  448. }
  449. [Fact]
  450. public void Iterate_Void_Complete_Dispose()
  451. {
  452. var scheduler = new TestScheduler();
  453. var xs = scheduler.CreateColdObservable(
  454. OnNext(10, 1),
  455. OnNext(20, 2),
  456. OnNext(30, 3),
  457. OnNext(40, 4),
  458. OnCompleted<int>(50)
  459. );
  460. var ys = scheduler.CreateColdObservable(
  461. OnNext(10, 1),
  462. OnNext(20, 2),
  463. OnCompleted<int>(30)
  464. );
  465. var zs = scheduler.CreateColdObservable(
  466. OnNext(100, 1),
  467. OnNext(200, 2),
  468. OnNext(300, 3),
  469. OnNext(400, 4),
  470. OnNext(500, 5),
  471. OnNext(600, 6),
  472. OnNext(700, 7),
  473. OnNext(800, 8),
  474. OnNext(900, 9),
  475. OnNext(1000, 10)
  476. );
  477. var res = scheduler.Start(() => ObservableEx.Create(() => ToIterate_Void_Complete_Dispose(xs, ys, zs)));
  478. res.Messages.AssertEqual(
  479. );
  480. xs.Subscriptions.AssertEqual(
  481. Subscribe(200, 250)
  482. );
  483. ys.Subscriptions.AssertEqual(
  484. Subscribe(250, 280)
  485. );
  486. zs.Subscriptions.AssertEqual(
  487. Subscribe(280, 1000)
  488. );
  489. }
  490. [Fact]
  491. public void Iterate_Void_Func_Throw()
  492. {
  493. var scheduler = new TestScheduler();
  494. var obs = scheduler.Start(() => ObservableEx.Create(() => { throw new InvalidOperationException(); }));
  495. Assert.Equal(1, obs.Messages.Count);
  496. var notification = obs.Messages[0].Value;
  497. Assert.Equal(NotificationKind.OnError, notification.Kind);
  498. Assert.IsType<InvalidOperationException>(notification.Exception);
  499. }
  500. static IEnumerable<IObservable<Object>> _IteratorScenario_Void(int x, int y)
  501. {
  502. var xs = Observable.Range(1, x).ToListObservable();
  503. yield return xs;
  504. var ys = Observable.Range(1, y).ToListObservable();
  505. yield return ys;
  506. }
  507. [Fact]
  508. public void IteratorScenario_Void()
  509. {
  510. var xs = ObservableEx.Create(() => _IteratorScenario_Void(100, 1000));
  511. xs.AssertEqual(new Unit[] { }.ToObservable());
  512. }
  513. #endregion
  514. #region Expand
  515. [Fact]
  516. public void Expand_ArgumentChecking()
  517. {
  518. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableEx.Expand(null, DummyFunc<int, IObservable<int>>.Instance, DummyScheduler.Instance));
  519. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableEx.Expand(DummyObservable<int>.Instance, null, DummyScheduler.Instance));
  520. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableEx.Expand(DummyObservable<int>.Instance, DummyFunc<int, IObservable<int>>.Instance, null));
  521. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableEx.Expand(null, DummyFunc<int, IObservable<int>>.Instance));
  522. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableEx.Expand(DummyObservable<int>.Instance, null));
  523. }
  524. [Fact]
  525. public void Expand_Default()
  526. {
  527. var b = Observable.Return(1).Expand(x => x < 10 ? Observable.Return(x + 1) : Observable.Empty<int>())
  528. .SequenceEqual(Observable.Range(1, 10)).First();
  529. Assert.True(b);
  530. }
  531. [Fact]
  532. public void Expand_Empty()
  533. {
  534. var scheduler = new TestScheduler();
  535. var xs = scheduler.CreateHotObservable(
  536. OnCompleted<int>(300)
  537. );
  538. var res = scheduler.Start(() =>
  539. xs.Expand(x => scheduler.CreateColdObservable(
  540. OnNext(100, 1),
  541. OnNext(200, 2),
  542. OnCompleted<int>(300)
  543. ), scheduler)
  544. );
  545. res.Messages.AssertEqual(
  546. OnCompleted<int>(300)
  547. );
  548. xs.Subscriptions.AssertEqual(
  549. Subscribe(201, 300)
  550. );
  551. }
  552. [Fact]
  553. public void Expand_Error()
  554. {
  555. var scheduler = new TestScheduler();
  556. var ex = new Exception();
  557. var xs = scheduler.CreateHotObservable(
  558. OnError<int>(300, ex)
  559. );
  560. var res = scheduler.Start(() =>
  561. xs.Expand(x => scheduler.CreateColdObservable<int>(
  562. OnNext(100 + x, 2 * x),
  563. OnNext(200 + x, 3 * x),
  564. OnCompleted<int>(300 + x)
  565. ), scheduler)
  566. );
  567. res.Messages.AssertEqual(
  568. OnError<int>(300, ex)
  569. );
  570. xs.Subscriptions.AssertEqual(
  571. Subscribe(201, 300)
  572. );
  573. }
  574. [Fact]
  575. public void Expand_Never()
  576. {
  577. var scheduler = new TestScheduler();
  578. var xs = scheduler.CreateHotObservable<int>(
  579. );
  580. var res = scheduler.Start(() =>
  581. xs.Expand(x => scheduler.CreateColdObservable<int>(
  582. OnNext(100 + x, 2 * x),
  583. OnNext(200 + x, 3 * x),
  584. OnCompleted<int>(300 + x)
  585. ), scheduler)
  586. );
  587. res.Messages.AssertEqual(
  588. );
  589. xs.Subscriptions.AssertEqual(
  590. Subscribe(201, 1000)
  591. );
  592. }
  593. [Fact]
  594. public void Expand_Basic()
  595. {
  596. var scheduler = new TestScheduler();
  597. var xs = scheduler.CreateHotObservable(
  598. OnNext(550, 1),
  599. OnNext(850, 2),
  600. OnCompleted<int>(950)
  601. );
  602. var res = scheduler.Start(() =>
  603. xs.Expand(x => scheduler.CreateColdObservable(
  604. OnNext(100, 2 * x),
  605. OnNext(200, 3 * x),
  606. OnCompleted<int>(300)
  607. ), scheduler)
  608. );
  609. res.Messages.AssertEqual(
  610. OnNext(550, 1),
  611. OnNext(651, 2),
  612. OnNext(751, 3),
  613. OnNext(752, 4),
  614. OnNext(850, 2),
  615. OnNext(852, 6),
  616. OnNext(852, 6),
  617. OnNext(853, 8),
  618. OnNext(951, 4),
  619. OnNext(952, 9),
  620. OnNext(952, 12),
  621. OnNext(953, 12),
  622. OnNext(953, 12),
  623. OnNext(954, 16)
  624. );
  625. xs.Subscriptions.AssertEqual(
  626. Subscribe(201, 950)
  627. );
  628. }
  629. [Fact]
  630. public void Expand_Throw()
  631. {
  632. var scheduler = new TestScheduler();
  633. var xs = scheduler.CreateHotObservable(
  634. OnNext(550, 1),
  635. OnNext(850, 2),
  636. OnCompleted<int>(950)
  637. );
  638. var ex = new Exception();
  639. var res = scheduler.Start(() =>
  640. xs.Expand(x => { throw ex; }, scheduler)
  641. );
  642. res.Messages.AssertEqual(
  643. OnNext(550, 1),
  644. OnError<int>(550, ex)
  645. );
  646. xs.Subscriptions.AssertEqual(
  647. Subscribe(201, 550)
  648. );
  649. }
  650. #endregion
  651. #region ForkJoin
  652. [Fact]
  653. public void ForkJoin_ArgumentChecking()
  654. {
  655. var someObservable = DummyObservable<int>.Instance;
  656. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableEx.ForkJoin(someObservable, someObservable, (Func<int, int, int>)null));
  657. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableEx.ForkJoin(someObservable, (IObservable<int>)null, (_, __) => _ + __));
  658. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableEx.ForkJoin((IObservable<int>)null, someObservable, (_, __) => _ + __));
  659. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableEx.ForkJoin((IObservable<int>[])null));
  660. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableEx.ForkJoin((IEnumerable<IObservable<int>>)null));
  661. }
  662. [Fact]
  663. public void ForkJoin_EmptyEmpty()
  664. {
  665. var scheduler = new TestScheduler();
  666. var msgs1 = new[] {
  667. OnNext(150, 1),
  668. OnCompleted<int>(230)
  669. };
  670. var msgs2 = new[] {
  671. OnNext(150, 1),
  672. OnCompleted<int>(250)
  673. };
  674. var o = scheduler.CreateHotObservable(msgs1);
  675. var e = scheduler.CreateHotObservable(msgs2);
  676. var res = scheduler.Start(() => e.ForkJoin(o, (x, y) => x + y));
  677. res.Messages.AssertEqual(
  678. OnCompleted<int>(250)
  679. );
  680. }
  681. [Fact]
  682. public void ForkJoin_None()
  683. {
  684. var scheduler = new TestScheduler();
  685. var res = scheduler.Start(() => ObservableEx.ForkJoin<int>());
  686. res.Messages.AssertEqual(
  687. OnCompleted<int[]>(200)
  688. );
  689. }
  690. [Fact]
  691. public void ForkJoin_EmptyReturn()
  692. {
  693. var scheduler = new TestScheduler();
  694. var msgs1 = new[] {
  695. OnNext(150, 1),
  696. OnCompleted<int>(230)
  697. };
  698. var msgs2 = new[] {
  699. OnNext(150, 1),
  700. OnNext(210, 2),
  701. OnCompleted<int>(250)
  702. };
  703. var o = scheduler.CreateHotObservable(msgs1);
  704. var e = scheduler.CreateHotObservable(msgs2);
  705. var res = scheduler.Start(() => e.ForkJoin(o, (x, y) => x + y));
  706. res.Messages.AssertEqual(
  707. OnCompleted<int>(250)
  708. );
  709. }
  710. [Fact]
  711. public void ForkJoin_ReturnEmpty()
  712. {
  713. var scheduler = new TestScheduler();
  714. var msgs1 = new[] {
  715. OnNext(150, 1),
  716. OnNext(210, 2),
  717. OnCompleted<int>(230)
  718. };
  719. var msgs2 = new[] {
  720. OnNext(150, 1),
  721. OnCompleted<int>(250)
  722. };
  723. var o = scheduler.CreateHotObservable(msgs1);
  724. var e = scheduler.CreateHotObservable(msgs2);
  725. var res = scheduler.Start(() => e.ForkJoin(o, (x, y) => x + y));
  726. res.Messages.AssertEqual(
  727. OnCompleted<int>(250)
  728. );
  729. }
  730. [Fact]
  731. public void ForkJoin_ReturnReturn()
  732. {
  733. var scheduler = new TestScheduler();
  734. var msgs1 = new[] {
  735. OnNext(150, 1),
  736. OnNext(210, 2),
  737. OnCompleted<int>(230)
  738. };
  739. var msgs2 = new[] {
  740. OnNext(150, 1),
  741. OnNext(220, 3),
  742. OnCompleted<int>(250)
  743. };
  744. var o = scheduler.CreateHotObservable(msgs1);
  745. var e = scheduler.CreateHotObservable(msgs2);
  746. var res = scheduler.Start(() => e.ForkJoin(o, (x, y) => x + y));
  747. res.Messages.AssertEqual(
  748. OnNext(250, 2 + 3),
  749. OnCompleted<int>(250)
  750. );
  751. }
  752. [Fact]
  753. public void ForkJoin_EmptyThrow()
  754. {
  755. var ex = new Exception();
  756. var scheduler = new TestScheduler();
  757. var msgs1 = new[] {
  758. OnNext(150, 1),
  759. OnCompleted<int>(230)
  760. };
  761. var msgs2 = new[] {
  762. OnNext(150, 1),
  763. OnError<int>(210, ex),
  764. OnCompleted<int>(250)
  765. };
  766. var o = scheduler.CreateHotObservable(msgs1);
  767. var e = scheduler.CreateHotObservable(msgs2);
  768. var res = scheduler.Start(() => e.ForkJoin(o, (x, y) => x + y));
  769. res.Messages.AssertEqual(
  770. OnError<int>(210, ex)
  771. );
  772. }
  773. [Fact]
  774. public void ForkJoin_ThrowEmpty()
  775. {
  776. var ex = new Exception();
  777. var scheduler = new TestScheduler();
  778. var msgs1 = new[] {
  779. OnNext(150, 1),
  780. OnError<int>(210, ex),
  781. OnCompleted<int>(230)
  782. };
  783. var msgs2 = new[] {
  784. OnNext(150, 1),
  785. OnCompleted<int>(250)
  786. };
  787. var o = scheduler.CreateHotObservable(msgs1);
  788. var e = scheduler.CreateHotObservable(msgs2);
  789. var res = scheduler.Start(() => e.ForkJoin(o, (x, y) => x + y));
  790. res.Messages.AssertEqual(
  791. OnError<int>(210, ex)
  792. );
  793. }
  794. [Fact]
  795. public void ForkJoin_ReturnThrow()
  796. {
  797. var ex = new Exception();
  798. var scheduler = new TestScheduler();
  799. var msgs1 = new[] {
  800. OnNext(150, 1),
  801. OnNext(210, 2),
  802. OnCompleted<int>(230)
  803. };
  804. var msgs2 = new[] {
  805. OnNext(150, 1),
  806. OnError<int>(220, ex),
  807. OnCompleted<int>(250)
  808. };
  809. var o = scheduler.CreateHotObservable(msgs1);
  810. var e = scheduler.CreateHotObservable(msgs2);
  811. var res = scheduler.Start(() => e.ForkJoin(o, (x, y) => x + y));
  812. res.Messages.AssertEqual(
  813. OnError<int>(220, ex)
  814. );
  815. }
  816. [Fact]
  817. public void ForkJoin_ThrowReturn()
  818. {
  819. var ex = new Exception();
  820. var scheduler = new TestScheduler();
  821. var msgs1 = new[] {
  822. OnNext(150, 1),
  823. OnError<int>(220, ex),
  824. OnCompleted<int>(230)
  825. };
  826. var msgs2 = new[] {
  827. OnNext(150, 1),
  828. OnNext(210, 2),
  829. OnCompleted<int>(250)
  830. };
  831. var o = scheduler.CreateHotObservable(msgs1);
  832. var e = scheduler.CreateHotObservable(msgs2);
  833. var res = scheduler.Start(() => e.ForkJoin(o, (x, y) => x + y));
  834. res.Messages.AssertEqual(
  835. OnError<int>(220, ex)
  836. );
  837. }
  838. [Fact]
  839. public void ForkJoin_Binary()
  840. {
  841. var scheduler = new TestScheduler();
  842. var msgs1 = new[] {
  843. OnNext(150, 1),
  844. OnNext(215, 2),
  845. OnNext(225, 4),
  846. OnCompleted<int>(230)
  847. };
  848. var msgs2 = new[] {
  849. OnNext(150, 1),
  850. OnNext(235, 6),
  851. OnNext(240, 7),
  852. OnCompleted<int>(250)
  853. };
  854. var o = scheduler.CreateHotObservable(msgs1);
  855. var e = scheduler.CreateHotObservable(msgs2);
  856. var res = scheduler.Start(() => e.ForkJoin(o, (x, y) => x + y));
  857. res.Messages.AssertEqual(
  858. OnNext(250, 4 + 7), // TODO: fix ForkJoin behavior
  859. OnCompleted<int>(250)
  860. );
  861. }
  862. [Fact]
  863. public void ForkJoin_NaryParams()
  864. {
  865. var scheduler = new TestScheduler();
  866. var msgs1 = new[] {
  867. OnNext(150, 1),
  868. OnNext(215, 2),
  869. OnNext(225, 4),
  870. OnCompleted<int>(230)
  871. };
  872. var msgs2 = new[] {
  873. OnNext(150, 1),
  874. OnNext(235, 6),
  875. OnNext(240, 7),
  876. OnCompleted<int>(250)
  877. };
  878. var msgs3 = new[] {
  879. OnNext(150, 1),
  880. OnNext(230, 3),
  881. OnNext(245, 5),
  882. OnCompleted<int>(270)
  883. };
  884. var o1 = scheduler.CreateHotObservable(msgs1);
  885. var o2 = scheduler.CreateHotObservable(msgs2);
  886. var o3 = scheduler.CreateHotObservable(msgs3);
  887. var res = scheduler.Start(() => ObservableEx.ForkJoin(o1, o2, o3));
  888. res.Messages.AssertEqual(
  889. OnNext<int[]>(270, l => l.SequenceEqual(new[] { 4, 7, 5 })), // TODO: fix ForkJoin behavior
  890. OnCompleted<int[]>(270)
  891. );
  892. }
  893. [Fact]
  894. public void ForkJoin_NaryParamsEmpty()
  895. {
  896. var scheduler = new TestScheduler();
  897. var msgs1 = new[] {
  898. OnNext(150, 1),
  899. OnNext(215, 2),
  900. OnNext(225, 4),
  901. OnCompleted<int>(230)
  902. };
  903. var msgs2 = new[] {
  904. OnNext(150, 1),
  905. OnNext(235, 6),
  906. OnNext(240, 7),
  907. OnCompleted<int>(250)
  908. };
  909. var msgs3 = new[] {
  910. OnCompleted<int>(270)
  911. };
  912. var o1 = scheduler.CreateHotObservable(msgs1);
  913. var o2 = scheduler.CreateHotObservable(msgs2);
  914. var o3 = scheduler.CreateHotObservable(msgs3);
  915. var res = scheduler.Start(() => ObservableEx.ForkJoin(o1, o2, o3));
  916. res.Messages.AssertEqual(
  917. OnCompleted<int[]>(270)
  918. );
  919. }
  920. [Fact]
  921. public void ForkJoin_NaryParamsEmptyBeforeEnd()
  922. {
  923. var scheduler = new TestScheduler();
  924. var msgs1 = new[] {
  925. OnNext(150, 1),
  926. OnNext(215, 2),
  927. OnNext(225, 4),
  928. OnCompleted<int>(230)
  929. };
  930. var msgs2 = new[] {
  931. OnCompleted<int>(235)
  932. };
  933. var msgs3 = new[] {
  934. OnNext(150, 1),
  935. OnNext(230, 3),
  936. OnNext(245, 5),
  937. OnCompleted<int>(270)
  938. };
  939. var o1 = scheduler.CreateHotObservable(msgs1);
  940. var o2 = scheduler.CreateHotObservable(msgs2);
  941. var o3 = scheduler.CreateHotObservable(msgs3);
  942. var res = scheduler.Start(() => ObservableEx.ForkJoin(o1, o2, o3));
  943. res.Messages.AssertEqual(
  944. OnCompleted<int[]>(235)
  945. );
  946. }
  947. [Fact]
  948. public void ForkJoin_Nary_Immediate()
  949. {
  950. ObservableEx.ForkJoin(Observable.Return(1), Observable.Return(2)).First().SequenceEqual(new[] { 1, 2 });
  951. }
  952. [Fact]
  953. public void ForkJoin_Nary_Virtual_And_Immediate()
  954. {
  955. var scheduler = new TestScheduler();
  956. var msgs1 = new[] {
  957. OnNext(150, 1),
  958. OnNext(215, 2),
  959. OnNext(225, 4),
  960. OnCompleted<int>(230)
  961. };
  962. var msgs2 = new[] {
  963. OnNext(150, 1),
  964. OnNext(235, 6),
  965. OnNext(240, 7),
  966. OnCompleted<int>(250)
  967. };
  968. var msgs3 = new[] {
  969. OnNext(150, 1),
  970. OnNext(230, 3),
  971. OnNext(245, 5),
  972. OnCompleted<int>(270)
  973. };
  974. var o1 = scheduler.CreateHotObservable(msgs1);
  975. var o2 = scheduler.CreateHotObservable(msgs2);
  976. var o3 = scheduler.CreateHotObservable(msgs3);
  977. var res = scheduler.Start(() => ObservableEx.ForkJoin(new List<IObservable<int>> { o1, o2, o3, Observable.Return(20) }));
  978. res.Messages.AssertEqual(
  979. OnNext<int[]>(270, l => l.SequenceEqual(new[] { 4, 7, 5, 20 })),
  980. OnCompleted<int[]>(270)
  981. );
  982. }
  983. [Fact]
  984. public void ForkJoin_Nary_Immediate_And_Virtual()
  985. {
  986. var scheduler = new TestScheduler();
  987. var msgs1 = new[] {
  988. OnNext(150, 1),
  989. OnNext(215, 2),
  990. OnNext(225, 4),
  991. OnCompleted<int>(230)
  992. };
  993. var msgs2 = new[] {
  994. OnNext(150, 1),
  995. OnNext(235, 6),
  996. OnNext(240, 7),
  997. OnCompleted<int>(250)
  998. };
  999. var msgs3 = new[] {
  1000. OnNext(150, 1),
  1001. OnNext(230, 3),
  1002. OnNext(245, 5),
  1003. OnCompleted<int>(270)
  1004. };
  1005. var o1 = scheduler.CreateHotObservable(msgs1);
  1006. var o2 = scheduler.CreateHotObservable(msgs2);
  1007. var o3 = scheduler.CreateHotObservable(msgs3);
  1008. var res = scheduler.Start(() => ObservableEx.ForkJoin(new List<IObservable<int>> { Observable.Return(20), o1, o2, o3 }));
  1009. res.Messages.AssertEqual(
  1010. OnNext<int[]>(270, l => l.SequenceEqual(new[] { 20, 4, 7, 5 })),
  1011. OnCompleted<int[]>(270)
  1012. );
  1013. }
  1014. [Fact]
  1015. public void ForkJoin_Nary()
  1016. {
  1017. var scheduler = new TestScheduler();
  1018. var msgs1 = new[] {
  1019. OnNext(150, 1),
  1020. OnNext(215, 2),
  1021. OnNext(225, 4),
  1022. OnCompleted<int>(230)
  1023. };
  1024. var msgs2 = new[] {
  1025. OnNext(150, 1),
  1026. OnNext(235, 6),
  1027. OnNext(240, 7),
  1028. OnCompleted<int>(250)
  1029. };
  1030. var msgs3 = new[] {
  1031. OnNext(150, 1),
  1032. OnNext(230, 3),
  1033. OnNext(245, 5),
  1034. OnCompleted<int>(270)
  1035. };
  1036. var o1 = scheduler.CreateHotObservable(msgs1);
  1037. var o2 = scheduler.CreateHotObservable(msgs2);
  1038. var o3 = scheduler.CreateHotObservable(msgs3);
  1039. var res = scheduler.Start(() => ObservableEx.ForkJoin(new List<IObservable<int>> { o1, o2, o3 }));
  1040. res.Messages.AssertEqual(
  1041. OnNext<int[]>(270, l => l.SequenceEqual(new[] { 4, 7, 5 })), // TODO: fix ForkJoin behavior
  1042. OnCompleted<int[]>(270)
  1043. );
  1044. }
  1045. [Fact]
  1046. public void Bug_1302_SelectorThrows_LeftLast()
  1047. {
  1048. var scheduler = new TestScheduler();
  1049. var xs = scheduler.CreateHotObservable(
  1050. OnNext(210, 1),
  1051. OnCompleted<int>(220)
  1052. );
  1053. var ys = scheduler.CreateHotObservable(
  1054. OnNext(215, 2),
  1055. OnCompleted<int>(217)
  1056. );
  1057. var ex = new Exception();
  1058. var results = scheduler.Start(() => xs.ForkJoin<int, int, int>(ys, (x, y) => { throw ex; }));
  1059. results.Messages.AssertEqual(
  1060. OnError<int>(220, ex)
  1061. );
  1062. xs.Subscriptions.AssertEqual(
  1063. Subscribe(200, 220)
  1064. );
  1065. ys.Subscriptions.AssertEqual(
  1066. Subscribe(200, 217)
  1067. );
  1068. }
  1069. [Fact]
  1070. public void Bug_1302_SelectorThrows_RightLast()
  1071. {
  1072. var scheduler = new TestScheduler();
  1073. var xs = scheduler.CreateHotObservable(
  1074. OnNext(210, 1),
  1075. OnCompleted<int>(217)
  1076. );
  1077. var ys = scheduler.CreateHotObservable(
  1078. OnNext(215, 2),
  1079. OnCompleted<int>(220)
  1080. );
  1081. var ex = new Exception();
  1082. var results = scheduler.Start(() => xs.ForkJoin<int, int, int>(ys, (x, y) => { throw ex; }));
  1083. results.Messages.AssertEqual(
  1084. OnError<int>(220, ex)
  1085. );
  1086. xs.Subscriptions.AssertEqual(
  1087. Subscribe(200, 217)
  1088. );
  1089. ys.Subscriptions.AssertEqual(
  1090. Subscribe(200, 220)
  1091. );
  1092. }
  1093. [Fact]
  1094. public void Bug_1302_RightLast_NoLeft()
  1095. {
  1096. var scheduler = new TestScheduler();
  1097. var xs = scheduler.CreateHotObservable(
  1098. OnCompleted<int>(217)
  1099. );
  1100. var ys = scheduler.CreateHotObservable(
  1101. OnNext(215, 2),
  1102. OnCompleted<int>(220)
  1103. );
  1104. var results = scheduler.Start(() => xs.ForkJoin<int, int, int>(ys, (x, y) => x + y));
  1105. results.Messages.AssertEqual(
  1106. OnCompleted<int>(220)
  1107. );
  1108. xs.Subscriptions.AssertEqual(
  1109. Subscribe(200, 217)
  1110. );
  1111. ys.Subscriptions.AssertEqual(
  1112. Subscribe(200, 220)
  1113. );
  1114. }
  1115. [Fact]
  1116. public void Bug_1302_RightLast_NoRight()
  1117. {
  1118. var scheduler = new TestScheduler();
  1119. var xs = scheduler.CreateHotObservable(
  1120. OnNext(215, 2),
  1121. OnCompleted<int>(217)
  1122. );
  1123. var ys = scheduler.CreateHotObservable(
  1124. OnCompleted<int>(220)
  1125. );
  1126. var results = scheduler.Start(() => xs.ForkJoin<int, int, int>(ys, (x, y) => x + y));
  1127. results.Messages.AssertEqual(
  1128. OnCompleted<int>(220)
  1129. );
  1130. xs.Subscriptions.AssertEqual(
  1131. Subscribe(200, 217)
  1132. );
  1133. ys.Subscriptions.AssertEqual(
  1134. Subscribe(200, 220)
  1135. );
  1136. }
  1137. #endregion
  1138. #region Let
  1139. [Fact]
  1140. public void Let_ArgumentChecking()
  1141. {
  1142. var someObservable = Observable.Empty<int>();
  1143. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableEx.Let(default(IObservable<int>), x => x));
  1144. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableEx.Let<int, int>(someObservable, null));
  1145. }
  1146. [Fact]
  1147. public void Let_CallsFunctionImmediately()
  1148. {
  1149. bool called = false;
  1150. Observable.Empty<int>().Let(x => { called = true; return x; });
  1151. Assert.True(called);
  1152. }
  1153. #endregion
  1154. #region ManySelect
  1155. [Fact]
  1156. public void ManySelect_ArgumentChecking()
  1157. {
  1158. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableEx.ManySelect<int, int>(null, DummyFunc<IObservable<int>, int>.Instance, DummyScheduler.Instance));
  1159. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableEx.ManySelect<int, int>(DummyObservable<int>.Instance, null, DummyScheduler.Instance));
  1160. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableEx.ManySelect<int, int>(DummyObservable<int>.Instance, DummyFunc<IObservable<int>, int>.Instance, null));
  1161. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableEx.ManySelect<int, int>(null, DummyFunc<IObservable<int>, int>.Instance));
  1162. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableEx.ManySelect<int, int>(DummyObservable<int>.Instance, null));
  1163. }
  1164. [Fact]
  1165. public void ManySelect_Law_1()
  1166. {
  1167. var xs = Observable.Range(1, 0);
  1168. var left = xs.ManySelect(Observable.First);
  1169. var right = xs;
  1170. Assert.True(left.SequenceEqual(right).First());
  1171. }
  1172. [Fact]
  1173. public void ManySelect_Law_2()
  1174. {
  1175. var xs = Observable.Range(1, 10);
  1176. Func<IObservable<int>, int> f = ys => ys.Count().First();
  1177. var left = xs.ManySelect(f).First();
  1178. var right = f(xs);
  1179. Assert.Equal(left, right);
  1180. }
  1181. [Fact]
  1182. public void ManySelect_Law_3()
  1183. {
  1184. var xs = Observable.Range(1, 10);
  1185. Func<IObservable<int>, int> f = ys => ys.Count().First();
  1186. Func<IObservable<int>, int> g = ys => ys.Last();
  1187. var left = xs.ManySelect(f).ManySelect(g);
  1188. var right = xs.ManySelect(ys => g(ys.ManySelect(f)));
  1189. Assert.True(left.SequenceEqual(right).First());
  1190. }
  1191. [Fact]
  1192. public void ManySelect_Basic()
  1193. {
  1194. var scheduler = new TestScheduler();
  1195. var xs = scheduler.CreateHotObservable(
  1196. OnNext(100, 1),
  1197. OnNext(220, 2),
  1198. OnNext(270, 3),
  1199. OnNext(410, 4),
  1200. OnCompleted<int>(500)
  1201. );
  1202. var res = scheduler.Start(() =>
  1203. xs.ManySelect(ys => ys.First(), scheduler)
  1204. );
  1205. res.Messages.AssertEqual(
  1206. OnNext(221, 2),
  1207. OnNext(271, 3),
  1208. OnNext(411, 4),
  1209. OnCompleted<int>(501)
  1210. );
  1211. xs.Subscriptions.AssertEqual(
  1212. Subscribe(200, 500)
  1213. );
  1214. }
  1215. [Fact]
  1216. public void ManySelect_Error()
  1217. {
  1218. var scheduler = new TestScheduler();
  1219. var ex = new Exception();
  1220. var xs = scheduler.CreateHotObservable(
  1221. OnNext(100, 1),
  1222. OnNext(220, 2),
  1223. OnNext(270, 3),
  1224. OnNext(410, 4),
  1225. OnError<int>(500, ex)
  1226. );
  1227. var res = scheduler.Start(() =>
  1228. xs.ManySelect(ys => ys.First(), scheduler)
  1229. );
  1230. res.Messages.AssertEqual(
  1231. OnNext(221, 2),
  1232. OnNext(271, 3),
  1233. OnNext(411, 4),
  1234. OnError<int>(501, ex)
  1235. );
  1236. xs.Subscriptions.AssertEqual(
  1237. Subscribe(200, 500)
  1238. );
  1239. }
  1240. #endregion
  1241. #region ToListObservable
  1242. [Fact]
  1243. public void ToListObservable_ArgumentChecking()
  1244. {
  1245. ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).ToListObservable());
  1246. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Never<int>().ToListObservable().Subscribe(null));
  1247. }
  1248. [Fact]
  1249. public void ToListObservable_OnNext()
  1250. {
  1251. var scheduler = new TestScheduler();
  1252. var xs = scheduler.CreateHotObservable(
  1253. OnNext(300, 1),
  1254. OnNext(400, 2),
  1255. OnNext(500, 3),
  1256. OnNext(600, 4)
  1257. );
  1258. var res = scheduler.Start(() =>
  1259. xs.ToListObservable()
  1260. );
  1261. res.Messages.AssertEqual(
  1262. );
  1263. }
  1264. [Fact]
  1265. public void ToListObservable_OnError()
  1266. {
  1267. var scheduler = new TestScheduler();
  1268. var ex = new InvalidOperationException();
  1269. var xs = scheduler.CreateHotObservable(
  1270. OnNext(300, 1),
  1271. OnNext(400, 2),
  1272. OnNext(500, 3),
  1273. OnError<int>(600, ex)
  1274. );
  1275. var s = default(ListObservable<int>);
  1276. var subscription = default(IDisposable);
  1277. var res = scheduler.CreateObserver<object>();
  1278. scheduler.ScheduleAbsolute(Created, () => s = xs.ToListObservable());
  1279. scheduler.ScheduleAbsolute(Subscribed, () => subscription = s.Subscribe(res));
  1280. scheduler.ScheduleAbsolute(Disposed, () => subscription.Dispose());
  1281. scheduler.Start();
  1282. ReactiveAssert.Throws<InvalidOperationException>(() => { var t = s.Value; });
  1283. res.Messages.AssertEqual(
  1284. OnError<Object>(600, ex)
  1285. );
  1286. }
  1287. [Fact]
  1288. public void ToListObservable_OnCompleted()
  1289. {
  1290. var scheduler = new TestScheduler();
  1291. var xs = scheduler.CreateHotObservable(
  1292. OnNext(300, 1),
  1293. OnNext(400, 2),
  1294. OnNext(500, 3),
  1295. OnCompleted<int>(600)
  1296. );
  1297. var s = default(ListObservable<int>);
  1298. var res = scheduler.Start(() =>
  1299. s = xs.ToListObservable()
  1300. );
  1301. s.AssertEqual(1, 2, 3);
  1302. res.Messages.AssertEqual(
  1303. OnCompleted<Object>(600)
  1304. );
  1305. Assert.Equal(3, s.Value);
  1306. }
  1307. [Fact]
  1308. public void ToListObservable_Disposed()
  1309. {
  1310. var scheduler = new TestScheduler();
  1311. var xs = scheduler.CreateHotObservable(
  1312. OnNext(300, 1),
  1313. OnNext(400, 2),
  1314. OnNext(500, 3),
  1315. OnNext(1050, 4),
  1316. OnCompleted<int>(1100)
  1317. );
  1318. var s = default(ListObservable<int>);
  1319. var res = scheduler.Start(() =>
  1320. s = xs.ToListObservable()
  1321. );
  1322. res.Messages.AssertEqual(
  1323. );
  1324. }
  1325. [Fact]
  1326. public void ToListObservable_Never()
  1327. {
  1328. var scheduler = new TestScheduler();
  1329. var xs = scheduler.CreateHotObservable<int>(
  1330. );
  1331. var res = scheduler.Start(() =>
  1332. xs.ToListObservable()
  1333. );
  1334. res.Messages.AssertEqual(
  1335. );
  1336. }
  1337. #endregion
  1338. }
  1339. }