ObservableSingleTest.cs 128 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Reactive;
  8. using System.Reactive.Concurrency;
  9. using System.Reactive.Linq;
  10. using Microsoft.Reactive.Testing;
  11. using Xunit;
  12. using ReactiveTests.Dummies;
  13. namespace ReactiveTests.Tests
  14. {
  15. public class ObservableSingleTest : ReactiveTest
  16. {
  17. #region + AsObservable +
  18. [Fact]
  19. public void AsObservable_ArgumentChecking()
  20. {
  21. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.AsObservable<int>(null));
  22. }
  23. [Fact]
  24. public void AsObservable_AsObservable()
  25. {
  26. var scheduler = new TestScheduler();
  27. var xs = scheduler.CreateHotObservable(
  28. OnNext(150, 1),
  29. OnNext(220, 2),
  30. OnCompleted<int>(250)
  31. );
  32. var ys = xs.AsObservable().AsObservable();
  33. Assert.NotSame(xs, ys);
  34. var res = scheduler.Start(() =>
  35. ys
  36. );
  37. res.Messages.AssertEqual(
  38. OnNext(220, 2),
  39. OnCompleted<int>(250)
  40. );
  41. xs.Subscriptions.AssertEqual(
  42. Subscribe(200, 250)
  43. );
  44. }
  45. [Fact]
  46. public void AsObservable_Hides()
  47. {
  48. var xs = Observable.Empty<int>();
  49. var res = xs.AsObservable();
  50. Assert.NotSame(xs, res);
  51. }
  52. [Fact]
  53. public void AsObservable_Never()
  54. {
  55. var scheduler = new TestScheduler();
  56. var xs = Observable.Never<int>();
  57. var res = scheduler.Start(() =>
  58. xs.AsObservable()
  59. );
  60. res.Messages.AssertEqual(
  61. );
  62. }
  63. [Fact]
  64. public void AsObservable_Empty()
  65. {
  66. var scheduler = new TestScheduler();
  67. var xs = scheduler.CreateHotObservable(
  68. OnNext(150, 1),
  69. OnCompleted<int>(250)
  70. );
  71. var res = scheduler.Start(() =>
  72. xs.AsObservable()
  73. );
  74. res.Messages.AssertEqual(
  75. OnCompleted<int>(250)
  76. );
  77. xs.Subscriptions.AssertEqual(
  78. Subscribe(200, 250)
  79. );
  80. }
  81. [Fact]
  82. public void AsObservable_Throw()
  83. {
  84. var scheduler = new TestScheduler();
  85. var ex = new Exception();
  86. var xs = scheduler.CreateHotObservable(
  87. OnNext(150, 1),
  88. OnError<int>(250, ex)
  89. );
  90. var res = scheduler.Start(() =>
  91. xs.AsObservable()
  92. );
  93. res.Messages.AssertEqual(
  94. OnError<int>(250, ex)
  95. );
  96. xs.Subscriptions.AssertEqual(
  97. Subscribe(200, 250)
  98. );
  99. }
  100. [Fact]
  101. public void AsObservable_Return()
  102. {
  103. var scheduler = new TestScheduler();
  104. var xs = scheduler.CreateHotObservable(
  105. OnNext(150, 1),
  106. OnNext(220, 2),
  107. OnCompleted<int>(250)
  108. );
  109. var res = scheduler.Start(() =>
  110. xs.AsObservable()
  111. );
  112. res.Messages.AssertEqual(
  113. OnNext(220, 2),
  114. OnCompleted<int>(250)
  115. );
  116. xs.Subscriptions.AssertEqual(
  117. Subscribe(200, 250)
  118. );
  119. }
  120. [Fact]
  121. public void AsObservable_IsNotEager()
  122. {
  123. var scheduler = new TestScheduler();
  124. bool subscribed = false;
  125. var xs = Observable.Create<int>(obs =>
  126. {
  127. subscribed = true;
  128. var disp = scheduler.CreateHotObservable(
  129. OnNext(150, 1),
  130. OnNext(220, 2),
  131. OnCompleted<int>(250)
  132. ).Subscribe(obs);
  133. return disp.Dispose;
  134. });
  135. xs.AsObservable();
  136. Assert.False(subscribed);
  137. var res = scheduler.Start(() =>
  138. xs.AsObservable()
  139. );
  140. Assert.True(subscribed);
  141. }
  142. #endregion
  143. #region + Buffer +
  144. [Fact]
  145. public void Buffer_Single_ArgumentChecking()
  146. {
  147. var someObservable = Observable.Empty<int>();
  148. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), 1));
  149. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(someObservable, 0));
  150. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(someObservable, -1));
  151. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), 1, 1));
  152. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(someObservable, 1, 0));
  153. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(someObservable, 0, 1));
  154. }
  155. [Fact]
  156. public void Buffer_Count_PartialWindow()
  157. {
  158. var scheduler = new TestScheduler();
  159. var xs = scheduler.CreateHotObservable(
  160. OnNext(150, 1),
  161. OnNext(210, 2),
  162. OnNext(220, 3),
  163. OnNext(230, 4),
  164. OnNext(240, 5),
  165. OnCompleted<int>(250)
  166. );
  167. var res = scheduler.Start(() =>
  168. xs.Buffer(5)
  169. );
  170. res.Messages.AssertEqual(
  171. OnNext<IList<int>>(250, l => l.SequenceEqual(new[] { 2, 3, 4, 5 })),
  172. OnCompleted<IList<int>>(250)
  173. );
  174. xs.Subscriptions.AssertEqual(
  175. Subscribe(200, 250)
  176. );
  177. }
  178. [Fact]
  179. public void Buffer_Count_FullWindows()
  180. {
  181. var scheduler = new TestScheduler();
  182. var xs = scheduler.CreateHotObservable(
  183. OnNext(150, 1),
  184. OnNext(210, 2),
  185. OnNext(220, 3),
  186. OnNext(230, 4),
  187. OnNext(240, 5),
  188. OnCompleted<int>(250)
  189. );
  190. var res = scheduler.Start(() =>
  191. xs.Buffer(2)
  192. );
  193. res.Messages.AssertEqual(
  194. OnNext<IList<int>>(220, l => l.SequenceEqual(new[] { 2, 3 })),
  195. OnNext<IList<int>>(240, l => l.SequenceEqual(new[] { 4, 5 })),
  196. OnCompleted<IList<int>>(250)
  197. );
  198. xs.Subscriptions.AssertEqual(
  199. Subscribe(200, 250)
  200. );
  201. }
  202. [Fact]
  203. public void Buffer_Count_FullAndPartialWindows()
  204. {
  205. var scheduler = new TestScheduler();
  206. var xs = scheduler.CreateHotObservable(
  207. OnNext(150, 1),
  208. OnNext(210, 2),
  209. OnNext(220, 3),
  210. OnNext(230, 4),
  211. OnNext(240, 5),
  212. OnCompleted<int>(250)
  213. );
  214. var res = scheduler.Start(() =>
  215. xs.Buffer(3)
  216. );
  217. res.Messages.AssertEqual(
  218. OnNext<IList<int>>(230, l => l.SequenceEqual(new int[] { 2, 3, 4 })),
  219. OnNext<IList<int>>(250, l => l.SequenceEqual(new int[] { 5 })),
  220. OnCompleted<IList<int>>(250)
  221. );
  222. xs.Subscriptions.AssertEqual(
  223. Subscribe(200, 250)
  224. );
  225. }
  226. [Fact]
  227. public void Buffer_Count_Error()
  228. {
  229. var scheduler = new TestScheduler();
  230. var ex = new Exception();
  231. var xs = scheduler.CreateHotObservable(
  232. OnNext(150, 1),
  233. OnNext(210, 2),
  234. OnNext(220, 3),
  235. OnNext(230, 4),
  236. OnNext(240, 5),
  237. OnError<int>(250, ex)
  238. );
  239. var res = scheduler.Start(() =>
  240. xs.Buffer(5)
  241. );
  242. res.Messages.AssertEqual(
  243. OnError<IList<int>>(250, ex)
  244. );
  245. xs.Subscriptions.AssertEqual(
  246. Subscribe(200, 250)
  247. );
  248. }
  249. [Fact]
  250. public void Buffer_Count_Skip_Less()
  251. {
  252. var scheduler = new TestScheduler();
  253. var xs = scheduler.CreateHotObservable(
  254. OnNext(150, 1),
  255. OnNext(210, 2),
  256. OnNext(220, 3),
  257. OnNext(230, 4),
  258. OnNext(240, 5),
  259. OnCompleted<int>(250)
  260. );
  261. var res = scheduler.Start(() =>
  262. xs.Buffer(3, 1)
  263. );
  264. res.Messages.AssertEqual(
  265. OnNext<IList<int>>(230, l => l.SequenceEqual(new int[] { 2, 3, 4 })),
  266. OnNext<IList<int>>(240, l => l.SequenceEqual(new int[] { 3, 4, 5 })),
  267. OnNext<IList<int>>(250, l => l.SequenceEqual(new int[] { 4, 5 })),
  268. OnNext<IList<int>>(250, l => l.SequenceEqual(new int[] { 5 })),
  269. OnCompleted<IList<int>>(250)
  270. );
  271. xs.Subscriptions.AssertEqual(
  272. Subscribe(200, 250)
  273. );
  274. }
  275. [Fact]
  276. public void Buffer_Count_Skip_More()
  277. {
  278. var scheduler = new TestScheduler();
  279. var xs = scheduler.CreateHotObservable(
  280. OnNext(150, 1),
  281. OnNext(210, 2),
  282. OnNext(220, 3),
  283. OnNext(230, 4),
  284. OnNext(240, 5),
  285. OnCompleted<int>(250)
  286. );
  287. var res = scheduler.Start(() =>
  288. xs.Buffer(2, 3)
  289. );
  290. res.Messages.AssertEqual(
  291. OnNext<IList<int>>(220, l => l.SequenceEqual(new int[] { 2, 3 })),
  292. OnNext<IList<int>>(250, l => l.SequenceEqual(new int[] { 5 })),
  293. OnCompleted<IList<int>>(250)
  294. );
  295. xs.Subscriptions.AssertEqual(
  296. Subscribe(200, 250)
  297. );
  298. }
  299. [Fact]
  300. public void BufferWithCount_ArgumentChecking()
  301. {
  302. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), 1, 1));
  303. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, 0, 1));
  304. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, 1, 0));
  305. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), 1));
  306. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, 0));
  307. }
  308. [Fact]
  309. public void BufferWithCount_Basic()
  310. {
  311. var scheduler = new TestScheduler();
  312. var xs = scheduler.CreateHotObservable(
  313. OnNext(100, 1),
  314. OnNext(210, 2),
  315. OnNext(240, 3),
  316. OnNext(280, 4),
  317. OnNext(320, 5),
  318. OnNext(350, 6),
  319. OnNext(380, 7),
  320. OnNext(420, 8),
  321. OnNext(470, 9),
  322. OnCompleted<int>(600)
  323. );
  324. var res = scheduler.Start(() =>
  325. xs.Buffer(3, 2).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray()))
  326. );
  327. res.Messages.AssertEqual(
  328. OnNext(280, "2,3,4"),
  329. OnNext(350, "4,5,6"),
  330. OnNext(420, "6,7,8"),
  331. OnNext(600, "8,9"),
  332. OnCompleted<string>(600)
  333. );
  334. xs.Subscriptions.AssertEqual(
  335. Subscribe(200, 600)
  336. );
  337. }
  338. [Fact]
  339. public void BufferWithCount_Disposed()
  340. {
  341. var scheduler = new TestScheduler();
  342. var xs = scheduler.CreateHotObservable(
  343. OnNext(100, 1),
  344. OnNext(210, 2),
  345. OnNext(240, 3),
  346. OnNext(280, 4),
  347. OnNext(320, 5),
  348. OnNext(350, 6),
  349. OnNext(380, 7),
  350. OnNext(420, 8),
  351. OnNext(470, 9),
  352. OnCompleted<int>(600)
  353. );
  354. var res = scheduler.Start(() =>
  355. xs.Buffer(3, 2).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray())), 370
  356. );
  357. res.Messages.AssertEqual(
  358. OnNext(280, "2,3,4"),
  359. OnNext(350, "4,5,6")
  360. );
  361. xs.Subscriptions.AssertEqual(
  362. Subscribe(200, 370)
  363. );
  364. }
  365. [Fact]
  366. public void BufferWithCount_Error()
  367. {
  368. var scheduler = new TestScheduler();
  369. var ex = new Exception();
  370. var xs = scheduler.CreateHotObservable(
  371. OnNext(100, 1),
  372. OnNext(210, 2),
  373. OnNext(240, 3),
  374. OnNext(280, 4),
  375. OnNext(320, 5),
  376. OnNext(350, 6),
  377. OnNext(380, 7),
  378. OnNext(420, 8),
  379. OnNext(470, 9),
  380. OnError<int>(600, ex)
  381. );
  382. var res = scheduler.Start(() =>
  383. xs.Buffer(3, 2).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray()))
  384. );
  385. res.Messages.AssertEqual(
  386. OnNext(280, "2,3,4"),
  387. OnNext(350, "4,5,6"),
  388. OnNext(420, "6,7,8"),
  389. OnError<string>(600, ex)
  390. );
  391. xs.Subscriptions.AssertEqual(
  392. Subscribe(200, 600)
  393. );
  394. }
  395. [Fact]
  396. public void BufferWithCount_Default()
  397. {
  398. Observable.Range(1, 10).Buffer(3).Skip(1).First().AssertEqual(4, 5, 6);
  399. Observable.Range(1, 10).Buffer(3, 2).Skip(1).First().AssertEqual(3, 4, 5);
  400. }
  401. #endregion
  402. #region + Dematerialize +
  403. [Fact]
  404. public void Dematerialize_ArgumentChecking()
  405. {
  406. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Dematerialize<int>(null));
  407. }
  408. [Fact]
  409. public void Dematerialize_Range1()
  410. {
  411. var scheduler = new TestScheduler();
  412. var xs = scheduler.CreateHotObservable(
  413. OnNext(150, Notification.CreateOnNext(41)),
  414. OnNext(210, Notification.CreateOnNext(42)),
  415. OnNext(220, Notification.CreateOnNext(43)),
  416. OnCompleted<Notification<int>>(250)
  417. );
  418. var res = scheduler.Start(() =>
  419. xs.Dematerialize()
  420. );
  421. res.Messages.AssertEqual(
  422. OnNext(210, 42),
  423. OnNext(220, 43),
  424. OnCompleted<int>(250)
  425. );
  426. xs.Subscriptions.AssertEqual(
  427. Subscribe(200, 250)
  428. );
  429. }
  430. [Fact]
  431. public void Dematerialize_Range2()
  432. {
  433. var scheduler = new TestScheduler();
  434. var xs = scheduler.CreateHotObservable(
  435. OnNext(150, Notification.CreateOnNext(41)),
  436. OnNext(210, Notification.CreateOnNext(42)),
  437. OnNext(220, Notification.CreateOnNext(43)),
  438. OnNext(230, Notification.CreateOnCompleted<int>())
  439. );
  440. var res = scheduler.Start(() =>
  441. xs.Dematerialize()
  442. );
  443. res.Messages.AssertEqual(
  444. OnNext(210, 42),
  445. OnNext(220, 43),
  446. OnCompleted<int>(230)
  447. );
  448. xs.Subscriptions.AssertEqual(
  449. Subscribe(200, 230)
  450. );
  451. }
  452. [Fact]
  453. public void Dematerialize_Error1()
  454. {
  455. var scheduler = new TestScheduler();
  456. var ex = new Exception();
  457. var xs = scheduler.CreateHotObservable(
  458. OnNext(150, Notification.CreateOnNext(41)),
  459. OnNext(210, Notification.CreateOnNext(42)),
  460. OnNext(220, Notification.CreateOnNext(43)),
  461. OnError<Notification<int>>(230, ex)
  462. );
  463. var res = scheduler.Start(() =>
  464. xs.Dematerialize()
  465. );
  466. res.Messages.AssertEqual(
  467. OnNext(210, 42),
  468. OnNext(220, 43),
  469. OnError<int>(230, ex)
  470. );
  471. xs.Subscriptions.AssertEqual(
  472. Subscribe(200, 230)
  473. );
  474. }
  475. [Fact]
  476. public void Dematerialize_Error2()
  477. {
  478. var scheduler = new TestScheduler();
  479. var ex = new Exception();
  480. var xs = scheduler.CreateHotObservable(
  481. OnNext(150, Notification.CreateOnNext(41)),
  482. OnNext(210, Notification.CreateOnNext(42)),
  483. OnNext(220, Notification.CreateOnNext(43)),
  484. OnNext(230, Notification.CreateOnError<int>(ex))
  485. );
  486. var res = scheduler.Start(() =>
  487. xs.Dematerialize()
  488. );
  489. res.Messages.AssertEqual(
  490. OnNext(210, 42),
  491. OnNext(220, 43),
  492. OnError<int>(230, ex)
  493. );
  494. xs.Subscriptions.AssertEqual(
  495. Subscribe(200, 230)
  496. );
  497. }
  498. [Fact]
  499. public void Materialize_Dematerialize_Never()
  500. {
  501. var scheduler = new TestScheduler();
  502. var xs = Observable.Never<int>();
  503. var res = scheduler.Start(() =>
  504. xs.Materialize().Dematerialize()
  505. );
  506. res.Messages.AssertEqual(
  507. );
  508. }
  509. [Fact]
  510. public void Materialize_Dematerialize_Empty()
  511. {
  512. var scheduler = new TestScheduler();
  513. var xs = scheduler.CreateHotObservable(
  514. OnNext(150, 1),
  515. OnCompleted<int>(250)
  516. );
  517. var res = scheduler.Start(() =>
  518. xs.Materialize().Dematerialize()
  519. );
  520. res.Messages.AssertEqual(
  521. OnCompleted<int>(250)
  522. );
  523. xs.Subscriptions.AssertEqual(
  524. Subscribe(200, 250)
  525. );
  526. }
  527. [Fact]
  528. public void Materialize_Dematerialize_Return()
  529. {
  530. var scheduler = new TestScheduler();
  531. var xs = scheduler.CreateHotObservable(
  532. OnNext(150, 1),
  533. OnNext(210, 2),
  534. OnCompleted<int>(250)
  535. );
  536. var res = scheduler.Start(() =>
  537. xs.Materialize().Dematerialize()
  538. );
  539. res.Messages.AssertEqual(
  540. OnNext(210, 2),
  541. OnCompleted<int>(250)
  542. );
  543. xs.Subscriptions.AssertEqual(
  544. Subscribe(200, 250)
  545. );
  546. }
  547. [Fact]
  548. public void Materialize_Dematerialize_Throw()
  549. {
  550. var scheduler = new TestScheduler();
  551. var ex = new Exception();
  552. var xs = scheduler.CreateHotObservable(
  553. OnNext(150, 1),
  554. OnError<int>(250, ex)
  555. );
  556. var res = scheduler.Start(() =>
  557. xs.Materialize().Dematerialize()
  558. );
  559. res.Messages.AssertEqual(
  560. OnError<int>(250, ex)
  561. );
  562. xs.Subscriptions.AssertEqual(
  563. Subscribe(200, 250)
  564. );
  565. }
  566. #endregion
  567. #region + DistinctUntilChanged +
  568. [Fact]
  569. public void DistinctUntilChanged_ArgumentChecking()
  570. {
  571. var someObservable = Observable.Empty<int>();
  572. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.DistinctUntilChanged<int>(null));
  573. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.DistinctUntilChanged<int>(null, EqualityComparer<int>.Default));
  574. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.DistinctUntilChanged<int>(someObservable, null));
  575. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.DistinctUntilChanged<int, int>(null, _ => _));
  576. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.DistinctUntilChanged<int, int>(someObservable, null));
  577. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.DistinctUntilChanged<int, int>(someObservable, _ => _, null));
  578. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.DistinctUntilChanged<int, int>(null, _ => _, EqualityComparer<int>.Default));
  579. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.DistinctUntilChanged<int, int>(someObservable, null, EqualityComparer<int>.Default));
  580. }
  581. [Fact]
  582. public void DistinctUntilChanged_Never()
  583. {
  584. var scheduler = new TestScheduler();
  585. var xs = Observable.Never<int>();
  586. var res = scheduler.Start(() =>
  587. xs.DistinctUntilChanged()
  588. );
  589. res.Messages.AssertEqual(
  590. );
  591. }
  592. [Fact]
  593. public void DistinctUntilChanged_Empty()
  594. {
  595. var scheduler = new TestScheduler();
  596. var xs = scheduler.CreateHotObservable(
  597. OnNext(150, 1),
  598. OnCompleted<int>(250)
  599. );
  600. var res = scheduler.Start(() =>
  601. xs.DistinctUntilChanged()
  602. );
  603. res.Messages.AssertEqual(
  604. OnCompleted<int>(250)
  605. );
  606. xs.Subscriptions.AssertEqual(
  607. Subscribe(200, 250)
  608. );
  609. }
  610. [Fact]
  611. public void DistinctUntilChanged_Return()
  612. {
  613. var scheduler = new TestScheduler();
  614. var xs = scheduler.CreateHotObservable(
  615. OnNext(150, 1),
  616. OnNext(220, 2),
  617. OnCompleted<int>(250)
  618. );
  619. var res = scheduler.Start(() =>
  620. xs.DistinctUntilChanged()
  621. );
  622. res.Messages.AssertEqual(
  623. OnNext(220, 2),
  624. OnCompleted<int>(250)
  625. );
  626. xs.Subscriptions.AssertEqual(
  627. Subscribe(200, 250)
  628. );
  629. }
  630. [Fact]
  631. public void DistinctUntilChanged_Throw()
  632. {
  633. var scheduler = new TestScheduler();
  634. var ex = new Exception();
  635. var xs = scheduler.CreateHotObservable(
  636. OnNext(150, 1),
  637. OnError<int>(250, ex)
  638. );
  639. var res = scheduler.Start(() =>
  640. xs.DistinctUntilChanged()
  641. );
  642. res.Messages.AssertEqual(
  643. OnError<int>(250, ex)
  644. );
  645. xs.Subscriptions.AssertEqual(
  646. Subscribe(200, 250)
  647. );
  648. }
  649. [Fact]
  650. public void DistinctUntilChanged_AllChanges()
  651. {
  652. var scheduler = new TestScheduler();
  653. var xs = scheduler.CreateHotObservable(
  654. OnNext(150, 1),
  655. OnNext(210, 2),
  656. OnNext(220, 3),
  657. OnNext(230, 4),
  658. OnNext(240, 5),
  659. OnCompleted<int>(250)
  660. );
  661. var res = scheduler.Start(() =>
  662. xs.DistinctUntilChanged()
  663. );
  664. res.Messages.AssertEqual(
  665. OnNext(210, 2),
  666. OnNext(220, 3),
  667. OnNext(230, 4),
  668. OnNext(240, 5),
  669. OnCompleted<int>(250)
  670. );
  671. xs.Subscriptions.AssertEqual(
  672. Subscribe(200, 250)
  673. );
  674. }
  675. [Fact]
  676. public void DistinctUntilChanged_AllSame()
  677. {
  678. var scheduler = new TestScheduler();
  679. var xs = scheduler.CreateHotObservable(
  680. OnNext(150, 1),
  681. OnNext(210, 2),
  682. OnNext(220, 2),
  683. OnNext(230, 2),
  684. OnNext(240, 2),
  685. OnCompleted<int>(250)
  686. );
  687. var res = scheduler.Start(() =>
  688. xs.DistinctUntilChanged()
  689. );
  690. res.Messages.AssertEqual(
  691. OnNext(210, 2),
  692. OnCompleted<int>(250)
  693. );
  694. xs.Subscriptions.AssertEqual(
  695. Subscribe(200, 250)
  696. );
  697. }
  698. [Fact]
  699. public void DistinctUntilChanged_SomeChanges()
  700. {
  701. var scheduler = new TestScheduler();
  702. var xs = scheduler.CreateHotObservable(
  703. OnNext(150, 1),
  704. OnNext(210, 2), //*
  705. OnNext(215, 3), //*
  706. OnNext(220, 3),
  707. OnNext(225, 2), //*
  708. OnNext(230, 2),
  709. OnNext(230, 1), //*
  710. OnNext(240, 2), //*
  711. OnCompleted<int>(250)
  712. );
  713. var res = scheduler.Start(() =>
  714. xs.DistinctUntilChanged()
  715. );
  716. res.Messages.AssertEqual(
  717. OnNext(210, 2),
  718. OnNext(215, 3),
  719. OnNext(225, 2),
  720. OnNext(230, 1),
  721. OnNext(240, 2),
  722. OnCompleted<int>(250)
  723. );
  724. xs.Subscriptions.AssertEqual(
  725. Subscribe(200, 250)
  726. );
  727. }
  728. [Fact]
  729. public void DistinctUntilChanged_Comparer_AllEqual()
  730. {
  731. var scheduler = new TestScheduler();
  732. var xs = scheduler.CreateHotObservable(
  733. OnNext(150, 1),
  734. OnNext(210, 2),
  735. OnNext(220, 3),
  736. OnNext(230, 4),
  737. OnNext(240, 5),
  738. OnCompleted<int>(250)
  739. );
  740. var res = scheduler.Start(() =>
  741. xs.DistinctUntilChanged(new FuncComparer<int>((x, y) => true))
  742. );
  743. res.Messages.AssertEqual(
  744. OnNext(210, 2),
  745. OnCompleted<int>(250)
  746. );
  747. xs.Subscriptions.AssertEqual(
  748. Subscribe(200, 250)
  749. );
  750. }
  751. [Fact]
  752. public void DistinctUntilChanged_Comparer_AllDifferent()
  753. {
  754. var scheduler = new TestScheduler();
  755. var xs = scheduler.CreateHotObservable(
  756. OnNext(150, 1),
  757. OnNext(210, 2),
  758. OnNext(220, 2),
  759. OnNext(230, 2),
  760. OnNext(240, 2),
  761. OnCompleted<int>(250)
  762. );
  763. var res = scheduler.Start(() =>
  764. xs.DistinctUntilChanged(new FuncComparer<int>((x, y) => false))
  765. );
  766. res.Messages.AssertEqual(
  767. OnNext(210, 2),
  768. OnNext(220, 2),
  769. OnNext(230, 2),
  770. OnNext(240, 2),
  771. OnCompleted<int>(250)
  772. );
  773. xs.Subscriptions.AssertEqual(
  774. Subscribe(200, 250)
  775. );
  776. }
  777. [Fact]
  778. public void DistinctUntilChanged_KeySelector_Div2()
  779. {
  780. var scheduler = new TestScheduler();
  781. var xs = scheduler.CreateHotObservable(
  782. OnNext(150, 1),
  783. OnNext(210, 2), //*
  784. OnNext(220, 4),
  785. OnNext(230, 3), //*
  786. OnNext(240, 5),
  787. OnCompleted<int>(250)
  788. );
  789. var res = scheduler.Start(() =>
  790. xs.DistinctUntilChanged(x => x % 2)
  791. );
  792. res.Messages.AssertEqual(
  793. OnNext(210, 2),
  794. OnNext(230, 3),
  795. OnCompleted<int>(250)
  796. );
  797. xs.Subscriptions.AssertEqual(
  798. Subscribe(200, 250)
  799. );
  800. }
  801. class FuncComparer<T> : IEqualityComparer<T>
  802. {
  803. private Func<T, T, bool> _equals;
  804. public FuncComparer(Func<T, T, bool> equals)
  805. {
  806. _equals = equals;
  807. }
  808. public bool Equals(T x, T y)
  809. {
  810. return _equals(x, y);
  811. }
  812. public int GetHashCode(T obj)
  813. {
  814. return 0;
  815. }
  816. }
  817. [Fact]
  818. public void DistinctUntilChanged_KeySelectorThrows()
  819. {
  820. var ex = new Exception();
  821. var scheduler = new TestScheduler();
  822. var xs = scheduler.CreateHotObservable(
  823. OnNext(150, 1),
  824. OnNext(210, 2),
  825. OnCompleted<int>(250)
  826. );
  827. var res = scheduler.Start(() =>
  828. xs.DistinctUntilChanged(new Func<int, int>(x => { throw ex; }))
  829. );
  830. res.Messages.AssertEqual(
  831. OnError<int>(210, ex)
  832. );
  833. }
  834. [Fact]
  835. public void DistinctUntilChanged_ComparerThrows()
  836. {
  837. var ex = new Exception();
  838. var scheduler = new TestScheduler();
  839. var xs = scheduler.CreateHotObservable(
  840. OnNext(150, 1),
  841. OnNext(210, 2),
  842. OnNext(220, 3),
  843. OnCompleted<int>(250)
  844. );
  845. var res = scheduler.Start(() =>
  846. xs.DistinctUntilChanged(new ThrowComparer<int>(ex))
  847. );
  848. res.Messages.AssertEqual(
  849. OnNext(210, 2),
  850. OnError<int>(220, ex)
  851. );
  852. }
  853. class ThrowComparer<T> : IEqualityComparer<T>
  854. {
  855. private Exception _ex;
  856. public ThrowComparer(Exception ex)
  857. {
  858. _ex = ex;
  859. }
  860. public bool Equals(T x, T y)
  861. {
  862. throw _ex;
  863. }
  864. public int GetHashCode(T obj)
  865. {
  866. return 0;
  867. }
  868. }
  869. [Fact]
  870. public void DistinctUntilChanged_KeySelector_Comparer()
  871. {
  872. var scheduler = new TestScheduler();
  873. var xs = scheduler.CreateHotObservable(
  874. OnNext(150, 1),
  875. OnNext(210, 2), // * key = 1 % 3 = 1
  876. OnNext(220, 8), // key = 4 % 3 = 1 same
  877. OnNext(230, 2), // key = 1 % 3 = 1 same
  878. OnNext(240, 5), // * key = 2 % 3 = 2
  879. OnCompleted<int>(250)
  880. );
  881. var res = scheduler.Start(() =>
  882. xs.DistinctUntilChanged(x => x / 2, new FuncComparer<int>((x, y) => x % 3 == y % 3))
  883. );
  884. res.Messages.AssertEqual(
  885. OnNext(210, 2),
  886. OnNext(240, 5),
  887. OnCompleted<int>(250)
  888. );
  889. xs.Subscriptions.AssertEqual(
  890. Subscribe(200, 250)
  891. );
  892. }
  893. #endregion
  894. #region + Do +
  895. [Fact]
  896. public void Do_ArgumentChecking()
  897. {
  898. var someObservable = Observable.Empty<int>();
  899. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Do<int>(someObservable, (Action<int>)null));
  900. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Do<int>(null, _ => { }));
  901. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Do<int>(someObservable, x => { }, (Action)null));
  902. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Do<int>(someObservable, (Action<int>)null, () => { }));
  903. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Do<int>(null, x => { }, () => { }));
  904. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Do<int>(someObservable, x => { }, (Action<Exception>)null));
  905. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Do<int>(someObservable, (Action<int>)null, (Exception _) => { }));
  906. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Do<int>(null, x => { }, (Exception _) => { }));
  907. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Do<int>(someObservable, x => { }, (Exception _) => { }, null));
  908. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Do<int>(someObservable, x => { }, (Action<Exception>)null, () => { }));
  909. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Do<int>(someObservable, (Action<int>)null, (Exception _) => { }, () => { }));
  910. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Do<int>(null, x => { }, (Exception _) => { }, () => { }));
  911. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Do<int>(null, Observer.Create<int>(i => { })));
  912. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Do<int>(someObservable, default(IObserver<int>)));
  913. }
  914. [Fact]
  915. public void Do_ShouldSeeAllValues()
  916. {
  917. var scheduler = new TestScheduler();
  918. var xs = scheduler.CreateHotObservable(
  919. OnNext(150, 1),
  920. OnNext(210, 2),
  921. OnNext(220, 3),
  922. OnNext(230, 4),
  923. OnNext(240, 5),
  924. OnCompleted<int>(250)
  925. );
  926. int i = 0;
  927. int sum = 2 + 3 + 4 + 5;
  928. var res = scheduler.Start(() =>
  929. xs.Do(x => { i++; sum -= x; })
  930. );
  931. Assert.Equal(4, i);
  932. Assert.Equal(0, sum);
  933. res.Messages.AssertEqual(
  934. OnNext(210, 2),
  935. OnNext(220, 3),
  936. OnNext(230, 4),
  937. OnNext(240, 5),
  938. OnCompleted<int>(250)
  939. );
  940. xs.Subscriptions.AssertEqual(
  941. Subscribe(200, 250)
  942. );
  943. }
  944. [Fact]
  945. public void Do_PlainAction()
  946. {
  947. var scheduler = new TestScheduler();
  948. var xs = scheduler.CreateHotObservable(
  949. OnNext(150, 1),
  950. OnNext(210, 2),
  951. OnNext(220, 3),
  952. OnNext(230, 4),
  953. OnNext(240, 5),
  954. OnCompleted<int>(250)
  955. );
  956. int i = 0;
  957. var res = scheduler.Start(() =>
  958. xs.Do(_ => { i++; })
  959. );
  960. Assert.Equal(4, i);
  961. res.Messages.AssertEqual(
  962. OnNext(210, 2),
  963. OnNext(220, 3),
  964. OnNext(230, 4),
  965. OnNext(240, 5),
  966. OnCompleted<int>(250)
  967. );
  968. xs.Subscriptions.AssertEqual(
  969. Subscribe(200, 250)
  970. );
  971. }
  972. [Fact]
  973. public void Do_NextCompleted()
  974. {
  975. var scheduler = new TestScheduler();
  976. var xs = scheduler.CreateHotObservable(
  977. OnNext(150, 1),
  978. OnNext(210, 2),
  979. OnNext(220, 3),
  980. OnNext(230, 4),
  981. OnNext(240, 5),
  982. OnCompleted<int>(250)
  983. );
  984. int i = 0;
  985. int sum = 2 + 3 + 4 + 5;
  986. bool completed = false;
  987. var res = scheduler.Start(() =>
  988. xs.Do(x => { i++; sum -= x; }, () => { completed = true; })
  989. );
  990. Assert.Equal(4, i);
  991. Assert.Equal(0, sum);
  992. Assert.True(completed);
  993. res.Messages.AssertEqual(
  994. OnNext(210, 2),
  995. OnNext(220, 3),
  996. OnNext(230, 4),
  997. OnNext(240, 5),
  998. OnCompleted<int>(250)
  999. );
  1000. xs.Subscriptions.AssertEqual(
  1001. Subscribe(200, 250)
  1002. );
  1003. }
  1004. [Fact]
  1005. public void Do_NextCompleted_Never()
  1006. {
  1007. var scheduler = new TestScheduler();
  1008. var xs = scheduler.CreateHotObservable<int>();
  1009. int i = 0;
  1010. bool completed = false;
  1011. var res = scheduler.Start(() =>
  1012. xs.Do(x => { i++; }, () => { completed = true; })
  1013. );
  1014. Assert.Equal(0, i);
  1015. Assert.False(completed);
  1016. res.Messages.AssertEqual(
  1017. );
  1018. xs.Subscriptions.AssertEqual(
  1019. Subscribe(200, 1000)
  1020. );
  1021. }
  1022. [Fact]
  1023. public void Do_NextError()
  1024. {
  1025. var scheduler = new TestScheduler();
  1026. var ex = new Exception();
  1027. var xs = scheduler.CreateHotObservable(
  1028. OnNext(150, 1),
  1029. OnNext(210, 2),
  1030. OnNext(220, 3),
  1031. OnNext(230, 4),
  1032. OnNext(240, 5),
  1033. OnError<int>(250, ex)
  1034. );
  1035. int i = 0;
  1036. int sum = 2 + 3 + 4 + 5;
  1037. bool sawError = false;
  1038. var res = scheduler.Start(() =>
  1039. xs.Do(x => { i++; sum -= x; }, e => { sawError = e == ex; })
  1040. );
  1041. Assert.Equal(4, i);
  1042. Assert.Equal(0, sum);
  1043. Assert.True(sawError);
  1044. res.Messages.AssertEqual(
  1045. OnNext(210, 2),
  1046. OnNext(220, 3),
  1047. OnNext(230, 4),
  1048. OnNext(240, 5),
  1049. OnError<int>(250, ex)
  1050. );
  1051. xs.Subscriptions.AssertEqual(
  1052. Subscribe(200, 250)
  1053. );
  1054. }
  1055. [Fact]
  1056. public void Do_NextErrorNot()
  1057. {
  1058. var scheduler = new TestScheduler();
  1059. var xs = scheduler.CreateHotObservable(
  1060. OnNext(150, 1),
  1061. OnNext(210, 2),
  1062. OnNext(220, 3),
  1063. OnNext(230, 4),
  1064. OnNext(240, 5),
  1065. OnCompleted<int>(250)
  1066. );
  1067. int i = 0;
  1068. int sum = 2 + 3 + 4 + 5;
  1069. bool sawError = false;
  1070. var res = scheduler.Start(() =>
  1071. xs.Do(x => { i++; sum -= x; }, _ => { sawError = true; })
  1072. );
  1073. Assert.Equal(4, i);
  1074. Assert.Equal(0, sum);
  1075. Assert.False(sawError);
  1076. res.Messages.AssertEqual(
  1077. OnNext(210, 2),
  1078. OnNext(220, 3),
  1079. OnNext(230, 4),
  1080. OnNext(240, 5),
  1081. OnCompleted<int>(250)
  1082. );
  1083. xs.Subscriptions.AssertEqual(
  1084. Subscribe(200, 250)
  1085. );
  1086. }
  1087. [Fact]
  1088. public void Do_NextErrorCompleted()
  1089. {
  1090. var scheduler = new TestScheduler();
  1091. var xs = scheduler.CreateHotObservable(
  1092. OnNext(150, 1),
  1093. OnNext(210, 2),
  1094. OnNext(220, 3),
  1095. OnNext(230, 4),
  1096. OnNext(240, 5),
  1097. OnCompleted<int>(250)
  1098. );
  1099. int i = 0;
  1100. int sum = 2 + 3 + 4 + 5;
  1101. bool sawError = false;
  1102. bool hasCompleted = false;
  1103. var res = scheduler.Start(() =>
  1104. xs.Do(x => { i++; sum -= x; }, e => { sawError = true; }, () => { hasCompleted = true; })
  1105. );
  1106. Assert.Equal(4, i);
  1107. Assert.Equal(0, sum);
  1108. Assert.False(sawError);
  1109. Assert.True(hasCompleted);
  1110. res.Messages.AssertEqual(
  1111. OnNext(210, 2),
  1112. OnNext(220, 3),
  1113. OnNext(230, 4),
  1114. OnNext(240, 5),
  1115. OnCompleted<int>(250)
  1116. );
  1117. xs.Subscriptions.AssertEqual(
  1118. Subscribe(200, 250)
  1119. );
  1120. }
  1121. [Fact]
  1122. public void Do_NextErrorCompletedError()
  1123. {
  1124. var scheduler = new TestScheduler();
  1125. var ex = new Exception();
  1126. var xs = scheduler.CreateHotObservable(
  1127. OnNext(150, 1),
  1128. OnNext(210, 2),
  1129. OnNext(220, 3),
  1130. OnNext(230, 4),
  1131. OnNext(240, 5),
  1132. OnError<int>(250, ex)
  1133. );
  1134. int i = 0;
  1135. int sum = 2 + 3 + 4 + 5;
  1136. bool sawError = false;
  1137. bool hasCompleted = false;
  1138. var res = scheduler.Start(() =>
  1139. xs.Do(x => { i++; sum -= x; }, e => { sawError = e == ex; }, () => { hasCompleted = true; })
  1140. );
  1141. Assert.Equal(4, i);
  1142. Assert.Equal(0, sum);
  1143. Assert.True(sawError);
  1144. Assert.False(hasCompleted);
  1145. res.Messages.AssertEqual(
  1146. OnNext(210, 2),
  1147. OnNext(220, 3),
  1148. OnNext(230, 4),
  1149. OnNext(240, 5),
  1150. OnError<int>(250, ex)
  1151. );
  1152. xs.Subscriptions.AssertEqual(
  1153. Subscribe(200, 250)
  1154. );
  1155. }
  1156. [Fact]
  1157. public void Do_NextErrorCompletedNever()
  1158. {
  1159. var scheduler = new TestScheduler();
  1160. var xs = scheduler.CreateHotObservable<int>();
  1161. int i = 0;
  1162. bool sawError = false;
  1163. bool hasCompleted = false;
  1164. var res = scheduler.Start(() =>
  1165. xs.Do(x => { i++; }, e => { sawError = true; }, () => { hasCompleted = true; })
  1166. );
  1167. Assert.Equal(0, i);
  1168. Assert.False(sawError);
  1169. Assert.False(hasCompleted);
  1170. res.Messages.AssertEqual(
  1171. );
  1172. xs.Subscriptions.AssertEqual(
  1173. Subscribe(200, 1000)
  1174. );
  1175. }
  1176. [Fact]
  1177. public void Do_Observer_SomeDataWithError()
  1178. {
  1179. var scheduler = new TestScheduler();
  1180. var ex = new Exception();
  1181. var xs = scheduler.CreateHotObservable(
  1182. OnNext(150, 1),
  1183. OnNext(210, 2),
  1184. OnNext(220, 3),
  1185. OnNext(230, 4),
  1186. OnNext(240, 5),
  1187. OnError<int>(250, ex)
  1188. );
  1189. int i = 0;
  1190. int sum = 2 + 3 + 4 + 5;
  1191. bool sawError = false;
  1192. bool hasCompleted = false;
  1193. var res = scheduler.Start(() =>
  1194. xs.Do(Observer.Create<int>(x => { i++; sum -= x; }, e => { sawError = e == ex; }, () => { hasCompleted = true; }))
  1195. );
  1196. Assert.Equal(4, i);
  1197. Assert.Equal(0, sum);
  1198. Assert.True(sawError);
  1199. Assert.False(hasCompleted);
  1200. res.Messages.AssertEqual(
  1201. OnNext(210, 2),
  1202. OnNext(220, 3),
  1203. OnNext(230, 4),
  1204. OnNext(240, 5),
  1205. OnError<int>(250, ex)
  1206. );
  1207. xs.Subscriptions.AssertEqual(
  1208. Subscribe(200, 250)
  1209. );
  1210. }
  1211. [Fact]
  1212. public void Do_Observer_SomeDataWithoutError()
  1213. {
  1214. var scheduler = new TestScheduler();
  1215. var xs = scheduler.CreateHotObservable(
  1216. OnNext(150, 1),
  1217. OnNext(210, 2),
  1218. OnNext(220, 3),
  1219. OnNext(230, 4),
  1220. OnNext(240, 5),
  1221. OnCompleted<int>(250)
  1222. );
  1223. int i = 0;
  1224. int sum = 2 + 3 + 4 + 5;
  1225. bool sawError = false;
  1226. bool hasCompleted = false;
  1227. var res = scheduler.Start(() =>
  1228. xs.Do(Observer.Create<int>(x => { i++; sum -= x; }, e => { sawError = true; }, () => { hasCompleted = true; }))
  1229. );
  1230. Assert.Equal(4, i);
  1231. Assert.Equal(0, sum);
  1232. Assert.False(sawError);
  1233. Assert.True(hasCompleted);
  1234. res.Messages.AssertEqual(
  1235. OnNext(210, 2),
  1236. OnNext(220, 3),
  1237. OnNext(230, 4),
  1238. OnNext(240, 5),
  1239. OnCompleted<int>(250)
  1240. );
  1241. xs.Subscriptions.AssertEqual(
  1242. Subscribe(200, 250)
  1243. );
  1244. }
  1245. [Fact]
  1246. public void Do1422_Next_NextThrows()
  1247. {
  1248. var scheduler = new TestScheduler();
  1249. var ex = new Exception();
  1250. var xs = scheduler.CreateHotObservable(
  1251. OnNext(150, 1),
  1252. OnNext(210, 2),
  1253. OnCompleted<int>(250)
  1254. );
  1255. var res = scheduler.Start(() =>
  1256. xs.Do(x => { throw ex; })
  1257. );
  1258. res.Messages.AssertEqual(
  1259. OnError<int>(210, ex)
  1260. );
  1261. xs.Subscriptions.AssertEqual(
  1262. Subscribe(200, 210)
  1263. );
  1264. }
  1265. [Fact]
  1266. public void Do1422_NextCompleted_NextThrows()
  1267. {
  1268. var scheduler = new TestScheduler();
  1269. var ex = new Exception();
  1270. var xs = scheduler.CreateHotObservable(
  1271. OnNext(150, 1),
  1272. OnNext(210, 2),
  1273. OnCompleted<int>(250)
  1274. );
  1275. var res = scheduler.Start(() =>
  1276. xs.Do(x => { throw ex; }, () => { })
  1277. );
  1278. res.Messages.AssertEqual(
  1279. OnError<int>(210, ex)
  1280. );
  1281. xs.Subscriptions.AssertEqual(
  1282. Subscribe(200, 210)
  1283. );
  1284. }
  1285. [Fact]
  1286. public void Do1422_NextCompleted_CompletedThrows()
  1287. {
  1288. var scheduler = new TestScheduler();
  1289. var ex = new Exception();
  1290. var xs = scheduler.CreateHotObservable(
  1291. OnNext(150, 1),
  1292. OnNext(210, 2),
  1293. OnCompleted<int>(250)
  1294. );
  1295. var res = scheduler.Start(() =>
  1296. xs.Do(x => { }, () => { throw ex; })
  1297. );
  1298. res.Messages.AssertEqual(
  1299. OnNext(210, 2),
  1300. OnError<int>(250, ex)
  1301. );
  1302. xs.Subscriptions.AssertEqual(
  1303. Subscribe(200, 250)
  1304. );
  1305. }
  1306. [Fact]
  1307. public void Do1422_NextError_NextThrows()
  1308. {
  1309. var scheduler = new TestScheduler();
  1310. var ex = new Exception();
  1311. var xs = scheduler.CreateHotObservable(
  1312. OnNext(150, 1),
  1313. OnNext(210, 2),
  1314. OnCompleted<int>(250)
  1315. );
  1316. var res = scheduler.Start(() =>
  1317. xs.Do(x => { throw ex; }, _ => { })
  1318. );
  1319. res.Messages.AssertEqual(
  1320. OnError<int>(210, ex)
  1321. );
  1322. xs.Subscriptions.AssertEqual(
  1323. Subscribe(200, 210)
  1324. );
  1325. }
  1326. [Fact]
  1327. public void Do1422_NextError_ErrorThrows()
  1328. {
  1329. var scheduler = new TestScheduler();
  1330. var ex1 = new Exception();
  1331. var ex2 = new Exception();
  1332. var xs = scheduler.CreateHotObservable(
  1333. OnNext(150, 1),
  1334. OnError<int>(210, ex1)
  1335. );
  1336. var res = scheduler.Start(() =>
  1337. xs.Do(x => { }, _ => { throw ex2; })
  1338. );
  1339. res.Messages.AssertEqual(
  1340. OnError<int>(210, ex2)
  1341. );
  1342. xs.Subscriptions.AssertEqual(
  1343. Subscribe(200, 210)
  1344. );
  1345. }
  1346. [Fact]
  1347. public void Do1422_NextErrorCompleted_NextThrows()
  1348. {
  1349. var scheduler = new TestScheduler();
  1350. var ex = new Exception();
  1351. var xs = scheduler.CreateHotObservable(
  1352. OnNext(150, 1),
  1353. OnNext(210, 2),
  1354. OnCompleted<int>(250)
  1355. );
  1356. var res = scheduler.Start(() =>
  1357. xs.Do(x => { throw ex; }, _ => { }, () => { })
  1358. );
  1359. res.Messages.AssertEqual(
  1360. OnError<int>(210, ex)
  1361. );
  1362. xs.Subscriptions.AssertEqual(
  1363. Subscribe(200, 210)
  1364. );
  1365. }
  1366. [Fact]
  1367. public void Do1422_NextErrorCompleted_ErrorThrows()
  1368. {
  1369. var scheduler = new TestScheduler();
  1370. var ex1 = new Exception();
  1371. var ex2 = new Exception();
  1372. var xs = scheduler.CreateHotObservable(
  1373. OnNext(150, 1),
  1374. OnError<int>(210, ex1)
  1375. );
  1376. var res = scheduler.Start(() =>
  1377. xs.Do(x => { }, _ => { throw ex2; }, () => { })
  1378. );
  1379. res.Messages.AssertEqual(
  1380. OnError<int>(210, ex2)
  1381. );
  1382. xs.Subscriptions.AssertEqual(
  1383. Subscribe(200, 210)
  1384. );
  1385. }
  1386. [Fact]
  1387. public void Do1422_NextErrorCompleted_CompletedThrows()
  1388. {
  1389. var scheduler = new TestScheduler();
  1390. var ex = new Exception();
  1391. var xs = scheduler.CreateHotObservable(
  1392. OnNext(150, 1),
  1393. OnNext(210, 2),
  1394. OnCompleted<int>(250)
  1395. );
  1396. var res = scheduler.Start(() =>
  1397. xs.Do(x => { }, _ => { }, () => { throw ex; })
  1398. );
  1399. res.Messages.AssertEqual(
  1400. OnNext(210, 2),
  1401. OnError<int>(250, ex)
  1402. );
  1403. xs.Subscriptions.AssertEqual(
  1404. Subscribe(200, 250)
  1405. );
  1406. }
  1407. [Fact]
  1408. public void Do1422_Observer_NextThrows()
  1409. {
  1410. var scheduler = new TestScheduler();
  1411. var ex = new Exception();
  1412. var xs = scheduler.CreateHotObservable(
  1413. OnNext(150, 1),
  1414. OnNext(210, 2),
  1415. OnCompleted<int>(250)
  1416. );
  1417. var res = scheduler.Start(() =>
  1418. xs.Do(Observer.Create<int>(x => { throw ex; }, _ => { }, () => { }))
  1419. );
  1420. res.Messages.AssertEqual(
  1421. OnError<int>(210, ex)
  1422. );
  1423. xs.Subscriptions.AssertEqual(
  1424. Subscribe(200, 210)
  1425. );
  1426. }
  1427. [Fact]
  1428. public void Do1422_Observer_ErrorThrows()
  1429. {
  1430. var scheduler = new TestScheduler();
  1431. var ex1 = new Exception();
  1432. var ex2 = new Exception();
  1433. var xs = scheduler.CreateHotObservable(
  1434. OnNext(150, 1),
  1435. OnError<int>(210, ex1)
  1436. );
  1437. var res = scheduler.Start(() =>
  1438. xs.Do(Observer.Create<int>(x => { }, _ => { throw ex2; }, () => { }))
  1439. );
  1440. res.Messages.AssertEqual(
  1441. OnError<int>(210, ex2)
  1442. );
  1443. xs.Subscriptions.AssertEqual(
  1444. Subscribe(200, 210)
  1445. );
  1446. }
  1447. [Fact]
  1448. public void Do1422_Observer_CompletedThrows()
  1449. {
  1450. var scheduler = new TestScheduler();
  1451. var ex = new Exception();
  1452. var xs = scheduler.CreateHotObservable(
  1453. OnNext(150, 1),
  1454. OnNext(210, 2),
  1455. OnCompleted<int>(250)
  1456. );
  1457. var res = scheduler.Start(() =>
  1458. xs.Do(Observer.Create<int>(x => { }, _ => { }, () => { throw ex; }))
  1459. );
  1460. res.Messages.AssertEqual(
  1461. OnNext(210, 2),
  1462. OnError<int>(250, ex)
  1463. );
  1464. xs.Subscriptions.AssertEqual(
  1465. Subscribe(200, 250)
  1466. );
  1467. }
  1468. #endregion
  1469. #region + Finally +
  1470. [Fact]
  1471. public void Finally_ArgumentChecking()
  1472. {
  1473. var someObservable = Observable.Empty<int>();
  1474. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Finally<int>(null, () => { }));
  1475. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Finally<int>(someObservable, null));
  1476. }
  1477. [Fact]
  1478. public void Finally_Never()
  1479. {
  1480. var scheduler = new TestScheduler();
  1481. bool invoked = false;
  1482. var res = scheduler.Start(() =>
  1483. Observable.Never<int>().Finally(() => { invoked = true; })
  1484. );
  1485. res.Messages.AssertEqual(
  1486. );
  1487. Assert.True(invoked); // due to unsubscribe; see 1356
  1488. }
  1489. [Fact]
  1490. public void Finally_OnlyCalledOnce_Never()
  1491. {
  1492. int invokeCount = 0;
  1493. var someObservable = Observable.Never<int>().Finally(() => { invokeCount++; });
  1494. var d = someObservable.Subscribe();
  1495. d.Dispose();
  1496. d.Dispose();
  1497. Assert.Equal(1, invokeCount);
  1498. }
  1499. [Fact]
  1500. public void Finally_OnlyCalledOnce_Empty()
  1501. {
  1502. var invokeCount = 0;
  1503. var someObservable = Observable.Empty<int>().Finally(() => { invokeCount++; });
  1504. var d = someObservable.Subscribe();
  1505. d.Dispose();
  1506. d.Dispose();
  1507. Assert.Equal(1, invokeCount);
  1508. }
  1509. [Fact]
  1510. public void Finally_Empty()
  1511. {
  1512. var scheduler = new TestScheduler();
  1513. var xs = scheduler.CreateHotObservable(
  1514. OnNext(150, 1),
  1515. OnCompleted<int>(250)
  1516. );
  1517. var invoked = false;
  1518. var res = scheduler.Start(() =>
  1519. xs.Finally(() => { invoked = true; })
  1520. );
  1521. Assert.True(invoked);
  1522. res.Messages.AssertEqual(
  1523. OnCompleted<int>(250)
  1524. );
  1525. xs.Subscriptions.AssertEqual(
  1526. Subscribe(200, 250)
  1527. );
  1528. }
  1529. [Fact]
  1530. public void Finally_Return()
  1531. {
  1532. var scheduler = new TestScheduler();
  1533. var xs = scheduler.CreateHotObservable(
  1534. OnNext(150, 1),
  1535. OnNext(210, 2),
  1536. OnCompleted<int>(250)
  1537. );
  1538. var invoked = false;
  1539. var res = scheduler.Start(() =>
  1540. xs.Finally(() => { invoked = true; })
  1541. );
  1542. Assert.True(invoked);
  1543. res.Messages.AssertEqual(
  1544. OnNext(210, 2),
  1545. OnCompleted<int>(250)
  1546. );
  1547. xs.Subscriptions.AssertEqual(
  1548. Subscribe(200, 250)
  1549. );
  1550. }
  1551. [Fact]
  1552. public void Finally_Throw()
  1553. {
  1554. var scheduler = new TestScheduler();
  1555. var ex = new Exception();
  1556. var xs = scheduler.CreateHotObservable(
  1557. OnNext(150, 1),
  1558. OnError<int>(250, ex)
  1559. );
  1560. var invoked = false;
  1561. var res = scheduler.Start(() =>
  1562. xs.Finally(() => { invoked = true; })
  1563. );
  1564. Assert.True(invoked);
  1565. res.Messages.AssertEqual(
  1566. OnError<int>(250, ex)
  1567. );
  1568. xs.Subscriptions.AssertEqual(
  1569. Subscribe(200, 250)
  1570. );
  1571. }
  1572. #endregion
  1573. #region + IgnoreElements +
  1574. [Fact]
  1575. public void IgnoreElements_ArgumentChecking()
  1576. {
  1577. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.IgnoreElements<int>(null));
  1578. }
  1579. [Fact]
  1580. public void IgnoreElements_IgnoreElements()
  1581. {
  1582. var scheduler = new TestScheduler();
  1583. var xs = scheduler.CreateHotObservable(
  1584. OnNext(180, 1),
  1585. OnNext(210, 2),
  1586. OnNext(250, 3),
  1587. OnNext(270, 4),
  1588. OnCompleted<int>(300)
  1589. );
  1590. var res = scheduler.Start(() =>
  1591. xs.IgnoreElements().IgnoreElements()
  1592. );
  1593. res.Messages.AssertEqual(
  1594. OnCompleted<int>(300)
  1595. );
  1596. xs.Subscriptions.AssertEqual(
  1597. Subscribe(200, 300)
  1598. );
  1599. }
  1600. [Fact]
  1601. public void IgnoreElements_Basic()
  1602. {
  1603. var scheduler = new TestScheduler();
  1604. var xs = scheduler.CreateHotObservable(
  1605. OnNext(180, 1),
  1606. OnNext(210, 2),
  1607. OnNext(250, 3),
  1608. OnNext(270, 4),
  1609. OnNext(310, 5),
  1610. OnNext(360, 6),
  1611. OnNext(380, 7),
  1612. OnNext(410, 8),
  1613. OnNext(590, 9)
  1614. );
  1615. var res = scheduler.Start(() =>
  1616. xs.IgnoreElements()
  1617. );
  1618. res.Messages.AssertEqual(
  1619. );
  1620. xs.Subscriptions.AssertEqual(
  1621. Subscribe(200, 1000)
  1622. );
  1623. }
  1624. [Fact]
  1625. public void IgnoreElements_Completed()
  1626. {
  1627. var scheduler = new TestScheduler();
  1628. var xs = scheduler.CreateHotObservable(
  1629. OnNext(180, 1),
  1630. OnNext(210, 2),
  1631. OnNext(250, 3),
  1632. OnNext(270, 4),
  1633. OnNext(310, 5),
  1634. OnNext(360, 6),
  1635. OnNext(380, 7),
  1636. OnNext(410, 8),
  1637. OnNext(590, 9),
  1638. OnCompleted<int>(610)
  1639. );
  1640. var res = scheduler.Start(() =>
  1641. xs.IgnoreElements()
  1642. );
  1643. res.Messages.AssertEqual(
  1644. OnCompleted<int>(610)
  1645. );
  1646. xs.Subscriptions.AssertEqual(
  1647. Subscribe(200, 610)
  1648. );
  1649. }
  1650. [Fact]
  1651. public void IgnoreElements_Error()
  1652. {
  1653. var scheduler = new TestScheduler();
  1654. var ex = new Exception();
  1655. var xs = scheduler.CreateHotObservable(
  1656. OnNext(180, 1),
  1657. OnNext(210, 2),
  1658. OnNext(250, 3),
  1659. OnNext(270, 4),
  1660. OnNext(310, 5),
  1661. OnNext(360, 6),
  1662. OnNext(380, 7),
  1663. OnNext(410, 8),
  1664. OnNext(590, 9),
  1665. OnError<int>(610, ex)
  1666. );
  1667. var res = scheduler.Start(() =>
  1668. xs.IgnoreElements()
  1669. );
  1670. res.Messages.AssertEqual(
  1671. OnError<int>(610, ex)
  1672. );
  1673. xs.Subscriptions.AssertEqual(
  1674. Subscribe(200, 610)
  1675. );
  1676. }
  1677. #endregion
  1678. #region + Materialize +
  1679. [Fact]
  1680. public void Materialize_ArgumentChecking()
  1681. {
  1682. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Materialize<int>(null));
  1683. }
  1684. [Fact]
  1685. public void Materialize_Never()
  1686. {
  1687. var scheduler = new TestScheduler();
  1688. var res = scheduler.Start(() =>
  1689. Observable.Never<int>().Materialize()
  1690. );
  1691. res.Messages.AssertEqual(
  1692. );
  1693. }
  1694. [Fact]
  1695. public void Materialize_Empty()
  1696. {
  1697. var scheduler = new TestScheduler();
  1698. var xs = scheduler.CreateHotObservable(
  1699. OnNext(150, 1),
  1700. OnCompleted<int>(250)
  1701. );
  1702. var res = scheduler.Start(() =>
  1703. xs.Materialize()
  1704. );
  1705. res.Messages.AssertEqual(
  1706. OnNext(250, Notification.CreateOnCompleted<int>()),
  1707. OnCompleted<Notification<int>>(250)
  1708. );
  1709. xs.Subscriptions.AssertEqual(
  1710. Subscribe(200, 250)
  1711. );
  1712. }
  1713. [Fact]
  1714. public void Materialize_Return()
  1715. {
  1716. var scheduler = new TestScheduler();
  1717. var xs = scheduler.CreateHotObservable(
  1718. OnNext(150, 1),
  1719. OnNext(210, 2),
  1720. OnCompleted<int>(250)
  1721. );
  1722. var res = scheduler.Start(() =>
  1723. xs.Materialize()
  1724. );
  1725. res.Messages.AssertEqual(
  1726. OnNext(210, Notification.CreateOnNext(2)),
  1727. OnNext(250, Notification.CreateOnCompleted<int>()),
  1728. OnCompleted<Notification<int>>(250)
  1729. );
  1730. xs.Subscriptions.AssertEqual(
  1731. Subscribe(200, 250)
  1732. );
  1733. }
  1734. [Fact]
  1735. public void Materialize_Throw()
  1736. {
  1737. var scheduler = new TestScheduler();
  1738. var ex = new Exception();
  1739. var xs = scheduler.CreateHotObservable(
  1740. OnNext(150, 1),
  1741. OnError<int>(250, ex)
  1742. );
  1743. var res = scheduler.Start(() =>
  1744. xs.Materialize()
  1745. );
  1746. res.Messages.AssertEqual(
  1747. OnNext(250, Notification.CreateOnError<int>(ex)),
  1748. OnCompleted<Notification<int>>(250)
  1749. );
  1750. xs.Subscriptions.AssertEqual(
  1751. Subscribe(200, 250)
  1752. );
  1753. }
  1754. #endregion
  1755. #region - Repeat -
  1756. [Fact]
  1757. public void Repeat_Observable_ArgumentChecking()
  1758. {
  1759. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Repeat<int>(null));
  1760. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.Repeat().Subscribe(null));
  1761. }
  1762. [Fact]
  1763. public void Repeat_Observable_Basic()
  1764. {
  1765. var scheduler = new TestScheduler();
  1766. var xs = scheduler.CreateColdObservable(
  1767. OnNext(100, 1),
  1768. OnNext(150, 2),
  1769. OnNext(200, 3),
  1770. OnCompleted<int>(250)
  1771. );
  1772. var res = scheduler.Start(() =>
  1773. xs.Repeat()
  1774. );
  1775. res.Messages.AssertEqual(
  1776. OnNext(300, 1),
  1777. OnNext(350, 2),
  1778. OnNext(400, 3),
  1779. OnNext(550, 1),
  1780. OnNext(600, 2),
  1781. OnNext(650, 3),
  1782. OnNext(800, 1),
  1783. OnNext(850, 2),
  1784. OnNext(900, 3)
  1785. );
  1786. xs.Subscriptions.AssertEqual(
  1787. Subscribe(200, 450),
  1788. Subscribe(450, 700),
  1789. Subscribe(700, 950),
  1790. Subscribe(950, 1000)
  1791. );
  1792. }
  1793. [Fact]
  1794. public void Repeat_Observable_Infinite()
  1795. {
  1796. var scheduler = new TestScheduler();
  1797. var xs = scheduler.CreateColdObservable(
  1798. OnNext(100, 1),
  1799. OnNext(150, 2),
  1800. OnNext(200, 3)
  1801. );
  1802. var res = scheduler.Start(() =>
  1803. xs.Repeat()
  1804. );
  1805. res.Messages.AssertEqual(
  1806. OnNext(300, 1),
  1807. OnNext(350, 2),
  1808. OnNext(400, 3)
  1809. );
  1810. xs.Subscriptions.AssertEqual(
  1811. Subscribe(200, 1000)
  1812. );
  1813. }
  1814. [Fact]
  1815. public void Repeat_Observable_Error()
  1816. {
  1817. var scheduler = new TestScheduler();
  1818. var ex = new Exception();
  1819. var xs = scheduler.CreateColdObservable(
  1820. OnNext(100, 1),
  1821. OnNext(150, 2),
  1822. OnNext(200, 3),
  1823. OnError<int>(250, ex)
  1824. );
  1825. var res = scheduler.Start(() =>
  1826. xs.Repeat()
  1827. );
  1828. res.Messages.AssertEqual(
  1829. OnNext(300, 1),
  1830. OnNext(350, 2),
  1831. OnNext(400, 3),
  1832. OnError<int>(450, ex)
  1833. );
  1834. xs.Subscriptions.AssertEqual(
  1835. Subscribe(200, 450)
  1836. );
  1837. }
  1838. [Fact]
  1839. public void Repeat_Observable_Throws()
  1840. {
  1841. var scheduler1 = new TestScheduler();
  1842. var xs = Observable.Return(1, scheduler1).Repeat();
  1843. xs.Subscribe(x => { throw new InvalidOperationException(); });
  1844. ReactiveAssert.Throws<InvalidOperationException>(() => scheduler1.Start());
  1845. var scheduler2 = new TestScheduler();
  1846. var ys = Observable.Throw<int>(new Exception(), scheduler2).Repeat();
  1847. ys.Subscribe(x => { }, ex => { throw new InvalidOperationException(); });
  1848. ReactiveAssert.Throws<InvalidOperationException>(() => scheduler2.Start());
  1849. var scheduler3 = new TestScheduler();
  1850. var zs = Observable.Return(1, scheduler3).Repeat();
  1851. var d = zs.Subscribe(x => { }, ex => { }, () => { throw new InvalidOperationException(); });
  1852. scheduler3.ScheduleAbsolute(210, () => d.Dispose());
  1853. scheduler3.Start();
  1854. var xss = Observable.Create<int>(new Func<IObserver<int>, Action>(o => { throw new InvalidOperationException(); })).Repeat();
  1855. ReactiveAssert.Throws<InvalidOperationException>(() => xss.Subscribe());
  1856. }
  1857. [Fact]
  1858. public void Repeat_Observable_Default_ArgumentChecking()
  1859. {
  1860. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Repeat<int>((IObservable<int>)null));
  1861. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.Repeat().Subscribe(null));
  1862. }
  1863. [Fact]
  1864. public void Repeat_Observable_RepeatCount_ArgumentChecking()
  1865. {
  1866. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Repeat<int>(null, 0));
  1867. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => DummyObservable<int>.Instance.Repeat(-1));
  1868. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.Repeat(0).Subscribe(null));
  1869. }
  1870. [Fact]
  1871. public void Repeat_Observable_RepeatCount_Basic()
  1872. {
  1873. var scheduler = new TestScheduler();
  1874. var xs = scheduler.CreateColdObservable(
  1875. OnNext(5, 1),
  1876. OnNext(10, 2),
  1877. OnNext(15, 3),
  1878. OnCompleted<int>(20)
  1879. );
  1880. var res = scheduler.Start(() =>
  1881. xs.Repeat(3)
  1882. );
  1883. res.Messages.AssertEqual(
  1884. OnNext(205, 1),
  1885. OnNext(210, 2),
  1886. OnNext(215, 3),
  1887. OnNext(225, 1),
  1888. OnNext(230, 2),
  1889. OnNext(235, 3),
  1890. OnNext(245, 1),
  1891. OnNext(250, 2),
  1892. OnNext(255, 3),
  1893. OnCompleted<int>(260)
  1894. );
  1895. xs.Subscriptions.AssertEqual(
  1896. Subscribe(200, 220),
  1897. Subscribe(220, 240),
  1898. Subscribe(240, 260)
  1899. );
  1900. }
  1901. [Fact]
  1902. public void Repeat_Observable_RepeatCount_Dispose()
  1903. {
  1904. var scheduler = new TestScheduler();
  1905. var xs = scheduler.CreateColdObservable(
  1906. OnNext(5, 1),
  1907. OnNext(10, 2),
  1908. OnNext(15, 3),
  1909. OnCompleted<int>(20)
  1910. );
  1911. var res = scheduler.Start(() =>
  1912. xs.Repeat(3), 231
  1913. );
  1914. res.Messages.AssertEqual(
  1915. OnNext(205, 1),
  1916. OnNext(210, 2),
  1917. OnNext(215, 3),
  1918. OnNext(225, 1),
  1919. OnNext(230, 2)
  1920. );
  1921. xs.Subscriptions.AssertEqual(
  1922. Subscribe(200, 220),
  1923. Subscribe(220, 231)
  1924. );
  1925. }
  1926. [Fact]
  1927. public void Repeat_Observable_RepeatCount_Infinite()
  1928. {
  1929. var scheduler = new TestScheduler();
  1930. var xs = scheduler.CreateColdObservable(
  1931. OnNext(100, 1),
  1932. OnNext(150, 2),
  1933. OnNext(200, 3)
  1934. );
  1935. var res = scheduler.Start(() =>
  1936. xs.Repeat(3)
  1937. );
  1938. res.Messages.AssertEqual(
  1939. OnNext(300, 1),
  1940. OnNext(350, 2),
  1941. OnNext(400, 3)
  1942. );
  1943. xs.Subscriptions.AssertEqual(
  1944. Subscribe(200, 1000)
  1945. );
  1946. }
  1947. [Fact]
  1948. public void Repeat_Observable_RepeatCount_Error()
  1949. {
  1950. var scheduler = new TestScheduler();
  1951. var ex = new Exception();
  1952. var xs = scheduler.CreateColdObservable(
  1953. OnNext(100, 1),
  1954. OnNext(150, 2),
  1955. OnNext(200, 3),
  1956. OnError<int>(250, ex)
  1957. );
  1958. var res = scheduler.Start(() =>
  1959. xs.Repeat(3)
  1960. );
  1961. res.Messages.AssertEqual(
  1962. OnNext(300, 1),
  1963. OnNext(350, 2),
  1964. OnNext(400, 3),
  1965. OnError<int>(450, ex)
  1966. );
  1967. xs.Subscriptions.AssertEqual(
  1968. Subscribe(200, 450)
  1969. );
  1970. }
  1971. [Fact]
  1972. public void Repeat_Observable_RepeatCount_Throws()
  1973. {
  1974. var scheduler1 = new TestScheduler();
  1975. var xs = Observable.Return(1, scheduler1).Repeat(3);
  1976. xs.Subscribe(x => { throw new InvalidOperationException(); });
  1977. ReactiveAssert.Throws<InvalidOperationException>(() => scheduler1.Start());
  1978. var scheduler2 = new TestScheduler();
  1979. var ys = Observable.Throw<int>(new Exception(), scheduler2).Repeat(3);
  1980. ys.Subscribe(x => { }, ex => { throw new InvalidOperationException(); });
  1981. ReactiveAssert.Throws<InvalidOperationException>(() => scheduler2.Start());
  1982. var scheduler3 = new TestScheduler();
  1983. var zs = Observable.Return(1, scheduler3).Repeat(100);
  1984. var d = zs.Subscribe(x => { }, ex => { }, () => { throw new InvalidOperationException(); });
  1985. scheduler3.ScheduleAbsolute(10, () => d.Dispose());
  1986. scheduler3.Start();
  1987. var xss = Observable.Create<int>(new Func<IObserver<int>, Action>(o => { throw new InvalidOperationException(); })).Repeat(3);
  1988. ReactiveAssert.Throws<InvalidOperationException>(() => xss.Subscribe());
  1989. }
  1990. [Fact]
  1991. public void Repeat_Observable_RepeatCount_Default_ArgumentChecking()
  1992. {
  1993. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Repeat<int>(default(IObservable<int>), 0));
  1994. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => DummyObservable<int>.Instance.Repeat(-1));
  1995. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.Repeat(0).Subscribe(null));
  1996. }
  1997. #endregion
  1998. #region - Retry -
  1999. [Fact]
  2000. public void Retry_Observable_ArgumentChecking()
  2001. {
  2002. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Retry<int>(null));
  2003. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.Retry().Subscribe(null));
  2004. }
  2005. [Fact]
  2006. public void Retry_Observable_Basic()
  2007. {
  2008. var scheduler = new TestScheduler();
  2009. var xs = scheduler.CreateColdObservable(
  2010. OnNext(100, 1),
  2011. OnNext(150, 2),
  2012. OnNext(200, 3),
  2013. OnCompleted<int>(250)
  2014. );
  2015. var res = scheduler.Start(() =>
  2016. xs.Retry()
  2017. );
  2018. res.Messages.AssertEqual(
  2019. OnNext(300, 1),
  2020. OnNext(350, 2),
  2021. OnNext(400, 3),
  2022. OnCompleted<int>(450)
  2023. );
  2024. xs.Subscriptions.AssertEqual(
  2025. Subscribe(200, 450)
  2026. );
  2027. }
  2028. [Fact]
  2029. public void Retry_Observable_Infinite()
  2030. {
  2031. var scheduler = new TestScheduler();
  2032. var xs = scheduler.CreateColdObservable(
  2033. OnNext(100, 1),
  2034. OnNext(150, 2),
  2035. OnNext(200, 3)
  2036. );
  2037. var res = scheduler.Start(() =>
  2038. xs.Retry()
  2039. );
  2040. res.Messages.AssertEqual(
  2041. OnNext(300, 1),
  2042. OnNext(350, 2),
  2043. OnNext(400, 3)
  2044. );
  2045. xs.Subscriptions.AssertEqual(
  2046. Subscribe(200, 1000)
  2047. );
  2048. }
  2049. [Fact]
  2050. public void Retry_Observable_Error()
  2051. {
  2052. var scheduler = new TestScheduler();
  2053. var ex = new Exception();
  2054. var xs = scheduler.CreateColdObservable(
  2055. OnNext(100, 1),
  2056. OnNext(150, 2),
  2057. OnNext(200, 3),
  2058. OnError<int>(250, ex)
  2059. );
  2060. var res = scheduler.Start(() =>
  2061. xs.Retry(), 1100
  2062. );
  2063. res.Messages.AssertEqual(
  2064. OnNext(300, 1),
  2065. OnNext(350, 2),
  2066. OnNext(400, 3),
  2067. OnNext(550, 1),
  2068. OnNext(600, 2),
  2069. OnNext(650, 3),
  2070. OnNext(800, 1),
  2071. OnNext(850, 2),
  2072. OnNext(900, 3),
  2073. OnNext(1050, 1)
  2074. );
  2075. xs.Subscriptions.AssertEqual(
  2076. Subscribe(200, 450),
  2077. Subscribe(450, 700),
  2078. Subscribe(700, 950),
  2079. Subscribe(950, 1100)
  2080. );
  2081. }
  2082. [Fact]
  2083. public void Retry_Observable_Throws1()
  2084. {
  2085. var scheduler1 = new TestScheduler();
  2086. var xs = Observable.Return(1, scheduler1).Retry();
  2087. xs.Subscribe(x => { throw new InvalidOperationException(); });
  2088. ReactiveAssert.Throws<InvalidOperationException>(() => scheduler1.Start());
  2089. }
  2090. [Fact]
  2091. public void Retry_Observable_Throws2()
  2092. {
  2093. var scheduler2 = new TestScheduler();
  2094. var ys = Observable.Throw<int>(new Exception(), scheduler2).Retry();
  2095. var d = ys.Subscribe(x => { }, ex => { throw new InvalidOperationException(); });
  2096. scheduler2.ScheduleAbsolute(210, () => d.Dispose());
  2097. scheduler2.Start();
  2098. }
  2099. [Fact]
  2100. public void Retry_Observable_Throws3()
  2101. {
  2102. var scheduler3 = new TestScheduler();
  2103. var zs = Observable.Return(1, scheduler3).Retry();
  2104. zs.Subscribe(x => { }, ex => { }, () => { throw new InvalidOperationException(); });
  2105. ReactiveAssert.Throws<InvalidOperationException>(() => scheduler3.Start());
  2106. }
  2107. /*
  2108. * BREAKING CHANGE v2.0 > v1.x - The code below will loop endlessly, trying to repeat the failing subscription,
  2109. * whose exception is propagated through OnError starting from v2.0.
  2110. *
  2111. [Fact]
  2112. public void Retry_Observable_Throws4()
  2113. {
  2114. var xss = Observable.Create<int>(new Func<IObserver<int>, Action>(o => { throw new InvalidOperationException(); })).Retry();
  2115. ReactiveAssert.Throws<InvalidOperationException>(() => xss.Subscribe());
  2116. }
  2117. */
  2118. [Fact]
  2119. public void Retry_Observable_Default_ArgumentChecking()
  2120. {
  2121. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Retry<int>((IObservable<int>)null));
  2122. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.Retry().Subscribe(null));
  2123. }
  2124. [Fact]
  2125. public void Retry_Observable_RetryCount_ArgumentChecking()
  2126. {
  2127. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Retry<int>(null, 0));
  2128. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => DummyObservable<int>.Instance.Retry(-1));
  2129. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.Retry(0).Subscribe(null));
  2130. }
  2131. [Fact]
  2132. public void Retry_Observable_RetryCount_Basic()
  2133. {
  2134. var scheduler = new TestScheduler();
  2135. var ex = new Exception();
  2136. var xs = scheduler.CreateColdObservable(
  2137. OnNext(5, 1),
  2138. OnNext(10, 2),
  2139. OnNext(15, 3),
  2140. OnError<int>(20, ex)
  2141. );
  2142. var res = scheduler.Start(() =>
  2143. xs.Retry(3)
  2144. );
  2145. res.Messages.AssertEqual(
  2146. OnNext(205, 1),
  2147. OnNext(210, 2),
  2148. OnNext(215, 3),
  2149. OnNext(225, 1),
  2150. OnNext(230, 2),
  2151. OnNext(235, 3),
  2152. OnNext(245, 1),
  2153. OnNext(250, 2),
  2154. OnNext(255, 3),
  2155. OnError<int>(260, ex)
  2156. );
  2157. xs.Subscriptions.AssertEqual(
  2158. Subscribe(200, 220),
  2159. Subscribe(220, 240),
  2160. Subscribe(240, 260)
  2161. );
  2162. }
  2163. [Fact]
  2164. public void Retry_Observable_RetryCount_Dispose()
  2165. {
  2166. var scheduler = new TestScheduler();
  2167. var xs = scheduler.CreateColdObservable(
  2168. OnNext(5, 1),
  2169. OnNext(10, 2),
  2170. OnNext(15, 3),
  2171. OnError<int>(20, new Exception())
  2172. );
  2173. var res = scheduler.Start(() =>
  2174. xs.Retry(3), 231
  2175. );
  2176. res.Messages.AssertEqual(
  2177. OnNext(205, 1),
  2178. OnNext(210, 2),
  2179. OnNext(215, 3),
  2180. OnNext(225, 1),
  2181. OnNext(230, 2)
  2182. );
  2183. xs.Subscriptions.AssertEqual(
  2184. Subscribe(200, 220),
  2185. Subscribe(220, 231)
  2186. );
  2187. }
  2188. [Fact]
  2189. public void Retry_Observable_RetryCount_Infinite()
  2190. {
  2191. var scheduler = new TestScheduler();
  2192. var xs = scheduler.CreateColdObservable(
  2193. OnNext(100, 1),
  2194. OnNext(150, 2),
  2195. OnNext(200, 3)
  2196. );
  2197. var res = scheduler.Start(() =>
  2198. xs.Retry(3)
  2199. );
  2200. res.Messages.AssertEqual(
  2201. OnNext(300, 1),
  2202. OnNext(350, 2),
  2203. OnNext(400, 3)
  2204. );
  2205. xs.Subscriptions.AssertEqual(
  2206. Subscribe(200, 1000)
  2207. );
  2208. }
  2209. [Fact]
  2210. public void Retry_Observable_RetryCount_Completed()
  2211. {
  2212. var scheduler = new TestScheduler();
  2213. var xs = scheduler.CreateColdObservable(
  2214. OnNext(100, 1),
  2215. OnNext(150, 2),
  2216. OnNext(200, 3),
  2217. OnCompleted<int>(250)
  2218. );
  2219. var res = scheduler.Start(() =>
  2220. xs.Retry(3)
  2221. );
  2222. res.Messages.AssertEqual(
  2223. OnNext(300, 1),
  2224. OnNext(350, 2),
  2225. OnNext(400, 3),
  2226. OnCompleted<int>(450)
  2227. );
  2228. xs.Subscriptions.AssertEqual(
  2229. Subscribe(200, 450)
  2230. );
  2231. }
  2232. [Fact]
  2233. public void Retry_Observable_RetryCount_Throws()
  2234. {
  2235. var scheduler1 = new TestScheduler();
  2236. var xs = Observable.Return(1, scheduler1).Retry(3);
  2237. xs.Subscribe(x => { throw new InvalidOperationException(); });
  2238. ReactiveAssert.Throws<InvalidOperationException>(() => scheduler1.Start());
  2239. var scheduler2 = new TestScheduler();
  2240. var ys = Observable.Throw<int>(new Exception(), scheduler2).Retry(100);
  2241. var d = ys.Subscribe(x => { }, ex => { throw new InvalidOperationException(); });
  2242. scheduler2.ScheduleAbsolute(10, () => d.Dispose());
  2243. scheduler2.Start();
  2244. var scheduler3 = new TestScheduler();
  2245. var zs = Observable.Return(1, scheduler3).Retry(100);
  2246. zs.Subscribe(x => { }, ex => { }, () => { throw new InvalidOperationException(); });
  2247. ReactiveAssert.Throws<InvalidOperationException>(() => scheduler3.Start());
  2248. var xss = Observable.Create<int>(new Func<IObserver<int>, Action>(o => { throw new InvalidOperationException(); })).Retry(3);
  2249. ReactiveAssert.Throws<InvalidOperationException>(() => xss.Subscribe());
  2250. }
  2251. [Fact]
  2252. public void Retry_Observable_RetryCount_Default_ArgumentChecking()
  2253. {
  2254. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Retry<int>(default(IObservable<int>), 0));
  2255. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => DummyObservable<int>.Instance.Retry(-1));
  2256. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.Retry(0).Subscribe(null));
  2257. }
  2258. [Fact]
  2259. public void Retry_Observable_RetryCount_Default()
  2260. {
  2261. Observable.Range(1, 3).Retry(3).AssertEqual(Observable.Range(1, 3).Retry(3));
  2262. }
  2263. #endregion
  2264. #region + Scan +
  2265. [Fact]
  2266. public void Scan_ArgumentChecking()
  2267. {
  2268. var someObservable = Observable.Empty<int>();
  2269. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Scan<int>(null, (_, __) => 0));
  2270. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Scan<int>(someObservable, null));
  2271. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Scan<int, int>(null, 0, (_, __) => 0));
  2272. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Scan<int, int>(someObservable, 0, null));
  2273. }
  2274. [Fact]
  2275. public void Scan_Seed_Never()
  2276. {
  2277. var scheduler = new TestScheduler();
  2278. var xs = scheduler.CreateHotObservable<int>();
  2279. var seed = 42;
  2280. var res = scheduler.Start(() =>
  2281. xs.Scan(seed, (acc, x) => acc + x)
  2282. );
  2283. res.Messages.AssertEqual(
  2284. );
  2285. xs.Subscriptions.AssertEqual(
  2286. Subscribe(200, 1000)
  2287. );
  2288. }
  2289. [Fact]
  2290. public void Scan_Seed_Empty()
  2291. {
  2292. var scheduler = new TestScheduler();
  2293. var xs = scheduler.CreateHotObservable(
  2294. OnNext(150, 1),
  2295. OnCompleted<int>(250)
  2296. );
  2297. var seed = 42;
  2298. var res = scheduler.Start(() =>
  2299. xs.Scan(seed, (acc, x) => acc + x)
  2300. );
  2301. res.Messages.AssertEqual(
  2302. OnCompleted<int>(250)
  2303. );
  2304. xs.Subscriptions.AssertEqual(
  2305. Subscribe(200, 250)
  2306. );
  2307. }
  2308. [Fact]
  2309. public void Scan_Seed_Return()
  2310. {
  2311. var scheduler = new TestScheduler();
  2312. var xs = scheduler.CreateHotObservable(
  2313. OnNext(150, 1),
  2314. OnNext(220, 2),
  2315. OnCompleted<int>(250)
  2316. );
  2317. var seed = 42;
  2318. var res = scheduler.Start(() =>
  2319. xs.Scan(seed, (acc, x) => acc + x)
  2320. );
  2321. res.Messages.AssertEqual(
  2322. OnNext(220, seed + 2),
  2323. OnCompleted<int>(250)
  2324. );
  2325. xs.Subscriptions.AssertEqual(
  2326. Subscribe(200, 250)
  2327. );
  2328. }
  2329. [Fact]
  2330. public void Scan_Seed_Throw()
  2331. {
  2332. var scheduler = new TestScheduler();
  2333. var ex = new Exception();
  2334. var xs = scheduler.CreateHotObservable(
  2335. OnNext(150, 1),
  2336. OnError<int>(250, ex)
  2337. );
  2338. var seed = 42;
  2339. var res = scheduler.Start(() =>
  2340. xs.Scan(seed, (acc, x) => acc + x)
  2341. );
  2342. res.Messages.AssertEqual(
  2343. OnError<int>(250, ex)
  2344. );
  2345. xs.Subscriptions.AssertEqual(
  2346. Subscribe(200, 250)
  2347. );
  2348. }
  2349. [Fact]
  2350. public void Scan_Seed_SomeData()
  2351. {
  2352. var scheduler = new TestScheduler();
  2353. var xs = scheduler.CreateHotObservable(
  2354. OnNext(150, 1),
  2355. OnNext(210, 2),
  2356. OnNext(220, 3),
  2357. OnNext(230, 4),
  2358. OnNext(240, 5),
  2359. OnCompleted<int>(250)
  2360. );
  2361. var seed = 1;
  2362. var res = scheduler.Start(() =>
  2363. xs.Scan(seed, (acc, x) => acc + x)
  2364. );
  2365. res.Messages.AssertEqual(
  2366. OnNext(210, seed + 2),
  2367. OnNext(220, seed + 2 + 3),
  2368. OnNext(230, seed + 2 + 3 + 4),
  2369. OnNext(240, seed + 2 + 3 + 4 + 5),
  2370. OnCompleted<int>(250)
  2371. );
  2372. xs.Subscriptions.AssertEqual(
  2373. Subscribe(200, 250)
  2374. );
  2375. }
  2376. [Fact]
  2377. public void Scan_Seed_AccumulatorThrows()
  2378. {
  2379. var scheduler = new TestScheduler();
  2380. var xs = scheduler.CreateHotObservable(
  2381. OnNext(150, 1),
  2382. OnNext(210, 2),
  2383. OnNext(220, 3),
  2384. OnNext(230, 4),
  2385. OnNext(240, 5),
  2386. OnCompleted<int>(250)
  2387. );
  2388. var ex = new Exception();
  2389. var seed = 1;
  2390. var res = scheduler.Start(() =>
  2391. xs.Scan(seed, (acc, x) => { if (x == 4) throw ex; return acc + x; })
  2392. );
  2393. res.Messages.AssertEqual(
  2394. OnNext(210, seed + 2),
  2395. OnNext(220, seed + 2 + 3),
  2396. OnError<int>(230, ex)
  2397. );
  2398. xs.Subscriptions.AssertEqual(
  2399. Subscribe(200, 230)
  2400. );
  2401. }
  2402. [Fact]
  2403. public void Scan_NoSeed_Never()
  2404. {
  2405. var scheduler = new TestScheduler();
  2406. var xs = scheduler.CreateHotObservable<int>();
  2407. var res = scheduler.Start(() =>
  2408. xs.Scan((acc, x) => acc + x)
  2409. );
  2410. res.Messages.AssertEqual(
  2411. );
  2412. xs.Subscriptions.AssertEqual(
  2413. Subscribe(200, 1000)
  2414. );
  2415. }
  2416. [Fact]
  2417. public void Scan_NoSeed_Empty()
  2418. {
  2419. var scheduler = new TestScheduler();
  2420. var xs = scheduler.CreateHotObservable(
  2421. OnNext(150, 1),
  2422. OnCompleted<int>(250)
  2423. );
  2424. var res = scheduler.Start(() =>
  2425. xs.Scan((acc, x) => acc + x)
  2426. );
  2427. res.Messages.AssertEqual(
  2428. OnCompleted<int>(250)
  2429. );
  2430. xs.Subscriptions.AssertEqual(
  2431. Subscribe(200, 250)
  2432. );
  2433. }
  2434. [Fact]
  2435. public void Scan_NoSeed_Return()
  2436. {
  2437. var scheduler = new TestScheduler();
  2438. var xs = scheduler.CreateHotObservable(
  2439. OnNext(150, 1),
  2440. OnNext(220, 2),
  2441. OnCompleted<int>(250)
  2442. );
  2443. var res = scheduler.Start(() =>
  2444. xs.Scan((acc, x) => acc + x)
  2445. );
  2446. res.Messages.AssertEqual(
  2447. OnNext(220, 2),
  2448. OnCompleted<int>(250)
  2449. );
  2450. xs.Subscriptions.AssertEqual(
  2451. Subscribe(200, 250)
  2452. );
  2453. }
  2454. [Fact]
  2455. public void Scan_NoSeed_Throw()
  2456. {
  2457. var scheduler = new TestScheduler();
  2458. var ex = new Exception();
  2459. var xs = scheduler.CreateHotObservable(
  2460. OnNext(150, 1),
  2461. OnError<int>(250, ex)
  2462. );
  2463. var res = scheduler.Start(() =>
  2464. xs.Scan((acc, x) => acc + x)
  2465. );
  2466. res.Messages.AssertEqual(
  2467. OnError<int>(250, ex)
  2468. );
  2469. xs.Subscriptions.AssertEqual(
  2470. Subscribe(200, 250)
  2471. );
  2472. }
  2473. [Fact]
  2474. public void Scan_NoSeed_SomeData()
  2475. {
  2476. var scheduler = new TestScheduler();
  2477. var xs = scheduler.CreateHotObservable(
  2478. OnNext(150, 1),
  2479. OnNext(210, 2),
  2480. OnNext(220, 3),
  2481. OnNext(230, 4),
  2482. OnNext(240, 5),
  2483. OnCompleted<int>(250)
  2484. );
  2485. var res = scheduler.Start(() =>
  2486. xs.Scan((acc, x) => acc + x)
  2487. );
  2488. res.Messages.AssertEqual(
  2489. OnNext(210, 2),
  2490. OnNext(220, 2 + 3),
  2491. OnNext(230, 2 + 3 + 4),
  2492. OnNext(240, 2 + 3 + 4 + 5),
  2493. OnCompleted<int>(250)
  2494. );
  2495. xs.Subscriptions.AssertEqual(
  2496. Subscribe(200, 250)
  2497. );
  2498. }
  2499. [Fact]
  2500. public void Scan_NoSeed_AccumulatorThrows()
  2501. {
  2502. var scheduler = new TestScheduler();
  2503. var xs = scheduler.CreateHotObservable(
  2504. OnNext(150, 1),
  2505. OnNext(210, 2),
  2506. OnNext(220, 3),
  2507. OnNext(230, 4),
  2508. OnNext(240, 5),
  2509. OnCompleted<int>(250)
  2510. );
  2511. var ex = new Exception();
  2512. var res = scheduler.Start(() =>
  2513. xs.Scan((acc, x) => { if (x == 4) throw ex; return acc + x; })
  2514. );
  2515. res.Messages.AssertEqual(
  2516. OnNext(210, 2),
  2517. OnNext(220, 2 + 3),
  2518. OnError<int>(230, ex)
  2519. );
  2520. xs.Subscriptions.AssertEqual(
  2521. Subscribe(200, 230)
  2522. );
  2523. }
  2524. #endregion
  2525. #region + SkipLast +
  2526. [Fact]
  2527. public void SkipLast_ArgumentChecking()
  2528. {
  2529. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SkipLast<int>(null, 0));
  2530. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.SkipLast(DummyObservable<int>.Instance, -1));
  2531. }
  2532. [Fact]
  2533. public void SkipLast_Zero_Completed()
  2534. {
  2535. var scheduler = new TestScheduler();
  2536. var xs = scheduler.CreateHotObservable(
  2537. OnNext(180, 1),
  2538. OnNext(210, 2),
  2539. OnNext(250, 3),
  2540. OnNext(270, 4),
  2541. OnNext(310, 5),
  2542. OnNext(360, 6),
  2543. OnNext(380, 7),
  2544. OnNext(410, 8),
  2545. OnNext(590, 9),
  2546. OnCompleted<int>(650)
  2547. );
  2548. var res = scheduler.Start(() =>
  2549. xs.SkipLast(0)
  2550. );
  2551. res.Messages.AssertEqual(
  2552. OnNext(210, 2),
  2553. OnNext(250, 3),
  2554. OnNext(270, 4),
  2555. OnNext(310, 5),
  2556. OnNext(360, 6),
  2557. OnNext(380, 7),
  2558. OnNext(410, 8),
  2559. OnNext(590, 9),
  2560. OnCompleted<int>(650)
  2561. );
  2562. xs.Subscriptions.AssertEqual(
  2563. Subscribe(200, 650)
  2564. );
  2565. }
  2566. [Fact]
  2567. public void SkipLast_Zero_Error()
  2568. {
  2569. var scheduler = new TestScheduler();
  2570. var ex = new Exception();
  2571. var xs = scheduler.CreateHotObservable(
  2572. OnNext(180, 1),
  2573. OnNext(210, 2),
  2574. OnNext(250, 3),
  2575. OnNext(270, 4),
  2576. OnNext(310, 5),
  2577. OnNext(360, 6),
  2578. OnNext(380, 7),
  2579. OnNext(410, 8),
  2580. OnNext(590, 9),
  2581. OnError<int>(650, ex)
  2582. );
  2583. var res = scheduler.Start(() =>
  2584. xs.SkipLast(0)
  2585. );
  2586. res.Messages.AssertEqual(
  2587. OnNext(210, 2),
  2588. OnNext(250, 3),
  2589. OnNext(270, 4),
  2590. OnNext(310, 5),
  2591. OnNext(360, 6),
  2592. OnNext(380, 7),
  2593. OnNext(410, 8),
  2594. OnNext(590, 9),
  2595. OnError<int>(650, ex)
  2596. );
  2597. xs.Subscriptions.AssertEqual(
  2598. Subscribe(200, 650)
  2599. );
  2600. }
  2601. [Fact]
  2602. public void SkipLast_Zero_Disposed()
  2603. {
  2604. var scheduler = new TestScheduler();
  2605. var xs = scheduler.CreateHotObservable(
  2606. OnNext(180, 1),
  2607. OnNext(210, 2),
  2608. OnNext(250, 3),
  2609. OnNext(270, 4),
  2610. OnNext(310, 5),
  2611. OnNext(360, 6),
  2612. OnNext(380, 7),
  2613. OnNext(410, 8),
  2614. OnNext(590, 9)
  2615. );
  2616. var res = scheduler.Start(() =>
  2617. xs.SkipLast(0)
  2618. );
  2619. res.Messages.AssertEqual(
  2620. OnNext(210, 2),
  2621. OnNext(250, 3),
  2622. OnNext(270, 4),
  2623. OnNext(310, 5),
  2624. OnNext(360, 6),
  2625. OnNext(380, 7),
  2626. OnNext(410, 8),
  2627. OnNext(590, 9)
  2628. );
  2629. xs.Subscriptions.AssertEqual(
  2630. Subscribe(200, 1000)
  2631. );
  2632. }
  2633. [Fact]
  2634. public void SkipLast_One_Completed()
  2635. {
  2636. var scheduler = new TestScheduler();
  2637. var xs = scheduler.CreateHotObservable(
  2638. OnNext(180, 1),
  2639. OnNext(210, 2),
  2640. OnNext(250, 3),
  2641. OnNext(270, 4),
  2642. OnNext(310, 5),
  2643. OnNext(360, 6),
  2644. OnNext(380, 7),
  2645. OnNext(410, 8),
  2646. OnNext(590, 9),
  2647. OnCompleted<int>(650)
  2648. );
  2649. var res = scheduler.Start(() =>
  2650. xs.SkipLast(1)
  2651. );
  2652. res.Messages.AssertEqual(
  2653. OnNext(250, 2),
  2654. OnNext(270, 3),
  2655. OnNext(310, 4),
  2656. OnNext(360, 5),
  2657. OnNext(380, 6),
  2658. OnNext(410, 7),
  2659. OnNext(590, 8),
  2660. OnCompleted<int>(650)
  2661. );
  2662. xs.Subscriptions.AssertEqual(
  2663. Subscribe(200, 650)
  2664. );
  2665. }
  2666. [Fact]
  2667. public void SkipLast_One_Error()
  2668. {
  2669. var scheduler = new TestScheduler();
  2670. var ex = new Exception();
  2671. var xs = scheduler.CreateHotObservable(
  2672. OnNext(180, 1),
  2673. OnNext(210, 2),
  2674. OnNext(250, 3),
  2675. OnNext(270, 4),
  2676. OnNext(310, 5),
  2677. OnNext(360, 6),
  2678. OnNext(380, 7),
  2679. OnNext(410, 8),
  2680. OnNext(590, 9),
  2681. OnError<int>(650, ex)
  2682. );
  2683. var res = scheduler.Start(() =>
  2684. xs.SkipLast(1)
  2685. );
  2686. res.Messages.AssertEqual(
  2687. OnNext(250, 2),
  2688. OnNext(270, 3),
  2689. OnNext(310, 4),
  2690. OnNext(360, 5),
  2691. OnNext(380, 6),
  2692. OnNext(410, 7),
  2693. OnNext(590, 8),
  2694. OnError<int>(650, ex)
  2695. );
  2696. xs.Subscriptions.AssertEqual(
  2697. Subscribe(200, 650)
  2698. );
  2699. }
  2700. [Fact]
  2701. public void SkipLast_One_Disposed()
  2702. {
  2703. var scheduler = new TestScheduler();
  2704. var xs = scheduler.CreateHotObservable(
  2705. OnNext(180, 1),
  2706. OnNext(210, 2),
  2707. OnNext(250, 3),
  2708. OnNext(270, 4),
  2709. OnNext(310, 5),
  2710. OnNext(360, 6),
  2711. OnNext(380, 7),
  2712. OnNext(410, 8),
  2713. OnNext(590, 9)
  2714. );
  2715. var res = scheduler.Start(() =>
  2716. xs.SkipLast(1)
  2717. );
  2718. res.Messages.AssertEqual(
  2719. OnNext(250, 2),
  2720. OnNext(270, 3),
  2721. OnNext(310, 4),
  2722. OnNext(360, 5),
  2723. OnNext(380, 6),
  2724. OnNext(410, 7),
  2725. OnNext(590, 8)
  2726. );
  2727. xs.Subscriptions.AssertEqual(
  2728. Subscribe(200, 1000)
  2729. );
  2730. }
  2731. [Fact]
  2732. public void SkipLast_Three_Completed()
  2733. {
  2734. var scheduler = new TestScheduler();
  2735. var xs = scheduler.CreateHotObservable(
  2736. OnNext(180, 1),
  2737. OnNext(210, 2),
  2738. OnNext(250, 3),
  2739. OnNext(270, 4),
  2740. OnNext(310, 5),
  2741. OnNext(360, 6),
  2742. OnNext(380, 7),
  2743. OnNext(410, 8),
  2744. OnNext(590, 9),
  2745. OnCompleted<int>(650)
  2746. );
  2747. var res = scheduler.Start(() =>
  2748. xs.SkipLast(3)
  2749. );
  2750. res.Messages.AssertEqual(
  2751. OnNext(310, 2),
  2752. OnNext(360, 3),
  2753. OnNext(380, 4),
  2754. OnNext(410, 5),
  2755. OnNext(590, 6),
  2756. OnCompleted<int>(650)
  2757. );
  2758. xs.Subscriptions.AssertEqual(
  2759. Subscribe(200, 650)
  2760. );
  2761. }
  2762. [Fact]
  2763. public void SkipLast_Three_Error()
  2764. {
  2765. var scheduler = new TestScheduler();
  2766. var ex = new Exception();
  2767. var xs = scheduler.CreateHotObservable(
  2768. OnNext(180, 1),
  2769. OnNext(210, 2),
  2770. OnNext(250, 3),
  2771. OnNext(270, 4),
  2772. OnNext(310, 5),
  2773. OnNext(360, 6),
  2774. OnNext(380, 7),
  2775. OnNext(410, 8),
  2776. OnNext(590, 9),
  2777. OnError<int>(650, ex)
  2778. );
  2779. var res = scheduler.Start(() =>
  2780. xs.SkipLast(3)
  2781. );
  2782. res.Messages.AssertEqual(
  2783. OnNext(310, 2),
  2784. OnNext(360, 3),
  2785. OnNext(380, 4),
  2786. OnNext(410, 5),
  2787. OnNext(590, 6),
  2788. OnError<int>(650, ex)
  2789. );
  2790. xs.Subscriptions.AssertEqual(
  2791. Subscribe(200, 650)
  2792. );
  2793. }
  2794. [Fact]
  2795. public void SkipLast_Three_Disposed()
  2796. {
  2797. var scheduler = new TestScheduler();
  2798. var xs = scheduler.CreateHotObservable(
  2799. OnNext(180, 1),
  2800. OnNext(210, 2),
  2801. OnNext(250, 3),
  2802. OnNext(270, 4),
  2803. OnNext(310, 5),
  2804. OnNext(360, 6),
  2805. OnNext(380, 7),
  2806. OnNext(410, 8),
  2807. OnNext(590, 9)
  2808. );
  2809. var res = scheduler.Start(() =>
  2810. xs.SkipLast(3)
  2811. );
  2812. res.Messages.AssertEqual(
  2813. OnNext(310, 2),
  2814. OnNext(360, 3),
  2815. OnNext(380, 4),
  2816. OnNext(410, 5),
  2817. OnNext(590, 6)
  2818. );
  2819. xs.Subscriptions.AssertEqual(
  2820. Subscribe(200, 1000)
  2821. );
  2822. }
  2823. #endregion
  2824. #region StartWith
  2825. [Fact]
  2826. public void StartWith_ArgumentChecking()
  2827. {
  2828. var values = (IEnumerable<int>)new[] { 1, 2, 3 };
  2829. var scheduler = new TestScheduler();
  2830. var someObservable = Observable.Empty<int>();
  2831. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.StartWith(default(IObservable<int>), 1));
  2832. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.StartWith(someObservable, default(int[])));
  2833. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.StartWith(default(IObservable<int>), values));
  2834. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.StartWith(someObservable, default(IEnumerable<int>)));
  2835. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.StartWith(default(IObservable<int>), scheduler, 1));
  2836. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.StartWith(someObservable, default(IScheduler), 1));
  2837. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.StartWith(someObservable, scheduler, default(int[])));
  2838. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.StartWith(default(IObservable<int>), scheduler, values));
  2839. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.StartWith(someObservable, default(IScheduler), values));
  2840. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.StartWith(someObservable, scheduler, default(IEnumerable<int>)));
  2841. }
  2842. [Fact]
  2843. public void StartWith()
  2844. {
  2845. var scheduler = new TestScheduler();
  2846. var xs = scheduler.CreateHotObservable(
  2847. OnNext(150, 1),
  2848. OnNext(220, 2),
  2849. OnCompleted<int>(250)
  2850. );
  2851. var res = scheduler.Start(() =>
  2852. xs.StartWith(1)
  2853. );
  2854. res.Messages.AssertEqual(
  2855. OnNext(200, 1),
  2856. OnNext(220, 2),
  2857. OnCompleted<int>(250)
  2858. );
  2859. }
  2860. [Fact]
  2861. public void StartWith_Scheduler()
  2862. {
  2863. var scheduler = new TestScheduler();
  2864. var xs = scheduler.CreateHotObservable(
  2865. OnNext(150, 1),
  2866. OnNext(220, 4),
  2867. OnCompleted<int>(250)
  2868. );
  2869. var res = scheduler.Start(() =>
  2870. xs.StartWith(scheduler, 1, 2, 3)
  2871. );
  2872. res.Messages.AssertEqual(
  2873. OnNext(201, 1),
  2874. OnNext(202, 2),
  2875. OnNext(203, 3),
  2876. OnNext(220, 4),
  2877. OnCompleted<int>(250)
  2878. );
  2879. }
  2880. [Fact]
  2881. public void StartWith_Enumerable()
  2882. {
  2883. var scheduler = new TestScheduler();
  2884. var xs = scheduler.CreateHotObservable(
  2885. OnNext(150, 1),
  2886. OnNext(220, 4),
  2887. OnCompleted<int>(250)
  2888. );
  2889. List<int> data = new List<int>(new[] { 1, 2, 3 });
  2890. var res = scheduler.Start(() =>
  2891. xs.StartWith(data)
  2892. );
  2893. res.Messages.AssertEqual(
  2894. OnNext(200, 1),
  2895. OnNext(200, 2),
  2896. OnNext(200, 3),
  2897. OnNext(220, 4),
  2898. OnCompleted<int>(250)
  2899. );
  2900. }
  2901. [Fact]
  2902. public void StartWith_Enumerable_Scheduler()
  2903. {
  2904. var scheduler = new TestScheduler();
  2905. var xs = scheduler.CreateHotObservable(
  2906. OnNext(150, 1),
  2907. OnNext(220, 4),
  2908. OnCompleted<int>(250)
  2909. );
  2910. List<int> data = new List<int>(new[] { 1, 2, 3 });
  2911. var res = scheduler.Start(() =>
  2912. xs.StartWith(scheduler, data)
  2913. );
  2914. res.Messages.AssertEqual(
  2915. OnNext(201, 1),
  2916. OnNext(202, 2),
  2917. OnNext(203, 3),
  2918. OnNext(220, 4),
  2919. OnCompleted<int>(250)
  2920. );
  2921. }
  2922. #endregion
  2923. #region + TakeLast +
  2924. [Fact]
  2925. public void TakeLast_ArgumentChecking()
  2926. {
  2927. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.TakeLast<int>(null, 0));
  2928. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.TakeLast(DummyObservable<int>.Instance, -1));
  2929. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.TakeLast<int>(null, 0, Scheduler.Default));
  2930. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.TakeLast(DummyObservable<int>.Instance, -1, Scheduler.Default));
  2931. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.TakeLast(DummyObservable<int>.Instance, 0, default(IScheduler)));
  2932. }
  2933. [Fact]
  2934. public void TakeLast_Zero_Completed()
  2935. {
  2936. var scheduler = new TestScheduler();
  2937. var xs = scheduler.CreateHotObservable(
  2938. OnNext(180, 1),
  2939. OnNext(210, 2),
  2940. OnNext(250, 3),
  2941. OnNext(270, 4),
  2942. OnNext(310, 5),
  2943. OnNext(360, 6),
  2944. OnNext(380, 7),
  2945. OnNext(410, 8),
  2946. OnNext(590, 9),
  2947. OnCompleted<int>(650)
  2948. );
  2949. var res = scheduler.Start(() =>
  2950. xs.TakeLast(0)
  2951. );
  2952. res.Messages.AssertEqual(
  2953. OnCompleted<int>(650)
  2954. );
  2955. xs.Subscriptions.AssertEqual(
  2956. Subscribe(200, 650)
  2957. );
  2958. }
  2959. [Fact]
  2960. public void TakeLast_Zero_Error()
  2961. {
  2962. var scheduler = new TestScheduler();
  2963. var ex = new Exception();
  2964. var xs = scheduler.CreateHotObservable(
  2965. OnNext(180, 1),
  2966. OnNext(210, 2),
  2967. OnNext(250, 3),
  2968. OnNext(270, 4),
  2969. OnNext(310, 5),
  2970. OnNext(360, 6),
  2971. OnNext(380, 7),
  2972. OnNext(410, 8),
  2973. OnNext(590, 9),
  2974. OnError<int>(650, ex)
  2975. );
  2976. var res = scheduler.Start(() =>
  2977. xs.TakeLast(0)
  2978. );
  2979. res.Messages.AssertEqual(
  2980. OnError<int>(650, ex)
  2981. );
  2982. xs.Subscriptions.AssertEqual(
  2983. Subscribe(200, 650)
  2984. );
  2985. }
  2986. [Fact]
  2987. public void TakeLast_Zero_Disposed()
  2988. {
  2989. var scheduler = new TestScheduler();
  2990. var xs = scheduler.CreateHotObservable(
  2991. OnNext(180, 1),
  2992. OnNext(210, 2),
  2993. OnNext(250, 3),
  2994. OnNext(270, 4),
  2995. OnNext(310, 5),
  2996. OnNext(360, 6),
  2997. OnNext(380, 7),
  2998. OnNext(410, 8),
  2999. OnNext(590, 9)
  3000. );
  3001. var res = scheduler.Start(() =>
  3002. xs.TakeLast(0)
  3003. );
  3004. res.Messages.AssertEqual(
  3005. );
  3006. xs.Subscriptions.AssertEqual(
  3007. Subscribe(200, 1000)
  3008. );
  3009. }
  3010. [Fact]
  3011. public void TakeLast_One_Completed()
  3012. {
  3013. var scheduler = new TestScheduler();
  3014. var xs = scheduler.CreateHotObservable(
  3015. OnNext(180, 1),
  3016. OnNext(210, 2),
  3017. OnNext(250, 3),
  3018. OnNext(270, 4),
  3019. OnNext(310, 5),
  3020. OnNext(360, 6),
  3021. OnNext(380, 7),
  3022. OnNext(410, 8),
  3023. OnNext(590, 9),
  3024. OnCompleted<int>(650)
  3025. );
  3026. var res = scheduler.Start(() =>
  3027. xs.TakeLast(1)
  3028. );
  3029. res.Messages.AssertEqual(
  3030. OnNext(650, 9),
  3031. OnCompleted<int>(650)
  3032. );
  3033. xs.Subscriptions.AssertEqual(
  3034. Subscribe(200, 650)
  3035. );
  3036. }
  3037. [Fact]
  3038. public void TakeLast_One_Error()
  3039. {
  3040. var scheduler = new TestScheduler();
  3041. var ex = new Exception();
  3042. var xs = scheduler.CreateHotObservable(
  3043. OnNext(180, 1),
  3044. OnNext(210, 2),
  3045. OnNext(250, 3),
  3046. OnNext(270, 4),
  3047. OnNext(310, 5),
  3048. OnNext(360, 6),
  3049. OnNext(380, 7),
  3050. OnNext(410, 8),
  3051. OnNext(590, 9),
  3052. OnError<int>(650, ex)
  3053. );
  3054. var res = scheduler.Start(() =>
  3055. xs.TakeLast(1)
  3056. );
  3057. res.Messages.AssertEqual(
  3058. OnError<int>(650, ex)
  3059. );
  3060. xs.Subscriptions.AssertEqual(
  3061. Subscribe(200, 650)
  3062. );
  3063. }
  3064. [Fact]
  3065. public void TakeLast_One_Disposed()
  3066. {
  3067. var scheduler = new TestScheduler();
  3068. var xs = scheduler.CreateHotObservable(
  3069. OnNext(180, 1),
  3070. OnNext(210, 2),
  3071. OnNext(250, 3),
  3072. OnNext(270, 4),
  3073. OnNext(310, 5),
  3074. OnNext(360, 6),
  3075. OnNext(380, 7),
  3076. OnNext(410, 8),
  3077. OnNext(590, 9)
  3078. );
  3079. var res = scheduler.Start(() =>
  3080. xs.TakeLast(1)
  3081. );
  3082. res.Messages.AssertEqual(
  3083. );
  3084. xs.Subscriptions.AssertEqual(
  3085. Subscribe(200, 1000)
  3086. );
  3087. }
  3088. [Fact]
  3089. public void TakeLast_Three_Completed()
  3090. {
  3091. var scheduler = new TestScheduler();
  3092. var xs = scheduler.CreateHotObservable(
  3093. OnNext(180, 1),
  3094. OnNext(210, 2),
  3095. OnNext(250, 3),
  3096. OnNext(270, 4),
  3097. OnNext(310, 5),
  3098. OnNext(360, 6),
  3099. OnNext(380, 7),
  3100. OnNext(410, 8),
  3101. OnNext(590, 9),
  3102. OnCompleted<int>(650)
  3103. );
  3104. var res = scheduler.Start(() =>
  3105. xs.TakeLast(3)
  3106. );
  3107. res.Messages.AssertEqual(
  3108. OnNext(650, 7),
  3109. OnNext(650, 8),
  3110. OnNext(650, 9),
  3111. OnCompleted<int>(650)
  3112. );
  3113. xs.Subscriptions.AssertEqual(
  3114. Subscribe(200, 650)
  3115. );
  3116. }
  3117. [Fact]
  3118. public void TakeLast_Three_Error()
  3119. {
  3120. var scheduler = new TestScheduler();
  3121. var ex = new Exception();
  3122. var xs = scheduler.CreateHotObservable(
  3123. OnNext(180, 1),
  3124. OnNext(210, 2),
  3125. OnNext(250, 3),
  3126. OnNext(270, 4),
  3127. OnNext(310, 5),
  3128. OnNext(360, 6),
  3129. OnNext(380, 7),
  3130. OnNext(410, 8),
  3131. OnNext(590, 9),
  3132. OnError<int>(650, ex)
  3133. );
  3134. var res = scheduler.Start(() =>
  3135. xs.TakeLast(3)
  3136. );
  3137. res.Messages.AssertEqual(
  3138. OnError<int>(650, ex)
  3139. );
  3140. xs.Subscriptions.AssertEqual(
  3141. Subscribe(200, 650)
  3142. );
  3143. }
  3144. [Fact]
  3145. public void TakeLast_Three_Disposed()
  3146. {
  3147. var scheduler = new TestScheduler();
  3148. var xs = scheduler.CreateHotObservable(
  3149. OnNext(180, 1),
  3150. OnNext(210, 2),
  3151. OnNext(250, 3),
  3152. OnNext(270, 4),
  3153. OnNext(310, 5),
  3154. OnNext(360, 6),
  3155. OnNext(380, 7),
  3156. OnNext(410, 8),
  3157. OnNext(590, 9)
  3158. );
  3159. var res = scheduler.Start(() =>
  3160. xs.TakeLast(3)
  3161. );
  3162. res.Messages.AssertEqual(
  3163. );
  3164. xs.Subscriptions.AssertEqual(
  3165. Subscribe(200, 1000)
  3166. );
  3167. }
  3168. [Fact]
  3169. public void TakeLast_LongRunning_Regular()
  3170. {
  3171. var res = Observable.Range(0, 100, Scheduler.Default).TakeLast(10, NewThreadScheduler.Default);
  3172. var lst = new List<int>();
  3173. res.ForEach(lst.Add);
  3174. Assert.True(Enumerable.Range(90, 10).SequenceEqual(lst));
  3175. }
  3176. #endregion
  3177. #region + TakeLastBuffer +
  3178. [Fact]
  3179. public void TakeLastBuffer_ArgumentChecking()
  3180. {
  3181. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.TakeLastBuffer<int>(null, 0));
  3182. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.TakeLastBuffer(DummyObservable<int>.Instance, -1));
  3183. }
  3184. [Fact]
  3185. public void TakeLastBuffer_Zero_Completed()
  3186. {
  3187. var scheduler = new TestScheduler();
  3188. var xs = scheduler.CreateHotObservable(
  3189. OnNext(180, 1),
  3190. OnNext(210, 2),
  3191. OnNext(250, 3),
  3192. OnNext(270, 4),
  3193. OnNext(310, 5),
  3194. OnNext(360, 6),
  3195. OnNext(380, 7),
  3196. OnNext(410, 8),
  3197. OnNext(590, 9),
  3198. OnCompleted<int>(650)
  3199. );
  3200. var res = scheduler.Start(() =>
  3201. xs.TakeLastBuffer(0)
  3202. );
  3203. res.Messages.AssertEqual(
  3204. OnNext<IList<int>>(650, lst => lst.Count == 0),
  3205. OnCompleted<IList<int>>(650)
  3206. );
  3207. xs.Subscriptions.AssertEqual(
  3208. Subscribe(200, 650)
  3209. );
  3210. }
  3211. [Fact]
  3212. public void TakeLastBuffer_Zero_Error()
  3213. {
  3214. var scheduler = new TestScheduler();
  3215. var ex = new Exception();
  3216. var xs = scheduler.CreateHotObservable(
  3217. OnNext(180, 1),
  3218. OnNext(210, 2),
  3219. OnNext(250, 3),
  3220. OnNext(270, 4),
  3221. OnNext(310, 5),
  3222. OnNext(360, 6),
  3223. OnNext(380, 7),
  3224. OnNext(410, 8),
  3225. OnNext(590, 9),
  3226. OnError<int>(650, ex)
  3227. );
  3228. var res = scheduler.Start(() =>
  3229. xs.TakeLastBuffer(0)
  3230. );
  3231. res.Messages.AssertEqual(
  3232. OnError<IList<int>>(650, ex)
  3233. );
  3234. xs.Subscriptions.AssertEqual(
  3235. Subscribe(200, 650)
  3236. );
  3237. }
  3238. [Fact]
  3239. public void TakeLastBuffer_Zero_Disposed()
  3240. {
  3241. var scheduler = new TestScheduler();
  3242. var xs = scheduler.CreateHotObservable(
  3243. OnNext(180, 1),
  3244. OnNext(210, 2),
  3245. OnNext(250, 3),
  3246. OnNext(270, 4),
  3247. OnNext(310, 5),
  3248. OnNext(360, 6),
  3249. OnNext(380, 7),
  3250. OnNext(410, 8),
  3251. OnNext(590, 9)
  3252. );
  3253. var res = scheduler.Start(() =>
  3254. xs.TakeLastBuffer(0)
  3255. );
  3256. res.Messages.AssertEqual(
  3257. );
  3258. xs.Subscriptions.AssertEqual(
  3259. Subscribe(200, 1000)
  3260. );
  3261. }
  3262. [Fact]
  3263. public void TakeLastBuffer_One_Completed()
  3264. {
  3265. var scheduler = new TestScheduler();
  3266. var xs = scheduler.CreateHotObservable(
  3267. OnNext(180, 1),
  3268. OnNext(210, 2),
  3269. OnNext(250, 3),
  3270. OnNext(270, 4),
  3271. OnNext(310, 5),
  3272. OnNext(360, 6),
  3273. OnNext(380, 7),
  3274. OnNext(410, 8),
  3275. OnNext(590, 9),
  3276. OnCompleted<int>(650)
  3277. );
  3278. var res = scheduler.Start(() =>
  3279. xs.TakeLastBuffer(1)
  3280. );
  3281. res.Messages.AssertEqual(
  3282. OnNext<IList<int>>(650, lst => lst.SequenceEqual(new[] { 9 })),
  3283. OnCompleted<IList<int>>(650)
  3284. );
  3285. xs.Subscriptions.AssertEqual(
  3286. Subscribe(200, 650)
  3287. );
  3288. }
  3289. [Fact]
  3290. public void TakeLastBuffer_One_Error()
  3291. {
  3292. var scheduler = new TestScheduler();
  3293. var ex = new Exception();
  3294. var xs = scheduler.CreateHotObservable(
  3295. OnNext(180, 1),
  3296. OnNext(210, 2),
  3297. OnNext(250, 3),
  3298. OnNext(270, 4),
  3299. OnNext(310, 5),
  3300. OnNext(360, 6),
  3301. OnNext(380, 7),
  3302. OnNext(410, 8),
  3303. OnNext(590, 9),
  3304. OnError<int>(650, ex)
  3305. );
  3306. var res = scheduler.Start(() =>
  3307. xs.TakeLastBuffer(1)
  3308. );
  3309. res.Messages.AssertEqual(
  3310. OnError<IList<int>>(650, ex)
  3311. );
  3312. xs.Subscriptions.AssertEqual(
  3313. Subscribe(200, 650)
  3314. );
  3315. }
  3316. [Fact]
  3317. public void TakeLastBuffer_One_Disposed()
  3318. {
  3319. var scheduler = new TestScheduler();
  3320. var xs = scheduler.CreateHotObservable(
  3321. OnNext(180, 1),
  3322. OnNext(210, 2),
  3323. OnNext(250, 3),
  3324. OnNext(270, 4),
  3325. OnNext(310, 5),
  3326. OnNext(360, 6),
  3327. OnNext(380, 7),
  3328. OnNext(410, 8),
  3329. OnNext(590, 9)
  3330. );
  3331. var res = scheduler.Start(() =>
  3332. xs.TakeLastBuffer(1)
  3333. );
  3334. res.Messages.AssertEqual(
  3335. );
  3336. xs.Subscriptions.AssertEqual(
  3337. Subscribe(200, 1000)
  3338. );
  3339. }
  3340. [Fact]
  3341. public void TakeLastBuffer_Three_Completed()
  3342. {
  3343. var scheduler = new TestScheduler();
  3344. var xs = scheduler.CreateHotObservable(
  3345. OnNext(180, 1),
  3346. OnNext(210, 2),
  3347. OnNext(250, 3),
  3348. OnNext(270, 4),
  3349. OnNext(310, 5),
  3350. OnNext(360, 6),
  3351. OnNext(380, 7),
  3352. OnNext(410, 8),
  3353. OnNext(590, 9),
  3354. OnCompleted<int>(650)
  3355. );
  3356. var res = scheduler.Start(() =>
  3357. xs.TakeLastBuffer(3)
  3358. );
  3359. res.Messages.AssertEqual(
  3360. OnNext<IList<int>>(650, lst => lst.SequenceEqual(new[] { 7, 8, 9 })),
  3361. OnCompleted<IList<int>>(650)
  3362. );
  3363. xs.Subscriptions.AssertEqual(
  3364. Subscribe(200, 650)
  3365. );
  3366. }
  3367. [Fact]
  3368. public void TakeLastBuffer_Three_Error()
  3369. {
  3370. var scheduler = new TestScheduler();
  3371. var ex = new Exception();
  3372. var xs = scheduler.CreateHotObservable(
  3373. OnNext(180, 1),
  3374. OnNext(210, 2),
  3375. OnNext(250, 3),
  3376. OnNext(270, 4),
  3377. OnNext(310, 5),
  3378. OnNext(360, 6),
  3379. OnNext(380, 7),
  3380. OnNext(410, 8),
  3381. OnNext(590, 9),
  3382. OnError<int>(650, ex)
  3383. );
  3384. var res = scheduler.Start(() =>
  3385. xs.TakeLastBuffer(3)
  3386. );
  3387. res.Messages.AssertEqual(
  3388. OnError<IList<int>>(650, ex)
  3389. );
  3390. xs.Subscriptions.AssertEqual(
  3391. Subscribe(200, 650)
  3392. );
  3393. }
  3394. [Fact]
  3395. public void TakeLastBuffer_Three_Disposed()
  3396. {
  3397. var scheduler = new TestScheduler();
  3398. var xs = scheduler.CreateHotObservable(
  3399. OnNext(180, 1),
  3400. OnNext(210, 2),
  3401. OnNext(250, 3),
  3402. OnNext(270, 4),
  3403. OnNext(310, 5),
  3404. OnNext(360, 6),
  3405. OnNext(380, 7),
  3406. OnNext(410, 8),
  3407. OnNext(590, 9)
  3408. );
  3409. var res = scheduler.Start(() =>
  3410. xs.TakeLastBuffer(3)
  3411. );
  3412. res.Messages.AssertEqual(
  3413. );
  3414. xs.Subscriptions.AssertEqual(
  3415. Subscribe(200, 1000)
  3416. );
  3417. }
  3418. #endregion
  3419. #region + Window +
  3420. [Fact]
  3421. public void WindowWithCount_ArgumentChecking()
  3422. {
  3423. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Window(default(IObservable<int>), 1, 1));
  3424. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Window(DummyObservable<int>.Instance, 0, 1));
  3425. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Window(DummyObservable<int>.Instance, 1, 0));
  3426. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Window(default(IObservable<int>), 1));
  3427. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Window(DummyObservable<int>.Instance, 0));
  3428. }
  3429. [Fact]
  3430. public void WindowWithCount_Basic()
  3431. {
  3432. var scheduler = new TestScheduler();
  3433. var xs = scheduler.CreateHotObservable(
  3434. OnNext(100, 1),
  3435. OnNext(210, 2),
  3436. OnNext(240, 3),
  3437. OnNext(280, 4),
  3438. OnNext(320, 5),
  3439. OnNext(350, 6),
  3440. OnNext(380, 7),
  3441. OnNext(420, 8),
  3442. OnNext(470, 9),
  3443. OnCompleted<int>(600)
  3444. );
  3445. var res = scheduler.Start(() =>
  3446. xs.Window(3, 2).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge()
  3447. );
  3448. res.Messages.AssertEqual(
  3449. OnNext(210, "0 2"),
  3450. OnNext(240, "0 3"),
  3451. OnNext(280, "0 4"),
  3452. OnNext(280, "1 4"),
  3453. OnNext(320, "1 5"),
  3454. OnNext(350, "1 6"),
  3455. OnNext(350, "2 6"),
  3456. OnNext(380, "2 7"),
  3457. OnNext(420, "2 8"),
  3458. OnNext(420, "3 8"),
  3459. OnNext(470, "3 9"),
  3460. OnCompleted<string>(600)
  3461. );
  3462. xs.Subscriptions.AssertEqual(
  3463. Subscribe(200, 600)
  3464. );
  3465. }
  3466. [Fact]
  3467. public void WindowWithCount_InnerTimings()
  3468. {
  3469. var scheduler = new TestScheduler();
  3470. var xs = scheduler.CreateHotObservable(
  3471. OnNext(100, 1),
  3472. OnNext(210, 2),
  3473. OnNext(240, 3),
  3474. OnNext(280, 4),
  3475. OnNext(320, 5),
  3476. OnNext(350, 6),
  3477. OnNext(380, 7),
  3478. OnNext(420, 8),
  3479. OnNext(470, 9),
  3480. OnCompleted<int>(600)
  3481. );
  3482. var res = default(IObservable<IObservable<int>>);
  3483. var outerSubscription = default(IDisposable);
  3484. var innerSubscriptions = new List<IDisposable>();
  3485. var windows = new List<IObservable<int>>();
  3486. var observers = new List<ITestableObserver<int>>();
  3487. scheduler.ScheduleAbsolute(Created, () => res = xs.Window(3, 2));
  3488. scheduler.ScheduleAbsolute(Subscribed, () =>
  3489. {
  3490. outerSubscription = res.Subscribe(
  3491. window =>
  3492. {
  3493. var result = scheduler.CreateObserver<int>();
  3494. windows.Add(window);
  3495. observers.Add(result);
  3496. scheduler.ScheduleAbsolute(0, () => innerSubscriptions.Add(window.Subscribe(result)));
  3497. }
  3498. );
  3499. });
  3500. scheduler.Start();
  3501. Assert.Equal(5, observers.Count);
  3502. observers[0].Messages.AssertEqual(
  3503. OnNext(210, 2),
  3504. OnNext(240, 3),
  3505. OnNext(280, 4),
  3506. OnCompleted<int>(280)
  3507. );
  3508. observers[1].Messages.AssertEqual(
  3509. OnNext(280, 4),
  3510. OnNext(320, 5),
  3511. OnNext(350, 6),
  3512. OnCompleted<int>(350)
  3513. );
  3514. observers[2].Messages.AssertEqual(
  3515. OnNext(350, 6),
  3516. OnNext(380, 7),
  3517. OnNext(420, 8),
  3518. OnCompleted<int>(420)
  3519. );
  3520. observers[3].Messages.AssertEqual(
  3521. OnNext(420, 8),
  3522. OnNext(470, 9),
  3523. OnCompleted<int>(600)
  3524. );
  3525. observers[4].Messages.AssertEqual(
  3526. OnCompleted<int>(600)
  3527. );
  3528. xs.Subscriptions.AssertEqual(
  3529. Subscribe(200, 600)
  3530. );
  3531. }
  3532. [Fact]
  3533. public void WindowWithCount_InnerTimings_DisposeOuter()
  3534. {
  3535. var scheduler = new TestScheduler();
  3536. var xs = scheduler.CreateHotObservable(
  3537. OnNext(100, 1),
  3538. OnNext(210, 2),
  3539. OnNext(240, 3),
  3540. OnNext(280, 4),
  3541. OnNext(320, 5),
  3542. OnNext(350, 6),
  3543. OnNext(380, 7),
  3544. OnNext(420, 8),
  3545. OnNext(470, 9),
  3546. OnCompleted<int>(600)
  3547. );
  3548. var res = default(IObservable<IObservable<int>>);
  3549. var outerSubscription = default(IDisposable);
  3550. var innerSubscriptions = new List<IDisposable>();
  3551. var windows = new List<IObservable<int>>();
  3552. var observers = new List<ITestableObserver<int>>();
  3553. var windowCreationTimes = new List<long>();
  3554. scheduler.ScheduleAbsolute(Created, () => res = xs.Window(3, 2));
  3555. scheduler.ScheduleAbsolute(Subscribed, () =>
  3556. {
  3557. outerSubscription = res.Subscribe(
  3558. window =>
  3559. {
  3560. windowCreationTimes.Add(scheduler.Clock);
  3561. var result = scheduler.CreateObserver<int>();
  3562. windows.Add(window);
  3563. observers.Add(result);
  3564. scheduler.ScheduleAbsolute(0, () => innerSubscriptions.Add(window.Subscribe(result)));
  3565. }
  3566. );
  3567. });
  3568. scheduler.ScheduleAbsolute(400, () =>
  3569. {
  3570. outerSubscription.Dispose();
  3571. });
  3572. scheduler.Start();
  3573. Assert.True(windowCreationTimes.Last() < 400);
  3574. Assert.Equal(4, observers.Count);
  3575. observers[0].Messages.AssertEqual(
  3576. OnNext(210, 2),
  3577. OnNext(240, 3),
  3578. OnNext(280, 4),
  3579. OnCompleted<int>(280)
  3580. );
  3581. observers[1].Messages.AssertEqual(
  3582. OnNext(280, 4),
  3583. OnNext(320, 5),
  3584. OnNext(350, 6),
  3585. OnCompleted<int>(350)
  3586. );
  3587. observers[2].Messages.AssertEqual(
  3588. OnNext(350, 6),
  3589. OnNext(380, 7),
  3590. OnNext(420, 8),
  3591. OnCompleted<int>(420)
  3592. );
  3593. observers[3].Messages.AssertEqual(
  3594. OnNext(420, 8),
  3595. OnNext(470, 9),
  3596. OnCompleted<int>(600)
  3597. );
  3598. xs.Subscriptions.AssertEqual(
  3599. Subscribe(200, 600)
  3600. );
  3601. }
  3602. [Fact]
  3603. public void WindowWithCount_InnerTimings_DisposeOuterAndInners()
  3604. {
  3605. var scheduler = new TestScheduler();
  3606. var xs = scheduler.CreateHotObservable(
  3607. OnNext(100, 1),
  3608. OnNext(210, 2),
  3609. OnNext(240, 3),
  3610. OnNext(280, 4),
  3611. OnNext(320, 5),
  3612. OnNext(350, 6),
  3613. OnNext(380, 7),
  3614. OnNext(420, 8),
  3615. OnNext(470, 9),
  3616. OnCompleted<int>(600)
  3617. );
  3618. var res = default(IObservable<IObservable<int>>);
  3619. var outerSubscription = default(IDisposable);
  3620. var innerSubscriptions = new List<IDisposable>();
  3621. var windows = new List<IObservable<int>>();
  3622. var observers = new List<ITestableObserver<int>>();
  3623. var windowCreationTimes = new List<long>();
  3624. scheduler.ScheduleAbsolute(Created, () => res = xs.Window(3, 2));
  3625. scheduler.ScheduleAbsolute(Subscribed, () =>
  3626. {
  3627. outerSubscription = res.Subscribe(
  3628. window =>
  3629. {
  3630. windowCreationTimes.Add(scheduler.Clock);
  3631. var result = scheduler.CreateObserver<int>();
  3632. windows.Add(window);
  3633. observers.Add(result);
  3634. scheduler.ScheduleAbsolute(0, () => innerSubscriptions.Add(window.Subscribe(result)));
  3635. }
  3636. );
  3637. });
  3638. scheduler.ScheduleAbsolute(400, () =>
  3639. {
  3640. outerSubscription.Dispose();
  3641. foreach (var d in innerSubscriptions)
  3642. d.Dispose();
  3643. });
  3644. scheduler.Start();
  3645. Assert.True(windowCreationTimes.Last() < 400);
  3646. Assert.Equal(4, observers.Count);
  3647. observers[0].Messages.AssertEqual(
  3648. OnNext(210, 2),
  3649. OnNext(240, 3),
  3650. OnNext(280, 4),
  3651. OnCompleted<int>(280)
  3652. );
  3653. observers[1].Messages.AssertEqual(
  3654. OnNext(280, 4),
  3655. OnNext(320, 5),
  3656. OnNext(350, 6),
  3657. OnCompleted<int>(350)
  3658. );
  3659. observers[2].Messages.AssertEqual(
  3660. OnNext(350, 6),
  3661. OnNext(380, 7)
  3662. );
  3663. observers[3].Messages.AssertEqual(
  3664. );
  3665. xs.Subscriptions.AssertEqual(
  3666. Subscribe(200, 400)
  3667. );
  3668. }
  3669. [Fact]
  3670. public void WindowWithCount_Disposed()
  3671. {
  3672. var scheduler = new TestScheduler();
  3673. var xs = scheduler.CreateHotObservable(
  3674. OnNext(100, 1),
  3675. OnNext(210, 2),
  3676. OnNext(240, 3),
  3677. OnNext(280, 4),
  3678. OnNext(320, 5),
  3679. OnNext(350, 6),
  3680. OnNext(380, 7),
  3681. OnNext(420, 8),
  3682. OnNext(470, 9),
  3683. OnCompleted<int>(600)
  3684. );
  3685. var res = scheduler.Start(() =>
  3686. xs.Window(3, 2).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge(), 370
  3687. );
  3688. res.Messages.AssertEqual(
  3689. OnNext(210, "0 2"),
  3690. OnNext(240, "0 3"),
  3691. OnNext(280, "0 4"),
  3692. OnNext(280, "1 4"),
  3693. OnNext(320, "1 5"),
  3694. OnNext(350, "1 6"),
  3695. OnNext(350, "2 6")
  3696. );
  3697. xs.Subscriptions.AssertEqual(
  3698. Subscribe(200, 370)
  3699. );
  3700. }
  3701. [Fact]
  3702. public void WindowWithCount_Error()
  3703. {
  3704. var scheduler = new TestScheduler();
  3705. var ex = new Exception();
  3706. var xs = scheduler.CreateHotObservable(
  3707. OnNext(100, 1),
  3708. OnNext(210, 2),
  3709. OnNext(240, 3),
  3710. OnNext(280, 4),
  3711. OnNext(320, 5),
  3712. OnNext(350, 6),
  3713. OnNext(380, 7),
  3714. OnNext(420, 8),
  3715. OnNext(470, 9),
  3716. OnError<int>(600, ex)
  3717. );
  3718. var res = scheduler.Start(() =>
  3719. xs.Window(3, 2).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge()
  3720. );
  3721. res.Messages.AssertEqual(
  3722. OnNext(210, "0 2"),
  3723. OnNext(240, "0 3"),
  3724. OnNext(280, "0 4"),
  3725. OnNext(280, "1 4"),
  3726. OnNext(320, "1 5"),
  3727. OnNext(350, "1 6"),
  3728. OnNext(350, "2 6"),
  3729. OnNext(380, "2 7"),
  3730. OnNext(420, "2 8"),
  3731. OnNext(420, "3 8"),
  3732. OnNext(470, "3 9"),
  3733. OnError<string>(600, ex)
  3734. );
  3735. xs.Subscriptions.AssertEqual(
  3736. Subscribe(200, 600)
  3737. );
  3738. }
  3739. [Fact]
  3740. public void WindowWithCount_Default()
  3741. {
  3742. Observable.Range(1, 10).Window(3).Skip(1).First().SequenceEqual(new[] { 4, 5, 6 }.ToObservable());
  3743. Observable.Range(1, 10).Window(3).Skip(1).First().SequenceEqual(new[] { 4, 5, 6 }.ToObservable());
  3744. Observable.Range(1, 10).Window(3, 2).Skip(1).First().SequenceEqual(new[] { 3, 4, 5 }.ToObservable());
  3745. }
  3746. #endregion
  3747. }
  3748. }