ObservableSingleTest.cs 128 KB


  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. using System;
  3. using System.Collections.Generic;
  4. using System.Linq;
  5. using System.Reactive;
  6. using System.Reactive.Concurrency;
  7. using System.Reactive.Linq;
  8. using Microsoft.Reactive.Testing;
  9. using Xunit;
  10. using ReactiveTests.Dummies;
  11. namespace ReactiveTests.Tests
  12. {
  13. public class ObservableSingleTest : ReactiveTest
  14. {
  15. #region + AsObservable +
  16. [Fact]
  17. public void AsObservable_ArgumentChecking()
  18. {
  19. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.AsObservable<int>(null));
  20. }
  21. [Fact]
  22. public void AsObservable_AsObservable()
  23. {
  24. var scheduler = new TestScheduler();
  25. var xs = scheduler.CreateHotObservable(
  26. OnNext(150, 1),
  27. OnNext(220, 2),
  28. OnCompleted<int>(250)
  29. );
  30. var ys = xs.AsObservable().AsObservable();
  31. Assert.NotSame(xs, ys);
  32. var res = scheduler.Start(() =>
  33. ys
  34. );
  35. res.Messages.AssertEqual(
  36. OnNext(220, 2),
  37. OnCompleted<int>(250)
  38. );
  39. xs.Subscriptions.AssertEqual(
  40. Subscribe(200, 250)
  41. );
  42. }
  43. [Fact]
  44. public void AsObservable_Hides()
  45. {
  46. var xs = Observable.Empty<int>();
  47. var res = xs.AsObservable();
  48. Assert.NotSame(xs, res);
  49. }
  50. [Fact]
  51. public void AsObservable_Never()
  52. {
  53. var scheduler = new TestScheduler();
  54. var xs = Observable.Never<int>();
  55. var res = scheduler.Start(() =>
  56. xs.AsObservable()
  57. );
  58. res.Messages.AssertEqual(
  59. );
  60. }
  61. [Fact]
  62. public void AsObservable_Empty()
  63. {
  64. var scheduler = new TestScheduler();
  65. var xs = scheduler.CreateHotObservable(
  66. OnNext(150, 1),
  67. OnCompleted<int>(250)
  68. );
  69. var res = scheduler.Start(() =>
  70. xs.AsObservable()
  71. );
  72. res.Messages.AssertEqual(
  73. OnCompleted<int>(250)
  74. );
  75. xs.Subscriptions.AssertEqual(
  76. Subscribe(200, 250)
  77. );
  78. }
  79. [Fact]
  80. public void AsObservable_Throw()
  81. {
  82. var scheduler = new TestScheduler();
  83. var ex = new Exception();
  84. var xs = scheduler.CreateHotObservable(
  85. OnNext(150, 1),
  86. OnError<int>(250, ex)
  87. );
  88. var res = scheduler.Start(() =>
  89. xs.AsObservable()
  90. );
  91. res.Messages.AssertEqual(
  92. OnError<int>(250, ex)
  93. );
  94. xs.Subscriptions.AssertEqual(
  95. Subscribe(200, 250)
  96. );
  97. }
  98. [Fact]
  99. public void AsObservable_Return()
  100. {
  101. var scheduler = new TestScheduler();
  102. var xs = scheduler.CreateHotObservable(
  103. OnNext(150, 1),
  104. OnNext(220, 2),
  105. OnCompleted<int>(250)
  106. );
  107. var res = scheduler.Start(() =>
  108. xs.AsObservable()
  109. );
  110. res.Messages.AssertEqual(
  111. OnNext(220, 2),
  112. OnCompleted<int>(250)
  113. );
  114. xs.Subscriptions.AssertEqual(
  115. Subscribe(200, 250)
  116. );
  117. }
  118. [Fact]
  119. public void AsObservable_IsNotEager()
  120. {
  121. var scheduler = new TestScheduler();
  122. bool subscribed = false;
  123. var xs = Observable.Create<int>(obs =>
  124. {
  125. subscribed = true;
  126. var disp = scheduler.CreateHotObservable(
  127. OnNext(150, 1),
  128. OnNext(220, 2),
  129. OnCompleted<int>(250)
  130. ).Subscribe(obs);
  131. return disp.Dispose;
  132. });
  133. xs.AsObservable();
  134. Assert.False(subscribed);
  135. var res = scheduler.Start(() =>
  136. xs.AsObservable()
  137. );
  138. Assert.True(subscribed);
  139. }
  140. #endregion
  141. #region + Buffer +
  142. [Fact]
  143. public void Buffer_Single_ArgumentChecking()
  144. {
  145. var someObservable = Observable.Empty<int>();
  146. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), 1));
  147. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(someObservable, 0));
  148. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(someObservable, -1));
  149. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), 1, 1));
  150. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(someObservable, 1, 0));
  151. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(someObservable, 0, 1));
  152. }
  153. [Fact]
  154. public void Buffer_Count_PartialWindow()
  155. {
  156. var scheduler = new TestScheduler();
  157. var xs = scheduler.CreateHotObservable(
  158. OnNext(150, 1),
  159. OnNext(210, 2),
  160. OnNext(220, 3),
  161. OnNext(230, 4),
  162. OnNext(240, 5),
  163. OnCompleted<int>(250)
  164. );
  165. var res = scheduler.Start(() =>
  166. xs.Buffer(5)
  167. );
  168. res.Messages.AssertEqual(
  169. OnNext<IList<int>>(250, l => l.SequenceEqual(new[] { 2, 3, 4, 5 })),
  170. OnCompleted<IList<int>>(250)
  171. );
  172. xs.Subscriptions.AssertEqual(
  173. Subscribe(200, 250)
  174. );
  175. }
  176. [Fact]
  177. public void Buffer_Count_FullWindows()
  178. {
  179. var scheduler = new TestScheduler();
  180. var xs = scheduler.CreateHotObservable(
  181. OnNext(150, 1),
  182. OnNext(210, 2),
  183. OnNext(220, 3),
  184. OnNext(230, 4),
  185. OnNext(240, 5),
  186. OnCompleted<int>(250)
  187. );
  188. var res = scheduler.Start(() =>
  189. xs.Buffer(2)
  190. );
  191. res.Messages.AssertEqual(
  192. OnNext<IList<int>>(220, l => l.SequenceEqual(new[] { 2, 3 })),
  193. OnNext<IList<int>>(240, l => l.SequenceEqual(new[] { 4, 5 })),
  194. OnCompleted<IList<int>>(250)
  195. );
  196. xs.Subscriptions.AssertEqual(
  197. Subscribe(200, 250)
  198. );
  199. }
  200. [Fact]
  201. public void Buffer_Count_FullAndPartialWindows()
  202. {
  203. var scheduler = new TestScheduler();
  204. var xs = scheduler.CreateHotObservable(
  205. OnNext(150, 1),
  206. OnNext(210, 2),
  207. OnNext(220, 3),
  208. OnNext(230, 4),
  209. OnNext(240, 5),
  210. OnCompleted<int>(250)
  211. );
  212. var res = scheduler.Start(() =>
  213. xs.Buffer(3)
  214. );
  215. res.Messages.AssertEqual(
  216. OnNext<IList<int>>(230, l => l.SequenceEqual(new int[] { 2, 3, 4 })),
  217. OnNext<IList<int>>(250, l => l.SequenceEqual(new int[] { 5 })),
  218. OnCompleted<IList<int>>(250)
  219. );
  220. xs.Subscriptions.AssertEqual(
  221. Subscribe(200, 250)
  222. );
  223. }
  224. [Fact]
  225. public void Buffer_Count_Error()
  226. {
  227. var scheduler = new TestScheduler();
  228. var ex = new Exception();
  229. var xs = scheduler.CreateHotObservable(
  230. OnNext(150, 1),
  231. OnNext(210, 2),
  232. OnNext(220, 3),
  233. OnNext(230, 4),
  234. OnNext(240, 5),
  235. OnError<int>(250, ex)
  236. );
  237. var res = scheduler.Start(() =>
  238. xs.Buffer(5)
  239. );
  240. res.Messages.AssertEqual(
  241. OnError<IList<int>>(250, ex)
  242. );
  243. xs.Subscriptions.AssertEqual(
  244. Subscribe(200, 250)
  245. );
  246. }
  247. [Fact]
  248. public void Buffer_Count_Skip_Less()
  249. {
  250. var scheduler = new TestScheduler();
  251. var xs = scheduler.CreateHotObservable(
  252. OnNext(150, 1),
  253. OnNext(210, 2),
  254. OnNext(220, 3),
  255. OnNext(230, 4),
  256. OnNext(240, 5),
  257. OnCompleted<int>(250)
  258. );
  259. var res = scheduler.Start(() =>
  260. xs.Buffer(3, 1)
  261. );
  262. res.Messages.AssertEqual(
  263. OnNext<IList<int>>(230, l => l.SequenceEqual(new int[] { 2, 3, 4 })),
  264. OnNext<IList<int>>(240, l => l.SequenceEqual(new int[] { 3, 4, 5 })),
  265. OnNext<IList<int>>(250, l => l.SequenceEqual(new int[] { 4, 5 })),
  266. OnNext<IList<int>>(250, l => l.SequenceEqual(new int[] { 5 })),
  267. OnCompleted<IList<int>>(250)
  268. );
  269. xs.Subscriptions.AssertEqual(
  270. Subscribe(200, 250)
  271. );
  272. }
  273. [Fact]
  274. public void Buffer_Count_Skip_More()
  275. {
  276. var scheduler = new TestScheduler();
  277. var xs = scheduler.CreateHotObservable(
  278. OnNext(150, 1),
  279. OnNext(210, 2),
  280. OnNext(220, 3),
  281. OnNext(230, 4),
  282. OnNext(240, 5),
  283. OnCompleted<int>(250)
  284. );
  285. var res = scheduler.Start(() =>
  286. xs.Buffer(2, 3)
  287. );
  288. res.Messages.AssertEqual(
  289. OnNext<IList<int>>(220, l => l.SequenceEqual(new int[] { 2, 3 })),
  290. OnNext<IList<int>>(250, l => l.SequenceEqual(new int[] { 5 })),
  291. OnCompleted<IList<int>>(250)
  292. );
  293. xs.Subscriptions.AssertEqual(
  294. Subscribe(200, 250)
  295. );
  296. }
  297. [Fact]
  298. public void BufferWithCount_ArgumentChecking()
  299. {
  300. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), 1, 1));
  301. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, 0, 1));
  302. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, 1, 0));
  303. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), 1));
  304. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, 0));
  305. }
  306. [Fact]
  307. public void BufferWithCount_Basic()
  308. {
  309. var scheduler = new TestScheduler();
  310. var xs = scheduler.CreateHotObservable(
  311. OnNext(100, 1),
  312. OnNext(210, 2),
  313. OnNext(240, 3),
  314. OnNext(280, 4),
  315. OnNext(320, 5),
  316. OnNext(350, 6),
  317. OnNext(380, 7),
  318. OnNext(420, 8),
  319. OnNext(470, 9),
  320. OnCompleted<int>(600)
  321. );
  322. var res = scheduler.Start(() =>
  323. xs.Buffer(3, 2).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray()))
  324. );
  325. res.Messages.AssertEqual(
  326. OnNext(280, "2,3,4"),
  327. OnNext(350, "4,5,6"),
  328. OnNext(420, "6,7,8"),
  329. OnNext(600, "8,9"),
  330. OnCompleted<string>(600)
  331. );
  332. xs.Subscriptions.AssertEqual(
  333. Subscribe(200, 600)
  334. );
  335. }
  336. [Fact]
  337. public void BufferWithCount_Disposed()
  338. {
  339. var scheduler = new TestScheduler();
  340. var xs = scheduler.CreateHotObservable(
  341. OnNext(100, 1),
  342. OnNext(210, 2),
  343. OnNext(240, 3),
  344. OnNext(280, 4),
  345. OnNext(320, 5),
  346. OnNext(350, 6),
  347. OnNext(380, 7),
  348. OnNext(420, 8),
  349. OnNext(470, 9),
  350. OnCompleted<int>(600)
  351. );
  352. var res = scheduler.Start(() =>
  353. xs.Buffer(3, 2).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray())), 370
  354. );
  355. res.Messages.AssertEqual(
  356. OnNext(280, "2,3,4"),
  357. OnNext(350, "4,5,6")
  358. );
  359. xs.Subscriptions.AssertEqual(
  360. Subscribe(200, 370)
  361. );
  362. }
  363. [Fact]
  364. public void BufferWithCount_Error()
  365. {
  366. var scheduler = new TestScheduler();
  367. var ex = new Exception();
  368. var xs = scheduler.CreateHotObservable(
  369. OnNext(100, 1),
  370. OnNext(210, 2),
  371. OnNext(240, 3),
  372. OnNext(280, 4),
  373. OnNext(320, 5),
  374. OnNext(350, 6),
  375. OnNext(380, 7),
  376. OnNext(420, 8),
  377. OnNext(470, 9),
  378. OnError<int>(600, ex)
  379. );
  380. var res = scheduler.Start(() =>
  381. xs.Buffer(3, 2).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray()))
  382. );
  383. res.Messages.AssertEqual(
  384. OnNext(280, "2,3,4"),
  385. OnNext(350, "4,5,6"),
  386. OnNext(420, "6,7,8"),
  387. OnError<string>(600, ex)
  388. );
  389. xs.Subscriptions.AssertEqual(
  390. Subscribe(200, 600)
  391. );
  392. }
  393. [Fact]
  394. public void BufferWithCount_Default()
  395. {
  396. Observable.Range(1, 10).Buffer(3).Skip(1).First().AssertEqual(4, 5, 6);
  397. Observable.Range(1, 10).Buffer(3, 2).Skip(1).First().AssertEqual(3, 4, 5);
  398. }
  399. #endregion
  400. #region + Dematerialize +
  401. [Fact]
  402. public void Dematerialize_ArgumentChecking()
  403. {
  404. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Dematerialize<int>(null));
  405. }
  406. [Fact]
  407. public void Dematerialize_Range1()
  408. {
  409. var scheduler = new TestScheduler();
  410. var xs = scheduler.CreateHotObservable(
  411. OnNext(150, Notification.CreateOnNext(41)),
  412. OnNext(210, Notification.CreateOnNext(42)),
  413. OnNext(220, Notification.CreateOnNext(43)),
  414. OnCompleted<Notification<int>>(250)
  415. );
  416. var res = scheduler.Start(() =>
  417. xs.Dematerialize()
  418. );
  419. res.Messages.AssertEqual(
  420. OnNext(210, 42),
  421. OnNext(220, 43),
  422. OnCompleted<int>(250)
  423. );
  424. xs.Subscriptions.AssertEqual(
  425. Subscribe(200, 250)
  426. );
  427. }
  428. [Fact]
  429. public void Dematerialize_Range2()
  430. {
  431. var scheduler = new TestScheduler();
  432. var xs = scheduler.CreateHotObservable(
  433. OnNext(150, Notification.CreateOnNext(41)),
  434. OnNext(210, Notification.CreateOnNext(42)),
  435. OnNext(220, Notification.CreateOnNext(43)),
  436. OnNext(230, Notification.CreateOnCompleted<int>())
  437. );
  438. var res = scheduler.Start(() =>
  439. xs.Dematerialize()
  440. );
  441. res.Messages.AssertEqual(
  442. OnNext(210, 42),
  443. OnNext(220, 43),
  444. OnCompleted<int>(230)
  445. );
  446. xs.Subscriptions.AssertEqual(
  447. Subscribe(200, 230)
  448. );
  449. }
  450. [Fact]
  451. public void Dematerialize_Error1()
  452. {
  453. var scheduler = new TestScheduler();
  454. var ex = new Exception();
  455. var xs = scheduler.CreateHotObservable(
  456. OnNext(150, Notification.CreateOnNext(41)),
  457. OnNext(210, Notification.CreateOnNext(42)),
  458. OnNext(220, Notification.CreateOnNext(43)),
  459. OnError<Notification<int>>(230, ex)
  460. );
  461. var res = scheduler.Start(() =>
  462. xs.Dematerialize()
  463. );
  464. res.Messages.AssertEqual(
  465. OnNext(210, 42),
  466. OnNext(220, 43),
  467. OnError<int>(230, ex)
  468. );
  469. xs.Subscriptions.AssertEqual(
  470. Subscribe(200, 230)
  471. );
  472. }
  473. [Fact]
  474. public void Dematerialize_Error2()
  475. {
  476. var scheduler = new TestScheduler();
  477. var ex = new Exception();
  478. var xs = scheduler.CreateHotObservable(
  479. OnNext(150, Notification.CreateOnNext(41)),
  480. OnNext(210, Notification.CreateOnNext(42)),
  481. OnNext(220, Notification.CreateOnNext(43)),
  482. OnNext(230, Notification.CreateOnError<int>(ex))
  483. );
  484. var res = scheduler.Start(() =>
  485. xs.Dematerialize()
  486. );
  487. res.Messages.AssertEqual(
  488. OnNext(210, 42),
  489. OnNext(220, 43),
  490. OnError<int>(230, ex)
  491. );
  492. xs.Subscriptions.AssertEqual(
  493. Subscribe(200, 230)
  494. );
  495. }
  496. [Fact]
  497. public void Materialize_Dematerialize_Never()
  498. {
  499. var scheduler = new TestScheduler();
  500. var xs = Observable.Never<int>();
  501. var res = scheduler.Start(() =>
  502. xs.Materialize().Dematerialize()
  503. );
  504. res.Messages.AssertEqual(
  505. );
  506. }
  507. [Fact]
  508. public void Materialize_Dematerialize_Empty()
  509. {
  510. var scheduler = new TestScheduler();
  511. var xs = scheduler.CreateHotObservable(
  512. OnNext(150, 1),
  513. OnCompleted<int>(250)
  514. );
  515. var res = scheduler.Start(() =>
  516. xs.Materialize().Dematerialize()
  517. );
  518. res.Messages.AssertEqual(
  519. OnCompleted<int>(250)
  520. );
  521. xs.Subscriptions.AssertEqual(
  522. Subscribe(200, 250)
  523. );
  524. }
  525. [Fact]
  526. public void Materialize_Dematerialize_Return()
  527. {
  528. var scheduler = new TestScheduler();
  529. var xs = scheduler.CreateHotObservable(
  530. OnNext(150, 1),
  531. OnNext(210, 2),
  532. OnCompleted<int>(250)
  533. );
  534. var res = scheduler.Start(() =>
  535. xs.Materialize().Dematerialize()
  536. );
  537. res.Messages.AssertEqual(
  538. OnNext(210, 2),
  539. OnCompleted<int>(250)
  540. );
  541. xs.Subscriptions.AssertEqual(
  542. Subscribe(200, 250)
  543. );
  544. }
  545. [Fact]
  546. public void Materialize_Dematerialize_Throw()
  547. {
  548. var scheduler = new TestScheduler();
  549. var ex = new Exception();
  550. var xs = scheduler.CreateHotObservable(
  551. OnNext(150, 1),
  552. OnError<int>(250, ex)
  553. );
  554. var res = scheduler.Start(() =>
  555. xs.Materialize().Dematerialize()
  556. );
  557. res.Messages.AssertEqual(
  558. OnError<int>(250, ex)
  559. );
  560. xs.Subscriptions.AssertEqual(
  561. Subscribe(200, 250)
  562. );
  563. }
  564. #endregion
  565. #region + DistinctUntilChanged +
  566. [Fact]
  567. public void DistinctUntilChanged_ArgumentChecking()
  568. {
  569. var someObservable = Observable.Empty<int>();
  570. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.DistinctUntilChanged<int>(null));
  571. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.DistinctUntilChanged<int>(null, EqualityComparer<int>.Default));
  572. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.DistinctUntilChanged<int>(someObservable, null));
  573. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.DistinctUntilChanged<int, int>(null, _ => _));
  574. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.DistinctUntilChanged<int, int>(someObservable, null));
  575. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.DistinctUntilChanged<int, int>(someObservable, _ => _, null));
  576. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.DistinctUntilChanged<int, int>(null, _ => _, EqualityComparer<int>.Default));
  577. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.DistinctUntilChanged<int, int>(someObservable, null, EqualityComparer<int>.Default));
  578. }
  579. [Fact]
  580. public void DistinctUntilChanged_Never()
  581. {
  582. var scheduler = new TestScheduler();
  583. var xs = Observable.Never<int>();
  584. var res = scheduler.Start(() =>
  585. xs.DistinctUntilChanged()
  586. );
  587. res.Messages.AssertEqual(
  588. );
  589. }
  590. [Fact]
  591. public void DistinctUntilChanged_Empty()
  592. {
  593. var scheduler = new TestScheduler();
  594. var xs = scheduler.CreateHotObservable(
  595. OnNext(150, 1),
  596. OnCompleted<int>(250)
  597. );
  598. var res = scheduler.Start(() =>
  599. xs.DistinctUntilChanged()
  600. );
  601. res.Messages.AssertEqual(
  602. OnCompleted<int>(250)
  603. );
  604. xs.Subscriptions.AssertEqual(
  605. Subscribe(200, 250)
  606. );
  607. }
  608. [Fact]
  609. public void DistinctUntilChanged_Return()
  610. {
  611. var scheduler = new TestScheduler();
  612. var xs = scheduler.CreateHotObservable(
  613. OnNext(150, 1),
  614. OnNext(220, 2),
  615. OnCompleted<int>(250)
  616. );
  617. var res = scheduler.Start(() =>
  618. xs.DistinctUntilChanged()
  619. );
  620. res.Messages.AssertEqual(
  621. OnNext(220, 2),
  622. OnCompleted<int>(250)
  623. );
  624. xs.Subscriptions.AssertEqual(
  625. Subscribe(200, 250)
  626. );
  627. }
  628. [Fact]
  629. public void DistinctUntilChanged_Throw()
  630. {
  631. var scheduler = new TestScheduler();
  632. var ex = new Exception();
  633. var xs = scheduler.CreateHotObservable(
  634. OnNext(150, 1),
  635. OnError<int>(250, ex)
  636. );
  637. var res = scheduler.Start(() =>
  638. xs.DistinctUntilChanged()
  639. );
  640. res.Messages.AssertEqual(
  641. OnError<int>(250, ex)
  642. );
  643. xs.Subscriptions.AssertEqual(
  644. Subscribe(200, 250)
  645. );
  646. }
  647. [Fact]
  648. public void DistinctUntilChanged_AllChanges()
  649. {
  650. var scheduler = new TestScheduler();
  651. var xs = scheduler.CreateHotObservable(
  652. OnNext(150, 1),
  653. OnNext(210, 2),
  654. OnNext(220, 3),
  655. OnNext(230, 4),
  656. OnNext(240, 5),
  657. OnCompleted<int>(250)
  658. );
  659. var res = scheduler.Start(() =>
  660. xs.DistinctUntilChanged()
  661. );
  662. res.Messages.AssertEqual(
  663. OnNext(210, 2),
  664. OnNext(220, 3),
  665. OnNext(230, 4),
  666. OnNext(240, 5),
  667. OnCompleted<int>(250)
  668. );
  669. xs.Subscriptions.AssertEqual(
  670. Subscribe(200, 250)
  671. );
  672. }
  673. [Fact]
  674. public void DistinctUntilChanged_AllSame()
  675. {
  676. var scheduler = new TestScheduler();
  677. var xs = scheduler.CreateHotObservable(
  678. OnNext(150, 1),
  679. OnNext(210, 2),
  680. OnNext(220, 2),
  681. OnNext(230, 2),
  682. OnNext(240, 2),
  683. OnCompleted<int>(250)
  684. );
  685. var res = scheduler.Start(() =>
  686. xs.DistinctUntilChanged()
  687. );
  688. res.Messages.AssertEqual(
  689. OnNext(210, 2),
  690. OnCompleted<int>(250)
  691. );
  692. xs.Subscriptions.AssertEqual(
  693. Subscribe(200, 250)
  694. );
  695. }
  696. [Fact]
  697. public void DistinctUntilChanged_SomeChanges()
  698. {
  699. var scheduler = new TestScheduler();
  700. var xs = scheduler.CreateHotObservable(
  701. OnNext(150, 1),
  702. OnNext(210, 2), //*
  703. OnNext(215, 3), //*
  704. OnNext(220, 3),
  705. OnNext(225, 2), //*
  706. OnNext(230, 2),
  707. OnNext(230, 1), //*
  708. OnNext(240, 2), //*
  709. OnCompleted<int>(250)
  710. );
  711. var res = scheduler.Start(() =>
  712. xs.DistinctUntilChanged()
  713. );
  714. res.Messages.AssertEqual(
  715. OnNext(210, 2),
  716. OnNext(215, 3),
  717. OnNext(225, 2),
  718. OnNext(230, 1),
  719. OnNext(240, 2),
  720. OnCompleted<int>(250)
  721. );
  722. xs.Subscriptions.AssertEqual(
  723. Subscribe(200, 250)
  724. );
  725. }
  726. [Fact]
  727. public void DistinctUntilChanged_Comparer_AllEqual()
  728. {
  729. var scheduler = new TestScheduler();
  730. var xs = scheduler.CreateHotObservable(
  731. OnNext(150, 1),
  732. OnNext(210, 2),
  733. OnNext(220, 3),
  734. OnNext(230, 4),
  735. OnNext(240, 5),
  736. OnCompleted<int>(250)
  737. );
  738. var res = scheduler.Start(() =>
  739. xs.DistinctUntilChanged(new FuncComparer<int>((x, y) => true))
  740. );
  741. res.Messages.AssertEqual(
  742. OnNext(210, 2),
  743. OnCompleted<int>(250)
  744. );
  745. xs.Subscriptions.AssertEqual(
  746. Subscribe(200, 250)
  747. );
  748. }
  749. [Fact]
  750. public void DistinctUntilChanged_Comparer_AllDifferent()
  751. {
  752. var scheduler = new TestScheduler();
  753. var xs = scheduler.CreateHotObservable(
  754. OnNext(150, 1),
  755. OnNext(210, 2),
  756. OnNext(220, 2),
  757. OnNext(230, 2),
  758. OnNext(240, 2),
  759. OnCompleted<int>(250)
  760. );
  761. var res = scheduler.Start(() =>
  762. xs.DistinctUntilChanged(new FuncComparer<int>((x, y) => false))
  763. );
  764. res.Messages.AssertEqual(
  765. OnNext(210, 2),
  766. OnNext(220, 2),
  767. OnNext(230, 2),
  768. OnNext(240, 2),
  769. OnCompleted<int>(250)
  770. );
  771. xs.Subscriptions.AssertEqual(
  772. Subscribe(200, 250)
  773. );
  774. }
  775. [Fact]
  776. public void DistinctUntilChanged_KeySelector_Div2()
  777. {
  778. var scheduler = new TestScheduler();
  779. var xs = scheduler.CreateHotObservable(
  780. OnNext(150, 1),
  781. OnNext(210, 2), //*
  782. OnNext(220, 4),
  783. OnNext(230, 3), //*
  784. OnNext(240, 5),
  785. OnCompleted<int>(250)
  786. );
  787. var res = scheduler.Start(() =>
  788. xs.DistinctUntilChanged(x => x % 2)
  789. );
  790. res.Messages.AssertEqual(
  791. OnNext(210, 2),
  792. OnNext(230, 3),
  793. OnCompleted<int>(250)
  794. );
  795. xs.Subscriptions.AssertEqual(
  796. Subscribe(200, 250)
  797. );
  798. }
  799. class FuncComparer<T> : IEqualityComparer<T>
  800. {
  801. private Func<T, T, bool> _equals;
  802. public FuncComparer(Func<T, T, bool> equals)
  803. {
  804. _equals = equals;
  805. }
  806. public bool Equals(T x, T y)
  807. {
  808. return _equals(x, y);
  809. }
  810. public int GetHashCode(T obj)
  811. {
  812. return 0;
  813. }
  814. }
  815. [Fact]
  816. public void DistinctUntilChanged_KeySelectorThrows()
  817. {
  818. var ex = new Exception();
  819. var scheduler = new TestScheduler();
  820. var xs = scheduler.CreateHotObservable(
  821. OnNext(150, 1),
  822. OnNext(210, 2),
  823. OnCompleted<int>(250)
  824. );
  825. var res = scheduler.Start(() =>
  826. xs.DistinctUntilChanged(new Func<int, int>(x => { throw ex; }))
  827. );
  828. res.Messages.AssertEqual(
  829. OnError<int>(210, ex)
  830. );
  831. }
  832. [Fact]
  833. public void DistinctUntilChanged_ComparerThrows()
  834. {
  835. var ex = new Exception();
  836. var scheduler = new TestScheduler();
  837. var xs = scheduler.CreateHotObservable(
  838. OnNext(150, 1),
  839. OnNext(210, 2),
  840. OnNext(220, 3),
  841. OnCompleted<int>(250)
  842. );
  843. var res = scheduler.Start(() =>
  844. xs.DistinctUntilChanged(new ThrowComparer<int>(ex))
  845. );
  846. res.Messages.AssertEqual(
  847. OnNext(210, 2),
  848. OnError<int>(220, ex)
  849. );
  850. }
  851. class ThrowComparer<T> : IEqualityComparer<T>
  852. {
  853. private Exception _ex;
  854. public ThrowComparer(Exception ex)
  855. {
  856. _ex = ex;
  857. }
  858. public bool Equals(T x, T y)
  859. {
  860. throw _ex;
  861. }
  862. public int GetHashCode(T obj)
  863. {
  864. return 0;
  865. }
  866. }
  867. [Fact]
  868. public void DistinctUntilChanged_KeySelector_Comparer()
  869. {
  870. var scheduler = new TestScheduler();
  871. var xs = scheduler.CreateHotObservable(
  872. OnNext(150, 1),
  873. OnNext(210, 2), // * key = 1 % 3 = 1
  874. OnNext(220, 8), // key = 4 % 3 = 1 same
  875. OnNext(230, 2), // key = 1 % 3 = 1 same
  876. OnNext(240, 5), // * key = 2 % 3 = 2
  877. OnCompleted<int>(250)
  878. );
  879. var res = scheduler.Start(() =>
  880. xs.DistinctUntilChanged(x => x / 2, new FuncComparer<int>((x, y) => x % 3 == y % 3))
  881. );
  882. res.Messages.AssertEqual(
  883. OnNext(210, 2),
  884. OnNext(240, 5),
  885. OnCompleted<int>(250)
  886. );
  887. xs.Subscriptions.AssertEqual(
  888. Subscribe(200, 250)
  889. );
  890. }
  891. #endregion
  892. #region + Do +
  893. [Fact]
  894. public void Do_ArgumentChecking()
  895. {
  896. var someObservable = Observable.Empty<int>();
  897. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Do<int>(someObservable, (Action<int>)null));
  898. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Do<int>(null, _ => { }));
  899. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Do<int>(someObservable, x => { }, (Action)null));
  900. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Do<int>(someObservable, (Action<int>)null, () => { }));
  901. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Do<int>(null, x => { }, () => { }));
  902. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Do<int>(someObservable, x => { }, (Action<Exception>)null));
  903. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Do<int>(someObservable, (Action<int>)null, (Exception _) => { }));
  904. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Do<int>(null, x => { }, (Exception _) => { }));
  905. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Do<int>(someObservable, x => { }, (Exception _) => { }, null));
  906. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Do<int>(someObservable, x => { }, (Action<Exception>)null, () => { }));
  907. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Do<int>(someObservable, (Action<int>)null, (Exception _) => { }, () => { }));
  908. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Do<int>(null, x => { }, (Exception _) => { }, () => { }));
  909. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Do<int>(null, Observer.Create<int>(i => { })));
  910. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Do<int>(someObservable, default(IObserver<int>)));
  911. }
  912. [Fact]
  913. public void Do_ShouldSeeAllValues()
  914. {
  915. var scheduler = new TestScheduler();
  916. var xs = scheduler.CreateHotObservable(
  917. OnNext(150, 1),
  918. OnNext(210, 2),
  919. OnNext(220, 3),
  920. OnNext(230, 4),
  921. OnNext(240, 5),
  922. OnCompleted<int>(250)
  923. );
  924. int i = 0;
  925. int sum = 2 + 3 + 4 + 5;
  926. var res = scheduler.Start(() =>
  927. xs.Do(x => { i++; sum -= x; })
  928. );
  929. Assert.Equal(4, i);
  930. Assert.Equal(0, sum);
  931. res.Messages.AssertEqual(
  932. OnNext(210, 2),
  933. OnNext(220, 3),
  934. OnNext(230, 4),
  935. OnNext(240, 5),
  936. OnCompleted<int>(250)
  937. );
  938. xs.Subscriptions.AssertEqual(
  939. Subscribe(200, 250)
  940. );
  941. }
  942. [Fact]
  943. public void Do_PlainAction()
  944. {
  945. var scheduler = new TestScheduler();
  946. var xs = scheduler.CreateHotObservable(
  947. OnNext(150, 1),
  948. OnNext(210, 2),
  949. OnNext(220, 3),
  950. OnNext(230, 4),
  951. OnNext(240, 5),
  952. OnCompleted<int>(250)
  953. );
  954. int i = 0;
  955. var res = scheduler.Start(() =>
  956. xs.Do(_ => { i++; })
  957. );
  958. Assert.Equal(4, i);
  959. res.Messages.AssertEqual(
  960. OnNext(210, 2),
  961. OnNext(220, 3),
  962. OnNext(230, 4),
  963. OnNext(240, 5),
  964. OnCompleted<int>(250)
  965. );
  966. xs.Subscriptions.AssertEqual(
  967. Subscribe(200, 250)
  968. );
  969. }
  970. [Fact]
  971. public void Do_NextCompleted()
  972. {
  973. var scheduler = new TestScheduler();
  974. var xs = scheduler.CreateHotObservable(
  975. OnNext(150, 1),
  976. OnNext(210, 2),
  977. OnNext(220, 3),
  978. OnNext(230, 4),
  979. OnNext(240, 5),
  980. OnCompleted<int>(250)
  981. );
  982. int i = 0;
  983. int sum = 2 + 3 + 4 + 5;
  984. bool completed = false;
  985. var res = scheduler.Start(() =>
  986. xs.Do(x => { i++; sum -= x; }, () => { completed = true; })
  987. );
  988. Assert.Equal(4, i);
  989. Assert.Equal(0, sum);
  990. Assert.True(completed);
  991. res.Messages.AssertEqual(
  992. OnNext(210, 2),
  993. OnNext(220, 3),
  994. OnNext(230, 4),
  995. OnNext(240, 5),
  996. OnCompleted<int>(250)
  997. );
  998. xs.Subscriptions.AssertEqual(
  999. Subscribe(200, 250)
  1000. );
  1001. }
  1002. [Fact]
  1003. public void Do_NextCompleted_Never()
  1004. {
  1005. var scheduler = new TestScheduler();
  1006. var xs = scheduler.CreateHotObservable<int>();
  1007. int i = 0;
  1008. bool completed = false;
  1009. var res = scheduler.Start(() =>
  1010. xs.Do(x => { i++; }, () => { completed = true; })
  1011. );
  1012. Assert.Equal(0, i);
  1013. Assert.False(completed);
  1014. res.Messages.AssertEqual(
  1015. );
  1016. xs.Subscriptions.AssertEqual(
  1017. Subscribe(200, 1000)
  1018. );
  1019. }
  1020. [Fact]
  1021. public void Do_NextError()
  1022. {
  1023. var scheduler = new TestScheduler();
  1024. var ex = new Exception();
  1025. var xs = scheduler.CreateHotObservable(
  1026. OnNext(150, 1),
  1027. OnNext(210, 2),
  1028. OnNext(220, 3),
  1029. OnNext(230, 4),
  1030. OnNext(240, 5),
  1031. OnError<int>(250, ex)
  1032. );
  1033. int i = 0;
  1034. int sum = 2 + 3 + 4 + 5;
  1035. bool sawError = false;
  1036. var res = scheduler.Start(() =>
  1037. xs.Do(x => { i++; sum -= x; }, e => { sawError = e == ex; })
  1038. );
  1039. Assert.Equal(4, i);
  1040. Assert.Equal(0, sum);
  1041. Assert.True(sawError);
  1042. res.Messages.AssertEqual(
  1043. OnNext(210, 2),
  1044. OnNext(220, 3),
  1045. OnNext(230, 4),
  1046. OnNext(240, 5),
  1047. OnError<int>(250, ex)
  1048. );
  1049. xs.Subscriptions.AssertEqual(
  1050. Subscribe(200, 250)
  1051. );
  1052. }
  1053. [Fact]
  1054. public void Do_NextErrorNot()
  1055. {
  1056. var scheduler = new TestScheduler();
  1057. var xs = scheduler.CreateHotObservable(
  1058. OnNext(150, 1),
  1059. OnNext(210, 2),
  1060. OnNext(220, 3),
  1061. OnNext(230, 4),
  1062. OnNext(240, 5),
  1063. OnCompleted<int>(250)
  1064. );
  1065. int i = 0;
  1066. int sum = 2 + 3 + 4 + 5;
  1067. bool sawError = false;
  1068. var res = scheduler.Start(() =>
  1069. xs.Do(x => { i++; sum -= x; }, _ => { sawError = true; })
  1070. );
  1071. Assert.Equal(4, i);
  1072. Assert.Equal(0, sum);
  1073. Assert.False(sawError);
  1074. res.Messages.AssertEqual(
  1075. OnNext(210, 2),
  1076. OnNext(220, 3),
  1077. OnNext(230, 4),
  1078. OnNext(240, 5),
  1079. OnCompleted<int>(250)
  1080. );
  1081. xs.Subscriptions.AssertEqual(
  1082. Subscribe(200, 250)
  1083. );
  1084. }
  1085. [Fact]
  1086. public void Do_NextErrorCompleted()
  1087. {
  1088. var scheduler = new TestScheduler();
  1089. var xs = scheduler.CreateHotObservable(
  1090. OnNext(150, 1),
  1091. OnNext(210, 2),
  1092. OnNext(220, 3),
  1093. OnNext(230, 4),
  1094. OnNext(240, 5),
  1095. OnCompleted<int>(250)
  1096. );
  1097. int i = 0;
  1098. int sum = 2 + 3 + 4 + 5;
  1099. bool sawError = false;
  1100. bool hasCompleted = false;
  1101. var res = scheduler.Start(() =>
  1102. xs.Do(x => { i++; sum -= x; }, e => { sawError = true; }, () => { hasCompleted = true; })
  1103. );
  1104. Assert.Equal(4, i);
  1105. Assert.Equal(0, sum);
  1106. Assert.False(sawError);
  1107. Assert.True(hasCompleted);
  1108. res.Messages.AssertEqual(
  1109. OnNext(210, 2),
  1110. OnNext(220, 3),
  1111. OnNext(230, 4),
  1112. OnNext(240, 5),
  1113. OnCompleted<int>(250)
  1114. );
  1115. xs.Subscriptions.AssertEqual(
  1116. Subscribe(200, 250)
  1117. );
  1118. }
  1119. [Fact]
  1120. public void Do_NextErrorCompletedError()
  1121. {
  1122. var scheduler = new TestScheduler();
  1123. var ex = new Exception();
  1124. var xs = scheduler.CreateHotObservable(
  1125. OnNext(150, 1),
  1126. OnNext(210, 2),
  1127. OnNext(220, 3),
  1128. OnNext(230, 4),
  1129. OnNext(240, 5),
  1130. OnError<int>(250, ex)
  1131. );
  1132. int i = 0;
  1133. int sum = 2 + 3 + 4 + 5;
  1134. bool sawError = false;
  1135. bool hasCompleted = false;
  1136. var res = scheduler.Start(() =>
  1137. xs.Do(x => { i++; sum -= x; }, e => { sawError = e == ex; }, () => { hasCompleted = true; })
  1138. );
  1139. Assert.Equal(4, i);
  1140. Assert.Equal(0, sum);
  1141. Assert.True(sawError);
  1142. Assert.False(hasCompleted);
  1143. res.Messages.AssertEqual(
  1144. OnNext(210, 2),
  1145. OnNext(220, 3),
  1146. OnNext(230, 4),
  1147. OnNext(240, 5),
  1148. OnError<int>(250, ex)
  1149. );
  1150. xs.Subscriptions.AssertEqual(
  1151. Subscribe(200, 250)
  1152. );
  1153. }
  1154. [Fact]
  1155. public void Do_NextErrorCompletedNever()
  1156. {
  1157. var scheduler = new TestScheduler();
  1158. var xs = scheduler.CreateHotObservable<int>();
  1159. int i = 0;
  1160. bool sawError = false;
  1161. bool hasCompleted = false;
  1162. var res = scheduler.Start(() =>
  1163. xs.Do(x => { i++; }, e => { sawError = true; }, () => { hasCompleted = true; })
  1164. );
  1165. Assert.Equal(0, i);
  1166. Assert.False(sawError);
  1167. Assert.False(hasCompleted);
  1168. res.Messages.AssertEqual(
  1169. );
  1170. xs.Subscriptions.AssertEqual(
  1171. Subscribe(200, 1000)
  1172. );
  1173. }
  1174. [Fact]
  1175. public void Do_Observer_SomeDataWithError()
  1176. {
  1177. var scheduler = new TestScheduler();
  1178. var ex = new Exception();
  1179. var xs = scheduler.CreateHotObservable(
  1180. OnNext(150, 1),
  1181. OnNext(210, 2),
  1182. OnNext(220, 3),
  1183. OnNext(230, 4),
  1184. OnNext(240, 5),
  1185. OnError<int>(250, ex)
  1186. );
  1187. int i = 0;
  1188. int sum = 2 + 3 + 4 + 5;
  1189. bool sawError = false;
  1190. bool hasCompleted = false;
  1191. var res = scheduler.Start(() =>
  1192. xs.Do(Observer.Create<int>(x => { i++; sum -= x; }, e => { sawError = e == ex; }, () => { hasCompleted = true; }))
  1193. );
  1194. Assert.Equal(4, i);
  1195. Assert.Equal(0, sum);
  1196. Assert.True(sawError);
  1197. Assert.False(hasCompleted);
  1198. res.Messages.AssertEqual(
  1199. OnNext(210, 2),
  1200. OnNext(220, 3),
  1201. OnNext(230, 4),
  1202. OnNext(240, 5),
  1203. OnError<int>(250, ex)
  1204. );
  1205. xs.Subscriptions.AssertEqual(
  1206. Subscribe(200, 250)
  1207. );
  1208. }
  1209. [Fact]
  1210. public void Do_Observer_SomeDataWithoutError()
  1211. {
  1212. var scheduler = new TestScheduler();
  1213. var xs = scheduler.CreateHotObservable(
  1214. OnNext(150, 1),
  1215. OnNext(210, 2),
  1216. OnNext(220, 3),
  1217. OnNext(230, 4),
  1218. OnNext(240, 5),
  1219. OnCompleted<int>(250)
  1220. );
  1221. int i = 0;
  1222. int sum = 2 + 3 + 4 + 5;
  1223. bool sawError = false;
  1224. bool hasCompleted = false;
  1225. var res = scheduler.Start(() =>
  1226. xs.Do(Observer.Create<int>(x => { i++; sum -= x; }, e => { sawError = true; }, () => { hasCompleted = true; }))
  1227. );
  1228. Assert.Equal(4, i);
  1229. Assert.Equal(0, sum);
  1230. Assert.False(sawError);
  1231. Assert.True(hasCompleted);
  1232. res.Messages.AssertEqual(
  1233. OnNext(210, 2),
  1234. OnNext(220, 3),
  1235. OnNext(230, 4),
  1236. OnNext(240, 5),
  1237. OnCompleted<int>(250)
  1238. );
  1239. xs.Subscriptions.AssertEqual(
  1240. Subscribe(200, 250)
  1241. );
  1242. }
  1243. [Fact]
  1244. public void Do1422_Next_NextThrows()
  1245. {
  1246. var scheduler = new TestScheduler();
  1247. var ex = new Exception();
  1248. var xs = scheduler.CreateHotObservable(
  1249. OnNext(150, 1),
  1250. OnNext(210, 2),
  1251. OnCompleted<int>(250)
  1252. );
  1253. var res = scheduler.Start(() =>
  1254. xs.Do(x => { throw ex; })
  1255. );
  1256. res.Messages.AssertEqual(
  1257. OnError<int>(210, ex)
  1258. );
  1259. xs.Subscriptions.AssertEqual(
  1260. Subscribe(200, 210)
  1261. );
  1262. }
  1263. [Fact]
  1264. public void Do1422_NextCompleted_NextThrows()
  1265. {
  1266. var scheduler = new TestScheduler();
  1267. var ex = new Exception();
  1268. var xs = scheduler.CreateHotObservable(
  1269. OnNext(150, 1),
  1270. OnNext(210, 2),
  1271. OnCompleted<int>(250)
  1272. );
  1273. var res = scheduler.Start(() =>
  1274. xs.Do(x => { throw ex; }, () => { })
  1275. );
  1276. res.Messages.AssertEqual(
  1277. OnError<int>(210, ex)
  1278. );
  1279. xs.Subscriptions.AssertEqual(
  1280. Subscribe(200, 210)
  1281. );
  1282. }
  1283. [Fact]
  1284. public void Do1422_NextCompleted_CompletedThrows()
  1285. {
  1286. var scheduler = new TestScheduler();
  1287. var ex = new Exception();
  1288. var xs = scheduler.CreateHotObservable(
  1289. OnNext(150, 1),
  1290. OnNext(210, 2),
  1291. OnCompleted<int>(250)
  1292. );
  1293. var res = scheduler.Start(() =>
  1294. xs.Do(x => { }, () => { throw ex; })
  1295. );
  1296. res.Messages.AssertEqual(
  1297. OnNext(210, 2),
  1298. OnError<int>(250, ex)
  1299. );
  1300. xs.Subscriptions.AssertEqual(
  1301. Subscribe(200, 250)
  1302. );
  1303. }
  1304. [Fact]
  1305. public void Do1422_NextError_NextThrows()
  1306. {
  1307. var scheduler = new TestScheduler();
  1308. var ex = new Exception();
  1309. var xs = scheduler.CreateHotObservable(
  1310. OnNext(150, 1),
  1311. OnNext(210, 2),
  1312. OnCompleted<int>(250)
  1313. );
  1314. var res = scheduler.Start(() =>
  1315. xs.Do(x => { throw ex; }, _ => { })
  1316. );
  1317. res.Messages.AssertEqual(
  1318. OnError<int>(210, ex)
  1319. );
  1320. xs.Subscriptions.AssertEqual(
  1321. Subscribe(200, 210)
  1322. );
  1323. }
  1324. [Fact]
  1325. public void Do1422_NextError_ErrorThrows()
  1326. {
  1327. var scheduler = new TestScheduler();
  1328. var ex1 = new Exception();
  1329. var ex2 = new Exception();
  1330. var xs = scheduler.CreateHotObservable(
  1331. OnNext(150, 1),
  1332. OnError<int>(210, ex1)
  1333. );
  1334. var res = scheduler.Start(() =>
  1335. xs.Do(x => { }, _ => { throw ex2; })
  1336. );
  1337. res.Messages.AssertEqual(
  1338. OnError<int>(210, ex2)
  1339. );
  1340. xs.Subscriptions.AssertEqual(
  1341. Subscribe(200, 210)
  1342. );
  1343. }
  1344. [Fact]
  1345. public void Do1422_NextErrorCompleted_NextThrows()
  1346. {
  1347. var scheduler = new TestScheduler();
  1348. var ex = new Exception();
  1349. var xs = scheduler.CreateHotObservable(
  1350. OnNext(150, 1),
  1351. OnNext(210, 2),
  1352. OnCompleted<int>(250)
  1353. );
  1354. var res = scheduler.Start(() =>
  1355. xs.Do(x => { throw ex; }, _ => { }, () => { })
  1356. );
  1357. res.Messages.AssertEqual(
  1358. OnError<int>(210, ex)
  1359. );
  1360. xs.Subscriptions.AssertEqual(
  1361. Subscribe(200, 210)
  1362. );
  1363. }
  1364. [Fact]
  1365. public void Do1422_NextErrorCompleted_ErrorThrows()
  1366. {
  1367. var scheduler = new TestScheduler();
  1368. var ex1 = new Exception();
  1369. var ex2 = new Exception();
  1370. var xs = scheduler.CreateHotObservable(
  1371. OnNext(150, 1),
  1372. OnError<int>(210, ex1)
  1373. );
  1374. var res = scheduler.Start(() =>
  1375. xs.Do(x => { }, _ => { throw ex2; }, () => { })
  1376. );
  1377. res.Messages.AssertEqual(
  1378. OnError<int>(210, ex2)
  1379. );
  1380. xs.Subscriptions.AssertEqual(
  1381. Subscribe(200, 210)
  1382. );
  1383. }
  1384. [Fact]
  1385. public void Do1422_NextErrorCompleted_CompletedThrows()
  1386. {
  1387. var scheduler = new TestScheduler();
  1388. var ex = new Exception();
  1389. var xs = scheduler.CreateHotObservable(
  1390. OnNext(150, 1),
  1391. OnNext(210, 2),
  1392. OnCompleted<int>(250)
  1393. );
  1394. var res = scheduler.Start(() =>
  1395. xs.Do(x => { }, _ => { }, () => { throw ex; })
  1396. );
  1397. res.Messages.AssertEqual(
  1398. OnNext(210, 2),
  1399. OnError<int>(250, ex)
  1400. );
  1401. xs.Subscriptions.AssertEqual(
  1402. Subscribe(200, 250)
  1403. );
  1404. }
  1405. [Fact]
  1406. public void Do1422_Observer_NextThrows()
  1407. {
  1408. var scheduler = new TestScheduler();
  1409. var ex = new Exception();
  1410. var xs = scheduler.CreateHotObservable(
  1411. OnNext(150, 1),
  1412. OnNext(210, 2),
  1413. OnCompleted<int>(250)
  1414. );
  1415. var res = scheduler.Start(() =>
  1416. xs.Do(Observer.Create<int>(x => { throw ex; }, _ => { }, () => { }))
  1417. );
  1418. res.Messages.AssertEqual(
  1419. OnError<int>(210, ex)
  1420. );
  1421. xs.Subscriptions.AssertEqual(
  1422. Subscribe(200, 210)
  1423. );
  1424. }
  1425. [Fact]
  1426. public void Do1422_Observer_ErrorThrows()
  1427. {
  1428. var scheduler = new TestScheduler();
  1429. var ex1 = new Exception();
  1430. var ex2 = new Exception();
  1431. var xs = scheduler.CreateHotObservable(
  1432. OnNext(150, 1),
  1433. OnError<int>(210, ex1)
  1434. );
  1435. var res = scheduler.Start(() =>
  1436. xs.Do(Observer.Create<int>(x => { }, _ => { throw ex2; }, () => { }))
  1437. );
  1438. res.Messages.AssertEqual(
  1439. OnError<int>(210, ex2)
  1440. );
  1441. xs.Subscriptions.AssertEqual(
  1442. Subscribe(200, 210)
  1443. );
  1444. }
  1445. [Fact]
  1446. public void Do1422_Observer_CompletedThrows()
  1447. {
  1448. var scheduler = new TestScheduler();
  1449. var ex = new Exception();
  1450. var xs = scheduler.CreateHotObservable(
  1451. OnNext(150, 1),
  1452. OnNext(210, 2),
  1453. OnCompleted<int>(250)
  1454. );
  1455. var res = scheduler.Start(() =>
  1456. xs.Do(Observer.Create<int>(x => { }, _ => { }, () => { throw ex; }))
  1457. );
  1458. res.Messages.AssertEqual(
  1459. OnNext(210, 2),
  1460. OnError<int>(250, ex)
  1461. );
  1462. xs.Subscriptions.AssertEqual(
  1463. Subscribe(200, 250)
  1464. );
  1465. }
  1466. #endregion
  1467. #region + Finally +
  1468. [Fact]
  1469. public void Finally_ArgumentChecking()
  1470. {
  1471. var someObservable = Observable.Empty<int>();
  1472. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Finally<int>(null, () => { }));
  1473. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Finally<int>(someObservable, null));
  1474. }
  1475. [Fact]
  1476. public void Finally_Never()
  1477. {
  1478. var scheduler = new TestScheduler();
  1479. bool invoked = false;
  1480. var res = scheduler.Start(() =>
  1481. Observable.Never<int>().Finally(() => { invoked = true; })
  1482. );
  1483. res.Messages.AssertEqual(
  1484. );
  1485. Assert.True(invoked); // due to unsubscribe; see 1356
  1486. }
  1487. [Fact]
  1488. public void Finally_OnlyCalledOnce_Never()
  1489. {
  1490. int invokeCount = 0;
  1491. var someObservable = Observable.Never<int>().Finally(() => { invokeCount++; });
  1492. var d = someObservable.Subscribe();
  1493. d.Dispose();
  1494. d.Dispose();
  1495. Assert.Equal(1, invokeCount);
  1496. }
  1497. [Fact]
  1498. public void Finally_OnlyCalledOnce_Empty()
  1499. {
  1500. var invokeCount = 0;
  1501. var someObservable = Observable.Empty<int>().Finally(() => { invokeCount++; });
  1502. var d = someObservable.Subscribe();
  1503. d.Dispose();
  1504. d.Dispose();
  1505. Assert.Equal(1, invokeCount);
  1506. }
  1507. [Fact]
  1508. public void Finally_Empty()
  1509. {
  1510. var scheduler = new TestScheduler();
  1511. var xs = scheduler.CreateHotObservable(
  1512. OnNext(150, 1),
  1513. OnCompleted<int>(250)
  1514. );
  1515. var invoked = false;
  1516. var res = scheduler.Start(() =>
  1517. xs.Finally(() => { invoked = true; })
  1518. );
  1519. Assert.True(invoked);
  1520. res.Messages.AssertEqual(
  1521. OnCompleted<int>(250)
  1522. );
  1523. xs.Subscriptions.AssertEqual(
  1524. Subscribe(200, 250)
  1525. );
  1526. }
  1527. [Fact]
  1528. public void Finally_Return()
  1529. {
  1530. var scheduler = new TestScheduler();
  1531. var xs = scheduler.CreateHotObservable(
  1532. OnNext(150, 1),
  1533. OnNext(210, 2),
  1534. OnCompleted<int>(250)
  1535. );
  1536. var invoked = false;
  1537. var res = scheduler.Start(() =>
  1538. xs.Finally(() => { invoked = true; })
  1539. );
  1540. Assert.True(invoked);
  1541. res.Messages.AssertEqual(
  1542. OnNext(210, 2),
  1543. OnCompleted<int>(250)
  1544. );
  1545. xs.Subscriptions.AssertEqual(
  1546. Subscribe(200, 250)
  1547. );
  1548. }
  1549. [Fact]
  1550. public void Finally_Throw()
  1551. {
  1552. var scheduler = new TestScheduler();
  1553. var ex = new Exception();
  1554. var xs = scheduler.CreateHotObservable(
  1555. OnNext(150, 1),
  1556. OnError<int>(250, ex)
  1557. );
  1558. var invoked = false;
  1559. var res = scheduler.Start(() =>
  1560. xs.Finally(() => { invoked = true; })
  1561. );
  1562. Assert.True(invoked);
  1563. res.Messages.AssertEqual(
  1564. OnError<int>(250, ex)
  1565. );
  1566. xs.Subscriptions.AssertEqual(
  1567. Subscribe(200, 250)
  1568. );
  1569. }
  1570. #endregion
  1571. #region + IgnoreElements +
  1572. [Fact]
  1573. public void IgnoreElements_ArgumentChecking()
  1574. {
  1575. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.IgnoreElements<int>(null));
  1576. }
  1577. [Fact]
  1578. public void IgnoreElements_IgnoreElements()
  1579. {
  1580. var scheduler = new TestScheduler();
  1581. var xs = scheduler.CreateHotObservable(
  1582. OnNext(180, 1),
  1583. OnNext(210, 2),
  1584. OnNext(250, 3),
  1585. OnNext(270, 4),
  1586. OnCompleted<int>(300)
  1587. );
  1588. var res = scheduler.Start(() =>
  1589. xs.IgnoreElements().IgnoreElements()
  1590. );
  1591. res.Messages.AssertEqual(
  1592. OnCompleted<int>(300)
  1593. );
  1594. xs.Subscriptions.AssertEqual(
  1595. Subscribe(200, 300)
  1596. );
  1597. }
  1598. [Fact]
  1599. public void IgnoreElements_Basic()
  1600. {
  1601. var scheduler = new TestScheduler();
  1602. var xs = scheduler.CreateHotObservable(
  1603. OnNext(180, 1),
  1604. OnNext(210, 2),
  1605. OnNext(250, 3),
  1606. OnNext(270, 4),
  1607. OnNext(310, 5),
  1608. OnNext(360, 6),
  1609. OnNext(380, 7),
  1610. OnNext(410, 8),
  1611. OnNext(590, 9)
  1612. );
  1613. var res = scheduler.Start(() =>
  1614. xs.IgnoreElements()
  1615. );
  1616. res.Messages.AssertEqual(
  1617. );
  1618. xs.Subscriptions.AssertEqual(
  1619. Subscribe(200, 1000)
  1620. );
  1621. }
  1622. [Fact]
  1623. public void IgnoreElements_Completed()
  1624. {
  1625. var scheduler = new TestScheduler();
  1626. var xs = scheduler.CreateHotObservable(
  1627. OnNext(180, 1),
  1628. OnNext(210, 2),
  1629. OnNext(250, 3),
  1630. OnNext(270, 4),
  1631. OnNext(310, 5),
  1632. OnNext(360, 6),
  1633. OnNext(380, 7),
  1634. OnNext(410, 8),
  1635. OnNext(590, 9),
  1636. OnCompleted<int>(610)
  1637. );
  1638. var res = scheduler.Start(() =>
  1639. xs.IgnoreElements()
  1640. );
  1641. res.Messages.AssertEqual(
  1642. OnCompleted<int>(610)
  1643. );
  1644. xs.Subscriptions.AssertEqual(
  1645. Subscribe(200, 610)
  1646. );
  1647. }
  1648. [Fact]
  1649. public void IgnoreElements_Error()
  1650. {
  1651. var scheduler = new TestScheduler();
  1652. var ex = new Exception();
  1653. var xs = scheduler.CreateHotObservable(
  1654. OnNext(180, 1),
  1655. OnNext(210, 2),
  1656. OnNext(250, 3),
  1657. OnNext(270, 4),
  1658. OnNext(310, 5),
  1659. OnNext(360, 6),
  1660. OnNext(380, 7),
  1661. OnNext(410, 8),
  1662. OnNext(590, 9),
  1663. OnError<int>(610, ex)
  1664. );
  1665. var res = scheduler.Start(() =>
  1666. xs.IgnoreElements()
  1667. );
  1668. res.Messages.AssertEqual(
  1669. OnError<int>(610, ex)
  1670. );
  1671. xs.Subscriptions.AssertEqual(
  1672. Subscribe(200, 610)
  1673. );
  1674. }
  1675. #endregion
  1676. #region + Materialize +
  1677. [Fact]
  1678. public void Materialize_ArgumentChecking()
  1679. {
  1680. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Materialize<int>(null));
  1681. }
  1682. [Fact]
  1683. public void Materialize_Never()
  1684. {
  1685. var scheduler = new TestScheduler();
  1686. var res = scheduler.Start(() =>
  1687. Observable.Never<int>().Materialize()
  1688. );
  1689. res.Messages.AssertEqual(
  1690. );
  1691. }
  1692. [Fact]
  1693. public void Materialize_Empty()
  1694. {
  1695. var scheduler = new TestScheduler();
  1696. var xs = scheduler.CreateHotObservable(
  1697. OnNext(150, 1),
  1698. OnCompleted<int>(250)
  1699. );
  1700. var res = scheduler.Start(() =>
  1701. xs.Materialize()
  1702. );
  1703. res.Messages.AssertEqual(
  1704. OnNext(250, Notification.CreateOnCompleted<int>()),
  1705. OnCompleted<Notification<int>>(250)
  1706. );
  1707. xs.Subscriptions.AssertEqual(
  1708. Subscribe(200, 250)
  1709. );
  1710. }
  1711. [Fact]
  1712. public void Materialize_Return()
  1713. {
  1714. var scheduler = new TestScheduler();
  1715. var xs = scheduler.CreateHotObservable(
  1716. OnNext(150, 1),
  1717. OnNext(210, 2),
  1718. OnCompleted<int>(250)
  1719. );
  1720. var res = scheduler.Start(() =>
  1721. xs.Materialize()
  1722. );
  1723. res.Messages.AssertEqual(
  1724. OnNext(210, Notification.CreateOnNext(2)),
  1725. OnNext(250, Notification.CreateOnCompleted<int>()),
  1726. OnCompleted<Notification<int>>(250)
  1727. );
  1728. xs.Subscriptions.AssertEqual(
  1729. Subscribe(200, 250)
  1730. );
  1731. }
  1732. [Fact]
  1733. public void Materialize_Throw()
  1734. {
  1735. var scheduler = new TestScheduler();
  1736. var ex = new Exception();
  1737. var xs = scheduler.CreateHotObservable(
  1738. OnNext(150, 1),
  1739. OnError<int>(250, ex)
  1740. );
  1741. var res = scheduler.Start(() =>
  1742. xs.Materialize()
  1743. );
  1744. res.Messages.AssertEqual(
  1745. OnNext(250, Notification.CreateOnError<int>(ex)),
  1746. OnCompleted<Notification<int>>(250)
  1747. );
  1748. xs.Subscriptions.AssertEqual(
  1749. Subscribe(200, 250)
  1750. );
  1751. }
  1752. #endregion
  1753. #region - Repeat -
  1754. [Fact]
  1755. public void Repeat_Observable_ArgumentChecking()
  1756. {
  1757. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Repeat<int>(null));
  1758. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.Repeat().Subscribe(null));
  1759. }
  1760. [Fact]
  1761. public void Repeat_Observable_Basic()
  1762. {
  1763. var scheduler = new TestScheduler();
  1764. var xs = scheduler.CreateColdObservable(
  1765. OnNext(100, 1),
  1766. OnNext(150, 2),
  1767. OnNext(200, 3),
  1768. OnCompleted<int>(250)
  1769. );
  1770. var res = scheduler.Start(() =>
  1771. xs.Repeat()
  1772. );
  1773. res.Messages.AssertEqual(
  1774. OnNext(300, 1),
  1775. OnNext(350, 2),
  1776. OnNext(400, 3),
  1777. OnNext(550, 1),
  1778. OnNext(600, 2),
  1779. OnNext(650, 3),
  1780. OnNext(800, 1),
  1781. OnNext(850, 2),
  1782. OnNext(900, 3)
  1783. );
  1784. xs.Subscriptions.AssertEqual(
  1785. Subscribe(200, 450),
  1786. Subscribe(450, 700),
  1787. Subscribe(700, 950),
  1788. Subscribe(950, 1000)
  1789. );
  1790. }
  1791. [Fact]
  1792. public void Repeat_Observable_Infinite()
  1793. {
  1794. var scheduler = new TestScheduler();
  1795. var xs = scheduler.CreateColdObservable(
  1796. OnNext(100, 1),
  1797. OnNext(150, 2),
  1798. OnNext(200, 3)
  1799. );
  1800. var res = scheduler.Start(() =>
  1801. xs.Repeat()
  1802. );
  1803. res.Messages.AssertEqual(
  1804. OnNext(300, 1),
  1805. OnNext(350, 2),
  1806. OnNext(400, 3)
  1807. );
  1808. xs.Subscriptions.AssertEqual(
  1809. Subscribe(200, 1000)
  1810. );
  1811. }
  1812. [Fact]
  1813. public void Repeat_Observable_Error()
  1814. {
  1815. var scheduler = new TestScheduler();
  1816. var ex = new Exception();
  1817. var xs = scheduler.CreateColdObservable(
  1818. OnNext(100, 1),
  1819. OnNext(150, 2),
  1820. OnNext(200, 3),
  1821. OnError<int>(250, ex)
  1822. );
  1823. var res = scheduler.Start(() =>
  1824. xs.Repeat()
  1825. );
  1826. res.Messages.AssertEqual(
  1827. OnNext(300, 1),
  1828. OnNext(350, 2),
  1829. OnNext(400, 3),
  1830. OnError<int>(450, ex)
  1831. );
  1832. xs.Subscriptions.AssertEqual(
  1833. Subscribe(200, 450)
  1834. );
  1835. }
  1836. [Fact]
  1837. public void Repeat_Observable_Throws()
  1838. {
  1839. var scheduler1 = new TestScheduler();
  1840. var xs = Observable.Return(1, scheduler1).Repeat();
  1841. xs.Subscribe(x => { throw new InvalidOperationException(); });
  1842. ReactiveAssert.Throws<InvalidOperationException>(() => scheduler1.Start());
  1843. var scheduler2 = new TestScheduler();
  1844. var ys = Observable.Throw<int>(new Exception(), scheduler2).Repeat();
  1845. ys.Subscribe(x => { }, ex => { throw new InvalidOperationException(); });
  1846. ReactiveAssert.Throws<InvalidOperationException>(() => scheduler2.Start());
  1847. var scheduler3 = new TestScheduler();
  1848. var zs = Observable.Return(1, scheduler3).Repeat();
  1849. var d = zs.Subscribe(x => { }, ex => { }, () => { throw new InvalidOperationException(); });
  1850. scheduler3.ScheduleAbsolute(210, () => d.Dispose());
  1851. scheduler3.Start();
  1852. var xss = Observable.Create<int>(new Func<IObserver<int>, Action>(o => { throw new InvalidOperationException(); })).Repeat();
  1853. ReactiveAssert.Throws<InvalidOperationException>(() => xss.Subscribe());
  1854. }
  1855. [Fact]
  1856. public void Repeat_Observable_Default_ArgumentChecking()
  1857. {
  1858. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Repeat<int>((IObservable<int>)null));
  1859. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.Repeat().Subscribe(null));
  1860. }
  1861. [Fact]
  1862. public void Repeat_Observable_RepeatCount_ArgumentChecking()
  1863. {
  1864. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Repeat<int>(null, 0));
  1865. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => DummyObservable<int>.Instance.Repeat(-1));
  1866. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.Repeat(0).Subscribe(null));
  1867. }
  1868. [Fact]
  1869. public void Repeat_Observable_RepeatCount_Basic()
  1870. {
  1871. var scheduler = new TestScheduler();
  1872. var xs = scheduler.CreateColdObservable(
  1873. OnNext(5, 1),
  1874. OnNext(10, 2),
  1875. OnNext(15, 3),
  1876. OnCompleted<int>(20)
  1877. );
  1878. var res = scheduler.Start(() =>
  1879. xs.Repeat(3)
  1880. );
  1881. res.Messages.AssertEqual(
  1882. OnNext(205, 1),
  1883. OnNext(210, 2),
  1884. OnNext(215, 3),
  1885. OnNext(225, 1),
  1886. OnNext(230, 2),
  1887. OnNext(235, 3),
  1888. OnNext(245, 1),
  1889. OnNext(250, 2),
  1890. OnNext(255, 3),
  1891. OnCompleted<int>(260)
  1892. );
  1893. xs.Subscriptions.AssertEqual(
  1894. Subscribe(200, 220),
  1895. Subscribe(220, 240),
  1896. Subscribe(240, 260)
  1897. );
  1898. }
  1899. [Fact]
  1900. public void Repeat_Observable_RepeatCount_Dispose()
  1901. {
  1902. var scheduler = new TestScheduler();
  1903. var xs = scheduler.CreateColdObservable(
  1904. OnNext(5, 1),
  1905. OnNext(10, 2),
  1906. OnNext(15, 3),
  1907. OnCompleted<int>(20)
  1908. );
  1909. var res = scheduler.Start(() =>
  1910. xs.Repeat(3), 231
  1911. );
  1912. res.Messages.AssertEqual(
  1913. OnNext(205, 1),
  1914. OnNext(210, 2),
  1915. OnNext(215, 3),
  1916. OnNext(225, 1),
  1917. OnNext(230, 2)
  1918. );
  1919. xs.Subscriptions.AssertEqual(
  1920. Subscribe(200, 220),
  1921. Subscribe(220, 231)
  1922. );
  1923. }
  1924. [Fact]
  1925. public void Repeat_Observable_RepeatCount_Infinite()
  1926. {
  1927. var scheduler = new TestScheduler();
  1928. var xs = scheduler.CreateColdObservable(
  1929. OnNext(100, 1),
  1930. OnNext(150, 2),
  1931. OnNext(200, 3)
  1932. );
  1933. var res = scheduler.Start(() =>
  1934. xs.Repeat(3)
  1935. );
  1936. res.Messages.AssertEqual(
  1937. OnNext(300, 1),
  1938. OnNext(350, 2),
  1939. OnNext(400, 3)
  1940. );
  1941. xs.Subscriptions.AssertEqual(
  1942. Subscribe(200, 1000)
  1943. );
  1944. }
  1945. [Fact]
  1946. public void Repeat_Observable_RepeatCount_Error()
  1947. {
  1948. var scheduler = new TestScheduler();
  1949. var ex = new Exception();
  1950. var xs = scheduler.CreateColdObservable(
  1951. OnNext(100, 1),
  1952. OnNext(150, 2),
  1953. OnNext(200, 3),
  1954. OnError<int>(250, ex)
  1955. );
  1956. var res = scheduler.Start(() =>
  1957. xs.Repeat(3)
  1958. );
  1959. res.Messages.AssertEqual(
  1960. OnNext(300, 1),
  1961. OnNext(350, 2),
  1962. OnNext(400, 3),
  1963. OnError<int>(450, ex)
  1964. );
  1965. xs.Subscriptions.AssertEqual(
  1966. Subscribe(200, 450)
  1967. );
  1968. }
  1969. [Fact]
  1970. public void Repeat_Observable_RepeatCount_Throws()
  1971. {
  1972. var scheduler1 = new TestScheduler();
  1973. var xs = Observable.Return(1, scheduler1).Repeat(3);
  1974. xs.Subscribe(x => { throw new InvalidOperationException(); });
  1975. ReactiveAssert.Throws<InvalidOperationException>(() => scheduler1.Start());
  1976. var scheduler2 = new TestScheduler();
  1977. var ys = Observable.Throw<int>(new Exception(), scheduler2).Repeat(3);
  1978. ys.Subscribe(x => { }, ex => { throw new InvalidOperationException(); });
  1979. ReactiveAssert.Throws<InvalidOperationException>(() => scheduler2.Start());
  1980. var scheduler3 = new TestScheduler();
  1981. var zs = Observable.Return(1, scheduler3).Repeat(100);
  1982. var d = zs.Subscribe(x => { }, ex => { }, () => { throw new InvalidOperationException(); });
  1983. scheduler3.ScheduleAbsolute(10, () => d.Dispose());
  1984. scheduler3.Start();
  1985. var xss = Observable.Create<int>(new Func<IObserver<int>, Action>(o => { throw new InvalidOperationException(); })).Repeat(3);
  1986. ReactiveAssert.Throws<InvalidOperationException>(() => xss.Subscribe());
  1987. }
  1988. [Fact]
  1989. public void Repeat_Observable_RepeatCount_Default_ArgumentChecking()
  1990. {
  1991. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Repeat<int>(default(IObservable<int>), 0));
  1992. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => DummyObservable<int>.Instance.Repeat(-1));
  1993. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.Repeat(0).Subscribe(null));
  1994. }
  1995. #endregion
  1996. #region - Retry -
  1997. [Fact]
  1998. public void Retry_Observable_ArgumentChecking()
  1999. {
  2000. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Retry<int>(null));
  2001. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.Retry().Subscribe(null));
  2002. }
  2003. [Fact]
  2004. public void Retry_Observable_Basic()
  2005. {
  2006. var scheduler = new TestScheduler();
  2007. var xs = scheduler.CreateColdObservable(
  2008. OnNext(100, 1),
  2009. OnNext(150, 2),
  2010. OnNext(200, 3),
  2011. OnCompleted<int>(250)
  2012. );
  2013. var res = scheduler.Start(() =>
  2014. xs.Retry()
  2015. );
  2016. res.Messages.AssertEqual(
  2017. OnNext(300, 1),
  2018. OnNext(350, 2),
  2019. OnNext(400, 3),
  2020. OnCompleted<int>(450)
  2021. );
  2022. xs.Subscriptions.AssertEqual(
  2023. Subscribe(200, 450)
  2024. );
  2025. }
  2026. [Fact]
  2027. public void Retry_Observable_Infinite()
  2028. {
  2029. var scheduler = new TestScheduler();
  2030. var xs = scheduler.CreateColdObservable(
  2031. OnNext(100, 1),
  2032. OnNext(150, 2),
  2033. OnNext(200, 3)
  2034. );
  2035. var res = scheduler.Start(() =>
  2036. xs.Retry()
  2037. );
  2038. res.Messages.AssertEqual(
  2039. OnNext(300, 1),
  2040. OnNext(350, 2),
  2041. OnNext(400, 3)
  2042. );
  2043. xs.Subscriptions.AssertEqual(
  2044. Subscribe(200, 1000)
  2045. );
  2046. }
  2047. [Fact]
  2048. public void Retry_Observable_Error()
  2049. {
  2050. var scheduler = new TestScheduler();
  2051. var ex = new Exception();
  2052. var xs = scheduler.CreateColdObservable(
  2053. OnNext(100, 1),
  2054. OnNext(150, 2),
  2055. OnNext(200, 3),
  2056. OnError<int>(250, ex)
  2057. );
  2058. var res = scheduler.Start(() =>
  2059. xs.Retry(), 1100
  2060. );
  2061. res.Messages.AssertEqual(
  2062. OnNext(300, 1),
  2063. OnNext(350, 2),
  2064. OnNext(400, 3),
  2065. OnNext(550, 1),
  2066. OnNext(600, 2),
  2067. OnNext(650, 3),
  2068. OnNext(800, 1),
  2069. OnNext(850, 2),
  2070. OnNext(900, 3),
  2071. OnNext(1050, 1)
  2072. );
  2073. xs.Subscriptions.AssertEqual(
  2074. Subscribe(200, 450),
  2075. Subscribe(450, 700),
  2076. Subscribe(700, 950),
  2077. Subscribe(950, 1100)
  2078. );
  2079. }
  2080. [Fact]
  2081. public void Retry_Observable_Throws1()
  2082. {
  2083. var scheduler1 = new TestScheduler();
  2084. var xs = Observable.Return(1, scheduler1).Retry();
  2085. xs.Subscribe(x => { throw new InvalidOperationException(); });
  2086. ReactiveAssert.Throws<InvalidOperationException>(() => scheduler1.Start());
  2087. }
  2088. [Fact]
  2089. public void Retry_Observable_Throws2()
  2090. {
  2091. var scheduler2 = new TestScheduler();
  2092. var ys = Observable.Throw<int>(new Exception(), scheduler2).Retry();
  2093. var d = ys.Subscribe(x => { }, ex => { throw new InvalidOperationException(); });
  2094. scheduler2.ScheduleAbsolute(210, () => d.Dispose());
  2095. scheduler2.Start();
  2096. }
  2097. [Fact]
  2098. public void Retry_Observable_Throws3()
  2099. {
  2100. var scheduler3 = new TestScheduler();
  2101. var zs = Observable.Return(1, scheduler3).Retry();
  2102. zs.Subscribe(x => { }, ex => { }, () => { throw new InvalidOperationException(); });
  2103. ReactiveAssert.Throws<InvalidOperationException>(() => scheduler3.Start());
  2104. }
  2105. /*
  2106. * BREAKING CHANGE v2.0 > v1.x - The code below will loop endlessly, trying to repeat the failing subscription,
  2107. * whose exception is propagated through OnError starting from v2.0.
  2108. *
  2109. [Fact]
  2110. public void Retry_Observable_Throws4()
  2111. {
  2112. var xss = Observable.Create<int>(new Func<IObserver<int>, Action>(o => { throw new InvalidOperationException(); })).Retry();
  2113. ReactiveAssert.Throws<InvalidOperationException>(() => xss.Subscribe());
  2114. }
  2115. */
  2116. [Fact]
  2117. public void Retry_Observable_Default_ArgumentChecking()
  2118. {
  2119. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Retry<int>((IObservable<int>)null));
  2120. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.Retry().Subscribe(null));
  2121. }
  2122. [Fact]
  2123. public void Retry_Observable_RetryCount_ArgumentChecking()
  2124. {
  2125. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Retry<int>(null, 0));
  2126. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => DummyObservable<int>.Instance.Retry(-1));
  2127. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.Retry(0).Subscribe(null));
  2128. }
  2129. [Fact]
  2130. public void Retry_Observable_RetryCount_Basic()
  2131. {
  2132. var scheduler = new TestScheduler();
  2133. var ex = new Exception();
  2134. var xs = scheduler.CreateColdObservable(
  2135. OnNext(5, 1),
  2136. OnNext(10, 2),
  2137. OnNext(15, 3),
  2138. OnError<int>(20, ex)
  2139. );
  2140. var res = scheduler.Start(() =>
  2141. xs.Retry(3)
  2142. );
  2143. res.Messages.AssertEqual(
  2144. OnNext(205, 1),
  2145. OnNext(210, 2),
  2146. OnNext(215, 3),
  2147. OnNext(225, 1),
  2148. OnNext(230, 2),
  2149. OnNext(235, 3),
  2150. OnNext(245, 1),
  2151. OnNext(250, 2),
  2152. OnNext(255, 3),
  2153. OnError<int>(260, ex)
  2154. );
  2155. xs.Subscriptions.AssertEqual(
  2156. Subscribe(200, 220),
  2157. Subscribe(220, 240),
  2158. Subscribe(240, 260)
  2159. );
  2160. }
  2161. [Fact]
  2162. public void Retry_Observable_RetryCount_Dispose()
  2163. {
  2164. var scheduler = new TestScheduler();
  2165. var xs = scheduler.CreateColdObservable(
  2166. OnNext(5, 1),
  2167. OnNext(10, 2),
  2168. OnNext(15, 3),
  2169. OnError<int>(20, new Exception())
  2170. );
  2171. var res = scheduler.Start(() =>
  2172. xs.Retry(3), 231
  2173. );
  2174. res.Messages.AssertEqual(
  2175. OnNext(205, 1),
  2176. OnNext(210, 2),
  2177. OnNext(215, 3),
  2178. OnNext(225, 1),
  2179. OnNext(230, 2)
  2180. );
  2181. xs.Subscriptions.AssertEqual(
  2182. Subscribe(200, 220),
  2183. Subscribe(220, 231)
  2184. );
  2185. }
  2186. [Fact]
  2187. public void Retry_Observable_RetryCount_Infinite()
  2188. {
  2189. var scheduler = new TestScheduler();
  2190. var xs = scheduler.CreateColdObservable(
  2191. OnNext(100, 1),
  2192. OnNext(150, 2),
  2193. OnNext(200, 3)
  2194. );
  2195. var res = scheduler.Start(() =>
  2196. xs.Retry(3)
  2197. );
  2198. res.Messages.AssertEqual(
  2199. OnNext(300, 1),
  2200. OnNext(350, 2),
  2201. OnNext(400, 3)
  2202. );
  2203. xs.Subscriptions.AssertEqual(
  2204. Subscribe(200, 1000)
  2205. );
  2206. }
  2207. [Fact]
  2208. public void Retry_Observable_RetryCount_Completed()
  2209. {
  2210. var scheduler = new TestScheduler();
  2211. var xs = scheduler.CreateColdObservable(
  2212. OnNext(100, 1),
  2213. OnNext(150, 2),
  2214. OnNext(200, 3),
  2215. OnCompleted<int>(250)
  2216. );
  2217. var res = scheduler.Start(() =>
  2218. xs.Retry(3)
  2219. );
  2220. res.Messages.AssertEqual(
  2221. OnNext(300, 1),
  2222. OnNext(350, 2),
  2223. OnNext(400, 3),
  2224. OnCompleted<int>(450)
  2225. );
  2226. xs.Subscriptions.AssertEqual(
  2227. Subscribe(200, 450)
  2228. );
  2229. }
  2230. [Fact]
  2231. public void Retry_Observable_RetryCount_Throws()
  2232. {
  2233. var scheduler1 = new TestScheduler();
  2234. var xs = Observable.Return(1, scheduler1).Retry(3);
  2235. xs.Subscribe(x => { throw new InvalidOperationException(); });
  2236. ReactiveAssert.Throws<InvalidOperationException>(() => scheduler1.Start());
  2237. var scheduler2 = new TestScheduler();
  2238. var ys = Observable.Throw<int>(new Exception(), scheduler2).Retry(100);
  2239. var d = ys.Subscribe(x => { }, ex => { throw new InvalidOperationException(); });
  2240. scheduler2.ScheduleAbsolute(10, () => d.Dispose());
  2241. scheduler2.Start();
  2242. var scheduler3 = new TestScheduler();
  2243. var zs = Observable.Return(1, scheduler3).Retry(100);
  2244. zs.Subscribe(x => { }, ex => { }, () => { throw new InvalidOperationException(); });
  2245. ReactiveAssert.Throws<InvalidOperationException>(() => scheduler3.Start());
  2246. var xss = Observable.Create<int>(new Func<IObserver<int>, Action>(o => { throw new InvalidOperationException(); })).Retry(3);
  2247. ReactiveAssert.Throws<InvalidOperationException>(() => xss.Subscribe());
  2248. }
  2249. [Fact]
  2250. public void Retry_Observable_RetryCount_Default_ArgumentChecking()
  2251. {
  2252. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Retry<int>(default(IObservable<int>), 0));
  2253. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => DummyObservable<int>.Instance.Retry(-1));
  2254. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.Retry(0).Subscribe(null));
  2255. }
  2256. [Fact]
  2257. public void Retry_Observable_RetryCount_Default()
  2258. {
  2259. Observable.Range(1, 3).Retry(3).AssertEqual(Observable.Range(1, 3).Retry(3));
  2260. }
  2261. #endregion
  2262. #region + Scan +
  2263. [Fact]
  2264. public void Scan_ArgumentChecking()
  2265. {
  2266. var someObservable = Observable.Empty<int>();
  2267. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Scan<int>(null, (_, __) => 0));
  2268. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Scan<int>(someObservable, null));
  2269. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Scan<int, int>(null, 0, (_, __) => 0));
  2270. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Scan<int, int>(someObservable, 0, null));
  2271. }
  2272. [Fact]
  2273. public void Scan_Seed_Never()
  2274. {
  2275. var scheduler = new TestScheduler();
  2276. var xs = scheduler.CreateHotObservable<int>();
  2277. var seed = 42;
  2278. var res = scheduler.Start(() =>
  2279. xs.Scan(seed, (acc, x) => acc + x)
  2280. );
  2281. res.Messages.AssertEqual(
  2282. );
  2283. xs.Subscriptions.AssertEqual(
  2284. Subscribe(200, 1000)
  2285. );
  2286. }
  2287. [Fact]
  2288. public void Scan_Seed_Empty()
  2289. {
  2290. var scheduler = new TestScheduler();
  2291. var xs = scheduler.CreateHotObservable(
  2292. OnNext(150, 1),
  2293. OnCompleted<int>(250)
  2294. );
  2295. var seed = 42;
  2296. var res = scheduler.Start(() =>
  2297. xs.Scan(seed, (acc, x) => acc + x)
  2298. );
  2299. res.Messages.AssertEqual(
  2300. OnCompleted<int>(250)
  2301. );
  2302. xs.Subscriptions.AssertEqual(
  2303. Subscribe(200, 250)
  2304. );
  2305. }
  2306. [Fact]
  2307. public void Scan_Seed_Return()
  2308. {
  2309. var scheduler = new TestScheduler();
  2310. var xs = scheduler.CreateHotObservable(
  2311. OnNext(150, 1),
  2312. OnNext(220, 2),
  2313. OnCompleted<int>(250)
  2314. );
  2315. var seed = 42;
  2316. var res = scheduler.Start(() =>
  2317. xs.Scan(seed, (acc, x) => acc + x)
  2318. );
  2319. res.Messages.AssertEqual(
  2320. OnNext(220, seed + 2),
  2321. OnCompleted<int>(250)
  2322. );
  2323. xs.Subscriptions.AssertEqual(
  2324. Subscribe(200, 250)
  2325. );
  2326. }
  2327. [Fact]
  2328. public void Scan_Seed_Throw()
  2329. {
  2330. var scheduler = new TestScheduler();
  2331. var ex = new Exception();
  2332. var xs = scheduler.CreateHotObservable(
  2333. OnNext(150, 1),
  2334. OnError<int>(250, ex)
  2335. );
  2336. var seed = 42;
  2337. var res = scheduler.Start(() =>
  2338. xs.Scan(seed, (acc, x) => acc + x)
  2339. );
  2340. res.Messages.AssertEqual(
  2341. OnError<int>(250, ex)
  2342. );
  2343. xs.Subscriptions.AssertEqual(
  2344. Subscribe(200, 250)
  2345. );
  2346. }
  2347. [Fact]
  2348. public void Scan_Seed_SomeData()
  2349. {
  2350. var scheduler = new TestScheduler();
  2351. var xs = scheduler.CreateHotObservable(
  2352. OnNext(150, 1),
  2353. OnNext(210, 2),
  2354. OnNext(220, 3),
  2355. OnNext(230, 4),
  2356. OnNext(240, 5),
  2357. OnCompleted<int>(250)
  2358. );
  2359. var seed = 1;
  2360. var res = scheduler.Start(() =>
  2361. xs.Scan(seed, (acc, x) => acc + x)
  2362. );
  2363. res.Messages.AssertEqual(
  2364. OnNext(210, seed + 2),
  2365. OnNext(220, seed + 2 + 3),
  2366. OnNext(230, seed + 2 + 3 + 4),
  2367. OnNext(240, seed + 2 + 3 + 4 + 5),
  2368. OnCompleted<int>(250)
  2369. );
  2370. xs.Subscriptions.AssertEqual(
  2371. Subscribe(200, 250)
  2372. );
  2373. }
  2374. [Fact]
  2375. public void Scan_Seed_AccumulatorThrows()
  2376. {
  2377. var scheduler = new TestScheduler();
  2378. var xs = scheduler.CreateHotObservable(
  2379. OnNext(150, 1),
  2380. OnNext(210, 2),
  2381. OnNext(220, 3),
  2382. OnNext(230, 4),
  2383. OnNext(240, 5),
  2384. OnCompleted<int>(250)
  2385. );
  2386. var ex = new Exception();
  2387. var seed = 1;
  2388. var res = scheduler.Start(() =>
  2389. xs.Scan(seed, (acc, x) => { if (x == 4) throw ex; return acc + x; })
  2390. );
  2391. res.Messages.AssertEqual(
  2392. OnNext(210, seed + 2),
  2393. OnNext(220, seed + 2 + 3),
  2394. OnError<int>(230, ex)
  2395. );
  2396. xs.Subscriptions.AssertEqual(
  2397. Subscribe(200, 230)
  2398. );
  2399. }
  2400. [Fact]
  2401. public void Scan_NoSeed_Never()
  2402. {
  2403. var scheduler = new TestScheduler();
  2404. var xs = scheduler.CreateHotObservable<int>();
  2405. var res = scheduler.Start(() =>
  2406. xs.Scan((acc, x) => acc + x)
  2407. );
  2408. res.Messages.AssertEqual(
  2409. );
  2410. xs.Subscriptions.AssertEqual(
  2411. Subscribe(200, 1000)
  2412. );
  2413. }
  2414. [Fact]
  2415. public void Scan_NoSeed_Empty()
  2416. {
  2417. var scheduler = new TestScheduler();
  2418. var xs = scheduler.CreateHotObservable(
  2419. OnNext(150, 1),
  2420. OnCompleted<int>(250)
  2421. );
  2422. var res = scheduler.Start(() =>
  2423. xs.Scan((acc, x) => acc + x)
  2424. );
  2425. res.Messages.AssertEqual(
  2426. OnCompleted<int>(250)
  2427. );
  2428. xs.Subscriptions.AssertEqual(
  2429. Subscribe(200, 250)
  2430. );
  2431. }
  2432. [Fact]
  2433. public void Scan_NoSeed_Return()
  2434. {
  2435. var scheduler = new TestScheduler();
  2436. var xs = scheduler.CreateHotObservable(
  2437. OnNext(150, 1),
  2438. OnNext(220, 2),
  2439. OnCompleted<int>(250)
  2440. );
  2441. var res = scheduler.Start(() =>
  2442. xs.Scan((acc, x) => acc + x)
  2443. );
  2444. res.Messages.AssertEqual(
  2445. OnNext(220, 2),
  2446. OnCompleted<int>(250)
  2447. );
  2448. xs.Subscriptions.AssertEqual(
  2449. Subscribe(200, 250)
  2450. );
  2451. }
  2452. [Fact]
  2453. public void Scan_NoSeed_Throw()
  2454. {
  2455. var scheduler = new TestScheduler();
  2456. var ex = new Exception();
  2457. var xs = scheduler.CreateHotObservable(
  2458. OnNext(150, 1),
  2459. OnError<int>(250, ex)
  2460. );
  2461. var res = scheduler.Start(() =>
  2462. xs.Scan((acc, x) => acc + x)
  2463. );
  2464. res.Messages.AssertEqual(
  2465. OnError<int>(250, ex)
  2466. );
  2467. xs.Subscriptions.AssertEqual(
  2468. Subscribe(200, 250)
  2469. );
  2470. }
  2471. [Fact]
  2472. public void Scan_NoSeed_SomeData()
  2473. {
  2474. var scheduler = new TestScheduler();
  2475. var xs = scheduler.CreateHotObservable(
  2476. OnNext(150, 1),
  2477. OnNext(210, 2),
  2478. OnNext(220, 3),
  2479. OnNext(230, 4),
  2480. OnNext(240, 5),
  2481. OnCompleted<int>(250)
  2482. );
  2483. var res = scheduler.Start(() =>
  2484. xs.Scan((acc, x) => acc + x)
  2485. );
  2486. res.Messages.AssertEqual(
  2487. OnNext(210, 2),
  2488. OnNext(220, 2 + 3),
  2489. OnNext(230, 2 + 3 + 4),
  2490. OnNext(240, 2 + 3 + 4 + 5),
  2491. OnCompleted<int>(250)
  2492. );
  2493. xs.Subscriptions.AssertEqual(
  2494. Subscribe(200, 250)
  2495. );
  2496. }
  2497. [Fact]
  2498. public void Scan_NoSeed_AccumulatorThrows()
  2499. {
  2500. var scheduler = new TestScheduler();
  2501. var xs = scheduler.CreateHotObservable(
  2502. OnNext(150, 1),
  2503. OnNext(210, 2),
  2504. OnNext(220, 3),
  2505. OnNext(230, 4),
  2506. OnNext(240, 5),
  2507. OnCompleted<int>(250)
  2508. );
  2509. var ex = new Exception();
  2510. var res = scheduler.Start(() =>
  2511. xs.Scan((acc, x) => { if (x == 4) throw ex; return acc + x; })
  2512. );
  2513. res.Messages.AssertEqual(
  2514. OnNext(210, 2),
  2515. OnNext(220, 2 + 3),
  2516. OnError<int>(230, ex)
  2517. );
  2518. xs.Subscriptions.AssertEqual(
  2519. Subscribe(200, 230)
  2520. );
  2521. }
  2522. #endregion
  2523. #region + SkipLast +
  2524. [Fact]
  2525. public void SkipLast_ArgumentChecking()
  2526. {
  2527. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SkipLast<int>(null, 0));
  2528. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.SkipLast(DummyObservable<int>.Instance, -1));
  2529. }
  2530. [Fact]
  2531. public void SkipLast_Zero_Completed()
  2532. {
  2533. var scheduler = new TestScheduler();
  2534. var xs = scheduler.CreateHotObservable(
  2535. OnNext(180, 1),
  2536. OnNext(210, 2),
  2537. OnNext(250, 3),
  2538. OnNext(270, 4),
  2539. OnNext(310, 5),
  2540. OnNext(360, 6),
  2541. OnNext(380, 7),
  2542. OnNext(410, 8),
  2543. OnNext(590, 9),
  2544. OnCompleted<int>(650)
  2545. );
  2546. var res = scheduler.Start(() =>
  2547. xs.SkipLast(0)
  2548. );
  2549. res.Messages.AssertEqual(
  2550. OnNext(210, 2),
  2551. OnNext(250, 3),
  2552. OnNext(270, 4),
  2553. OnNext(310, 5),
  2554. OnNext(360, 6),
  2555. OnNext(380, 7),
  2556. OnNext(410, 8),
  2557. OnNext(590, 9),
  2558. OnCompleted<int>(650)
  2559. );
  2560. xs.Subscriptions.AssertEqual(
  2561. Subscribe(200, 650)
  2562. );
  2563. }
  2564. [Fact]
  2565. public void SkipLast_Zero_Error()
  2566. {
  2567. var scheduler = new TestScheduler();
  2568. var ex = new Exception();
  2569. var xs = scheduler.CreateHotObservable(
  2570. OnNext(180, 1),
  2571. OnNext(210, 2),
  2572. OnNext(250, 3),
  2573. OnNext(270, 4),
  2574. OnNext(310, 5),
  2575. OnNext(360, 6),
  2576. OnNext(380, 7),
  2577. OnNext(410, 8),
  2578. OnNext(590, 9),
  2579. OnError<int>(650, ex)
  2580. );
  2581. var res = scheduler.Start(() =>
  2582. xs.SkipLast(0)
  2583. );
  2584. res.Messages.AssertEqual(
  2585. OnNext(210, 2),
  2586. OnNext(250, 3),
  2587. OnNext(270, 4),
  2588. OnNext(310, 5),
  2589. OnNext(360, 6),
  2590. OnNext(380, 7),
  2591. OnNext(410, 8),
  2592. OnNext(590, 9),
  2593. OnError<int>(650, ex)
  2594. );
  2595. xs.Subscriptions.AssertEqual(
  2596. Subscribe(200, 650)
  2597. );
  2598. }
  2599. [Fact]
  2600. public void SkipLast_Zero_Disposed()
  2601. {
  2602. var scheduler = new TestScheduler();
  2603. var xs = scheduler.CreateHotObservable(
  2604. OnNext(180, 1),
  2605. OnNext(210, 2),
  2606. OnNext(250, 3),
  2607. OnNext(270, 4),
  2608. OnNext(310, 5),
  2609. OnNext(360, 6),
  2610. OnNext(380, 7),
  2611. OnNext(410, 8),
  2612. OnNext(590, 9)
  2613. );
  2614. var res = scheduler.Start(() =>
  2615. xs.SkipLast(0)
  2616. );
  2617. res.Messages.AssertEqual(
  2618. OnNext(210, 2),
  2619. OnNext(250, 3),
  2620. OnNext(270, 4),
  2621. OnNext(310, 5),
  2622. OnNext(360, 6),
  2623. OnNext(380, 7),
  2624. OnNext(410, 8),
  2625. OnNext(590, 9)
  2626. );
  2627. xs.Subscriptions.AssertEqual(
  2628. Subscribe(200, 1000)
  2629. );
  2630. }
  2631. [Fact]
  2632. public void SkipLast_One_Completed()
  2633. {
  2634. var scheduler = new TestScheduler();
  2635. var xs = scheduler.CreateHotObservable(
  2636. OnNext(180, 1),
  2637. OnNext(210, 2),
  2638. OnNext(250, 3),
  2639. OnNext(270, 4),
  2640. OnNext(310, 5),
  2641. OnNext(360, 6),
  2642. OnNext(380, 7),
  2643. OnNext(410, 8),
  2644. OnNext(590, 9),
  2645. OnCompleted<int>(650)
  2646. );
  2647. var res = scheduler.Start(() =>
  2648. xs.SkipLast(1)
  2649. );
  2650. res.Messages.AssertEqual(
  2651. OnNext(250, 2),
  2652. OnNext(270, 3),
  2653. OnNext(310, 4),
  2654. OnNext(360, 5),
  2655. OnNext(380, 6),
  2656. OnNext(410, 7),
  2657. OnNext(590, 8),
  2658. OnCompleted<int>(650)
  2659. );
  2660. xs.Subscriptions.AssertEqual(
  2661. Subscribe(200, 650)
  2662. );
  2663. }
  2664. [Fact]
  2665. public void SkipLast_One_Error()
  2666. {
  2667. var scheduler = new TestScheduler();
  2668. var ex = new Exception();
  2669. var xs = scheduler.CreateHotObservable(
  2670. OnNext(180, 1),
  2671. OnNext(210, 2),
  2672. OnNext(250, 3),
  2673. OnNext(270, 4),
  2674. OnNext(310, 5),
  2675. OnNext(360, 6),
  2676. OnNext(380, 7),
  2677. OnNext(410, 8),
  2678. OnNext(590, 9),
  2679. OnError<int>(650, ex)
  2680. );
  2681. var res = scheduler.Start(() =>
  2682. xs.SkipLast(1)
  2683. );
  2684. res.Messages.AssertEqual(
  2685. OnNext(250, 2),
  2686. OnNext(270, 3),
  2687. OnNext(310, 4),
  2688. OnNext(360, 5),
  2689. OnNext(380, 6),
  2690. OnNext(410, 7),
  2691. OnNext(590, 8),
  2692. OnError<int>(650, ex)
  2693. );
  2694. xs.Subscriptions.AssertEqual(
  2695. Subscribe(200, 650)
  2696. );
  2697. }
  2698. [Fact]
  2699. public void SkipLast_One_Disposed()
  2700. {
  2701. var scheduler = new TestScheduler();
  2702. var xs = scheduler.CreateHotObservable(
  2703. OnNext(180, 1),
  2704. OnNext(210, 2),
  2705. OnNext(250, 3),
  2706. OnNext(270, 4),
  2707. OnNext(310, 5),
  2708. OnNext(360, 6),
  2709. OnNext(380, 7),
  2710. OnNext(410, 8),
  2711. OnNext(590, 9)
  2712. );
  2713. var res = scheduler.Start(() =>
  2714. xs.SkipLast(1)
  2715. );
  2716. res.Messages.AssertEqual(
  2717. OnNext(250, 2),
  2718. OnNext(270, 3),
  2719. OnNext(310, 4),
  2720. OnNext(360, 5),
  2721. OnNext(380, 6),
  2722. OnNext(410, 7),
  2723. OnNext(590, 8)
  2724. );
  2725. xs.Subscriptions.AssertEqual(
  2726. Subscribe(200, 1000)
  2727. );
  2728. }
  2729. [Fact]
  2730. public void SkipLast_Three_Completed()
  2731. {
  2732. var scheduler = new TestScheduler();
  2733. var xs = scheduler.CreateHotObservable(
  2734. OnNext(180, 1),
  2735. OnNext(210, 2),
  2736. OnNext(250, 3),
  2737. OnNext(270, 4),
  2738. OnNext(310, 5),
  2739. OnNext(360, 6),
  2740. OnNext(380, 7),
  2741. OnNext(410, 8),
  2742. OnNext(590, 9),
  2743. OnCompleted<int>(650)
  2744. );
  2745. var res = scheduler.Start(() =>
  2746. xs.SkipLast(3)
  2747. );
  2748. res.Messages.AssertEqual(
  2749. OnNext(310, 2),
  2750. OnNext(360, 3),
  2751. OnNext(380, 4),
  2752. OnNext(410, 5),
  2753. OnNext(590, 6),
  2754. OnCompleted<int>(650)
  2755. );
  2756. xs.Subscriptions.AssertEqual(
  2757. Subscribe(200, 650)
  2758. );
  2759. }
  2760. [Fact]
  2761. public void SkipLast_Three_Error()
  2762. {
  2763. var scheduler = new TestScheduler();
  2764. var ex = new Exception();
  2765. var xs = scheduler.CreateHotObservable(
  2766. OnNext(180, 1),
  2767. OnNext(210, 2),
  2768. OnNext(250, 3),
  2769. OnNext(270, 4),
  2770. OnNext(310, 5),
  2771. OnNext(360, 6),
  2772. OnNext(380, 7),
  2773. OnNext(410, 8),
  2774. OnNext(590, 9),
  2775. OnError<int>(650, ex)
  2776. );
  2777. var res = scheduler.Start(() =>
  2778. xs.SkipLast(3)
  2779. );
  2780. res.Messages.AssertEqual(
  2781. OnNext(310, 2),
  2782. OnNext(360, 3),
  2783. OnNext(380, 4),
  2784. OnNext(410, 5),
  2785. OnNext(590, 6),
  2786. OnError<int>(650, ex)
  2787. );
  2788. xs.Subscriptions.AssertEqual(
  2789. Subscribe(200, 650)
  2790. );
  2791. }
  2792. [Fact]
  2793. public void SkipLast_Three_Disposed()
  2794. {
  2795. var scheduler = new TestScheduler();
  2796. var xs = scheduler.CreateHotObservable(
  2797. OnNext(180, 1),
  2798. OnNext(210, 2),
  2799. OnNext(250, 3),
  2800. OnNext(270, 4),
  2801. OnNext(310, 5),
  2802. OnNext(360, 6),
  2803. OnNext(380, 7),
  2804. OnNext(410, 8),
  2805. OnNext(590, 9)
  2806. );
  2807. var res = scheduler.Start(() =>
  2808. xs.SkipLast(3)
  2809. );
  2810. res.Messages.AssertEqual(
  2811. OnNext(310, 2),
  2812. OnNext(360, 3),
  2813. OnNext(380, 4),
  2814. OnNext(410, 5),
  2815. OnNext(590, 6)
  2816. );
  2817. xs.Subscriptions.AssertEqual(
  2818. Subscribe(200, 1000)
  2819. );
  2820. }
  2821. #endregion
  2822. #region StartWith
  2823. [Fact]
  2824. public void StartWith_ArgumentChecking()
  2825. {
  2826. var values = (IEnumerable<int>)new[] { 1, 2, 3 };
  2827. var scheduler = new TestScheduler();
  2828. var someObservable = Observable.Empty<int>();
  2829. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.StartWith(default(IObservable<int>), 1));
  2830. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.StartWith(someObservable, default(int[])));
  2831. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.StartWith(default(IObservable<int>), values));
  2832. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.StartWith(someObservable, default(IEnumerable<int>)));
  2833. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.StartWith(default(IObservable<int>), scheduler, 1));
  2834. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.StartWith(someObservable, default(IScheduler), 1));
  2835. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.StartWith(someObservable, scheduler, default(int[])));
  2836. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.StartWith(default(IObservable<int>), scheduler, values));
  2837. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.StartWith(someObservable, default(IScheduler), values));
  2838. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.StartWith(someObservable, scheduler, default(IEnumerable<int>)));
  2839. }
  2840. [Fact]
  2841. public void StartWith()
  2842. {
  2843. var scheduler = new TestScheduler();
  2844. var xs = scheduler.CreateHotObservable(
  2845. OnNext(150, 1),
  2846. OnNext(220, 2),
  2847. OnCompleted<int>(250)
  2848. );
  2849. var res = scheduler.Start(() =>
  2850. xs.StartWith(1)
  2851. );
  2852. res.Messages.AssertEqual(
  2853. OnNext(200, 1),
  2854. OnNext(220, 2),
  2855. OnCompleted<int>(250)
  2856. );
  2857. }
  2858. [Fact]
  2859. public void StartWith_Scheduler()
  2860. {
  2861. var scheduler = new TestScheduler();
  2862. var xs = scheduler.CreateHotObservable(
  2863. OnNext(150, 1),
  2864. OnNext(220, 4),
  2865. OnCompleted<int>(250)
  2866. );
  2867. var res = scheduler.Start(() =>
  2868. xs.StartWith(scheduler, 1, 2, 3)
  2869. );
  2870. res.Messages.AssertEqual(
  2871. OnNext(201, 1),
  2872. OnNext(202, 2),
  2873. OnNext(203, 3),
  2874. OnNext(220, 4),
  2875. OnCompleted<int>(250)
  2876. );
  2877. }
  2878. [Fact]
  2879. public void StartWith_Enumerable()
  2880. {
  2881. var scheduler = new TestScheduler();
  2882. var xs = scheduler.CreateHotObservable(
  2883. OnNext(150, 1),
  2884. OnNext(220, 4),
  2885. OnCompleted<int>(250)
  2886. );
  2887. List<int> data = new List<int>(new[] { 1, 2, 3 });
  2888. var res = scheduler.Start(() =>
  2889. xs.StartWith(data)
  2890. );
  2891. res.Messages.AssertEqual(
  2892. OnNext(200, 1),
  2893. OnNext(200, 2),
  2894. OnNext(200, 3),
  2895. OnNext(220, 4),
  2896. OnCompleted<int>(250)
  2897. );
  2898. }
  2899. [Fact]
  2900. public void StartWith_Enumerable_Scheduler()
  2901. {
  2902. var scheduler = new TestScheduler();
  2903. var xs = scheduler.CreateHotObservable(
  2904. OnNext(150, 1),
  2905. OnNext(220, 4),
  2906. OnCompleted<int>(250)
  2907. );
  2908. List<int> data = new List<int>(new[] { 1, 2, 3 });
  2909. var res = scheduler.Start(() =>
  2910. xs.StartWith(scheduler, data)
  2911. );
  2912. res.Messages.AssertEqual(
  2913. OnNext(201, 1),
  2914. OnNext(202, 2),
  2915. OnNext(203, 3),
  2916. OnNext(220, 4),
  2917. OnCompleted<int>(250)
  2918. );
  2919. }
  2920. #endregion
  2921. #region + TakeLast +
  2922. [Fact]
  2923. public void TakeLast_ArgumentChecking()
  2924. {
  2925. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.TakeLast<int>(null, 0));
  2926. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.TakeLast(DummyObservable<int>.Instance, -1));
  2927. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.TakeLast<int>(null, 0, Scheduler.Default));
  2928. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.TakeLast(DummyObservable<int>.Instance, -1, Scheduler.Default));
  2929. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.TakeLast(DummyObservable<int>.Instance, 0, default(IScheduler)));
  2930. }
  2931. [Fact]
  2932. public void TakeLast_Zero_Completed()
  2933. {
  2934. var scheduler = new TestScheduler();
  2935. var xs = scheduler.CreateHotObservable(
  2936. OnNext(180, 1),
  2937. OnNext(210, 2),
  2938. OnNext(250, 3),
  2939. OnNext(270, 4),
  2940. OnNext(310, 5),
  2941. OnNext(360, 6),
  2942. OnNext(380, 7),
  2943. OnNext(410, 8),
  2944. OnNext(590, 9),
  2945. OnCompleted<int>(650)
  2946. );
  2947. var res = scheduler.Start(() =>
  2948. xs.TakeLast(0)
  2949. );
  2950. res.Messages.AssertEqual(
  2951. OnCompleted<int>(650)
  2952. );
  2953. xs.Subscriptions.AssertEqual(
  2954. Subscribe(200, 650)
  2955. );
  2956. }
  2957. [Fact]
  2958. public void TakeLast_Zero_Error()
  2959. {
  2960. var scheduler = new TestScheduler();
  2961. var ex = new Exception();
  2962. var xs = scheduler.CreateHotObservable(
  2963. OnNext(180, 1),
  2964. OnNext(210, 2),
  2965. OnNext(250, 3),
  2966. OnNext(270, 4),
  2967. OnNext(310, 5),
  2968. OnNext(360, 6),
  2969. OnNext(380, 7),
  2970. OnNext(410, 8),
  2971. OnNext(590, 9),
  2972. OnError<int>(650, ex)
  2973. );
  2974. var res = scheduler.Start(() =>
  2975. xs.TakeLast(0)
  2976. );
  2977. res.Messages.AssertEqual(
  2978. OnError<int>(650, ex)
  2979. );
  2980. xs.Subscriptions.AssertEqual(
  2981. Subscribe(200, 650)
  2982. );
  2983. }
  2984. [Fact]
  2985. public void TakeLast_Zero_Disposed()
  2986. {
  2987. var scheduler = new TestScheduler();
  2988. var xs = scheduler.CreateHotObservable(
  2989. OnNext(180, 1),
  2990. OnNext(210, 2),
  2991. OnNext(250, 3),
  2992. OnNext(270, 4),
  2993. OnNext(310, 5),
  2994. OnNext(360, 6),
  2995. OnNext(380, 7),
  2996. OnNext(410, 8),
  2997. OnNext(590, 9)
  2998. );
  2999. var res = scheduler.Start(() =>
  3000. xs.TakeLast(0)
  3001. );
  3002. res.Messages.AssertEqual(
  3003. );
  3004. xs.Subscriptions.AssertEqual(
  3005. Subscribe(200, 1000)
  3006. );
  3007. }
  3008. [Fact]
  3009. public void TakeLast_One_Completed()
  3010. {
  3011. var scheduler = new TestScheduler();
  3012. var xs = scheduler.CreateHotObservable(
  3013. OnNext(180, 1),
  3014. OnNext(210, 2),
  3015. OnNext(250, 3),
  3016. OnNext(270, 4),
  3017. OnNext(310, 5),
  3018. OnNext(360, 6),
  3019. OnNext(380, 7),
  3020. OnNext(410, 8),
  3021. OnNext(590, 9),
  3022. OnCompleted<int>(650)
  3023. );
  3024. var res = scheduler.Start(() =>
  3025. xs.TakeLast(1)
  3026. );
  3027. res.Messages.AssertEqual(
  3028. OnNext(650, 9),
  3029. OnCompleted<int>(650)
  3030. );
  3031. xs.Subscriptions.AssertEqual(
  3032. Subscribe(200, 650)
  3033. );
  3034. }
  3035. [Fact]
  3036. public void TakeLast_One_Error()
  3037. {
  3038. var scheduler = new TestScheduler();
  3039. var ex = new Exception();
  3040. var xs = scheduler.CreateHotObservable(
  3041. OnNext(180, 1),
  3042. OnNext(210, 2),
  3043. OnNext(250, 3),
  3044. OnNext(270, 4),
  3045. OnNext(310, 5),
  3046. OnNext(360, 6),
  3047. OnNext(380, 7),
  3048. OnNext(410, 8),
  3049. OnNext(590, 9),
  3050. OnError<int>(650, ex)
  3051. );
  3052. var res = scheduler.Start(() =>
  3053. xs.TakeLast(1)
  3054. );
  3055. res.Messages.AssertEqual(
  3056. OnError<int>(650, ex)
  3057. );
  3058. xs.Subscriptions.AssertEqual(
  3059. Subscribe(200, 650)
  3060. );
  3061. }
  3062. [Fact]
  3063. public void TakeLast_One_Disposed()
  3064. {
  3065. var scheduler = new TestScheduler();
  3066. var xs = scheduler.CreateHotObservable(
  3067. OnNext(180, 1),
  3068. OnNext(210, 2),
  3069. OnNext(250, 3),
  3070. OnNext(270, 4),
  3071. OnNext(310, 5),
  3072. OnNext(360, 6),
  3073. OnNext(380, 7),
  3074. OnNext(410, 8),
  3075. OnNext(590, 9)
  3076. );
  3077. var res = scheduler.Start(() =>
  3078. xs.TakeLast(1)
  3079. );
  3080. res.Messages.AssertEqual(
  3081. );
  3082. xs.Subscriptions.AssertEqual(
  3083. Subscribe(200, 1000)
  3084. );
  3085. }
  3086. [Fact]
  3087. public void TakeLast_Three_Completed()
  3088. {
  3089. var scheduler = new TestScheduler();
  3090. var xs = scheduler.CreateHotObservable(
  3091. OnNext(180, 1),
  3092. OnNext(210, 2),
  3093. OnNext(250, 3),
  3094. OnNext(270, 4),
  3095. OnNext(310, 5),
  3096. OnNext(360, 6),
  3097. OnNext(380, 7),
  3098. OnNext(410, 8),
  3099. OnNext(590, 9),
  3100. OnCompleted<int>(650)
  3101. );
  3102. var res = scheduler.Start(() =>
  3103. xs.TakeLast(3)
  3104. );
  3105. res.Messages.AssertEqual(
  3106. OnNext(650, 7),
  3107. OnNext(650, 8),
  3108. OnNext(650, 9),
  3109. OnCompleted<int>(650)
  3110. );
  3111. xs.Subscriptions.AssertEqual(
  3112. Subscribe(200, 650)
  3113. );
  3114. }
  3115. [Fact]
  3116. public void TakeLast_Three_Error()
  3117. {
  3118. var scheduler = new TestScheduler();
  3119. var ex = new Exception();
  3120. var xs = scheduler.CreateHotObservable(
  3121. OnNext(180, 1),
  3122. OnNext(210, 2),
  3123. OnNext(250, 3),
  3124. OnNext(270, 4),
  3125. OnNext(310, 5),
  3126. OnNext(360, 6),
  3127. OnNext(380, 7),
  3128. OnNext(410, 8),
  3129. OnNext(590, 9),
  3130. OnError<int>(650, ex)
  3131. );
  3132. var res = scheduler.Start(() =>
  3133. xs.TakeLast(3)
  3134. );
  3135. res.Messages.AssertEqual(
  3136. OnError<int>(650, ex)
  3137. );
  3138. xs.Subscriptions.AssertEqual(
  3139. Subscribe(200, 650)
  3140. );
  3141. }
  3142. [Fact]
  3143. public void TakeLast_Three_Disposed()
  3144. {
  3145. var scheduler = new TestScheduler();
  3146. var xs = scheduler.CreateHotObservable(
  3147. OnNext(180, 1),
  3148. OnNext(210, 2),
  3149. OnNext(250, 3),
  3150. OnNext(270, 4),
  3151. OnNext(310, 5),
  3152. OnNext(360, 6),
  3153. OnNext(380, 7),
  3154. OnNext(410, 8),
  3155. OnNext(590, 9)
  3156. );
  3157. var res = scheduler.Start(() =>
  3158. xs.TakeLast(3)
  3159. );
  3160. res.Messages.AssertEqual(
  3161. );
  3162. xs.Subscriptions.AssertEqual(
  3163. Subscribe(200, 1000)
  3164. );
  3165. }
  3166. [Fact]
  3167. public void TakeLast_LongRunning_Regular()
  3168. {
  3169. var res = Observable.Range(0, 100, Scheduler.Default).TakeLast(10, NewThreadScheduler.Default);
  3170. var lst = new List<int>();
  3171. res.ForEach(lst.Add);
  3172. Assert.True(Enumerable.Range(90, 10).SequenceEqual(lst));
  3173. }
  3174. #endregion
  3175. #region + TakeLastBuffer +
  3176. [Fact]
  3177. public void TakeLastBuffer_ArgumentChecking()
  3178. {
  3179. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.TakeLastBuffer<int>(null, 0));
  3180. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.TakeLastBuffer(DummyObservable<int>.Instance, -1));
  3181. }
  3182. [Fact]
  3183. public void TakeLastBuffer_Zero_Completed()
  3184. {
  3185. var scheduler = new TestScheduler();
  3186. var xs = scheduler.CreateHotObservable(
  3187. OnNext(180, 1),
  3188. OnNext(210, 2),
  3189. OnNext(250, 3),
  3190. OnNext(270, 4),
  3191. OnNext(310, 5),
  3192. OnNext(360, 6),
  3193. OnNext(380, 7),
  3194. OnNext(410, 8),
  3195. OnNext(590, 9),
  3196. OnCompleted<int>(650)
  3197. );
  3198. var res = scheduler.Start(() =>
  3199. xs.TakeLastBuffer(0)
  3200. );
  3201. res.Messages.AssertEqual(
  3202. OnNext<IList<int>>(650, lst => lst.Count == 0),
  3203. OnCompleted<IList<int>>(650)
  3204. );
  3205. xs.Subscriptions.AssertEqual(
  3206. Subscribe(200, 650)
  3207. );
  3208. }
  3209. [Fact]
  3210. public void TakeLastBuffer_Zero_Error()
  3211. {
  3212. var scheduler = new TestScheduler();
  3213. var ex = new Exception();
  3214. var xs = scheduler.CreateHotObservable(
  3215. OnNext(180, 1),
  3216. OnNext(210, 2),
  3217. OnNext(250, 3),
  3218. OnNext(270, 4),
  3219. OnNext(310, 5),
  3220. OnNext(360, 6),
  3221. OnNext(380, 7),
  3222. OnNext(410, 8),
  3223. OnNext(590, 9),
  3224. OnError<int>(650, ex)
  3225. );
  3226. var res = scheduler.Start(() =>
  3227. xs.TakeLastBuffer(0)
  3228. );
  3229. res.Messages.AssertEqual(
  3230. OnError<IList<int>>(650, ex)
  3231. );
  3232. xs.Subscriptions.AssertEqual(
  3233. Subscribe(200, 650)
  3234. );
  3235. }
  3236. [Fact]
  3237. public void TakeLastBuffer_Zero_Disposed()
  3238. {
  3239. var scheduler = new TestScheduler();
  3240. var xs = scheduler.CreateHotObservable(
  3241. OnNext(180, 1),
  3242. OnNext(210, 2),
  3243. OnNext(250, 3),
  3244. OnNext(270, 4),
  3245. OnNext(310, 5),
  3246. OnNext(360, 6),
  3247. OnNext(380, 7),
  3248. OnNext(410, 8),
  3249. OnNext(590, 9)
  3250. );
  3251. var res = scheduler.Start(() =>
  3252. xs.TakeLastBuffer(0)
  3253. );
  3254. res.Messages.AssertEqual(
  3255. );
  3256. xs.Subscriptions.AssertEqual(
  3257. Subscribe(200, 1000)
  3258. );
  3259. }
  3260. [Fact]
  3261. public void TakeLastBuffer_One_Completed()
  3262. {
  3263. var scheduler = new TestScheduler();
  3264. var xs = scheduler.CreateHotObservable(
  3265. OnNext(180, 1),
  3266. OnNext(210, 2),
  3267. OnNext(250, 3),
  3268. OnNext(270, 4),
  3269. OnNext(310, 5),
  3270. OnNext(360, 6),
  3271. OnNext(380, 7),
  3272. OnNext(410, 8),
  3273. OnNext(590, 9),
  3274. OnCompleted<int>(650)
  3275. );
  3276. var res = scheduler.Start(() =>
  3277. xs.TakeLastBuffer(1)
  3278. );
  3279. res.Messages.AssertEqual(
  3280. OnNext<IList<int>>(650, lst => lst.SequenceEqual(new[] { 9 })),
  3281. OnCompleted<IList<int>>(650)
  3282. );
  3283. xs.Subscriptions.AssertEqual(
  3284. Subscribe(200, 650)
  3285. );
  3286. }
  3287. [Fact]
  3288. public void TakeLastBuffer_One_Error()
  3289. {
  3290. var scheduler = new TestScheduler();
  3291. var ex = new Exception();
  3292. var xs = scheduler.CreateHotObservable(
  3293. OnNext(180, 1),
  3294. OnNext(210, 2),
  3295. OnNext(250, 3),
  3296. OnNext(270, 4),
  3297. OnNext(310, 5),
  3298. OnNext(360, 6),
  3299. OnNext(380, 7),
  3300. OnNext(410, 8),
  3301. OnNext(590, 9),
  3302. OnError<int>(650, ex)
  3303. );
  3304. var res = scheduler.Start(() =>
  3305. xs.TakeLastBuffer(1)
  3306. );
  3307. res.Messages.AssertEqual(
  3308. OnError<IList<int>>(650, ex)
  3309. );
  3310. xs.Subscriptions.AssertEqual(
  3311. Subscribe(200, 650)
  3312. );
  3313. }
  3314. [Fact]
  3315. public void TakeLastBuffer_One_Disposed()
  3316. {
  3317. var scheduler = new TestScheduler();
  3318. var xs = scheduler.CreateHotObservable(
  3319. OnNext(180, 1),
  3320. OnNext(210, 2),
  3321. OnNext(250, 3),
  3322. OnNext(270, 4),
  3323. OnNext(310, 5),
  3324. OnNext(360, 6),
  3325. OnNext(380, 7),
  3326. OnNext(410, 8),
  3327. OnNext(590, 9)
  3328. );
  3329. var res = scheduler.Start(() =>
  3330. xs.TakeLastBuffer(1)
  3331. );
  3332. res.Messages.AssertEqual(
  3333. );
  3334. xs.Subscriptions.AssertEqual(
  3335. Subscribe(200, 1000)
  3336. );
  3337. }
  3338. [Fact]
  3339. public void TakeLastBuffer_Three_Completed()
  3340. {
  3341. var scheduler = new TestScheduler();
  3342. var xs = scheduler.CreateHotObservable(
  3343. OnNext(180, 1),
  3344. OnNext(210, 2),
  3345. OnNext(250, 3),
  3346. OnNext(270, 4),
  3347. OnNext(310, 5),
  3348. OnNext(360, 6),
  3349. OnNext(380, 7),
  3350. OnNext(410, 8),
  3351. OnNext(590, 9),
  3352. OnCompleted<int>(650)
  3353. );
  3354. var res = scheduler.Start(() =>
  3355. xs.TakeLastBuffer(3)
  3356. );
  3357. res.Messages.AssertEqual(
  3358. OnNext<IList<int>>(650, lst => lst.SequenceEqual(new[] { 7, 8, 9 })),
  3359. OnCompleted<IList<int>>(650)
  3360. );
  3361. xs.Subscriptions.AssertEqual(
  3362. Subscribe(200, 650)
  3363. );
  3364. }
  3365. [Fact]
  3366. public void TakeLastBuffer_Three_Error()
  3367. {
  3368. var scheduler = new TestScheduler();
  3369. var ex = new Exception();
  3370. var xs = scheduler.CreateHotObservable(
  3371. OnNext(180, 1),
  3372. OnNext(210, 2),
  3373. OnNext(250, 3),
  3374. OnNext(270, 4),
  3375. OnNext(310, 5),
  3376. OnNext(360, 6),
  3377. OnNext(380, 7),
  3378. OnNext(410, 8),
  3379. OnNext(590, 9),
  3380. OnError<int>(650, ex)
  3381. );
  3382. var res = scheduler.Start(() =>
  3383. xs.TakeLastBuffer(3)
  3384. );
  3385. res.Messages.AssertEqual(
  3386. OnError<IList<int>>(650, ex)
  3387. );
  3388. xs.Subscriptions.AssertEqual(
  3389. Subscribe(200, 650)
  3390. );
  3391. }
  3392. [Fact]
  3393. public void TakeLastBuffer_Three_Disposed()
  3394. {
  3395. var scheduler = new TestScheduler();
  3396. var xs = scheduler.CreateHotObservable(
  3397. OnNext(180, 1),
  3398. OnNext(210, 2),
  3399. OnNext(250, 3),
  3400. OnNext(270, 4),
  3401. OnNext(310, 5),
  3402. OnNext(360, 6),
  3403. OnNext(380, 7),
  3404. OnNext(410, 8),
  3405. OnNext(590, 9)
  3406. );
  3407. var res = scheduler.Start(() =>
  3408. xs.TakeLastBuffer(3)
  3409. );
  3410. res.Messages.AssertEqual(
  3411. );
  3412. xs.Subscriptions.AssertEqual(
  3413. Subscribe(200, 1000)
  3414. );
  3415. }
  3416. #endregion
  3417. #region + Window +
  3418. [Fact]
  3419. public void WindowWithCount_ArgumentChecking()
  3420. {
  3421. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Window(default(IObservable<int>), 1, 1));
  3422. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Window(DummyObservable<int>.Instance, 0, 1));
  3423. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Window(DummyObservable<int>.Instance, 1, 0));
  3424. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Window(default(IObservable<int>), 1));
  3425. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Window(DummyObservable<int>.Instance, 0));
  3426. }
  3427. [Fact]
  3428. public void WindowWithCount_Basic()
  3429. {
  3430. var scheduler = new TestScheduler();
  3431. var xs = scheduler.CreateHotObservable(
  3432. OnNext(100, 1),
  3433. OnNext(210, 2),
  3434. OnNext(240, 3),
  3435. OnNext(280, 4),
  3436. OnNext(320, 5),
  3437. OnNext(350, 6),
  3438. OnNext(380, 7),
  3439. OnNext(420, 8),
  3440. OnNext(470, 9),
  3441. OnCompleted<int>(600)
  3442. );
  3443. var res = scheduler.Start(() =>
  3444. xs.Window(3, 2).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge()
  3445. );
  3446. res.Messages.AssertEqual(
  3447. OnNext(210, "0 2"),
  3448. OnNext(240, "0 3"),
  3449. OnNext(280, "0 4"),
  3450. OnNext(280, "1 4"),
  3451. OnNext(320, "1 5"),
  3452. OnNext(350, "1 6"),
  3453. OnNext(350, "2 6"),
  3454. OnNext(380, "2 7"),
  3455. OnNext(420, "2 8"),
  3456. OnNext(420, "3 8"),
  3457. OnNext(470, "3 9"),
  3458. OnCompleted<string>(600)
  3459. );
  3460. xs.Subscriptions.AssertEqual(
  3461. Subscribe(200, 600)
  3462. );
  3463. }
  3464. [Fact]
  3465. public void WindowWithCount_InnerTimings()
  3466. {
  3467. var scheduler = new TestScheduler();
  3468. var xs = scheduler.CreateHotObservable(
  3469. OnNext(100, 1),
  3470. OnNext(210, 2),
  3471. OnNext(240, 3),
  3472. OnNext(280, 4),
  3473. OnNext(320, 5),
  3474. OnNext(350, 6),
  3475. OnNext(380, 7),
  3476. OnNext(420, 8),
  3477. OnNext(470, 9),
  3478. OnCompleted<int>(600)
  3479. );
  3480. var res = default(IObservable<IObservable<int>>);
  3481. var outerSubscription = default(IDisposable);
  3482. var innerSubscriptions = new List<IDisposable>();
  3483. var windows = new List<IObservable<int>>();
  3484. var observers = new List<ITestableObserver<int>>();
  3485. scheduler.ScheduleAbsolute(Created, () => res = xs.Window(3, 2));
  3486. scheduler.ScheduleAbsolute(Subscribed, () =>
  3487. {
  3488. outerSubscription = res.Subscribe(
  3489. window =>
  3490. {
  3491. var result = scheduler.CreateObserver<int>();
  3492. windows.Add(window);
  3493. observers.Add(result);
  3494. scheduler.ScheduleAbsolute(0, () => innerSubscriptions.Add(window.Subscribe(result)));
  3495. }
  3496. );
  3497. });
  3498. scheduler.Start();
  3499. Assert.Equal(5, observers.Count);
  3500. observers[0].Messages.AssertEqual(
  3501. OnNext(210, 2),
  3502. OnNext(240, 3),
  3503. OnNext(280, 4),
  3504. OnCompleted<int>(280)
  3505. );
  3506. observers[1].Messages.AssertEqual(
  3507. OnNext(280, 4),
  3508. OnNext(320, 5),
  3509. OnNext(350, 6),
  3510. OnCompleted<int>(350)
  3511. );
  3512. observers[2].Messages.AssertEqual(
  3513. OnNext(350, 6),
  3514. OnNext(380, 7),
  3515. OnNext(420, 8),
  3516. OnCompleted<int>(420)
  3517. );
  3518. observers[3].Messages.AssertEqual(
  3519. OnNext(420, 8),
  3520. OnNext(470, 9),
  3521. OnCompleted<int>(600)
  3522. );
  3523. observers[4].Messages.AssertEqual(
  3524. OnCompleted<int>(600)
  3525. );
  3526. xs.Subscriptions.AssertEqual(
  3527. Subscribe(200, 600)
  3528. );
  3529. }
  3530. [Fact]
  3531. public void WindowWithCount_InnerTimings_DisposeOuter()
  3532. {
  3533. var scheduler = new TestScheduler();
  3534. var xs = scheduler.CreateHotObservable(
  3535. OnNext(100, 1),
  3536. OnNext(210, 2),
  3537. OnNext(240, 3),
  3538. OnNext(280, 4),
  3539. OnNext(320, 5),
  3540. OnNext(350, 6),
  3541. OnNext(380, 7),
  3542. OnNext(420, 8),
  3543. OnNext(470, 9),
  3544. OnCompleted<int>(600)
  3545. );
  3546. var res = default(IObservable<IObservable<int>>);
  3547. var outerSubscription = default(IDisposable);
  3548. var innerSubscriptions = new List<IDisposable>();
  3549. var windows = new List<IObservable<int>>();
  3550. var observers = new List<ITestableObserver<int>>();
  3551. var windowCreationTimes = new List<long>();
  3552. scheduler.ScheduleAbsolute(Created, () => res = xs.Window(3, 2));
  3553. scheduler.ScheduleAbsolute(Subscribed, () =>
  3554. {
  3555. outerSubscription = res.Subscribe(
  3556. window =>
  3557. {
  3558. windowCreationTimes.Add(scheduler.Clock);
  3559. var result = scheduler.CreateObserver<int>();
  3560. windows.Add(window);
  3561. observers.Add(result);
  3562. scheduler.ScheduleAbsolute(0, () => innerSubscriptions.Add(window.Subscribe(result)));
  3563. }
  3564. );
  3565. });
  3566. scheduler.ScheduleAbsolute(400, () =>
  3567. {
  3568. outerSubscription.Dispose();
  3569. });
  3570. scheduler.Start();
  3571. Assert.True(windowCreationTimes.Last() < 400);
  3572. Assert.Equal(4, observers.Count);
  3573. observers[0].Messages.AssertEqual(
  3574. OnNext(210, 2),
  3575. OnNext(240, 3),
  3576. OnNext(280, 4),
  3577. OnCompleted<int>(280)
  3578. );
  3579. observers[1].Messages.AssertEqual(
  3580. OnNext(280, 4),
  3581. OnNext(320, 5),
  3582. OnNext(350, 6),
  3583. OnCompleted<int>(350)
  3584. );
  3585. observers[2].Messages.AssertEqual(
  3586. OnNext(350, 6),
  3587. OnNext(380, 7),
  3588. OnNext(420, 8),
  3589. OnCompleted<int>(420)
  3590. );
  3591. observers[3].Messages.AssertEqual(
  3592. OnNext(420, 8),
  3593. OnNext(470, 9),
  3594. OnCompleted<int>(600)
  3595. );
  3596. xs.Subscriptions.AssertEqual(
  3597. Subscribe(200, 600)
  3598. );
  3599. }
  3600. [Fact]
  3601. public void WindowWithCount_InnerTimings_DisposeOuterAndInners()
  3602. {
  3603. var scheduler = new TestScheduler();
  3604. var xs = scheduler.CreateHotObservable(
  3605. OnNext(100, 1),
  3606. OnNext(210, 2),
  3607. OnNext(240, 3),
  3608. OnNext(280, 4),
  3609. OnNext(320, 5),
  3610. OnNext(350, 6),
  3611. OnNext(380, 7),
  3612. OnNext(420, 8),
  3613. OnNext(470, 9),
  3614. OnCompleted<int>(600)
  3615. );
  3616. var res = default(IObservable<IObservable<int>>);
  3617. var outerSubscription = default(IDisposable);
  3618. var innerSubscriptions = new List<IDisposable>();
  3619. var windows = new List<IObservable<int>>();
  3620. var observers = new List<ITestableObserver<int>>();
  3621. var windowCreationTimes = new List<long>();
  3622. scheduler.ScheduleAbsolute(Created, () => res = xs.Window(3, 2));
  3623. scheduler.ScheduleAbsolute(Subscribed, () =>
  3624. {
  3625. outerSubscription = res.Subscribe(
  3626. window =>
  3627. {
  3628. windowCreationTimes.Add(scheduler.Clock);
  3629. var result = scheduler.CreateObserver<int>();
  3630. windows.Add(window);
  3631. observers.Add(result);
  3632. scheduler.ScheduleAbsolute(0, () => innerSubscriptions.Add(window.Subscribe(result)));
  3633. }
  3634. );
  3635. });
  3636. scheduler.ScheduleAbsolute(400, () =>
  3637. {
  3638. outerSubscription.Dispose();
  3639. foreach (var d in innerSubscriptions)
  3640. d.Dispose();
  3641. });
  3642. scheduler.Start();
  3643. Assert.True(windowCreationTimes.Last() < 400);
  3644. Assert.Equal(4, observers.Count);
  3645. observers[0].Messages.AssertEqual(
  3646. OnNext(210, 2),
  3647. OnNext(240, 3),
  3648. OnNext(280, 4),
  3649. OnCompleted<int>(280)
  3650. );
  3651. observers[1].Messages.AssertEqual(
  3652. OnNext(280, 4),
  3653. OnNext(320, 5),
  3654. OnNext(350, 6),
  3655. OnCompleted<int>(350)
  3656. );
  3657. observers[2].Messages.AssertEqual(
  3658. OnNext(350, 6),
  3659. OnNext(380, 7)
  3660. );
  3661. observers[3].Messages.AssertEqual(
  3662. );
  3663. xs.Subscriptions.AssertEqual(
  3664. Subscribe(200, 400)
  3665. );
  3666. }
  3667. [Fact]
  3668. public void WindowWithCount_Disposed()
  3669. {
  3670. var scheduler = new TestScheduler();
  3671. var xs = scheduler.CreateHotObservable(
  3672. OnNext(100, 1),
  3673. OnNext(210, 2),
  3674. OnNext(240, 3),
  3675. OnNext(280, 4),
  3676. OnNext(320, 5),
  3677. OnNext(350, 6),
  3678. OnNext(380, 7),
  3679. OnNext(420, 8),
  3680. OnNext(470, 9),
  3681. OnCompleted<int>(600)
  3682. );
  3683. var res = scheduler.Start(() =>
  3684. xs.Window(3, 2).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge(), 370
  3685. );
  3686. res.Messages.AssertEqual(
  3687. OnNext(210, "0 2"),
  3688. OnNext(240, "0 3"),
  3689. OnNext(280, "0 4"),
  3690. OnNext(280, "1 4"),
  3691. OnNext(320, "1 5"),
  3692. OnNext(350, "1 6"),
  3693. OnNext(350, "2 6")
  3694. );
  3695. xs.Subscriptions.AssertEqual(
  3696. Subscribe(200, 370)
  3697. );
  3698. }
  3699. [Fact]
  3700. public void WindowWithCount_Error()
  3701. {
  3702. var scheduler = new TestScheduler();
  3703. var ex = new Exception();
  3704. var xs = scheduler.CreateHotObservable(
  3705. OnNext(100, 1),
  3706. OnNext(210, 2),
  3707. OnNext(240, 3),
  3708. OnNext(280, 4),
  3709. OnNext(320, 5),
  3710. OnNext(350, 6),
  3711. OnNext(380, 7),
  3712. OnNext(420, 8),
  3713. OnNext(470, 9),
  3714. OnError<int>(600, ex)
  3715. );
  3716. var res = scheduler.Start(() =>
  3717. xs.Window(3, 2).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge()
  3718. );
  3719. res.Messages.AssertEqual(
  3720. OnNext(210, "0 2"),
  3721. OnNext(240, "0 3"),
  3722. OnNext(280, "0 4"),
  3723. OnNext(280, "1 4"),
  3724. OnNext(320, "1 5"),
  3725. OnNext(350, "1 6"),
  3726. OnNext(350, "2 6"),
  3727. OnNext(380, "2 7"),
  3728. OnNext(420, "2 8"),
  3729. OnNext(420, "3 8"),
  3730. OnNext(470, "3 9"),
  3731. OnError<string>(600, ex)
  3732. );
  3733. xs.Subscriptions.AssertEqual(
  3734. Subscribe(200, 600)
  3735. );
  3736. }
  3737. [Fact]
  3738. public void WindowWithCount_Default()
  3739. {
  3740. Observable.Range(1, 10).Window(3).Skip(1).First().SequenceEqual(new[] { 4, 5, 6 }.ToObservable());
  3741. Observable.Range(1, 10).Window(3).Skip(1).First().SequenceEqual(new[] { 4, 5, 6 }.ToObservable());
  3742. Observable.Range(1, 10).Window(3, 2).Skip(1).First().SequenceEqual(new[] { 3, 4, 5 }.ToObservable());
  3743. }
  3744. #endregion
  3745. }
  3746. }