CreateTest.cs 30 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Reactive;
  8. using System.Reactive.Concurrency;
  9. using System.Reactive.Disposables;
  10. using System.Reactive.Linq;
  11. using Microsoft.Reactive.Testing;
  12. using ReactiveTests.Dummies;
  13. using Microsoft.VisualStudio.TestTools.UnitTesting;
  14. using Assert = Xunit.Assert;
  15. namespace ReactiveTests.Tests
  16. {
  17. [TestClass]
  18. public class CreateTest : ReactiveTest
  19. {
  20. [TestMethod]
  21. public void Create_ArgumentChecking()
  22. {
  23. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Create(default(Func<IObserver<int>, Action>)));
  24. //
  25. // BREAKING CHANGE v2.0 > v1.x - Returning null from Subscribe means "nothing to do upon unsubscription"
  26. // all null-coalesces to Disposable.Empty.
  27. //
  28. //ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Create<int>(o => default(Action)).Subscribe(DummyObserver<int>.Instance));
  29. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Create<int>(o => () => { }).Subscribe(null));
  30. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Create<int>(o =>
  31. {
  32. o.OnError(null);
  33. return () => { };
  34. }).Subscribe(null));
  35. }
  36. [TestMethod]
  37. public void Create_NullCoalescingAction()
  38. {
  39. var xs = Observable.Create<int>(o =>
  40. {
  41. o.OnNext(42);
  42. return default(Action);
  43. });
  44. var lst = new List<int>();
  45. var d = xs.Subscribe(lst.Add);
  46. d.Dispose();
  47. Assert.True(lst.SequenceEqual(new[] { 42 }));
  48. }
  49. [TestMethod]
  50. public void Create_Next()
  51. {
  52. var scheduler = new TestScheduler();
  53. var res = scheduler.Start(() =>
  54. Observable.Create<int>(o =>
  55. {
  56. o.OnNext(1);
  57. o.OnNext(2);
  58. return () => { };
  59. })
  60. );
  61. res.Messages.AssertEqual(
  62. OnNext(200, 1),
  63. OnNext(200, 2)
  64. );
  65. }
  66. [TestMethod]
  67. public void Create_Completed()
  68. {
  69. var scheduler = new TestScheduler();
  70. var res = scheduler.Start(() =>
  71. Observable.Create<int>(o =>
  72. {
  73. o.OnCompleted();
  74. o.OnNext(100);
  75. o.OnError(new Exception());
  76. o.OnCompleted();
  77. return () => { };
  78. })
  79. );
  80. res.Messages.AssertEqual(
  81. OnCompleted<int>(200)
  82. );
  83. }
  84. [TestMethod]
  85. public void Create_Error()
  86. {
  87. var scheduler = new TestScheduler();
  88. var ex = new Exception();
  89. var res = scheduler.Start(() =>
  90. Observable.Create<int>(o =>
  91. {
  92. o.OnError(ex);
  93. o.OnNext(100);
  94. o.OnError(new Exception());
  95. o.OnCompleted();
  96. return () => { };
  97. })
  98. );
  99. res.Messages.AssertEqual(
  100. OnError<int>(200, ex)
  101. );
  102. }
  103. [TestMethod]
  104. public void Create_Exception()
  105. {
  106. ReactiveAssert.Throws<InvalidOperationException>(() =>
  107. Observable.Create(new Func<IObserver<int>, Action>(o => { throw new InvalidOperationException(); })).Subscribe());
  108. }
  109. [TestMethod]
  110. public void Create_Dispose()
  111. {
  112. var scheduler = new TestScheduler();
  113. var res = scheduler.Start(() =>
  114. Observable.Create<int>(o =>
  115. {
  116. var stopped = false;
  117. o.OnNext(1);
  118. o.OnNext(2);
  119. scheduler.Schedule(TimeSpan.FromTicks(600), () =>
  120. {
  121. if (!stopped)
  122. {
  123. o.OnNext(3);
  124. }
  125. });
  126. scheduler.Schedule(TimeSpan.FromTicks(700), () =>
  127. {
  128. if (!stopped)
  129. {
  130. o.OnNext(4);
  131. }
  132. });
  133. scheduler.Schedule(TimeSpan.FromTicks(900), () =>
  134. {
  135. if (!stopped)
  136. {
  137. o.OnNext(5);
  138. }
  139. });
  140. scheduler.Schedule(TimeSpan.FromTicks(1100), () =>
  141. {
  142. if (!stopped)
  143. {
  144. o.OnNext(6);
  145. }
  146. });
  147. return () => { stopped = true; };
  148. })
  149. );
  150. res.Messages.AssertEqual(
  151. OnNext(200, 1),
  152. OnNext(200, 2),
  153. OnNext(800, 3),
  154. OnNext(900, 4)
  155. );
  156. }
  157. [TestMethod]
  158. public void Create_ObserverThrows()
  159. {
  160. ReactiveAssert.Throws<InvalidOperationException>(() =>
  161. Observable.Create<int>(o =>
  162. {
  163. o.OnNext(1);
  164. return () => { };
  165. }).Subscribe(x => { throw new InvalidOperationException(); }));
  166. ReactiveAssert.Throws<InvalidOperationException>(() =>
  167. Observable.Create<int>(o =>
  168. {
  169. o.OnError(new Exception());
  170. return () => { };
  171. }).Subscribe(x => { }, ex => { throw new InvalidOperationException(); }));
  172. ReactiveAssert.Throws<InvalidOperationException>(() =>
  173. Observable.Create<int>(o =>
  174. {
  175. o.OnCompleted();
  176. return () => { };
  177. }).Subscribe(x => { }, ex => { }, () => { throw new InvalidOperationException(); }));
  178. }
  179. [TestMethod]
  180. public void CreateWithDisposable_ArgumentChecking()
  181. {
  182. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Create(default(Func<IObserver<int>, IDisposable>)));
  183. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Create<int>(o => DummyDisposable.Instance).Subscribe(null));
  184. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Create<int>(o =>
  185. {
  186. o.OnError(null);
  187. return DummyDisposable.Instance;
  188. }).Subscribe(null));
  189. }
  190. [TestMethod]
  191. public void CreateWithDisposable_NullCoalescingAction()
  192. {
  193. var xs = Observable.Create<int>(o =>
  194. {
  195. o.OnNext(42);
  196. return default(IDisposable);
  197. });
  198. var lst = new List<int>();
  199. var d = xs.Subscribe(lst.Add);
  200. d.Dispose();
  201. Assert.True(lst.SequenceEqual(new[] { 42 }));
  202. }
  203. [TestMethod]
  204. public void CreateWithDisposable_Next()
  205. {
  206. var scheduler = new TestScheduler();
  207. var res = scheduler.Start(() =>
  208. Observable.Create<int>(o =>
  209. {
  210. o.OnNext(1);
  211. o.OnNext(2);
  212. return Disposable.Empty;
  213. })
  214. );
  215. res.Messages.AssertEqual(
  216. OnNext(200, 1),
  217. OnNext(200, 2)
  218. );
  219. }
  220. [TestMethod]
  221. public void CreateWithDisposable_Completed()
  222. {
  223. var scheduler = new TestScheduler();
  224. var res = scheduler.Start(() =>
  225. Observable.Create<int>(o =>
  226. {
  227. o.OnCompleted();
  228. o.OnNext(100);
  229. o.OnError(new Exception());
  230. o.OnCompleted();
  231. return Disposable.Empty;
  232. })
  233. );
  234. res.Messages.AssertEqual(
  235. OnCompleted<int>(200)
  236. );
  237. }
  238. [TestMethod]
  239. public void CreateWithDisposable_Error()
  240. {
  241. var scheduler = new TestScheduler();
  242. var ex = new Exception();
  243. var res = scheduler.Start(() =>
  244. Observable.Create<int>(o =>
  245. {
  246. o.OnError(ex);
  247. o.OnNext(100);
  248. o.OnError(new Exception());
  249. o.OnCompleted();
  250. return Disposable.Empty;
  251. })
  252. );
  253. res.Messages.AssertEqual(
  254. OnError<int>(200, ex)
  255. );
  256. }
  257. [TestMethod]
  258. public void CreateWithDisposable_Exception()
  259. {
  260. ReactiveAssert.Throws<InvalidOperationException>(() =>
  261. Observable.Create(new Func<IObserver<int>, IDisposable>(o => { throw new InvalidOperationException(); })).Subscribe());
  262. }
  263. [TestMethod]
  264. public void CreateWithDisposable_Dispose()
  265. {
  266. var scheduler = new TestScheduler();
  267. var res = scheduler.Start(() =>
  268. Observable.Create<int>(o =>
  269. {
  270. var d = new BooleanDisposable();
  271. o.OnNext(1);
  272. o.OnNext(2);
  273. scheduler.Schedule(TimeSpan.FromTicks(600), () =>
  274. {
  275. if (!d.IsDisposed)
  276. {
  277. o.OnNext(3);
  278. }
  279. });
  280. scheduler.Schedule(TimeSpan.FromTicks(700), () =>
  281. {
  282. if (!d.IsDisposed)
  283. {
  284. o.OnNext(4);
  285. }
  286. });
  287. scheduler.Schedule(TimeSpan.FromTicks(900), () =>
  288. {
  289. if (!d.IsDisposed)
  290. {
  291. o.OnNext(5);
  292. }
  293. });
  294. scheduler.Schedule(TimeSpan.FromTicks(1100), () =>
  295. {
  296. if (!d.IsDisposed)
  297. {
  298. o.OnNext(6);
  299. }
  300. });
  301. return d;
  302. })
  303. );
  304. res.Messages.AssertEqual(
  305. OnNext(200, 1),
  306. OnNext(200, 2),
  307. OnNext(800, 3),
  308. OnNext(900, 4)
  309. );
  310. }
  311. [TestMethod]
  312. public void CreateWithDisposable_ObserverThrows()
  313. {
  314. ReactiveAssert.Throws<InvalidOperationException>(() =>
  315. Observable.Create<int>(o =>
  316. {
  317. o.OnNext(1);
  318. return Disposable.Empty;
  319. }).Subscribe(x => { throw new InvalidOperationException(); }));
  320. ReactiveAssert.Throws<InvalidOperationException>(() =>
  321. Observable.Create<int>(o =>
  322. {
  323. o.OnError(new Exception());
  324. return Disposable.Empty;
  325. }).Subscribe(x => { }, ex => { throw new InvalidOperationException(); }));
  326. ReactiveAssert.Throws<InvalidOperationException>(() =>
  327. Observable.Create<int>(o =>
  328. {
  329. o.OnCompleted();
  330. return Disposable.Empty;
  331. }).Subscribe(x => { }, ex => { }, () => { throw new InvalidOperationException(); }));
  332. }
  333. [TestMethod]
  334. public void Iterate_ArgumentChecking()
  335. {
  336. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableEx.Create<int>(default));
  337. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableEx.Create(DummyFunc<IObserver<int>, IEnumerable<IObservable<object>>>.Instance).Subscribe(null));
  338. }
  339. private IEnumerable<IObservable<object>> ToIterate_Complete(IObservable<int> xs, IObservable<int> ys, IObservable<int> zs, IObserver<int> observer)
  340. {
  341. observer.OnNext(1);
  342. yield return xs.Select(x => new object());
  343. observer.OnNext(2);
  344. yield return ys.Select(x => new object());
  345. observer.OnNext(3);
  346. observer.OnCompleted();
  347. yield return zs.Select(x => new object());
  348. observer.OnNext(4);
  349. }
  350. [TestMethod]
  351. public void Iterate_Complete()
  352. {
  353. var scheduler = new TestScheduler();
  354. var xs = scheduler.CreateColdObservable(
  355. OnNext(10, 1),
  356. OnNext(20, 2),
  357. OnNext(30, 3),
  358. OnNext(40, 4),
  359. OnCompleted<int>(50)
  360. );
  361. var ys = scheduler.CreateColdObservable(
  362. OnNext(10, 1),
  363. OnNext(20, 2),
  364. OnCompleted<int>(30)
  365. );
  366. var zs = scheduler.CreateColdObservable(
  367. OnNext(10, 1),
  368. OnNext(20, 2),
  369. OnNext(30, 3),
  370. OnNext(40, 4),
  371. OnNext(50, 5),
  372. OnCompleted<int>(60)
  373. );
  374. var res = scheduler.Start(() => ObservableEx.Create<int>(observer => ToIterate_Complete(xs, ys, zs, observer)));
  375. res.Messages.AssertEqual(
  376. OnNext(200, 1),
  377. OnNext(250, 2),
  378. OnNext(280, 3),
  379. OnCompleted<int>(280)
  380. );
  381. xs.Subscriptions.AssertEqual(
  382. Subscribe(200, 250)
  383. );
  384. ys.Subscriptions.AssertEqual(
  385. Subscribe(250, 280)
  386. );
  387. zs.Subscriptions.AssertEqual(
  388. Subscribe(280, 280)
  389. );
  390. }
  391. private IEnumerable<IObservable<object>> ToIterate_Complete_Implicit(IObservable<int> xs, IObservable<int> ys, IObservable<int> zs, IObserver<int> observer)
  392. {
  393. observer.OnNext(1);
  394. yield return xs.Select(x => new object());
  395. observer.OnNext(2);
  396. yield return ys.Select(x => new object());
  397. observer.OnNext(3);
  398. yield return zs.Select(x => new object());
  399. observer.OnNext(4);
  400. }
  401. [TestMethod]
  402. public void Iterate_Complete_Implicit()
  403. {
  404. var scheduler = new TestScheduler();
  405. var xs = scheduler.CreateColdObservable(
  406. OnNext(10, 1),
  407. OnNext(20, 2),
  408. OnNext(30, 3),
  409. OnNext(40, 4),
  410. OnCompleted<int>(50)
  411. );
  412. var ys = scheduler.CreateColdObservable(
  413. OnNext(10, 1),
  414. OnNext(20, 2),
  415. OnCompleted<int>(30)
  416. );
  417. var zs = scheduler.CreateColdObservable(
  418. OnNext(10, 1),
  419. OnNext(20, 2),
  420. OnNext(30, 3),
  421. OnNext(40, 4),
  422. OnNext(50, 5),
  423. OnCompleted<int>(60)
  424. );
  425. var res = scheduler.Start(() => ObservableEx.Create<int>(observer => ToIterate_Complete_Implicit(xs, ys, zs, observer)));
  426. res.Messages.AssertEqual(
  427. OnNext(200, 1),
  428. OnNext(250, 2),
  429. OnNext(280, 3),
  430. OnNext(340, 4),
  431. OnCompleted<int>(340)
  432. );
  433. xs.Subscriptions.AssertEqual(
  434. Subscribe(200, 250)
  435. );
  436. ys.Subscriptions.AssertEqual(
  437. Subscribe(250, 280)
  438. );
  439. zs.Subscriptions.AssertEqual(
  440. Subscribe(280, 340)
  441. );
  442. }
  443. private IEnumerable<IObservable<object>> ToIterate_Throw(IObservable<int> xs, IObservable<int> ys, IObservable<int> zs, IObserver<int> observer, Exception ex)
  444. {
  445. observer.OnNext(1);
  446. yield return xs.Select(x => new object());
  447. observer.OnNext(2);
  448. yield return ys.Select(x => new object());
  449. observer.OnNext(3);
  450. if (xs != null)
  451. {
  452. throw ex;
  453. }
  454. yield return zs.Select(x => new object());
  455. observer.OnNext(4);
  456. observer.OnCompleted();
  457. }
  458. [TestMethod]
  459. public void Iterate_Iterator_Throw()
  460. {
  461. var scheduler = new TestScheduler();
  462. var xs = scheduler.CreateColdObservable(
  463. OnNext(10, 1),
  464. OnNext(20, 2),
  465. OnNext(30, 3),
  466. OnNext(40, 4),
  467. OnCompleted<int>(50)
  468. );
  469. var ys = scheduler.CreateColdObservable(
  470. OnNext(10, 1),
  471. OnNext(20, 2),
  472. OnCompleted<int>(30)
  473. );
  474. var zs = scheduler.CreateColdObservable(
  475. OnNext(10, 1),
  476. OnNext(20, 2),
  477. OnNext(30, 3),
  478. OnNext(40, 4),
  479. OnNext(50, 5),
  480. OnCompleted<int>(60)
  481. );
  482. var ex = new Exception();
  483. var res = scheduler.Start(() => ObservableEx.Create<int>(observer => ToIterate_Throw(xs, ys, zs, observer, ex)));
  484. res.Messages.AssertEqual(
  485. OnNext(200, 1),
  486. OnNext(250, 2),
  487. OnNext(280, 3),
  488. OnError<int>(280, ex)
  489. );
  490. xs.Subscriptions.AssertEqual(
  491. Subscribe(200, 250)
  492. );
  493. ys.Subscriptions.AssertEqual(
  494. Subscribe(250, 280)
  495. );
  496. zs.Subscriptions.AssertEqual(
  497. );
  498. }
  499. private IEnumerable<IObservable<object>> ToIterate_Error(IObservable<int> xs, IObservable<int> ys, IObservable<int> zs, IObserver<int> observer, Exception ex)
  500. {
  501. observer.OnNext(1);
  502. yield return xs.Select(x => new object());
  503. observer.OnNext(2);
  504. observer.OnError(ex);
  505. yield return ys.Select(x => new object());
  506. observer.OnNext(3);
  507. yield return zs.Select(x => new object());
  508. observer.OnNext(4);
  509. observer.OnCompleted();
  510. }
  511. [TestMethod]
  512. public void Iterate_Iterator_Error()
  513. {
  514. var scheduler = new TestScheduler();
  515. var ex = new Exception();
  516. var xs = scheduler.CreateColdObservable(
  517. OnNext(10, 1),
  518. OnNext(20, 2),
  519. OnNext(30, 3),
  520. OnNext(40, 4),
  521. OnCompleted<int>(50)
  522. );
  523. var ys = scheduler.CreateColdObservable(
  524. OnNext(10, 1),
  525. OnNext(20, 2),
  526. OnCompleted<int>(30)
  527. );
  528. var zs = scheduler.CreateColdObservable(
  529. OnNext(10, 1),
  530. OnNext(20, 2),
  531. OnNext(30, 3),
  532. OnNext(40, 4),
  533. OnNext(50, 5),
  534. OnCompleted<int>(60)
  535. );
  536. var res = scheduler.Start(() => ObservableEx.Create<int>(observer => ToIterate_Error(xs, ys, zs, observer, ex)));
  537. res.Messages.AssertEqual(
  538. OnNext(200, 1),
  539. OnNext(250, 2),
  540. OnError<int>(250, ex)
  541. );
  542. xs.Subscriptions.AssertEqual(
  543. Subscribe(200, 250)
  544. );
  545. ys.Subscriptions.AssertEqual(
  546. Subscribe(250, 250)
  547. );
  548. zs.Subscriptions.AssertEqual(
  549. );
  550. }
  551. private IEnumerable<IObservable<object>> ToIterate_Complete_Dispose(IObservable<int> xs, IObservable<int> ys, IObservable<int> zs, IObserver<int> observer)
  552. {
  553. observer.OnNext(1);
  554. yield return xs.Select(x => new object());
  555. observer.OnNext(2);
  556. yield return ys.Select(x => new object());
  557. observer.OnNext(3);
  558. yield return zs.Select(x => new object());
  559. observer.OnNext(4);
  560. }
  561. [TestMethod]
  562. public void Iterate_Complete_Dispose()
  563. {
  564. var scheduler = new TestScheduler();
  565. var xs = scheduler.CreateColdObservable(
  566. OnNext(10, 1),
  567. OnNext(20, 2),
  568. OnNext(30, 3),
  569. OnNext(40, 4),
  570. OnCompleted<int>(50)
  571. );
  572. var ys = scheduler.CreateColdObservable(
  573. OnNext(10, 1),
  574. OnNext(20, 2),
  575. OnCompleted<int>(30)
  576. );
  577. var zs = scheduler.CreateColdObservable(
  578. OnNext(100, 1),
  579. OnNext(200, 2),
  580. OnNext(300, 3),
  581. OnNext(400, 4),
  582. OnNext(500, 5),
  583. OnNext(600, 6),
  584. OnNext(700, 7),
  585. OnNext(800, 8),
  586. OnNext(900, 9),
  587. OnNext(1000, 10)
  588. );
  589. var res = scheduler.Start(() => ObservableEx.Create<int>(observer => ToIterate_Complete_Dispose(xs, ys, zs, observer)));
  590. res.Messages.AssertEqual(
  591. OnNext(200, 1),
  592. OnNext(250, 2),
  593. OnNext(280, 3)
  594. );
  595. xs.Subscriptions.AssertEqual(
  596. Subscribe(200, 250)
  597. );
  598. ys.Subscriptions.AssertEqual(
  599. Subscribe(250, 280)
  600. );
  601. zs.Subscriptions.AssertEqual(
  602. Subscribe(280, 1000)
  603. );
  604. }
  605. [TestMethod]
  606. public void IteratorScenario()
  607. {
  608. var xs = ObservableEx.Create<int>(o => _IteratorScenario(100, 1000, o));
  609. xs.AssertEqual(new[] { 100, 1000 }.ToObservable());
  610. }
  611. private static IEnumerable<IObservable<object>> _IteratorScenario(int x, int y, IObserver<int> results)
  612. {
  613. var xs = Observable.Range(1, x).ToListObservable();
  614. yield return xs;
  615. results.OnNext(xs.Value);
  616. var ys = Observable.Range(1, y).ToListObservable();
  617. yield return ys;
  618. results.OnNext(ys.Value);
  619. }
  620. [TestMethod]
  621. public void Iterate_Void_ArgumentChecking()
  622. {
  623. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableEx.Create(default));
  624. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableEx.Create(DummyFunc<IEnumerable<IObservable<object>>>.Instance).Subscribe(null));
  625. }
  626. private IEnumerable<IObservable<object>> ToIterate_Void_Complete(IObservable<int> xs, IObservable<int> ys, IObservable<int> zs)
  627. {
  628. yield return xs.Select(x => new object());
  629. yield return ys.Select(x => new object());
  630. yield return zs.Select(x => new object());
  631. }
  632. [TestMethod]
  633. public void Iterate_Void_Complete()
  634. {
  635. var scheduler = new TestScheduler();
  636. var xs = scheduler.CreateColdObservable(
  637. OnNext(10, 1),
  638. OnNext(20, 2),
  639. OnNext(30, 3),
  640. OnNext(40, 4),
  641. OnCompleted<int>(50)
  642. );
  643. var ys = scheduler.CreateColdObservable(
  644. OnNext(10, 1),
  645. OnNext(20, 2),
  646. OnCompleted<int>(30)
  647. );
  648. var zs = scheduler.CreateColdObservable(
  649. OnNext(10, 1),
  650. OnNext(20, 2),
  651. OnNext(30, 3),
  652. OnNext(40, 4),
  653. OnNext(50, 5),
  654. OnCompleted<int>(60)
  655. );
  656. var res = scheduler.Start(() => ObservableEx.Create(() => ToIterate_Void_Complete(xs, ys, zs)));
  657. res.Messages.AssertEqual(
  658. OnCompleted<Unit>(340)
  659. );
  660. xs.Subscriptions.AssertEqual(
  661. Subscribe(200, 250)
  662. );
  663. ys.Subscriptions.AssertEqual(
  664. Subscribe(250, 280)
  665. );
  666. zs.Subscriptions.AssertEqual(
  667. Subscribe(280, 340)
  668. );
  669. }
  670. private IEnumerable<IObservable<object>> ToIterate_Void_Complete_Implicit(IObservable<int> xs, IObservable<int> ys, IObservable<int> zs)
  671. {
  672. yield return xs.Select(x => new object());
  673. yield return ys.Select(x => new object());
  674. yield return zs.Select(x => new object());
  675. }
  676. [TestMethod]
  677. public void Iterate_Void_Complete_Implicit()
  678. {
  679. var scheduler = new TestScheduler();
  680. var xs = scheduler.CreateColdObservable(
  681. OnNext(10, 1),
  682. OnNext(20, 2),
  683. OnNext(30, 3),
  684. OnNext(40, 4),
  685. OnCompleted<int>(50)
  686. );
  687. var ys = scheduler.CreateColdObservable(
  688. OnNext(10, 1),
  689. OnNext(20, 2),
  690. OnCompleted<int>(30)
  691. );
  692. var zs = scheduler.CreateColdObservable(
  693. OnNext(10, 1),
  694. OnNext(20, 2),
  695. OnNext(30, 3),
  696. OnNext(40, 4),
  697. OnNext(50, 5),
  698. OnCompleted<int>(60)
  699. );
  700. var res = scheduler.Start(() => ObservableEx.Create(() => ToIterate_Void_Complete_Implicit(xs, ys, zs)));
  701. res.Messages.AssertEqual(
  702. OnCompleted<Unit>(340)
  703. );
  704. xs.Subscriptions.AssertEqual(
  705. Subscribe(200, 250)
  706. );
  707. ys.Subscriptions.AssertEqual(
  708. Subscribe(250, 280)
  709. );
  710. zs.Subscriptions.AssertEqual(
  711. Subscribe(280, 340)
  712. );
  713. }
  714. private IEnumerable<IObservable<object>> ToIterate_Void_Throw(IObservable<int> xs, IObservable<int> ys, IObservable<int> zs, Exception ex)
  715. {
  716. yield return xs.Select(x => new object());
  717. yield return ys.Select(x => new object());
  718. if (xs != null)
  719. {
  720. throw ex;
  721. }
  722. yield return zs.Select(x => new object());
  723. }
  724. [TestMethod]
  725. public void Iterate_Void_Iterator_Throw()
  726. {
  727. var scheduler = new TestScheduler();
  728. var xs = scheduler.CreateColdObservable(
  729. OnNext(10, 1),
  730. OnNext(20, 2),
  731. OnNext(30, 3),
  732. OnNext(40, 4),
  733. OnCompleted<int>(50)
  734. );
  735. var ys = scheduler.CreateColdObservable(
  736. OnNext(10, 1),
  737. OnNext(20, 2),
  738. OnCompleted<int>(30)
  739. );
  740. var zs = scheduler.CreateColdObservable(
  741. OnNext(10, 1),
  742. OnNext(20, 2),
  743. OnNext(30, 3),
  744. OnNext(40, 4),
  745. OnNext(50, 5),
  746. OnCompleted<int>(60)
  747. );
  748. var ex = new Exception();
  749. var res = scheduler.Start(() => ObservableEx.Create(() => ToIterate_Void_Throw(xs, ys, zs, ex)));
  750. res.Messages.AssertEqual(
  751. OnError<Unit>(280, ex)
  752. );
  753. xs.Subscriptions.AssertEqual(
  754. Subscribe(200, 250)
  755. );
  756. ys.Subscriptions.AssertEqual(
  757. Subscribe(250, 280)
  758. );
  759. zs.Subscriptions.AssertEqual(
  760. );
  761. }
  762. private IEnumerable<IObservable<object>> ToIterate_Void_Complete_Dispose(IObservable<int> xs, IObservable<int> ys, IObservable<int> zs)
  763. {
  764. yield return xs.Select(x => new object());
  765. yield return ys.Select(x => new object());
  766. yield return zs.Select(x => new object());
  767. }
  768. [TestMethod]
  769. public void Iterate_Void_Complete_Dispose()
  770. {
  771. var scheduler = new TestScheduler();
  772. var xs = scheduler.CreateColdObservable(
  773. OnNext(10, 1),
  774. OnNext(20, 2),
  775. OnNext(30, 3),
  776. OnNext(40, 4),
  777. OnCompleted<int>(50)
  778. );
  779. var ys = scheduler.CreateColdObservable(
  780. OnNext(10, 1),
  781. OnNext(20, 2),
  782. OnCompleted<int>(30)
  783. );
  784. var zs = scheduler.CreateColdObservable(
  785. OnNext(100, 1),
  786. OnNext(200, 2),
  787. OnNext(300, 3),
  788. OnNext(400, 4),
  789. OnNext(500, 5),
  790. OnNext(600, 6),
  791. OnNext(700, 7),
  792. OnNext(800, 8),
  793. OnNext(900, 9),
  794. OnNext(1000, 10)
  795. );
  796. var res = scheduler.Start(() => ObservableEx.Create(() => ToIterate_Void_Complete_Dispose(xs, ys, zs)));
  797. res.Messages.AssertEqual(
  798. );
  799. xs.Subscriptions.AssertEqual(
  800. Subscribe(200, 250)
  801. );
  802. ys.Subscriptions.AssertEqual(
  803. Subscribe(250, 280)
  804. );
  805. zs.Subscriptions.AssertEqual(
  806. Subscribe(280, 1000)
  807. );
  808. }
  809. [TestMethod]
  810. public void Iterate_Void_Func_Throw()
  811. {
  812. var scheduler = new TestScheduler();
  813. var obs = scheduler.Start(() => ObservableEx.Create(() => { throw new InvalidOperationException(); }));
  814. Assert.Equal(1, obs.Messages.Count);
  815. var notification = obs.Messages[0].Value;
  816. Assert.Equal(NotificationKind.OnError, notification.Kind);
  817. Assert.IsType<InvalidOperationException>(notification.Exception);
  818. }
  819. private static IEnumerable<IObservable<object>> _IteratorScenario_Void(int x, int y)
  820. {
  821. var xs = Observable.Range(1, x).ToListObservable();
  822. yield return xs;
  823. var ys = Observable.Range(1, y).ToListObservable();
  824. yield return ys;
  825. }
  826. [TestMethod]
  827. public void IteratorScenario_Void()
  828. {
  829. var xs = ObservableEx.Create(() => _IteratorScenario_Void(100, 1000));
  830. xs.AssertEqual(new Unit[] { }.ToObservable());
  831. }
  832. }
  833. }