CreateTest.cs 30 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Reactive;
  8. using System.Reactive.Concurrency;
  9. using System.Reactive.Disposables;
  10. using System.Reactive.Linq;
  11. using Microsoft.Reactive.Testing;
  12. using ReactiveTests.Dummies;
  13. using Xunit;
  14. namespace ReactiveTests.Tests
  15. {
  16. public class CreateTest : ReactiveTest
  17. {
  18. [Fact]
  19. public void Create_ArgumentChecking()
  20. {
  21. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Create<int>(default(Func<IObserver<int>, Action>)));
  22. //
  23. // BREAKING CHANGE v2.0 > v1.x - Returning null from Subscribe means "nothing to do upon unsubscription"
  24. // all null-coalesces to Disposable.Empty.
  25. //
  26. //ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Create<int>(o => default(Action)).Subscribe(DummyObserver<int>.Instance));
  27. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Create<int>(o => () => { }).Subscribe(null));
  28. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Create<int>(o =>
  29. {
  30. o.OnError(null);
  31. return () => { };
  32. }).Subscribe(null));
  33. }
  34. [Fact]
  35. public void Create_NullCoalescingAction()
  36. {
  37. var xs = Observable.Create<int>(o =>
  38. {
  39. o.OnNext(42);
  40. return default(Action);
  41. });
  42. var lst = new List<int>();
  43. var d = xs.Subscribe(lst.Add);
  44. d.Dispose();
  45. Assert.True(lst.SequenceEqual(new[] { 42 }));
  46. }
  47. [Fact]
  48. public void Create_Next()
  49. {
  50. var scheduler = new TestScheduler();
  51. var res = scheduler.Start(() =>
  52. Observable.Create<int>(o =>
  53. {
  54. o.OnNext(1);
  55. o.OnNext(2);
  56. return () => { };
  57. })
  58. );
  59. res.Messages.AssertEqual(
  60. OnNext(200, 1),
  61. OnNext(200, 2)
  62. );
  63. }
  64. [Fact]
  65. public void Create_Completed()
  66. {
  67. var scheduler = new TestScheduler();
  68. var res = scheduler.Start(() =>
  69. Observable.Create<int>(o =>
  70. {
  71. o.OnCompleted();
  72. o.OnNext(100);
  73. o.OnError(new Exception());
  74. o.OnCompleted();
  75. return () => { };
  76. })
  77. );
  78. res.Messages.AssertEqual(
  79. OnCompleted<int>(200)
  80. );
  81. }
  82. [Fact]
  83. public void Create_Error()
  84. {
  85. var scheduler = new TestScheduler();
  86. var ex = new Exception();
  87. var res = scheduler.Start(() =>
  88. Observable.Create<int>(o =>
  89. {
  90. o.OnError(ex);
  91. o.OnNext(100);
  92. o.OnError(new Exception());
  93. o.OnCompleted();
  94. return () => { };
  95. })
  96. );
  97. res.Messages.AssertEqual(
  98. OnError<int>(200, ex)
  99. );
  100. }
  101. [Fact]
  102. public void Create_Exception()
  103. {
  104. ReactiveAssert.Throws<InvalidOperationException>(() =>
  105. Observable.Create<int>(new Func<IObserver<int>, Action>(o => { throw new InvalidOperationException(); })).Subscribe());
  106. }
  107. [Fact]
  108. public void Create_Dispose()
  109. {
  110. var scheduler = new TestScheduler();
  111. var res = scheduler.Start(() =>
  112. Observable.Create<int>(o =>
  113. {
  114. var stopped = false;
  115. o.OnNext(1);
  116. o.OnNext(2);
  117. scheduler.Schedule(TimeSpan.FromTicks(600), () =>
  118. {
  119. if (!stopped)
  120. {
  121. o.OnNext(3);
  122. }
  123. });
  124. scheduler.Schedule(TimeSpan.FromTicks(700), () =>
  125. {
  126. if (!stopped)
  127. {
  128. o.OnNext(4);
  129. }
  130. });
  131. scheduler.Schedule(TimeSpan.FromTicks(900), () =>
  132. {
  133. if (!stopped)
  134. {
  135. o.OnNext(5);
  136. }
  137. });
  138. scheduler.Schedule(TimeSpan.FromTicks(1100), () =>
  139. {
  140. if (!stopped)
  141. {
  142. o.OnNext(6);
  143. }
  144. });
  145. return () => { stopped = true; };
  146. })
  147. );
  148. res.Messages.AssertEqual(
  149. OnNext(200, 1),
  150. OnNext(200, 2),
  151. OnNext(800, 3),
  152. OnNext(900, 4)
  153. );
  154. }
  155. [Fact]
  156. public void Create_ObserverThrows()
  157. {
  158. ReactiveAssert.Throws<InvalidOperationException>(() =>
  159. Observable.Create<int>(o =>
  160. {
  161. o.OnNext(1);
  162. return () => { };
  163. }).Subscribe(x => { throw new InvalidOperationException(); }));
  164. ReactiveAssert.Throws<InvalidOperationException>(() =>
  165. Observable.Create<int>(o =>
  166. {
  167. o.OnError(new Exception());
  168. return () => { };
  169. }).Subscribe(x => { }, ex => { throw new InvalidOperationException(); }));
  170. ReactiveAssert.Throws<InvalidOperationException>(() =>
  171. Observable.Create<int>(o =>
  172. {
  173. o.OnCompleted();
  174. return () => { };
  175. }).Subscribe(x => { }, ex => { }, () => { throw new InvalidOperationException(); }));
  176. }
  177. [Fact]
  178. public void CreateWithDisposable_ArgumentChecking()
  179. {
  180. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Create<int>(default(Func<IObserver<int>, IDisposable>)));
  181. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Create<int>(o => DummyDisposable.Instance).Subscribe(null));
  182. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Create<int>(o =>
  183. {
  184. o.OnError(null);
  185. return DummyDisposable.Instance;
  186. }).Subscribe(null));
  187. }
  188. [Fact]
  189. public void CreateWithDisposable_NullCoalescingAction()
  190. {
  191. var xs = Observable.Create<int>(o =>
  192. {
  193. o.OnNext(42);
  194. return default(IDisposable);
  195. });
  196. var lst = new List<int>();
  197. var d = xs.Subscribe(lst.Add);
  198. d.Dispose();
  199. Assert.True(lst.SequenceEqual(new[] { 42 }));
  200. }
  201. [Fact]
  202. public void CreateWithDisposable_Next()
  203. {
  204. var scheduler = new TestScheduler();
  205. var res = scheduler.Start(() =>
  206. Observable.Create<int>(o =>
  207. {
  208. o.OnNext(1);
  209. o.OnNext(2);
  210. return Disposable.Empty;
  211. })
  212. );
  213. res.Messages.AssertEqual(
  214. OnNext(200, 1),
  215. OnNext(200, 2)
  216. );
  217. }
  218. [Fact]
  219. public void CreateWithDisposable_Completed()
  220. {
  221. var scheduler = new TestScheduler();
  222. var res = scheduler.Start(() =>
  223. Observable.Create<int>(o =>
  224. {
  225. o.OnCompleted();
  226. o.OnNext(100);
  227. o.OnError(new Exception());
  228. o.OnCompleted();
  229. return Disposable.Empty;
  230. })
  231. );
  232. res.Messages.AssertEqual(
  233. OnCompleted<int>(200)
  234. );
  235. }
  236. [Fact]
  237. public void CreateWithDisposable_Error()
  238. {
  239. var scheduler = new TestScheduler();
  240. var ex = new Exception();
  241. var res = scheduler.Start(() =>
  242. Observable.Create<int>(o =>
  243. {
  244. o.OnError(ex);
  245. o.OnNext(100);
  246. o.OnError(new Exception());
  247. o.OnCompleted();
  248. return Disposable.Empty;
  249. })
  250. );
  251. res.Messages.AssertEqual(
  252. OnError<int>(200, ex)
  253. );
  254. }
  255. [Fact]
  256. public void CreateWithDisposable_Exception()
  257. {
  258. ReactiveAssert.Throws<InvalidOperationException>(() =>
  259. Observable.Create<int>(new Func<IObserver<int>, IDisposable>(o => { throw new InvalidOperationException(); })).Subscribe());
  260. }
  261. [Fact]
  262. public void CreateWithDisposable_Dispose()
  263. {
  264. var scheduler = new TestScheduler();
  265. var res = scheduler.Start(() =>
  266. Observable.Create<int>(o =>
  267. {
  268. var d = new BooleanDisposable();
  269. o.OnNext(1);
  270. o.OnNext(2);
  271. scheduler.Schedule(TimeSpan.FromTicks(600), () =>
  272. {
  273. if (!d.IsDisposed)
  274. {
  275. o.OnNext(3);
  276. }
  277. });
  278. scheduler.Schedule(TimeSpan.FromTicks(700), () =>
  279. {
  280. if (!d.IsDisposed)
  281. {
  282. o.OnNext(4);
  283. }
  284. });
  285. scheduler.Schedule(TimeSpan.FromTicks(900), () =>
  286. {
  287. if (!d.IsDisposed)
  288. {
  289. o.OnNext(5);
  290. }
  291. });
  292. scheduler.Schedule(TimeSpan.FromTicks(1100), () =>
  293. {
  294. if (!d.IsDisposed)
  295. {
  296. o.OnNext(6);
  297. }
  298. });
  299. return d;
  300. })
  301. );
  302. res.Messages.AssertEqual(
  303. OnNext(200, 1),
  304. OnNext(200, 2),
  305. OnNext(800, 3),
  306. OnNext(900, 4)
  307. );
  308. }
  309. [Fact]
  310. public void CreateWithDisposable_ObserverThrows()
  311. {
  312. ReactiveAssert.Throws<InvalidOperationException>(() =>
  313. Observable.Create<int>(o =>
  314. {
  315. o.OnNext(1);
  316. return Disposable.Empty;
  317. }).Subscribe(x => { throw new InvalidOperationException(); }));
  318. ReactiveAssert.Throws<InvalidOperationException>(() =>
  319. Observable.Create<int>(o =>
  320. {
  321. o.OnError(new Exception());
  322. return Disposable.Empty;
  323. }).Subscribe(x => { }, ex => { throw new InvalidOperationException(); }));
  324. ReactiveAssert.Throws<InvalidOperationException>(() =>
  325. Observable.Create<int>(o =>
  326. {
  327. o.OnCompleted();
  328. return Disposable.Empty;
  329. }).Subscribe(x => { }, ex => { }, () => { throw new InvalidOperationException(); }));
  330. }
  331. [Fact]
  332. public void Iterate_ArgumentChecking()
  333. {
  334. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableEx.Create<int>(default));
  335. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableEx.Create(DummyFunc<IObserver<int>, IEnumerable<IObservable<Object>>>.Instance).Subscribe(null));
  336. }
  337. private IEnumerable<IObservable<Object>> ToIterate_Complete(IObservable<int> xs, IObservable<int> ys, IObservable<int> zs, IObserver<int> observer)
  338. {
  339. observer.OnNext(1);
  340. yield return xs.Select(x => new Object());
  341. observer.OnNext(2);
  342. yield return ys.Select(x => new Object());
  343. observer.OnNext(3);
  344. observer.OnCompleted();
  345. yield return zs.Select(x => new Object());
  346. observer.OnNext(4);
  347. }
  348. [Fact]
  349. public void Iterate_Complete()
  350. {
  351. var scheduler = new TestScheduler();
  352. var xs = scheduler.CreateColdObservable(
  353. OnNext(10, 1),
  354. OnNext(20, 2),
  355. OnNext(30, 3),
  356. OnNext(40, 4),
  357. OnCompleted<int>(50)
  358. );
  359. var ys = scheduler.CreateColdObservable(
  360. OnNext(10, 1),
  361. OnNext(20, 2),
  362. OnCompleted<int>(30)
  363. );
  364. var zs = scheduler.CreateColdObservable(
  365. OnNext(10, 1),
  366. OnNext(20, 2),
  367. OnNext(30, 3),
  368. OnNext(40, 4),
  369. OnNext(50, 5),
  370. OnCompleted<int>(60)
  371. );
  372. var res = scheduler.Start(() => ObservableEx.Create<int>(observer => ToIterate_Complete(xs, ys, zs, observer)));
  373. res.Messages.AssertEqual(
  374. OnNext(200, 1),
  375. OnNext(250, 2),
  376. OnNext(280, 3),
  377. OnCompleted<int>(280)
  378. );
  379. xs.Subscriptions.AssertEqual(
  380. Subscribe(200, 250)
  381. );
  382. ys.Subscriptions.AssertEqual(
  383. Subscribe(250, 280)
  384. );
  385. zs.Subscriptions.AssertEqual(
  386. Subscribe(280, 280)
  387. );
  388. }
  389. private IEnumerable<IObservable<Object>> ToIterate_Complete_Implicit(IObservable<int> xs, IObservable<int> ys, IObservable<int> zs, IObserver<int> observer)
  390. {
  391. observer.OnNext(1);
  392. yield return xs.Select(x => new Object());
  393. observer.OnNext(2);
  394. yield return ys.Select(x => new Object());
  395. observer.OnNext(3);
  396. yield return zs.Select(x => new Object());
  397. observer.OnNext(4);
  398. }
  399. [Fact]
  400. public void Iterate_Complete_Implicit()
  401. {
  402. var scheduler = new TestScheduler();
  403. var xs = scheduler.CreateColdObservable(
  404. OnNext(10, 1),
  405. OnNext(20, 2),
  406. OnNext(30, 3),
  407. OnNext(40, 4),
  408. OnCompleted<int>(50)
  409. );
  410. var ys = scheduler.CreateColdObservable(
  411. OnNext(10, 1),
  412. OnNext(20, 2),
  413. OnCompleted<int>(30)
  414. );
  415. var zs = scheduler.CreateColdObservable(
  416. OnNext(10, 1),
  417. OnNext(20, 2),
  418. OnNext(30, 3),
  419. OnNext(40, 4),
  420. OnNext(50, 5),
  421. OnCompleted<int>(60)
  422. );
  423. var res = scheduler.Start(() => ObservableEx.Create<int>(observer => ToIterate_Complete_Implicit(xs, ys, zs, observer)));
  424. res.Messages.AssertEqual(
  425. OnNext(200, 1),
  426. OnNext(250, 2),
  427. OnNext(280, 3),
  428. OnNext(340, 4),
  429. OnCompleted<int>(340)
  430. );
  431. xs.Subscriptions.AssertEqual(
  432. Subscribe(200, 250)
  433. );
  434. ys.Subscriptions.AssertEqual(
  435. Subscribe(250, 280)
  436. );
  437. zs.Subscriptions.AssertEqual(
  438. Subscribe(280, 340)
  439. );
  440. }
  441. private IEnumerable<IObservable<Object>> ToIterate_Throw(IObservable<int> xs, IObservable<int> ys, IObservable<int> zs, IObserver<int> observer, Exception ex)
  442. {
  443. observer.OnNext(1);
  444. yield return xs.Select(x => new Object());
  445. observer.OnNext(2);
  446. yield return ys.Select(x => new Object());
  447. observer.OnNext(3);
  448. if (xs != null)
  449. {
  450. throw ex;
  451. }
  452. yield return zs.Select(x => new Object());
  453. observer.OnNext(4);
  454. observer.OnCompleted();
  455. }
  456. [Fact]
  457. public void Iterate_Iterator_Throw()
  458. {
  459. var scheduler = new TestScheduler();
  460. var xs = scheduler.CreateColdObservable(
  461. OnNext(10, 1),
  462. OnNext(20, 2),
  463. OnNext(30, 3),
  464. OnNext(40, 4),
  465. OnCompleted<int>(50)
  466. );
  467. var ys = scheduler.CreateColdObservable(
  468. OnNext(10, 1),
  469. OnNext(20, 2),
  470. OnCompleted<int>(30)
  471. );
  472. var zs = scheduler.CreateColdObservable(
  473. OnNext(10, 1),
  474. OnNext(20, 2),
  475. OnNext(30, 3),
  476. OnNext(40, 4),
  477. OnNext(50, 5),
  478. OnCompleted<int>(60)
  479. );
  480. var ex = new Exception();
  481. var res = scheduler.Start(() => ObservableEx.Create<int>(observer => ToIterate_Throw(xs, ys, zs, observer, ex)));
  482. res.Messages.AssertEqual(
  483. OnNext(200, 1),
  484. OnNext(250, 2),
  485. OnNext(280, 3),
  486. OnError<int>(280, ex)
  487. );
  488. xs.Subscriptions.AssertEqual(
  489. Subscribe(200, 250)
  490. );
  491. ys.Subscriptions.AssertEqual(
  492. Subscribe(250, 280)
  493. );
  494. zs.Subscriptions.AssertEqual(
  495. );
  496. }
  497. private IEnumerable<IObservable<Object>> ToIterate_Error(IObservable<int> xs, IObservable<int> ys, IObservable<int> zs, IObserver<int> observer, Exception ex)
  498. {
  499. observer.OnNext(1);
  500. yield return xs.Select(x => new Object());
  501. observer.OnNext(2);
  502. observer.OnError(ex);
  503. yield return ys.Select(x => new Object());
  504. observer.OnNext(3);
  505. yield return zs.Select(x => new Object());
  506. observer.OnNext(4);
  507. observer.OnCompleted();
  508. }
  509. [Fact]
  510. public void Iterate_Iterator_Error()
  511. {
  512. var scheduler = new TestScheduler();
  513. var ex = new Exception();
  514. var xs = scheduler.CreateColdObservable(
  515. OnNext(10, 1),
  516. OnNext(20, 2),
  517. OnNext(30, 3),
  518. OnNext(40, 4),
  519. OnCompleted<int>(50)
  520. );
  521. var ys = scheduler.CreateColdObservable(
  522. OnNext(10, 1),
  523. OnNext(20, 2),
  524. OnCompleted<int>(30)
  525. );
  526. var zs = scheduler.CreateColdObservable(
  527. OnNext(10, 1),
  528. OnNext(20, 2),
  529. OnNext(30, 3),
  530. OnNext(40, 4),
  531. OnNext(50, 5),
  532. OnCompleted<int>(60)
  533. );
  534. var res = scheduler.Start(() => ObservableEx.Create<int>(observer => ToIterate_Error(xs, ys, zs, observer, ex)));
  535. res.Messages.AssertEqual(
  536. OnNext(200, 1),
  537. OnNext(250, 2),
  538. OnError<int>(250, ex)
  539. );
  540. xs.Subscriptions.AssertEqual(
  541. Subscribe(200, 250)
  542. );
  543. ys.Subscriptions.AssertEqual(
  544. Subscribe(250, 250)
  545. );
  546. zs.Subscriptions.AssertEqual(
  547. );
  548. }
  549. private IEnumerable<IObservable<Object>> ToIterate_Complete_Dispose(IObservable<int> xs, IObservable<int> ys, IObservable<int> zs, IObserver<int> observer)
  550. {
  551. observer.OnNext(1);
  552. yield return xs.Select(x => new Object());
  553. observer.OnNext(2);
  554. yield return ys.Select(x => new Object());
  555. observer.OnNext(3);
  556. yield return zs.Select(x => new Object());
  557. observer.OnNext(4);
  558. }
  559. [Fact]
  560. public void Iterate_Complete_Dispose()
  561. {
  562. var scheduler = new TestScheduler();
  563. var xs = scheduler.CreateColdObservable(
  564. OnNext(10, 1),
  565. OnNext(20, 2),
  566. OnNext(30, 3),
  567. OnNext(40, 4),
  568. OnCompleted<int>(50)
  569. );
  570. var ys = scheduler.CreateColdObservable(
  571. OnNext(10, 1),
  572. OnNext(20, 2),
  573. OnCompleted<int>(30)
  574. );
  575. var zs = scheduler.CreateColdObservable(
  576. OnNext(100, 1),
  577. OnNext(200, 2),
  578. OnNext(300, 3),
  579. OnNext(400, 4),
  580. OnNext(500, 5),
  581. OnNext(600, 6),
  582. OnNext(700, 7),
  583. OnNext(800, 8),
  584. OnNext(900, 9),
  585. OnNext(1000, 10)
  586. );
  587. var res = scheduler.Start(() => ObservableEx.Create<int>(observer => ToIterate_Complete_Dispose(xs, ys, zs, observer)));
  588. res.Messages.AssertEqual(
  589. OnNext(200, 1),
  590. OnNext(250, 2),
  591. OnNext(280, 3)
  592. );
  593. xs.Subscriptions.AssertEqual(
  594. Subscribe(200, 250)
  595. );
  596. ys.Subscriptions.AssertEqual(
  597. Subscribe(250, 280)
  598. );
  599. zs.Subscriptions.AssertEqual(
  600. Subscribe(280, 1000)
  601. );
  602. }
  603. [Fact]
  604. public void IteratorScenario()
  605. {
  606. var xs = ObservableEx.Create<int>(o => _IteratorScenario(100, 1000, o));
  607. xs.AssertEqual(new[] { 100, 1000 }.ToObservable());
  608. }
  609. private static IEnumerable<IObservable<Object>> _IteratorScenario(int x, int y, IObserver<int> results)
  610. {
  611. var xs = Observable.Range(1, x).ToListObservable();
  612. yield return xs;
  613. results.OnNext(xs.Value);
  614. var ys = Observable.Range(1, y).ToListObservable();
  615. yield return ys;
  616. results.OnNext(ys.Value);
  617. }
  618. [Fact]
  619. public void Iterate_Void_ArgumentChecking()
  620. {
  621. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableEx.Create(default));
  622. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableEx.Create(DummyFunc<IEnumerable<IObservable<Object>>>.Instance).Subscribe(null));
  623. }
  624. private IEnumerable<IObservable<Object>> ToIterate_Void_Complete(IObservable<int> xs, IObservable<int> ys, IObservable<int> zs)
  625. {
  626. yield return xs.Select(x => new Object());
  627. yield return ys.Select(x => new Object());
  628. yield return zs.Select(x => new Object());
  629. }
  630. [Fact]
  631. public void Iterate_Void_Complete()
  632. {
  633. var scheduler = new TestScheduler();
  634. var xs = scheduler.CreateColdObservable(
  635. OnNext(10, 1),
  636. OnNext(20, 2),
  637. OnNext(30, 3),
  638. OnNext(40, 4),
  639. OnCompleted<int>(50)
  640. );
  641. var ys = scheduler.CreateColdObservable(
  642. OnNext(10, 1),
  643. OnNext(20, 2),
  644. OnCompleted<int>(30)
  645. );
  646. var zs = scheduler.CreateColdObservable(
  647. OnNext(10, 1),
  648. OnNext(20, 2),
  649. OnNext(30, 3),
  650. OnNext(40, 4),
  651. OnNext(50, 5),
  652. OnCompleted<int>(60)
  653. );
  654. var res = scheduler.Start(() => ObservableEx.Create(() => ToIterate_Void_Complete(xs, ys, zs)));
  655. res.Messages.AssertEqual(
  656. OnCompleted<Unit>(340)
  657. );
  658. xs.Subscriptions.AssertEqual(
  659. Subscribe(200, 250)
  660. );
  661. ys.Subscriptions.AssertEqual(
  662. Subscribe(250, 280)
  663. );
  664. zs.Subscriptions.AssertEqual(
  665. Subscribe(280, 340)
  666. );
  667. }
  668. private IEnumerable<IObservable<Object>> ToIterate_Void_Complete_Implicit(IObservable<int> xs, IObservable<int> ys, IObservable<int> zs)
  669. {
  670. yield return xs.Select(x => new Object());
  671. yield return ys.Select(x => new Object());
  672. yield return zs.Select(x => new Object());
  673. }
  674. [Fact]
  675. public void Iterate_Void_Complete_Implicit()
  676. {
  677. var scheduler = new TestScheduler();
  678. var xs = scheduler.CreateColdObservable(
  679. OnNext(10, 1),
  680. OnNext(20, 2),
  681. OnNext(30, 3),
  682. OnNext(40, 4),
  683. OnCompleted<int>(50)
  684. );
  685. var ys = scheduler.CreateColdObservable(
  686. OnNext(10, 1),
  687. OnNext(20, 2),
  688. OnCompleted<int>(30)
  689. );
  690. var zs = scheduler.CreateColdObservable(
  691. OnNext(10, 1),
  692. OnNext(20, 2),
  693. OnNext(30, 3),
  694. OnNext(40, 4),
  695. OnNext(50, 5),
  696. OnCompleted<int>(60)
  697. );
  698. var res = scheduler.Start(() => ObservableEx.Create(() => ToIterate_Void_Complete_Implicit(xs, ys, zs)));
  699. res.Messages.AssertEqual(
  700. OnCompleted<Unit>(340)
  701. );
  702. xs.Subscriptions.AssertEqual(
  703. Subscribe(200, 250)
  704. );
  705. ys.Subscriptions.AssertEqual(
  706. Subscribe(250, 280)
  707. );
  708. zs.Subscriptions.AssertEqual(
  709. Subscribe(280, 340)
  710. );
  711. }
  712. private IEnumerable<IObservable<Object>> ToIterate_Void_Throw(IObservable<int> xs, IObservable<int> ys, IObservable<int> zs, Exception ex)
  713. {
  714. yield return xs.Select(x => new Object());
  715. yield return ys.Select(x => new Object());
  716. if (xs != null)
  717. {
  718. throw ex;
  719. }
  720. yield return zs.Select(x => new Object());
  721. }
  722. [Fact]
  723. public void Iterate_Void_Iterator_Throw()
  724. {
  725. var scheduler = new TestScheduler();
  726. var xs = scheduler.CreateColdObservable(
  727. OnNext(10, 1),
  728. OnNext(20, 2),
  729. OnNext(30, 3),
  730. OnNext(40, 4),
  731. OnCompleted<int>(50)
  732. );
  733. var ys = scheduler.CreateColdObservable(
  734. OnNext(10, 1),
  735. OnNext(20, 2),
  736. OnCompleted<int>(30)
  737. );
  738. var zs = scheduler.CreateColdObservable(
  739. OnNext(10, 1),
  740. OnNext(20, 2),
  741. OnNext(30, 3),
  742. OnNext(40, 4),
  743. OnNext(50, 5),
  744. OnCompleted<int>(60)
  745. );
  746. var ex = new Exception();
  747. var res = scheduler.Start(() => ObservableEx.Create(() => ToIterate_Void_Throw(xs, ys, zs, ex)));
  748. res.Messages.AssertEqual(
  749. OnError<Unit>(280, ex)
  750. );
  751. xs.Subscriptions.AssertEqual(
  752. Subscribe(200, 250)
  753. );
  754. ys.Subscriptions.AssertEqual(
  755. Subscribe(250, 280)
  756. );
  757. zs.Subscriptions.AssertEqual(
  758. );
  759. }
  760. private IEnumerable<IObservable<Object>> ToIterate_Void_Complete_Dispose(IObservable<int> xs, IObservable<int> ys, IObservable<int> zs)
  761. {
  762. yield return xs.Select(x => new Object());
  763. yield return ys.Select(x => new Object());
  764. yield return zs.Select(x => new Object());
  765. }
  766. [Fact]
  767. public void Iterate_Void_Complete_Dispose()
  768. {
  769. var scheduler = new TestScheduler();
  770. var xs = scheduler.CreateColdObservable(
  771. OnNext(10, 1),
  772. OnNext(20, 2),
  773. OnNext(30, 3),
  774. OnNext(40, 4),
  775. OnCompleted<int>(50)
  776. );
  777. var ys = scheduler.CreateColdObservable(
  778. OnNext(10, 1),
  779. OnNext(20, 2),
  780. OnCompleted<int>(30)
  781. );
  782. var zs = scheduler.CreateColdObservable(
  783. OnNext(100, 1),
  784. OnNext(200, 2),
  785. OnNext(300, 3),
  786. OnNext(400, 4),
  787. OnNext(500, 5),
  788. OnNext(600, 6),
  789. OnNext(700, 7),
  790. OnNext(800, 8),
  791. OnNext(900, 9),
  792. OnNext(1000, 10)
  793. );
  794. var res = scheduler.Start(() => ObservableEx.Create(() => ToIterate_Void_Complete_Dispose(xs, ys, zs)));
  795. res.Messages.AssertEqual(
  796. );
  797. xs.Subscriptions.AssertEqual(
  798. Subscribe(200, 250)
  799. );
  800. ys.Subscriptions.AssertEqual(
  801. Subscribe(250, 280)
  802. );
  803. zs.Subscriptions.AssertEqual(
  804. Subscribe(280, 1000)
  805. );
  806. }
  807. [Fact]
  808. public void Iterate_Void_Func_Throw()
  809. {
  810. var scheduler = new TestScheduler();
  811. var obs = scheduler.Start(() => ObservableEx.Create(() => { throw new InvalidOperationException(); }));
  812. Assert.Equal(1, obs.Messages.Count);
  813. var notification = obs.Messages[0].Value;
  814. Assert.Equal(NotificationKind.OnError, notification.Kind);
  815. Assert.IsType<InvalidOperationException>(notification.Exception);
  816. }
  817. private static IEnumerable<IObservable<Object>> _IteratorScenario_Void(int x, int y)
  818. {
  819. var xs = Observable.Range(1, x).ToListObservable();
  820. yield return xs;
  821. var ys = Observable.Range(1, y).ToListObservable();
  822. yield return ys;
  823. }
  824. [Fact]
  825. public void IteratorScenario_Void()
  826. {
  827. var xs = ObservableEx.Create(() => _IteratorScenario_Void(100, 1000));
  828. xs.AssertEqual(new Unit[] { }.ToObservable());
  829. }
  830. }
  831. }