ObservableTimeTest.cs 265 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Reactive;
  8. using System.Reactive.Concurrency;
  9. using System.Reactive.Disposables;
  10. using System.Reactive.Linq;
  11. using System.Reactive.Subjects;
  12. using System.Text;
  13. using System.Threading;
  14. using Microsoft.Reactive.Testing;
  15. using Xunit;
  16. using ReactiveTests.Dummies;
  17. using System.Threading.Tasks;
  18. namespace ReactiveTests.Tests
  19. {
  20. public class ObservableTimeTest : ReactiveTest
  21. {
  22. #region + Buffer +
  23. [Fact]
  24. public void Buffer_Time_ArgumentChecking()
  25. {
  26. var scheduler = new TestScheduler();
  27. var someObservable = Observable.Empty<int>();
  28. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), TimeSpan.Zero));
  29. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(someObservable, TimeSpan.Zero, null));
  30. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), TimeSpan.Zero, scheduler));
  31. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), TimeSpan.Zero, TimeSpan.Zero));
  32. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(someObservable, TimeSpan.Zero, TimeSpan.Zero, null));
  33. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), TimeSpan.Zero, TimeSpan.Zero, scheduler));
  34. }
  35. [Fact]
  36. public void BufferWithTime_ArgumentChecking()
  37. {
  38. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), TimeSpan.FromTicks(1), TimeSpan.FromTicks(1), DummyScheduler.Instance));
  39. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(-1), TimeSpan.FromTicks(1), DummyScheduler.Instance));
  40. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(1), TimeSpan.FromTicks(-1), DummyScheduler.Instance));
  41. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(1), TimeSpan.FromTicks(1), null));
  42. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), TimeSpan.FromTicks(1), TimeSpan.FromTicks(1)));
  43. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(-1), TimeSpan.FromTicks(1)));
  44. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(1), TimeSpan.FromTicks(-1)));
  45. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), TimeSpan.FromTicks(1), DummyScheduler.Instance));
  46. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(-1), DummyScheduler.Instance));
  47. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(1), null));
  48. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), TimeSpan.FromTicks(1)));
  49. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(-1)));
  50. }
  51. [Fact]
  52. public void BufferWithTime_Basic1()
  53. {
  54. var scheduler = new TestScheduler();
  55. var xs = scheduler.CreateHotObservable(
  56. OnNext(100, 1),
  57. OnNext(210, 2),
  58. OnNext(240, 3),
  59. OnNext(280, 4),
  60. OnNext(320, 5),
  61. OnNext(350, 6),
  62. OnNext(380, 7),
  63. OnNext(420, 8),
  64. OnNext(470, 9),
  65. OnCompleted<int>(600)
  66. );
  67. var res = scheduler.Start(() =>
  68. xs.Buffer(TimeSpan.FromTicks(100), TimeSpan.FromTicks(70), scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray()))
  69. );
  70. res.Messages.AssertEqual(
  71. OnNext(300, "2,3,4"),
  72. OnNext(370, "4,5,6"),
  73. OnNext(440, "6,7,8"),
  74. OnNext(510, "8,9"),
  75. OnNext(580, ""),
  76. OnNext(600, ""),
  77. OnCompleted<string>(600)
  78. );
  79. xs.Subscriptions.AssertEqual(
  80. Subscribe(200, 600)
  81. );
  82. }
  83. [Fact]
  84. public void BufferWithTime_Basic2()
  85. {
  86. var scheduler = new TestScheduler();
  87. var xs = scheduler.CreateHotObservable(
  88. OnNext(100, 1),
  89. OnNext(210, 2),
  90. OnNext(240, 3),
  91. OnNext(280, 4),
  92. OnNext(320, 5),
  93. OnNext(350, 6),
  94. OnNext(380, 7),
  95. OnNext(420, 8),
  96. OnNext(470, 9),
  97. OnCompleted<int>(600)
  98. );
  99. var res = scheduler.Start(() =>
  100. xs.Buffer(TimeSpan.FromTicks(70), TimeSpan.FromTicks(100), scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray()))
  101. );
  102. res.Messages.AssertEqual(
  103. OnNext(270, "2,3"),
  104. OnNext(370, "5,6"),
  105. OnNext(470, "8,9"),
  106. OnNext(570, ""),
  107. OnCompleted<string>(600)
  108. );
  109. xs.Subscriptions.AssertEqual(
  110. Subscribe(200, 600)
  111. );
  112. }
  113. [Fact]
  114. public void BufferWithTime_Error()
  115. {
  116. var scheduler = new TestScheduler();
  117. var ex = new Exception();
  118. var xs = scheduler.CreateHotObservable(
  119. OnNext(100, 1),
  120. OnNext(210, 2),
  121. OnNext(240, 3),
  122. OnNext(280, 4),
  123. OnNext(320, 5),
  124. OnNext(350, 6),
  125. OnNext(380, 7),
  126. OnNext(420, 8),
  127. OnNext(470, 9),
  128. OnError<int>(600, ex)
  129. );
  130. var res = scheduler.Start(() =>
  131. xs.Buffer(TimeSpan.FromTicks(100), TimeSpan.FromTicks(70), scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray()))
  132. );
  133. res.Messages.AssertEqual(
  134. OnNext(300, "2,3,4"),
  135. OnNext(370, "4,5,6"),
  136. OnNext(440, "6,7,8"),
  137. OnNext(510, "8,9"),
  138. OnNext(580, ""),
  139. OnError<string>(600, ex)
  140. );
  141. xs.Subscriptions.AssertEqual(
  142. Subscribe(200, 600)
  143. );
  144. }
  145. [Fact]
  146. public void BufferWithTime_Disposed()
  147. {
  148. var scheduler = new TestScheduler();
  149. var xs = scheduler.CreateHotObservable(
  150. OnNext(100, 1),
  151. OnNext(210, 2),
  152. OnNext(240, 3),
  153. OnNext(280, 4),
  154. OnNext(320, 5),
  155. OnNext(350, 6),
  156. OnNext(380, 7),
  157. OnNext(420, 8),
  158. OnNext(470, 9),
  159. OnCompleted<int>(600)
  160. );
  161. var res = scheduler.Start(() =>
  162. xs.Buffer(TimeSpan.FromTicks(100), TimeSpan.FromTicks(70), scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray())),
  163. 370
  164. );
  165. res.Messages.AssertEqual(
  166. OnNext(300, "2,3,4")
  167. );
  168. xs.Subscriptions.AssertEqual(
  169. Subscribe(200, 370)
  170. );
  171. }
  172. [Fact]
  173. public void BufferWithTime_Basic_Same()
  174. {
  175. var scheduler = new TestScheduler();
  176. var xs = scheduler.CreateHotObservable(
  177. OnNext(100, 1),
  178. OnNext(210, 2),
  179. OnNext(240, 3),
  180. OnNext(280, 4),
  181. OnNext(320, 5),
  182. OnNext(350, 6),
  183. OnNext(380, 7),
  184. OnNext(420, 8),
  185. OnNext(470, 9),
  186. OnCompleted<int>(600)
  187. );
  188. var res = scheduler.Start(() =>
  189. xs.Buffer(TimeSpan.FromTicks(100), scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray()))
  190. );
  191. res.Messages.AssertEqual(
  192. OnNext(300, "2,3,4"),
  193. OnNext(400, "5,6,7"),
  194. OnNext(500, "8,9"),
  195. OnNext(600, ""),
  196. OnCompleted<string>(600)
  197. );
  198. xs.Subscriptions.AssertEqual(
  199. Subscribe(200, 600)
  200. );
  201. }
  202. [Fact]
  203. public void BufferWithTime_Basic_Same_Periodic()
  204. {
  205. var scheduler = new PeriodicTestScheduler();
  206. var xs = scheduler.CreateHotObservable(
  207. OnNext(100, 1),
  208. OnNext(210, 2),
  209. OnNext(240, 3),
  210. OnNext(280, 4),
  211. OnNext(320, 5),
  212. OnNext(350, 6),
  213. OnNext(380, 7),
  214. OnNext(420, 8),
  215. OnNext(470, 9),
  216. OnCompleted<int>(600)
  217. );
  218. var res = scheduler.Start(() =>
  219. xs.Buffer(TimeSpan.FromTicks(100), scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray()))
  220. );
  221. res.Messages.AssertEqual(
  222. OnNext(300, "2,3,4"),
  223. OnNext(400, "5,6,7"),
  224. OnNext(500, "8,9"),
  225. OnNext(600, ""),
  226. OnCompleted<string>(600)
  227. );
  228. xs.Subscriptions.AssertEqual(
  229. Subscribe(200, 600)
  230. );
  231. #if !WINDOWS
  232. scheduler.Timers.AssertEqual(
  233. new TimerRun(200, 600) { 300, 400, 500 }
  234. );
  235. #endif
  236. }
  237. [Fact]
  238. public void BufferWithTime_Basic_Same_Periodic_Error()
  239. {
  240. var ex = new Exception();
  241. var scheduler = new PeriodicTestScheduler();
  242. var xs = scheduler.CreateHotObservable(
  243. OnNext(100, 1),
  244. OnNext(210, 2),
  245. OnNext(240, 3),
  246. OnNext(280, 4),
  247. OnNext(320, 5),
  248. OnNext(350, 6),
  249. OnNext(380, 7),
  250. OnNext(420, 8),
  251. OnNext(470, 9),
  252. OnError<int>(480, ex)
  253. );
  254. var res = scheduler.Start(() =>
  255. xs.Buffer(TimeSpan.FromTicks(100), scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray()))
  256. );
  257. res.Messages.AssertEqual(
  258. OnNext(300, "2,3,4"),
  259. OnNext(400, "5,6,7"),
  260. OnError<string>(480, ex)
  261. );
  262. xs.Subscriptions.AssertEqual(
  263. Subscribe(200, 480)
  264. );
  265. #if !WINDOWS
  266. scheduler.Timers.AssertEqual(
  267. new TimerRun(200, 480) { 300, 400 }
  268. );
  269. #endif
  270. }
  271. [Fact]
  272. public void BufferWithTime_Default()
  273. {
  274. Observable.Range(0, 10).Buffer(TimeSpan.FromDays(1), TimeSpan.FromDays(1)).First().AssertEqual(Enumerable.Range(0, 10));
  275. Observable.Range(0, 10).Buffer(TimeSpan.FromDays(1)).First().AssertEqual(Enumerable.Range(0, 10));
  276. }
  277. [Fact]
  278. public void BufferWithTimeOrCount_ArgumentChecking()
  279. {
  280. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), TimeSpan.FromTicks(1), 1, DummyScheduler.Instance));
  281. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(-1), 1, DummyScheduler.Instance));
  282. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(1), 0, DummyScheduler.Instance));
  283. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(1), 1, null));
  284. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), TimeSpan.FromTicks(1), 1));
  285. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(-1), 1));
  286. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(1), 0));
  287. }
  288. [Fact]
  289. public void BufferWithTimeOrCount_Basic()
  290. {
  291. var scheduler = new TestScheduler();
  292. var xs = scheduler.CreateHotObservable(
  293. OnNext(205, 1),
  294. OnNext(210, 2),
  295. OnNext(240, 3),
  296. OnNext(280, 4),
  297. OnNext(320, 5),
  298. OnNext(350, 6),
  299. OnNext(370, 7),
  300. OnNext(420, 8),
  301. OnNext(470, 9),
  302. OnCompleted<int>(600)
  303. );
  304. var res = scheduler.Start(() =>
  305. xs.Buffer(TimeSpan.FromTicks(70), 3, scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray()))
  306. );
  307. res.Messages.AssertEqual(
  308. OnNext(240, "1,2,3"),
  309. OnNext(310, "4"),
  310. OnNext(370, "5,6,7"),
  311. OnNext(440, "8"),
  312. OnNext(510, "9"),
  313. OnNext(580, ""),
  314. OnNext(600, ""),
  315. OnCompleted<string>(600)
  316. );
  317. xs.Subscriptions.AssertEqual(
  318. Subscribe(200, 600)
  319. );
  320. }
  321. [Fact]
  322. public void BufferWithTimeOrCount_Error()
  323. {
  324. var scheduler = new TestScheduler();
  325. var ex = new Exception();
  326. var xs = scheduler.CreateHotObservable(
  327. OnNext(205, 1),
  328. OnNext(210, 2),
  329. OnNext(240, 3),
  330. OnNext(280, 4),
  331. OnNext(320, 5),
  332. OnNext(350, 6),
  333. OnNext(370, 7),
  334. OnNext(420, 8),
  335. OnNext(470, 9),
  336. OnError<int>(600, ex)
  337. );
  338. var res = scheduler.Start(() =>
  339. xs.Buffer(TimeSpan.FromTicks(70), 3, scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray()))
  340. );
  341. res.Messages.AssertEqual(
  342. OnNext(240, "1,2,3"),
  343. OnNext(310, "4"),
  344. OnNext(370, "5,6,7"),
  345. OnNext(440, "8"),
  346. OnNext(510, "9"),
  347. OnNext(580, ""),
  348. OnError<string>(600, ex)
  349. );
  350. xs.Subscriptions.AssertEqual(
  351. Subscribe(200, 600)
  352. );
  353. }
  354. [Fact]
  355. public void BufferWithTimeOrCount_Disposed()
  356. {
  357. var scheduler = new TestScheduler();
  358. var xs = scheduler.CreateHotObservable(
  359. OnNext(205, 1),
  360. OnNext(210, 2),
  361. OnNext(240, 3),
  362. OnNext(280, 4),
  363. OnNext(320, 5),
  364. OnNext(350, 6),
  365. OnNext(370, 7),
  366. OnNext(420, 8),
  367. OnNext(470, 9),
  368. OnCompleted<int>(600)
  369. );
  370. var res = scheduler.Start(() =>
  371. xs.Buffer(TimeSpan.FromTicks(70), 3, scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray())),
  372. 370
  373. );
  374. res.Messages.AssertEqual(
  375. OnNext(240, "1,2,3"),
  376. OnNext(310, "4"),
  377. OnNext(370, "5,6,7")
  378. );
  379. xs.Subscriptions.AssertEqual(
  380. Subscribe(200, 370)
  381. );
  382. }
  383. [Fact]
  384. public void BufferWithTimeOrCount_Default()
  385. {
  386. Observable.Range(1, 10, DefaultScheduler.Instance).Buffer(TimeSpan.FromDays(1), 3).Skip(1).First().AssertEqual(4, 5, 6);
  387. }
  388. #endregion
  389. #region + Delay +
  390. [Fact]
  391. public void Delay_ArgumentChecking()
  392. {
  393. var scheduler = new TestScheduler();
  394. var someObservable = Observable.Empty<int>();
  395. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Delay(default(IObservable<int>), DateTimeOffset.Now));
  396. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Delay(default(IObservable<int>), TimeSpan.Zero));
  397. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Delay(someObservable, TimeSpan.FromSeconds(-1)));
  398. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Delay(default(IObservable<int>), DateTimeOffset.Now, scheduler));
  399. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Delay(default(IObservable<int>), TimeSpan.Zero, scheduler));
  400. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Delay(someObservable, DateTimeOffset.Now, null));
  401. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Delay(someObservable, TimeSpan.Zero, null));
  402. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Delay(someObservable, TimeSpan.FromSeconds(-1), scheduler));
  403. }
  404. [Fact]
  405. public void Delay_TimeSpan_Simple1()
  406. {
  407. Delay_TimeSpan_Simple1_Impl(false);
  408. }
  409. [Fact]
  410. public void Delay_TimeSpan_Simple1_Stopwatch()
  411. {
  412. Delay_TimeSpan_Simple1_Impl(true);
  413. }
  414. private void Delay_TimeSpan_Simple1_Impl(bool useStopwatch)
  415. {
  416. var scheduler = new TestScheduler();
  417. var xs = scheduler.CreateHotObservable(
  418. OnNext(150, 1),
  419. OnNext(250, 2),
  420. OnNext(350, 3),
  421. OnNext(450, 4),
  422. OnCompleted<int>(550)
  423. );
  424. var res = scheduler.Start(() =>
  425. xs.Delay(TimeSpan.FromTicks(100), useStopwatch ? scheduler : scheduler.DisableOptimizations())
  426. );
  427. res.Messages.AssertEqual(
  428. OnNext(350, 2),
  429. OnNext(450, 3),
  430. OnNext(550, 4),
  431. OnCompleted<int>(650)
  432. );
  433. xs.Subscriptions.AssertEqual(
  434. Subscribe(200, 550)
  435. );
  436. }
  437. [Fact]
  438. public void Delay_DateTimeOffset_Simple1()
  439. {
  440. Delay_DateTimeOffset_Simple1_Impl(false);
  441. }
  442. [Fact]
  443. public void Delay_DateTimeOffset_Simple1_Stopwatch()
  444. {
  445. Delay_DateTimeOffset_Simple1_Impl(true);
  446. }
  447. private void Delay_DateTimeOffset_Simple1_Impl(bool useStopwatch)
  448. {
  449. var scheduler = new TestScheduler();
  450. var xs = scheduler.CreateHotObservable(
  451. OnNext(150, 1),
  452. OnNext(250, 2),
  453. OnNext(350, 3),
  454. OnNext(450, 4),
  455. OnCompleted<int>(550)
  456. );
  457. var res = scheduler.Start(() =>
  458. xs.Delay(new DateTimeOffset(300, TimeSpan.Zero), useStopwatch ? scheduler : scheduler.DisableOptimizations())
  459. );
  460. res.Messages.AssertEqual(
  461. OnNext(350, 2),
  462. OnNext(450, 3),
  463. OnNext(550, 4),
  464. OnCompleted<int>(650)
  465. );
  466. xs.Subscriptions.AssertEqual(
  467. Subscribe(200, 550)
  468. );
  469. }
  470. [Fact]
  471. public void Delay_TimeSpan_Simple2()
  472. {
  473. Delay_TimeSpan_Simple2_Impl(false);
  474. }
  475. [Fact]
  476. public void Delay_TimeSpan_Simple2_Stopwatch()
  477. {
  478. Delay_TimeSpan_Simple2_Impl(true);
  479. }
  480. private void Delay_TimeSpan_Simple2_Impl(bool useStopwatch)
  481. {
  482. var scheduler = new TestScheduler();
  483. var xs = scheduler.CreateHotObservable(
  484. OnNext(150, 1),
  485. OnNext(250, 2),
  486. OnNext(350, 3),
  487. OnNext(450, 4),
  488. OnCompleted<int>(550)
  489. );
  490. var res = scheduler.Start(() =>
  491. xs.Delay(TimeSpan.FromTicks(50), useStopwatch ? scheduler : scheduler.DisableOptimizations())
  492. );
  493. res.Messages.AssertEqual(
  494. OnNext(300, 2),
  495. OnNext(400, 3),
  496. OnNext(500, 4),
  497. OnCompleted<int>(600)
  498. );
  499. xs.Subscriptions.AssertEqual(
  500. Subscribe(200, 550)
  501. );
  502. }
  503. [Fact]
  504. public void Delay_DateTimeOffset_Simple2()
  505. {
  506. Delay_DateTimeOffset_Simple2_Impl(false);
  507. }
  508. [Fact]
  509. public void Delay_DateTimeOffset_Simple2_Stopwatch()
  510. {
  511. Delay_DateTimeOffset_Simple2_Impl(true);
  512. }
  513. private void Delay_DateTimeOffset_Simple2_Impl(bool useStopwatch)
  514. {
  515. var scheduler = new TestScheduler();
  516. var xs = scheduler.CreateHotObservable(
  517. OnNext(150, 1),
  518. OnNext(250, 2),
  519. OnNext(350, 3),
  520. OnNext(450, 4),
  521. OnCompleted<int>(550)
  522. );
  523. var res = scheduler.Start(() =>
  524. xs.Delay(new DateTimeOffset(250, TimeSpan.Zero), useStopwatch ? scheduler : scheduler.DisableOptimizations())
  525. );
  526. res.Messages.AssertEqual(
  527. OnNext(300, 2),
  528. OnNext(400, 3),
  529. OnNext(500, 4),
  530. OnCompleted<int>(600)
  531. );
  532. xs.Subscriptions.AssertEqual(
  533. Subscribe(200, 550)
  534. );
  535. }
  536. [Fact]
  537. public void Delay_TimeSpan_Simple3()
  538. {
  539. Delay_TimeSpan_Simple3_Impl(false);
  540. }
  541. [Fact]
  542. public void Delay_TimeSpan_Simple3_Stopwatch()
  543. {
  544. Delay_TimeSpan_Simple3_Impl(true);
  545. }
  546. private void Delay_TimeSpan_Simple3_Impl(bool useStopwatch)
  547. {
  548. var scheduler = new TestScheduler();
  549. var xs = scheduler.CreateHotObservable(
  550. OnNext(150, 1),
  551. OnNext(250, 2),
  552. OnNext(350, 3),
  553. OnNext(450, 4),
  554. OnCompleted<int>(550)
  555. );
  556. var res = scheduler.Start(() =>
  557. xs.Delay(TimeSpan.FromTicks(150), useStopwatch ? scheduler : scheduler.DisableOptimizations())
  558. );
  559. res.Messages.AssertEqual(
  560. OnNext(400, 2),
  561. OnNext(500, 3),
  562. OnNext(600, 4),
  563. OnCompleted<int>(700)
  564. );
  565. xs.Subscriptions.AssertEqual(
  566. Subscribe(200, 550)
  567. );
  568. }
  569. [Fact]
  570. public void Delay_DateTimeOffset_Simple3()
  571. {
  572. Delay_DateTimeOffset_Simple3_Impl(false);
  573. }
  574. [Fact]
  575. public void Delay_DateTimeOffset_Simple3_Stopwatch()
  576. {
  577. Delay_DateTimeOffset_Simple3_Impl(true);
  578. }
  579. private void Delay_DateTimeOffset_Simple3_Impl(bool useStopwatch)
  580. {
  581. var scheduler = new TestScheduler();
  582. var xs = scheduler.CreateHotObservable(
  583. OnNext(150, 1),
  584. OnNext(250, 2),
  585. OnNext(350, 3),
  586. OnNext(450, 4),
  587. OnCompleted<int>(550)
  588. );
  589. var res = scheduler.Start(() =>
  590. xs.Delay(new DateTimeOffset(350, TimeSpan.Zero), useStopwatch ? scheduler : scheduler.DisableOptimizations())
  591. );
  592. res.Messages.AssertEqual(
  593. OnNext(400, 2),
  594. OnNext(500, 3),
  595. OnNext(600, 4),
  596. OnCompleted<int>(700)
  597. );
  598. xs.Subscriptions.AssertEqual(
  599. Subscribe(200, 550)
  600. );
  601. }
  602. [Fact]
  603. public void Delay_TimeSpan_Error1()
  604. {
  605. Delay_TimeSpan_Error1_Impl(false);
  606. }
  607. [Fact]
  608. public void Delay_TimeSpan_Error1_Stopwatch()
  609. {
  610. Delay_TimeSpan_Error1_Impl(true);
  611. }
  612. private void Delay_TimeSpan_Error1_Impl(bool useStopwatch)
  613. {
  614. var scheduler = new TestScheduler();
  615. var ex = new Exception();
  616. var xs = scheduler.CreateHotObservable(
  617. OnNext(150, 1),
  618. OnNext(250, 2),
  619. OnNext(350, 3),
  620. OnNext(450, 4),
  621. OnError<int>(550, ex)
  622. );
  623. var res = scheduler.Start(() =>
  624. xs.Delay(TimeSpan.FromTicks(50), useStopwatch ? scheduler : scheduler.DisableOptimizations())
  625. );
  626. res.Messages.AssertEqual(
  627. OnNext(300, 2),
  628. OnNext(400, 3),
  629. OnNext(500, 4),
  630. OnError<int>(550, ex)
  631. );
  632. xs.Subscriptions.AssertEqual(
  633. Subscribe(200, 550)
  634. );
  635. }
  636. [Fact]
  637. public void Delay_DateTimeOffset_Error1()
  638. {
  639. Delay_DateTimeOffset_Error1_Impl(false);
  640. }
  641. [Fact]
  642. public void Delay_DateTimeOffset_Error1_Stopwatch()
  643. {
  644. Delay_DateTimeOffset_Error1_Impl(true);
  645. }
  646. private void Delay_DateTimeOffset_Error1_Impl(bool useStopwatch)
  647. {
  648. var scheduler = new TestScheduler();
  649. var ex = new Exception();
  650. var xs = scheduler.CreateHotObservable(
  651. OnNext(150, 1),
  652. OnNext(250, 2),
  653. OnNext(350, 3),
  654. OnNext(450, 4),
  655. OnError<int>(550, ex)
  656. );
  657. var res = scheduler.Start(() =>
  658. xs.Delay(new DateTimeOffset(250, TimeSpan.Zero), useStopwatch ? scheduler : scheduler.DisableOptimizations())
  659. );
  660. res.Messages.AssertEqual(
  661. OnNext(300, 2),
  662. OnNext(400, 3),
  663. OnNext(500, 4),
  664. OnError<int>(550, ex)
  665. );
  666. xs.Subscriptions.AssertEqual(
  667. Subscribe(200, 550)
  668. );
  669. }
  670. [Fact]
  671. public void Delay_TimeSpan_Error2()
  672. {
  673. Delay_TimeSpan_Error2_Impl(false);
  674. }
  675. [Fact]
  676. public void Delay_TimeSpan_Error2_Stopwatch()
  677. {
  678. Delay_TimeSpan_Error2_Impl(true);
  679. }
  680. private void Delay_TimeSpan_Error2_Impl(bool useStopwatch)
  681. {
  682. var scheduler = new TestScheduler();
  683. var ex = new Exception();
  684. var xs = scheduler.CreateHotObservable(
  685. OnNext(150, 1),
  686. OnNext(250, 2),
  687. OnNext(350, 3),
  688. OnNext(450, 4),
  689. OnError<int>(550, ex)
  690. );
  691. var res = scheduler.Start(() =>
  692. xs.Delay(TimeSpan.FromTicks(150), useStopwatch ? scheduler : scheduler.DisableOptimizations())
  693. );
  694. res.Messages.AssertEqual(
  695. OnNext(400, 2),
  696. OnNext(500, 3),
  697. OnError<int>(550, ex)
  698. );
  699. xs.Subscriptions.AssertEqual(
  700. Subscribe(200, 550)
  701. );
  702. }
  703. [Fact]
  704. public void Delay_DateTimeOffset_Error2()
  705. {
  706. Delay_DateTimeOffset_Error2_Impl(false);
  707. }
  708. [Fact]
  709. public void Delay_DateTimeOffset_Error2_Stopwatch()
  710. {
  711. Delay_DateTimeOffset_Error2_Impl(true);
  712. }
  713. private void Delay_DateTimeOffset_Error2_Impl(bool useStopwatch)
  714. {
  715. var scheduler = new TestScheduler();
  716. var ex = new Exception();
  717. var xs = scheduler.CreateHotObservable(
  718. OnNext(150, 1),
  719. OnNext(250, 2),
  720. OnNext(350, 3),
  721. OnNext(450, 4),
  722. OnError<int>(550, ex)
  723. );
  724. var res = scheduler.Start(() =>
  725. xs.Delay(new DateTimeOffset(350, TimeSpan.Zero), useStopwatch ? scheduler : scheduler.DisableOptimizations())
  726. );
  727. res.Messages.AssertEqual(
  728. OnNext(400, 2),
  729. OnNext(500, 3),
  730. OnError<int>(550, ex)
  731. );
  732. xs.Subscriptions.AssertEqual(
  733. Subscribe(200, 550)
  734. );
  735. }
  736. #if !NO_THREAD
  737. [Fact]
  738. public void Delay_TimeSpan_Real_Simple1()
  739. {
  740. Delay_TimeSpan_Real_Simple1_Impl(ThreadPoolScheduler.Instance.DisableOptimizations());
  741. }
  742. [Fact]
  743. public void Delay_TimeSpan_Real_Simple1_Stopwatch()
  744. {
  745. Delay_TimeSpan_Real_Simple1_Impl(ThreadPoolScheduler.Instance);
  746. }
  747. #endif
  748. private void Delay_TimeSpan_Real_Simple1_Impl(IScheduler scheduler)
  749. {
  750. var s = new Subject<int>();
  751. var res = s.Delay(TimeSpan.FromMilliseconds(10), scheduler);
  752. var lst = new List<int>();
  753. var e = new ManualResetEvent(false);
  754. res.Subscribe(lst.Add, () => e.Set());
  755. Task.Run(() =>
  756. {
  757. s.OnNext(1);
  758. s.OnNext(2);
  759. s.OnNext(3);
  760. s.OnCompleted();
  761. });
  762. e.WaitOne();
  763. Assert.True(new[] { 1, 2, 3 }.SequenceEqual(lst));
  764. }
  765. #if !NO_THREAD
  766. [Fact]
  767. public void Delay_TimeSpan_Real_Error1()
  768. {
  769. Delay_TimeSpan_Real_Error1_Impl(ThreadPoolScheduler.Instance.DisableOptimizations());
  770. }
  771. [Fact]
  772. public void Delay_TimeSpan_Real_Error1_Stopwatch()
  773. {
  774. Delay_TimeSpan_Real_Error1_Impl(ThreadPoolScheduler.Instance);
  775. }
  776. #endif
  777. private void Delay_TimeSpan_Real_Error1_Impl(IScheduler scheduler)
  778. {
  779. var ex = new Exception();
  780. var s = new Subject<int>();
  781. var res = s.Delay(TimeSpan.FromMilliseconds(10), scheduler);
  782. var e = new ManualResetEvent(false);
  783. var err = default(Exception);
  784. res.Subscribe(_ => { }, ex_ => { err = ex_; e.Set(); });
  785. Task.Run(() =>
  786. {
  787. s.OnNext(1);
  788. s.OnNext(2);
  789. s.OnNext(3);
  790. s.OnError(ex);
  791. });
  792. e.WaitOne();
  793. Assert.Same(ex, err);
  794. }
  795. #if !NO_THREAD
  796. [Fact]
  797. public void Delay_TimeSpan_Real_Error2()
  798. {
  799. Delay_TimeSpan_Real_Error2_Impl(ThreadPoolScheduler.Instance.DisableOptimizations());
  800. }
  801. [Fact]
  802. public void Delay_TimeSpan_Real_Error2_Stopwatch()
  803. {
  804. Delay_TimeSpan_Real_Error2_Impl(ThreadPoolScheduler.Instance);
  805. }
  806. #endif
  807. private void Delay_TimeSpan_Real_Error2_Impl(IScheduler scheduler)
  808. {
  809. var ex = new Exception();
  810. var s = new Subject<int>();
  811. var res = s.Delay(TimeSpan.FromMilliseconds(10), scheduler);
  812. var next = new ManualResetEvent(false);
  813. var e = new ManualResetEvent(false);
  814. var err = default(Exception);
  815. res.Subscribe(_ => { next.Set(); }, ex_ => { err = ex_; e.Set(); });
  816. Task.Run(() =>
  817. {
  818. s.OnNext(1);
  819. next.WaitOne();
  820. s.OnError(ex);
  821. });
  822. e.WaitOne();
  823. Assert.Same(ex, err);
  824. }
  825. #if !NO_THREAD
  826. [Fact]
  827. public void Delay_TimeSpan_Real_Error3()
  828. {
  829. Delay_TimeSpan_Real_Error3_Impl(ThreadPoolScheduler.Instance.DisableOptimizations());
  830. }
  831. [Fact]
  832. public void Delay_TimeSpan_Real_Error3_Stopwatch()
  833. {
  834. Delay_TimeSpan_Real_Error3_Impl(ThreadPoolScheduler.Instance);
  835. }
  836. #endif
  837. private void Delay_TimeSpan_Real_Error3_Impl(IScheduler scheduler)
  838. {
  839. var ex = new Exception();
  840. var s = new Subject<int>();
  841. var res = s.Delay(TimeSpan.FromMilliseconds(10), scheduler);
  842. var next = new ManualResetEvent(false);
  843. var ack = new ManualResetEvent(false);
  844. var e = new ManualResetEvent(false);
  845. var err = default(Exception);
  846. res.Subscribe(_ => { next.Set(); ack.WaitOne(); }, ex_ => { err = ex_; e.Set(); });
  847. Task.Run(() =>
  848. {
  849. s.OnNext(1);
  850. next.WaitOne();
  851. s.OnError(ex);
  852. ack.Set();
  853. });
  854. e.WaitOne();
  855. Assert.Same(ex, err);
  856. }
  857. [Fact]
  858. public void Delay_TimeSpan_Positive()
  859. {
  860. var scheduler = new TestScheduler();
  861. var msgs = new[] {
  862. OnNext(150, 1),
  863. OnNext(250, 2),
  864. OnNext(350, 3),
  865. OnNext(450, 4),
  866. OnCompleted<int>(550)
  867. };
  868. var xs = scheduler.CreateHotObservable(msgs);
  869. const ushort delay = 42;
  870. var res = scheduler.Start(() =>
  871. xs.Delay(TimeSpan.FromTicks(delay), scheduler)
  872. );
  873. var expected = from n in msgs
  874. where n.Time > ObservableTest.Subscribed
  875. select new Recorded<Notification<int>>((ushort)(n.Time + delay), n.Value);
  876. res.Messages.AssertEqual(expected);
  877. }
  878. [Fact]
  879. public void Delay_Empty()
  880. {
  881. var scheduler = new TestScheduler();
  882. var xs = scheduler.CreateHotObservable(
  883. OnNext(150, 1),
  884. OnCompleted<int>(550)
  885. );
  886. var res = scheduler.Start(() =>
  887. xs.Delay(TimeSpan.FromTicks(10), scheduler)
  888. );
  889. res.Messages.AssertEqual(
  890. OnCompleted<int>(560)
  891. );
  892. xs.Subscriptions.AssertEqual(
  893. Subscribe(200, 550)
  894. );
  895. }
  896. [Fact]
  897. public void Delay_Error()
  898. {
  899. var scheduler = new TestScheduler();
  900. var ex = new Exception();
  901. var xs = scheduler.CreateHotObservable(
  902. OnNext(150, 1),
  903. OnError<int>(550, ex)
  904. );
  905. var res = scheduler.Start(() =>
  906. xs.Delay(TimeSpan.FromTicks(10), scheduler)
  907. );
  908. res.Messages.AssertEqual(
  909. OnError<int>(550, ex)
  910. );
  911. xs.Subscriptions.AssertEqual(
  912. Subscribe(200, 550)
  913. );
  914. }
  915. [Fact]
  916. public void Delay_Never()
  917. {
  918. var scheduler = new TestScheduler();
  919. var xs = scheduler.CreateHotObservable(
  920. OnNext(150, 1)
  921. );
  922. var res = scheduler.Start(() =>
  923. xs.Delay(TimeSpan.FromTicks(10), scheduler)
  924. );
  925. res.Messages.AssertEqual(
  926. );
  927. xs.Subscriptions.AssertEqual(
  928. Subscribe(200, 1000)
  929. );
  930. }
  931. [Fact]
  932. public void Delay_TimeSpan_DefaultScheduler()
  933. {
  934. Assert.True(Observable.Return(1).Delay(TimeSpan.FromMilliseconds(1)).ToEnumerable().SequenceEqual(new[] { 1 }));
  935. }
  936. [Fact]
  937. public void Delay_DateTimeOffset_DefaultScheduler()
  938. {
  939. Assert.True(Observable.Return(1).Delay(DateTimeOffset.UtcNow + TimeSpan.FromMilliseconds(1)).ToEnumerable().SequenceEqual(new[] { 1 }));
  940. }
  941. [Fact]
  942. public void Delay_CrossingMessages()
  943. {
  944. var lst = new List<int>();
  945. var evt = new ManualResetEvent(false);
  946. var s = new Subject<int>();
  947. s.Delay(TimeSpan.FromSeconds(0.01)).Subscribe(x =>
  948. {
  949. lst.Add(x);
  950. if (x < 9)
  951. s.OnNext(x + 1);
  952. else
  953. s.OnCompleted();
  954. }, () =>
  955. {
  956. evt.Set();
  957. });
  958. s.OnNext(0);
  959. evt.WaitOne();
  960. Assert.True(Enumerable.Range(0, 10).SequenceEqual(lst));
  961. }
  962. [Fact]
  963. public void Delay_Duration_ArgumentChecking()
  964. {
  965. var someObservable = DummyObservable<int>.Instance;
  966. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Delay(default(IObservable<int>), x => someObservable));
  967. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Delay(someObservable, default(Func<int, IObservable<int>>)));
  968. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Delay(default(IObservable<int>), someObservable, x => someObservable));
  969. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Delay(someObservable, default(IObservable<int>), x => someObservable));
  970. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Delay(someObservable, someObservable, default(Func<int, IObservable<int>>)));
  971. }
  972. [Fact]
  973. public void Delay_Duration_Simple1()
  974. {
  975. var scheduler = new TestScheduler();
  976. var xs = scheduler.CreateHotObservable(
  977. OnNext(150, 1),
  978. OnNext(210, 10),
  979. OnNext(220, 30),
  980. OnNext(230, 50),
  981. OnNext(240, 35),
  982. OnNext(250, 20),
  983. OnCompleted<int>(260)
  984. );
  985. var res = scheduler.Start(() =>
  986. xs.Delay(x => scheduler.CreateColdObservable(new[] { OnNext(x, "!") }))
  987. );
  988. res.Messages.AssertEqual(
  989. OnNext(210 + 10, 10),
  990. OnNext(220 + 30, 30),
  991. OnNext(250 + 20, 20),
  992. OnNext(240 + 35, 35),
  993. OnNext(230 + 50, 50),
  994. OnCompleted<int>(280)
  995. );
  996. xs.Subscriptions.AssertEqual(
  997. Subscribe(200, 260)
  998. );
  999. }
  1000. [Fact]
  1001. public void Delay_Duration_Simple2()
  1002. {
  1003. var scheduler = new TestScheduler();
  1004. var xs = scheduler.CreateHotObservable(
  1005. OnNext(150, 1),
  1006. OnNext(210, 2),
  1007. OnNext(220, 3),
  1008. OnNext(230, 4),
  1009. OnNext(240, 5),
  1010. OnNext(250, 6),
  1011. OnCompleted<int>(300)
  1012. );
  1013. var ys = scheduler.CreateColdObservable(
  1014. OnNext(10, "!")
  1015. );
  1016. var res = scheduler.Start(() =>
  1017. xs.Delay(_ => ys)
  1018. );
  1019. res.Messages.AssertEqual(
  1020. OnNext(210 + 10, 2),
  1021. OnNext(220 + 10, 3),
  1022. OnNext(230 + 10, 4),
  1023. OnNext(240 + 10, 5),
  1024. OnNext(250 + 10, 6),
  1025. OnCompleted<int>(300)
  1026. );
  1027. xs.Subscriptions.AssertEqual(
  1028. Subscribe(200, 300)
  1029. );
  1030. ys.Subscriptions.AssertEqual(
  1031. Subscribe(210, 220),
  1032. Subscribe(220, 230),
  1033. Subscribe(230, 240),
  1034. Subscribe(240, 250),
  1035. Subscribe(250, 260)
  1036. );
  1037. }
  1038. [Fact]
  1039. public void Delay_Duration_Simple3()
  1040. {
  1041. var scheduler = new TestScheduler();
  1042. var xs = scheduler.CreateHotObservable(
  1043. OnNext(150, 1),
  1044. OnNext(210, 2),
  1045. OnNext(220, 3),
  1046. OnNext(230, 4),
  1047. OnNext(240, 5),
  1048. OnNext(250, 6),
  1049. OnCompleted<int>(300)
  1050. );
  1051. var ys = scheduler.CreateColdObservable(
  1052. OnNext(100, "!")
  1053. );
  1054. var res = scheduler.Start(() =>
  1055. xs.Delay(_ => ys)
  1056. );
  1057. res.Messages.AssertEqual(
  1058. OnNext(210 + 100, 2),
  1059. OnNext(220 + 100, 3),
  1060. OnNext(230 + 100, 4),
  1061. OnNext(240 + 100, 5),
  1062. OnNext(250 + 100, 6),
  1063. OnCompleted<int>(350)
  1064. );
  1065. xs.Subscriptions.AssertEqual(
  1066. Subscribe(200, 300)
  1067. );
  1068. ys.Subscriptions.AssertEqual(
  1069. Subscribe(210, 310),
  1070. Subscribe(220, 320),
  1071. Subscribe(230, 330),
  1072. Subscribe(240, 340),
  1073. Subscribe(250, 350)
  1074. );
  1075. }
  1076. [Fact]
  1077. public void Delay_Duration_Simple4_InnerEmpty()
  1078. {
  1079. var scheduler = new TestScheduler();
  1080. var xs = scheduler.CreateHotObservable(
  1081. OnNext(150, 1),
  1082. OnNext(210, 2),
  1083. OnNext(220, 3),
  1084. OnNext(230, 4),
  1085. OnNext(240, 5),
  1086. OnNext(250, 6),
  1087. OnCompleted<int>(300)
  1088. );
  1089. var ys = scheduler.CreateColdObservable(
  1090. OnCompleted<int>(100)
  1091. );
  1092. var res = scheduler.Start(() =>
  1093. xs.Delay(_ => ys)
  1094. );
  1095. res.Messages.AssertEqual(
  1096. OnNext(210 + 100, 2),
  1097. OnNext(220 + 100, 3),
  1098. OnNext(230 + 100, 4),
  1099. OnNext(240 + 100, 5),
  1100. OnNext(250 + 100, 6),
  1101. OnCompleted<int>(350)
  1102. );
  1103. xs.Subscriptions.AssertEqual(
  1104. Subscribe(200, 300)
  1105. );
  1106. ys.Subscriptions.AssertEqual(
  1107. Subscribe(210, 310),
  1108. Subscribe(220, 320),
  1109. Subscribe(230, 330),
  1110. Subscribe(240, 340),
  1111. Subscribe(250, 350)
  1112. );
  1113. }
  1114. [Fact]
  1115. public void Delay_Duration_Dispose1()
  1116. {
  1117. var scheduler = new TestScheduler();
  1118. var xs = scheduler.CreateHotObservable(
  1119. OnNext(150, 1),
  1120. OnNext(210, 2),
  1121. OnNext(220, 3),
  1122. OnNext(230, 4),
  1123. OnNext(240, 5),
  1124. OnNext(250, 6),
  1125. OnCompleted<int>(300)
  1126. );
  1127. var ys = scheduler.CreateColdObservable(
  1128. OnNext(200, "!")
  1129. );
  1130. var res = scheduler.Start(() =>
  1131. xs.Delay(_ => ys),
  1132. 425
  1133. );
  1134. res.Messages.AssertEqual(
  1135. OnNext(210 + 200, 2),
  1136. OnNext(220 + 200, 3)
  1137. );
  1138. xs.Subscriptions.AssertEqual(
  1139. Subscribe(200, 300)
  1140. );
  1141. ys.Subscriptions.AssertEqual(
  1142. Subscribe(210, 410),
  1143. Subscribe(220, 420),
  1144. Subscribe(230, 425),
  1145. Subscribe(240, 425),
  1146. Subscribe(250, 425)
  1147. );
  1148. }
  1149. [Fact]
  1150. public void Delay_Duration_Dispose2()
  1151. {
  1152. var scheduler = new TestScheduler();
  1153. var xs = scheduler.CreateHotObservable(
  1154. OnNext(150, 1),
  1155. OnNext(210, 2),
  1156. OnNext(400, 3),
  1157. OnCompleted<int>(500)
  1158. );
  1159. var ys = scheduler.CreateColdObservable(
  1160. OnNext(50, "!")
  1161. );
  1162. var res = scheduler.Start(() =>
  1163. xs.Delay(_ => ys),
  1164. 300
  1165. );
  1166. res.Messages.AssertEqual(
  1167. OnNext(210 + 50, 2)
  1168. );
  1169. xs.Subscriptions.AssertEqual(
  1170. Subscribe(200, 300)
  1171. );
  1172. ys.Subscriptions.AssertEqual(
  1173. Subscribe(210, 260)
  1174. );
  1175. }
  1176. [Fact]
  1177. public void Delay_Duration_OuterError1()
  1178. {
  1179. var scheduler = new TestScheduler();
  1180. var ex = new Exception();
  1181. var xs = scheduler.CreateHotObservable(
  1182. OnNext(150, 1),
  1183. OnNext(210, 2),
  1184. OnNext(220, 3),
  1185. OnNext(230, 4),
  1186. OnNext(240, 5),
  1187. OnNext(250, 6),
  1188. OnError<int>(300, ex)
  1189. );
  1190. var ys = scheduler.CreateColdObservable(
  1191. OnNext(100, "!")
  1192. );
  1193. var res = scheduler.Start(() =>
  1194. xs.Delay(_ => ys)
  1195. );
  1196. res.Messages.AssertEqual(
  1197. OnError<int>(300, ex)
  1198. );
  1199. xs.Subscriptions.AssertEqual(
  1200. Subscribe(200, 300)
  1201. );
  1202. ys.Subscriptions.AssertEqual(
  1203. Subscribe(210, 300),
  1204. Subscribe(220, 300),
  1205. Subscribe(230, 300),
  1206. Subscribe(240, 300),
  1207. Subscribe(250, 300)
  1208. );
  1209. }
  1210. [Fact]
  1211. public void Delay_Duration_OuterError2()
  1212. {
  1213. var scheduler = new TestScheduler();
  1214. var ex = new Exception();
  1215. var xs = scheduler.CreateHotObservable(
  1216. OnNext(150, 1),
  1217. OnNext(210, 2),
  1218. OnNext(220, 3),
  1219. OnNext(230, 4),
  1220. OnNext(240, 5),
  1221. OnNext(250, 6),
  1222. OnError<int>(300, ex)
  1223. );
  1224. var ys = scheduler.CreateColdObservable(
  1225. OnNext(10, "!")
  1226. );
  1227. var res = scheduler.Start(() =>
  1228. xs.Delay(_ => ys)
  1229. );
  1230. res.Messages.AssertEqual(
  1231. OnNext(210 + 10, 2),
  1232. OnNext(220 + 10, 3),
  1233. OnNext(230 + 10, 4),
  1234. OnNext(240 + 10, 5),
  1235. OnNext(250 + 10, 6),
  1236. OnError<int>(300, ex)
  1237. );
  1238. xs.Subscriptions.AssertEqual(
  1239. Subscribe(200, 300)
  1240. );
  1241. ys.Subscriptions.AssertEqual(
  1242. Subscribe(210, 220),
  1243. Subscribe(220, 230),
  1244. Subscribe(230, 240),
  1245. Subscribe(240, 250),
  1246. Subscribe(250, 260)
  1247. );
  1248. }
  1249. [Fact]
  1250. public void Delay_Duration_InnerError1()
  1251. {
  1252. var scheduler = new TestScheduler();
  1253. var ex = new Exception();
  1254. var xs = scheduler.CreateHotObservable(
  1255. OnNext(150, 1),
  1256. OnNext(210, 2),
  1257. OnNext(220, 3),
  1258. OnNext(230, 4),
  1259. OnNext(240, 5),
  1260. OnNext(250, 6),
  1261. OnCompleted<int>(300)
  1262. );
  1263. var ys = scheduler.CreateColdObservable(
  1264. OnNext(30, "!")
  1265. );
  1266. var zs = scheduler.CreateColdObservable(
  1267. OnError<string>(25, ex)
  1268. );
  1269. var res = scheduler.Start(() =>
  1270. xs.Delay(x => x != 5 ? ys : zs)
  1271. );
  1272. res.Messages.AssertEqual(
  1273. OnNext(210 + 30, 2),
  1274. OnNext(220 + 30, 3),
  1275. OnNext(230 + 30, 4),
  1276. OnError<int>(240 + 25, ex)
  1277. );
  1278. xs.Subscriptions.AssertEqual(
  1279. Subscribe(200, 265)
  1280. );
  1281. ys.Subscriptions.AssertEqual(
  1282. Subscribe(210, 240),
  1283. Subscribe(220, 250),
  1284. Subscribe(230, 260),
  1285. Subscribe(250, 265)
  1286. );
  1287. }
  1288. [Fact]
  1289. public void Delay_Duration_InnerError2()
  1290. {
  1291. var scheduler = new TestScheduler();
  1292. var ex = new Exception();
  1293. var xs = scheduler.CreateHotObservable(
  1294. OnNext(150, 1),
  1295. OnNext(210, 2),
  1296. OnNext(220, 3),
  1297. OnNext(230, 4),
  1298. OnNext(240, 5),
  1299. OnNext(250, 6),
  1300. OnCompleted<int>(300)
  1301. );
  1302. var ys = scheduler.CreateColdObservable(
  1303. OnError<string>(100, ex)
  1304. );
  1305. var res = scheduler.Start(() =>
  1306. xs.Delay(_ => ys)
  1307. );
  1308. res.Messages.AssertEqual(
  1309. OnError<int>(210 + 100, ex)
  1310. );
  1311. xs.Subscriptions.AssertEqual(
  1312. Subscribe(200, 300)
  1313. );
  1314. ys.Subscriptions.AssertEqual(
  1315. Subscribe(210, 310),
  1316. Subscribe(220, 310),
  1317. Subscribe(230, 310),
  1318. Subscribe(240, 310),
  1319. Subscribe(250, 310)
  1320. );
  1321. }
  1322. [Fact]
  1323. public void Delay_Duration_SelectorThrows1()
  1324. {
  1325. var scheduler = new TestScheduler();
  1326. var ex = new Exception();
  1327. var xs = scheduler.CreateHotObservable(
  1328. OnNext(150, 1),
  1329. OnNext(250, 2),
  1330. OnNext(300, 3),
  1331. OnNext(350, 4),
  1332. OnNext(400, 5),
  1333. OnNext(450, 6),
  1334. OnCompleted<int>(500)
  1335. );
  1336. var ys = scheduler.CreateColdObservable(
  1337. OnNext<string>(80, "")
  1338. );
  1339. var res = scheduler.Start(() =>
  1340. xs.Delay(x =>
  1341. {
  1342. if (x == 4)
  1343. throw ex;
  1344. return ys;
  1345. })
  1346. );
  1347. res.Messages.AssertEqual(
  1348. OnNext(330, 2),
  1349. OnError<int>(350, ex)
  1350. );
  1351. xs.Subscriptions.AssertEqual(
  1352. Subscribe(200, 350)
  1353. );
  1354. ys.Subscriptions.AssertEqual(
  1355. Subscribe(250, 330),
  1356. Subscribe(300, 350)
  1357. );
  1358. }
  1359. [Fact]
  1360. public void Delay_Duration_Simple()
  1361. {
  1362. var scheduler = new TestScheduler();
  1363. var xs = scheduler.CreateHotObservable(
  1364. OnNext(150, 1),
  1365. OnNext(250, 2),
  1366. OnNext(350, 3),
  1367. OnNext(450, 4),
  1368. OnCompleted<int>(550)
  1369. );
  1370. var res = scheduler.Start(() =>
  1371. xs.Delay(x =>
  1372. scheduler.CreateColdObservable(
  1373. OnNext(x * 10, "Ignore"),
  1374. OnNext(x * 10 + 5, "Aargh!")
  1375. )
  1376. )
  1377. );
  1378. res.Messages.AssertEqual(
  1379. OnNext<int>(250 + 2 * 10, 2),
  1380. OnNext<int>(350 + 3 * 10, 3),
  1381. OnNext<int>(450 + 4 * 10, 4),
  1382. OnCompleted<int>(550)
  1383. );
  1384. xs.Subscriptions.AssertEqual(
  1385. Subscribe(200, 550)
  1386. );
  1387. }
  1388. [Fact]
  1389. public void Delay_Duration_DeferOnCompleted()
  1390. {
  1391. var scheduler = new TestScheduler();
  1392. var xs = scheduler.CreateHotObservable(
  1393. OnNext(150, 1),
  1394. OnNext(250, 2),
  1395. OnNext(350, 3),
  1396. OnNext(450, 4),
  1397. OnCompleted<int>(451)
  1398. );
  1399. var res = scheduler.Start(() =>
  1400. xs.Delay(x =>
  1401. scheduler.CreateColdObservable(
  1402. OnNext(x * 10, "Ignore"),
  1403. OnNext(x * 10 + 5, "Aargh!")
  1404. )
  1405. )
  1406. );
  1407. res.Messages.AssertEqual(
  1408. OnNext<int>(250 + 2 * 10, 2),
  1409. OnNext<int>(350 + 3 * 10, 3),
  1410. OnNext<int>(450 + 4 * 10, 4),
  1411. OnCompleted<int>(450 + 4 * 10)
  1412. );
  1413. xs.Subscriptions.AssertEqual(
  1414. Subscribe(200, 451)
  1415. );
  1416. }
  1417. [Fact]
  1418. public void Delay_Duration_InnerError()
  1419. {
  1420. var scheduler = new TestScheduler();
  1421. var xs = scheduler.CreateHotObservable(
  1422. OnNext(150, 1),
  1423. OnNext(250, 2),
  1424. OnNext(350, 3),
  1425. OnNext(450, 4),
  1426. OnCompleted<int>(451)
  1427. );
  1428. var ex = new Exception();
  1429. var res = scheduler.Start(() =>
  1430. xs.Delay(x =>
  1431. x < 4 ? scheduler.CreateColdObservable(
  1432. OnNext(x * 10, "Ignore"),
  1433. OnNext(x * 10 + 5, "Aargh!")
  1434. )
  1435. : scheduler.CreateColdObservable(
  1436. OnError<string>(x * 10, ex)
  1437. )
  1438. )
  1439. );
  1440. res.Messages.AssertEqual(
  1441. OnNext<int>(250 + 2 * 10, 2),
  1442. OnNext<int>(350 + 3 * 10, 3),
  1443. OnError<int>(450 + 4 * 10, ex)
  1444. );
  1445. xs.Subscriptions.AssertEqual(
  1446. Subscribe(200, 451)
  1447. );
  1448. }
  1449. [Fact]
  1450. public void Delay_Duration_OuterError()
  1451. {
  1452. var scheduler = new TestScheduler();
  1453. var ex = new Exception();
  1454. var xs = scheduler.CreateHotObservable(
  1455. OnNext(150, 1),
  1456. OnNext(250, 2),
  1457. OnNext(350, 3),
  1458. OnNext(450, 4),
  1459. OnError<int>(460, ex)
  1460. );
  1461. var res = scheduler.Start(() =>
  1462. xs.Delay(x =>
  1463. scheduler.CreateColdObservable(
  1464. OnNext(x * 10, "Ignore"),
  1465. OnNext(x * 10 + 5, "Aargh!")
  1466. )
  1467. )
  1468. );
  1469. res.Messages.AssertEqual(
  1470. OnNext<int>(250 + 2 * 10, 2),
  1471. OnNext<int>(350 + 3 * 10, 3),
  1472. OnError<int>(460, ex)
  1473. );
  1474. xs.Subscriptions.AssertEqual(
  1475. Subscribe(200, 460)
  1476. );
  1477. }
  1478. [Fact]
  1479. public void Delay_Duration_SelectorThrows2()
  1480. {
  1481. var scheduler = new TestScheduler();
  1482. var xs = scheduler.CreateHotObservable(
  1483. OnNext(150, 1),
  1484. OnNext(250, 2),
  1485. OnNext(350, 3),
  1486. OnNext(450, 4),
  1487. OnCompleted<int>(550)
  1488. );
  1489. var ex = new Exception();
  1490. var res = scheduler.Start(() =>
  1491. xs.Delay(x =>
  1492. {
  1493. if (x < 4)
  1494. {
  1495. return scheduler.CreateColdObservable(
  1496. OnNext(x * 10, "Ignore"),
  1497. OnNext(x * 10 + 5, "Aargh!")
  1498. );
  1499. }
  1500. else
  1501. throw ex;
  1502. })
  1503. );
  1504. res.Messages.AssertEqual(
  1505. OnNext<int>(250 + 2 * 10, 2),
  1506. OnNext<int>(350 + 3 * 10, 3),
  1507. OnError<int>(450, ex)
  1508. );
  1509. xs.Subscriptions.AssertEqual(
  1510. Subscribe(200, 450)
  1511. );
  1512. }
  1513. [Fact]
  1514. public void Delay_Duration_InnerDone()
  1515. {
  1516. var scheduler = new TestScheduler();
  1517. var xs = scheduler.CreateHotObservable(
  1518. OnNext(150, 1),
  1519. OnNext(250, 2),
  1520. OnNext(350, 3),
  1521. OnNext(450, 4),
  1522. OnCompleted<int>(550)
  1523. );
  1524. var ex = new Exception();
  1525. var res = scheduler.Start(() =>
  1526. xs.Delay(x =>
  1527. scheduler.CreateColdObservable(
  1528. OnCompleted<string>(x * 10)
  1529. )
  1530. )
  1531. );
  1532. res.Messages.AssertEqual(
  1533. OnNext<int>(250 + 2 * 10, 2),
  1534. OnNext<int>(350 + 3 * 10, 3),
  1535. OnNext<int>(450 + 4 * 10, 4),
  1536. OnCompleted<int>(550)
  1537. );
  1538. xs.Subscriptions.AssertEqual(
  1539. Subscribe(200, 550)
  1540. );
  1541. }
  1542. [Fact]
  1543. public void Delay_Duration_InnerSubscriptionTimes()
  1544. {
  1545. var scheduler = new TestScheduler();
  1546. var xs = scheduler.CreateHotObservable(
  1547. OnNext(150, -1),
  1548. OnNext(250, 0),
  1549. OnNext(350, 1),
  1550. OnNext(450, 2),
  1551. OnCompleted<int>(550)
  1552. );
  1553. var ys = new[] {
  1554. scheduler.CreateColdObservable(
  1555. OnNext(20, 42),
  1556. OnNext(25, 99)
  1557. ),
  1558. scheduler.CreateColdObservable(
  1559. OnNext(10, 43),
  1560. OnNext(15, 99)
  1561. ),
  1562. scheduler.CreateColdObservable(
  1563. OnNext(30, 44),
  1564. OnNext(35, 99)
  1565. ),
  1566. };
  1567. var res = scheduler.Start(() =>
  1568. xs.Delay(x => ys[x])
  1569. );
  1570. res.Messages.AssertEqual(
  1571. OnNext<int>(250 + 20, 0),
  1572. OnNext<int>(350 + 10, 1),
  1573. OnNext<int>(450 + 30, 2),
  1574. OnCompleted<int>(550)
  1575. );
  1576. xs.Subscriptions.AssertEqual(
  1577. Subscribe(200, 550)
  1578. );
  1579. ys[0].Subscriptions.AssertEqual(Subscribe(250, 250 + 20));
  1580. ys[1].Subscriptions.AssertEqual(Subscribe(350, 350 + 10));
  1581. ys[2].Subscriptions.AssertEqual(Subscribe(450, 450 + 30));
  1582. }
  1583. [Fact]
  1584. public void Delay_DurationAndSubscription_Simple1()
  1585. {
  1586. var scheduler = new TestScheduler();
  1587. var xs = scheduler.CreateHotObservable(
  1588. OnNext(150, 1),
  1589. OnNext(210, 10),
  1590. OnNext(220, 30),
  1591. OnNext(230, 50),
  1592. OnNext(240, 35),
  1593. OnNext(250, 20),
  1594. OnCompleted<int>(260)
  1595. );
  1596. var ys = scheduler.CreateColdObservable(
  1597. OnNext(10, "!")
  1598. );
  1599. var res = scheduler.Start(() =>
  1600. xs.Delay(ys, x => scheduler.CreateColdObservable(new[] { OnNext(x, "!") }))
  1601. );
  1602. res.Messages.AssertEqual(
  1603. OnNext(220 + 30, 30),
  1604. OnNext(250 + 20, 20),
  1605. OnNext(240 + 35, 35),
  1606. OnNext(230 + 50, 50),
  1607. OnCompleted<int>(280)
  1608. );
  1609. xs.Subscriptions.AssertEqual(
  1610. Subscribe(210, 260)
  1611. );
  1612. ys.Subscriptions.AssertEqual(
  1613. Subscribe(200, 210)
  1614. );
  1615. }
  1616. [Fact]
  1617. public void Delay_DurationAndSubscription_Simple2()
  1618. {
  1619. var scheduler = new TestScheduler();
  1620. var xs = scheduler.CreateHotObservable(
  1621. OnNext(150, 1),
  1622. OnNext(210, 10),
  1623. OnNext(220, 30),
  1624. OnNext(230, 50),
  1625. OnNext(240, 35),
  1626. OnNext(250, 20),
  1627. OnCompleted<int>(260)
  1628. );
  1629. var ys = scheduler.CreateColdObservable(
  1630. OnCompleted<string>(10)
  1631. );
  1632. var res = scheduler.Start(() =>
  1633. xs.Delay(ys, x => scheduler.CreateColdObservable(new[] { OnNext(x, "!") }))
  1634. );
  1635. res.Messages.AssertEqual(
  1636. OnNext(220 + 30, 30),
  1637. OnNext(250 + 20, 20),
  1638. OnNext(240 + 35, 35),
  1639. OnNext(230 + 50, 50),
  1640. OnCompleted<int>(280)
  1641. );
  1642. xs.Subscriptions.AssertEqual(
  1643. Subscribe(210, 260)
  1644. );
  1645. ys.Subscriptions.AssertEqual(
  1646. Subscribe(200, 210)
  1647. );
  1648. }
  1649. [Fact]
  1650. public void Delay_DurationAndSubscription_Dispose1()
  1651. {
  1652. var scheduler = new TestScheduler();
  1653. var xs = scheduler.CreateHotObservable(
  1654. OnNext(150, 1),
  1655. OnNext(210, 10),
  1656. OnNext(220, 30),
  1657. OnNext(230, 50),
  1658. OnNext(240, 35),
  1659. OnNext(250, 20),
  1660. OnCompleted<int>(260)
  1661. );
  1662. var ys = scheduler.CreateColdObservable(
  1663. OnNext<string>(10, "!")
  1664. );
  1665. var res = scheduler.Start(() =>
  1666. xs.Delay(ys, x => scheduler.CreateColdObservable(new[] { OnNext(x, "!") })),
  1667. 255
  1668. );
  1669. res.Messages.AssertEqual(
  1670. OnNext(220 + 30, 30)
  1671. );
  1672. xs.Subscriptions.AssertEqual(
  1673. Subscribe(210, 255)
  1674. );
  1675. ys.Subscriptions.AssertEqual(
  1676. Subscribe(200, 210)
  1677. );
  1678. }
  1679. [Fact]
  1680. public void Delay_DurationAndSubscription_Dispose2()
  1681. {
  1682. var scheduler = new TestScheduler();
  1683. var xs = scheduler.CreateHotObservable(
  1684. OnNext(150, 1),
  1685. OnNext(210, 10),
  1686. OnNext(220, 30),
  1687. OnNext(230, 50),
  1688. OnNext(240, 35),
  1689. OnNext(250, 20),
  1690. OnCompleted<int>(260)
  1691. );
  1692. var ys = scheduler.CreateColdObservable(
  1693. OnNext<string>(100, "!")
  1694. );
  1695. var res = scheduler.Start(() =>
  1696. xs.Delay(ys, x => scheduler.CreateColdObservable(new[] { OnNext(x, "!") })),
  1697. 255
  1698. );
  1699. res.Messages.AssertEqual(
  1700. );
  1701. xs.Subscriptions.AssertEqual(
  1702. );
  1703. ys.Subscriptions.AssertEqual(
  1704. Subscribe(200, 255)
  1705. );
  1706. }
  1707. [Fact]
  1708. public void Delay_DurationAndSubscription_Error()
  1709. {
  1710. var scheduler = new TestScheduler();
  1711. var ex = new Exception();
  1712. var xs = scheduler.CreateHotObservable(
  1713. OnNext(150, 1),
  1714. OnNext(210, 10),
  1715. OnNext(220, 30),
  1716. OnNext(230, 50),
  1717. OnNext(240, 35),
  1718. OnNext(250, 20),
  1719. OnCompleted<int>(260)
  1720. );
  1721. var ys = scheduler.CreateColdObservable(
  1722. OnError<string>(10, ex)
  1723. );
  1724. var res = scheduler.Start(() =>
  1725. xs.Delay(ys, x => scheduler.CreateColdObservable(new[] { OnNext(x, "!") }))
  1726. );
  1727. res.Messages.AssertEqual(
  1728. OnError<int>(200 + 10, ex)
  1729. );
  1730. xs.Subscriptions.AssertEqual(
  1731. );
  1732. ys.Subscriptions.AssertEqual(
  1733. Subscribe(200, 210)
  1734. );
  1735. }
  1736. [Fact]
  1737. public void Delay_ErrorHandling1()
  1738. {
  1739. //
  1740. // Checks for race condition between OnNext and OnError where the latter has a chance to
  1741. // send out the OnError message before the former gets a chance to run in the delayed
  1742. // queue. In that case, the OnNext message should not come out.
  1743. //
  1744. // See DrainQueue's first _hasFailed check.
  1745. //
  1746. var xs = Observable.Create<int>(observer =>
  1747. {
  1748. observer.OnNext(42);
  1749. observer.OnError(new Exception());
  1750. return () => { };
  1751. });
  1752. var s = new ImpulseScheduler();
  1753. var called = false;
  1754. var failed = new ManualResetEvent(false);
  1755. xs.Delay(TimeSpan.FromDays(1), s).Subscribe(_ => { called = true; }, ex => { failed.Set(); });
  1756. failed.WaitOne();
  1757. s.Event.Set();
  1758. s.Done.WaitOne();
  1759. Assert.False(called);
  1760. }
  1761. class ImpulseScheduler : IScheduler
  1762. {
  1763. public DateTimeOffset Now
  1764. {
  1765. get { return DateTimeOffset.UtcNow; }
  1766. }
  1767. public IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action)
  1768. {
  1769. throw new NotImplementedException();
  1770. }
  1771. private ManualResetEvent _event = new ManualResetEvent(false);
  1772. private ManualResetEvent _done = new ManualResetEvent(false);
  1773. public ManualResetEvent Event { get { return _event; } }
  1774. public ManualResetEvent Done { get { return _done; } }
  1775. public IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)
  1776. {
  1777. Scheduler.Default.Schedule(() =>
  1778. {
  1779. _event.WaitOne();
  1780. action(this, state);
  1781. _done.Set();
  1782. });
  1783. return Disposable.Empty;
  1784. }
  1785. public IDisposable Schedule<TState>(TState state, DateTimeOffset dueTime, Func<IScheduler, TState, IDisposable> action)
  1786. {
  1787. throw new NotImplementedException();
  1788. }
  1789. }
  1790. [Fact]
  1791. public void Delay_LongRunning_CancelEarly()
  1792. {
  1793. var xs = Observable.Create<int>(observer =>
  1794. {
  1795. return Scheduler.Default.Schedule(TimeSpan.FromHours(1), () =>
  1796. {
  1797. observer.OnNext(42);
  1798. });
  1799. });
  1800. var s = new ManualResetEvent(false);
  1801. var e = new ManualResetEvent(false);
  1802. var ys = xs.Delay(TimeSpan.Zero, new MyLongRunning1(s, e));
  1803. var d = ys.Subscribe(_ => { });
  1804. s.WaitOne();
  1805. d.Dispose();
  1806. e.WaitOne();
  1807. }
  1808. class MyLongRunning1 : LocalScheduler, ISchedulerLongRunning
  1809. {
  1810. private ManualResetEvent _start;
  1811. private ManualResetEvent _stop;
  1812. public MyLongRunning1(ManualResetEvent start, ManualResetEvent stop)
  1813. {
  1814. _start = start;
  1815. _stop = stop;
  1816. }
  1817. public IDisposable ScheduleLongRunning<TState>(TState state, Action<TState, ICancelable> action)
  1818. {
  1819. var b = new BooleanDisposable();
  1820. Task.Run(() =>
  1821. {
  1822. _start.Set();
  1823. action(state, b);
  1824. _stop.Set();
  1825. });
  1826. return b;
  1827. }
  1828. public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)
  1829. {
  1830. throw new NotImplementedException();
  1831. }
  1832. }
  1833. [Fact]
  1834. public void Delay_LongRunning_CancelLate()
  1835. {
  1836. var xs = Observable.Return(42);
  1837. var s = new ManualResetEvent(false);
  1838. var e = new ManualResetEvent(false);
  1839. var ys = xs.Delay(TimeSpan.FromHours(1), new MyLongRunning2(s, e));
  1840. var d = ys.Subscribe(_ => { });
  1841. s.WaitOne();
  1842. d.Dispose();
  1843. e.WaitOne();
  1844. }
  1845. class MyLongRunning2 : LocalScheduler, ISchedulerLongRunning
  1846. {
  1847. private ManualResetEvent _start;
  1848. private ManualResetEvent _stop;
  1849. public MyLongRunning2(ManualResetEvent start, ManualResetEvent stop)
  1850. {
  1851. _start = start;
  1852. _stop = stop;
  1853. }
  1854. public IDisposable ScheduleLongRunning<TState>(TState state, Action<TState, ICancelable> action)
  1855. {
  1856. var b = new BooleanDisposable();
  1857. Task.Run(() =>
  1858. {
  1859. action(state, b);
  1860. _stop.Set();
  1861. });
  1862. return b;
  1863. }
  1864. public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)
  1865. {
  1866. _start.Set();
  1867. return Disposable.Empty;
  1868. }
  1869. }
  1870. #endregion
  1871. #region + DelaySubscription +
  1872. [Fact]
  1873. public void DelaySubscription_ArgumentChecking()
  1874. {
  1875. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.DelaySubscription(default(IObservable<int>), DateTimeOffset.Now));
  1876. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.DelaySubscription(default(IObservable<int>), DateTimeOffset.Now, Scheduler.Immediate));
  1877. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.DelaySubscription(DummyObservable<int>.Instance, DateTimeOffset.Now, default(IScheduler)));
  1878. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.DelaySubscription(default(IObservable<int>), TimeSpan.Zero));
  1879. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.DelaySubscription(default(IObservable<int>), TimeSpan.Zero, Scheduler.Immediate));
  1880. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.DelaySubscription(DummyObservable<int>.Instance, TimeSpan.Zero, default(IScheduler)));
  1881. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.DelaySubscription(DummyObservable<int>.Instance, TimeSpan.FromSeconds(-1)));
  1882. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.DelaySubscription(DummyObservable<int>.Instance, TimeSpan.FromSeconds(-1), Scheduler.Immediate));
  1883. }
  1884. [Fact]
  1885. public void DelaySubscription_TimeSpan_Default()
  1886. {
  1887. var lst = new List<int>();
  1888. Observable.Range(0, 10).DelaySubscription(TimeSpan.FromMilliseconds(1)).ForEach(lst.Add);
  1889. Assert.True(Enumerable.Range(0, 10).SequenceEqual(lst));
  1890. }
  1891. [Fact]
  1892. public void DelaySubscription_TimeSpan_Simple()
  1893. {
  1894. var scheduler = new TestScheduler();
  1895. var xs = scheduler.CreateColdObservable(
  1896. OnNext(50, 42),
  1897. OnNext(60, 43),
  1898. OnCompleted<int>(70)
  1899. );
  1900. var res = scheduler.Start(() =>
  1901. xs.DelaySubscription(TimeSpan.FromTicks(30), scheduler)
  1902. );
  1903. res.Messages.AssertEqual(
  1904. OnNext(280, 42),
  1905. OnNext(290, 43),
  1906. OnCompleted<int>(300)
  1907. );
  1908. xs.Subscriptions.AssertEqual(
  1909. Subscribe(230, 300)
  1910. );
  1911. }
  1912. [Fact]
  1913. public void DelaySubscription_TimeSpan_Error()
  1914. {
  1915. var ex = new Exception();
  1916. var scheduler = new TestScheduler();
  1917. var xs = scheduler.CreateColdObservable(
  1918. OnNext(50, 42),
  1919. OnNext(60, 43),
  1920. OnError<int>(70, ex)
  1921. );
  1922. var res = scheduler.Start(() =>
  1923. xs.DelaySubscription(TimeSpan.FromTicks(30), scheduler)
  1924. );
  1925. res.Messages.AssertEqual(
  1926. OnNext(280, 42),
  1927. OnNext(290, 43),
  1928. OnError<int>(300, ex)
  1929. );
  1930. xs.Subscriptions.AssertEqual(
  1931. Subscribe(230, 300)
  1932. );
  1933. }
  1934. [Fact]
  1935. public void DelaySubscription_DateTimeOffset_Default()
  1936. {
  1937. var lst = new List<int>();
  1938. Observable.Range(0, 10).DelaySubscription(DateTimeOffset.UtcNow.AddMilliseconds(1)).ForEach(lst.Add);
  1939. Assert.True(Enumerable.Range(0, 10).SequenceEqual(lst));
  1940. }
  1941. [Fact]
  1942. public void DelaySubscription_DateTimeOffset_Simple()
  1943. {
  1944. var scheduler = new TestScheduler();
  1945. var xs = scheduler.CreateColdObservable(
  1946. OnNext(50, 42),
  1947. OnNext(60, 43),
  1948. OnCompleted<int>(70)
  1949. );
  1950. var res = scheduler.Start(() =>
  1951. xs.DelaySubscription(new DateTimeOffset(230, TimeSpan.Zero), scheduler)
  1952. );
  1953. res.Messages.AssertEqual(
  1954. OnNext(280, 42),
  1955. OnNext(290, 43),
  1956. OnCompleted<int>(300)
  1957. );
  1958. xs.Subscriptions.AssertEqual(
  1959. Subscribe(230, 300)
  1960. );
  1961. }
  1962. [Fact]
  1963. public void DelaySubscription_DateTimeOffset_Error()
  1964. {
  1965. var ex = new Exception();
  1966. var scheduler = new TestScheduler();
  1967. var xs = scheduler.CreateColdObservable(
  1968. OnNext(50, 42),
  1969. OnNext(60, 43),
  1970. OnError<int>(70, ex)
  1971. );
  1972. var res = scheduler.Start(() =>
  1973. xs.DelaySubscription(new DateTimeOffset(230, TimeSpan.Zero), scheduler)
  1974. );
  1975. res.Messages.AssertEqual(
  1976. OnNext(280, 42),
  1977. OnNext(290, 43),
  1978. OnError<int>(300, ex)
  1979. );
  1980. xs.Subscriptions.AssertEqual(
  1981. Subscribe(230, 300)
  1982. );
  1983. }
  1984. #endregion
  1985. #region + Generate +
  1986. [Fact]
  1987. public void Generate_TimeSpan_ArgumentChecking()
  1988. {
  1989. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Generate(0, DummyFunc<int, bool>.Instance, DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<int, TimeSpan>.Instance, (IScheduler)null));
  1990. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Generate(0, (Func<int, bool>)null, DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<int, TimeSpan>.Instance, DummyScheduler.Instance));
  1991. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Generate(0, DummyFunc<int, bool>.Instance, DummyFunc<int, int>.Instance, (Func<int, int>)null, DummyFunc<int, TimeSpan>.Instance, DummyScheduler.Instance));
  1992. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Generate(0, DummyFunc<int, bool>.Instance, (Func<int, int>)null, DummyFunc<int, int>.Instance, DummyFunc<int, TimeSpan>.Instance, DummyScheduler.Instance));
  1993. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Generate(0, DummyFunc<int, bool>.Instance, DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, (Func<int, TimeSpan>)null, DummyScheduler.Instance));
  1994. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Generate(0, DummyFunc<int, bool>.Instance, DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<int, TimeSpan>.Instance, DummyScheduler.Instance).Subscribe(null));
  1995. }
  1996. [Fact]
  1997. public void Generate_TimeSpan_Finite()
  1998. {
  1999. var scheduler = new TestScheduler();
  2000. var res = scheduler.Start(() =>
  2001. Observable.Generate(0, x => x <= 3, x => x + 1, x => x, x => TimeSpan.FromTicks(x + 1), scheduler)
  2002. );
  2003. res.Messages.AssertEqual(
  2004. OnNext(202, 0),
  2005. OnNext(204, 1),
  2006. OnNext(207, 2),
  2007. OnNext(211, 3),
  2008. OnCompleted<int>(211)
  2009. );
  2010. }
  2011. [Fact]
  2012. public void Generate_TimeSpan_Throw_Condition()
  2013. {
  2014. var scheduler = new TestScheduler();
  2015. var ex = new Exception();
  2016. var res = scheduler.Start(() =>
  2017. Observable.Generate(0, new Func<int, bool>(x => { throw ex; }), x => x + 1, x => x, x => TimeSpan.FromTicks(x + 1), scheduler)
  2018. );
  2019. res.Messages.AssertEqual(
  2020. OnError<int>(201, ex)
  2021. );
  2022. }
  2023. [Fact]
  2024. public void Generate_TimeSpan_Throw_ResultSelector()
  2025. {
  2026. var scheduler = new TestScheduler();
  2027. var ex = new Exception();
  2028. var res = scheduler.Start(() =>
  2029. Observable.Generate(0, x => true, x => x + 1, new Func<int, int>(x => { throw ex; }), x => TimeSpan.FromTicks(x + 1), scheduler)
  2030. );
  2031. res.Messages.AssertEqual(
  2032. OnError<int>(201, ex)
  2033. );
  2034. }
  2035. [Fact]
  2036. public void Generate_TimeSpan_Throw_Iterate()
  2037. {
  2038. var scheduler = new TestScheduler();
  2039. var ex = new Exception();
  2040. var res = scheduler.Start(() =>
  2041. Observable.Generate(0, x => true, new Func<int, int>(x => { throw ex; }), x => x, x => TimeSpan.FromTicks(x + 1), scheduler)
  2042. );
  2043. res.Messages.AssertEqual(
  2044. OnNext(202, 0),
  2045. OnError<int>(202, ex)
  2046. );
  2047. }
  2048. [Fact]
  2049. public void Generate_TimeSpan_Throw_TimeSelector()
  2050. {
  2051. var scheduler = new TestScheduler();
  2052. var ex = new Exception();
  2053. var res = scheduler.Start(() =>
  2054. Observable.Generate(0, x => true, x => x + 1, x => x, new Func<int, TimeSpan>(x => { throw ex; }), scheduler)
  2055. );
  2056. res.Messages.AssertEqual(
  2057. OnError<int>(201, ex)
  2058. );
  2059. }
  2060. [Fact]
  2061. public void Generate_TimeSpan_Dispose()
  2062. {
  2063. var scheduler = new TestScheduler();
  2064. var res = scheduler.Start(() =>
  2065. Observable.Generate(0, x => true, x => x + 1, x => x, x => TimeSpan.FromTicks(x + 1), scheduler),
  2066. 210
  2067. );
  2068. res.Messages.AssertEqual(
  2069. OnNext(202, 0),
  2070. OnNext(204, 1),
  2071. OnNext(207, 2)
  2072. );
  2073. }
  2074. [Fact]
  2075. public void Generate_TimeSpan_DefaultScheduler_ArgumentChecking()
  2076. {
  2077. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Generate(0, (Func<int, bool>)null, DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<int, TimeSpan>.Instance));
  2078. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Generate(0, DummyFunc<int, bool>.Instance, DummyFunc<int, int>.Instance, (Func<int, int>)null, DummyFunc<int, TimeSpan>.Instance));
  2079. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Generate(0, DummyFunc<int, bool>.Instance, (Func<int, int>)null, DummyFunc<int, int>.Instance, DummyFunc<int, TimeSpan>.Instance));
  2080. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Generate(0, DummyFunc<int, bool>.Instance, DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, (Func<int, TimeSpan>)null));
  2081. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Generate(0, DummyFunc<int, bool>.Instance, DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<int, TimeSpan>.Instance).Subscribe(null));
  2082. }
  2083. [Fact]
  2084. public void Generate_TimeSpan_DefaultScheduler()
  2085. {
  2086. Observable.Generate(0, x => x < 10, x => x + 1, x => x, x => TimeSpan.FromMilliseconds(x)).AssertEqual(Observable.Generate(0, x => x < 10, x => x + 1, x => x, x => TimeSpan.FromMilliseconds(x), DefaultScheduler.Instance));
  2087. }
  2088. [Fact]
  2089. public void Generate_DateTimeOffset_ArgumentChecking()
  2090. {
  2091. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Generate(0, DummyFunc<int, bool>.Instance, DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<int, DateTimeOffset>.Instance, (IScheduler)null));
  2092. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Generate(0, (Func<int, bool>)null, DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<int, DateTimeOffset>.Instance, DummyScheduler.Instance));
  2093. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Generate(0, DummyFunc<int, bool>.Instance, DummyFunc<int, int>.Instance, (Func<int, int>)null, DummyFunc<int, DateTimeOffset>.Instance, DummyScheduler.Instance));
  2094. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Generate(0, DummyFunc<int, bool>.Instance, (Func<int, int>)null, DummyFunc<int, int>.Instance, DummyFunc<int, DateTimeOffset>.Instance, DummyScheduler.Instance));
  2095. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Generate(0, DummyFunc<int, bool>.Instance, DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, (Func<int, DateTimeOffset>)null, DummyScheduler.Instance));
  2096. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Generate(0, DummyFunc<int, bool>.Instance, DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<int, DateTimeOffset>.Instance, DummyScheduler.Instance).Subscribe(null));
  2097. }
  2098. [Fact]
  2099. public void Generate_DateTimeOffset_Finite()
  2100. {
  2101. var scheduler = new TestScheduler();
  2102. var res = scheduler.Start(() =>
  2103. Observable.Generate(0, x => x <= 3, x => x + 1, x => x, x => scheduler.Now.AddTicks(x + 1), scheduler)
  2104. );
  2105. res.Messages.AssertEqual(
  2106. OnNext(202, 0),
  2107. OnNext(204, 1),
  2108. OnNext(207, 2),
  2109. OnNext(211, 3),
  2110. OnCompleted<int>(211)
  2111. );
  2112. }
  2113. [Fact]
  2114. public void Generate_DateTimeOffset_Throw_Condition()
  2115. {
  2116. var scheduler = new TestScheduler();
  2117. var ex = new Exception();
  2118. var res = scheduler.Start(() =>
  2119. Observable.Generate(0, new Func<int, bool>(x => { throw ex; }), x => x + 1, x => x, x => scheduler.Now.AddTicks(x + 1), scheduler)
  2120. );
  2121. res.Messages.AssertEqual(
  2122. OnError<int>(201, ex)
  2123. );
  2124. }
  2125. [Fact]
  2126. public void Generate_DateTimeOffset_Throw_ResultSelector()
  2127. {
  2128. var scheduler = new TestScheduler();
  2129. var ex = new Exception();
  2130. var res = scheduler.Start(() =>
  2131. Observable.Generate(0, x => true, x => x + 1, new Func<int, int>(x => { throw ex; }), x => scheduler.Now.AddTicks(x + 1), scheduler)
  2132. );
  2133. res.Messages.AssertEqual(
  2134. OnError<int>(201, ex)
  2135. );
  2136. }
  2137. [Fact]
  2138. public void Generate_DateTimeOffset_Throw_Iterate()
  2139. {
  2140. var scheduler = new TestScheduler();
  2141. var ex = new Exception();
  2142. var res = scheduler.Start(() =>
  2143. Observable.Generate(0, x => true, new Func<int, int>(x => { throw ex; }), x => x, x => scheduler.Now.AddTicks(x + 1), scheduler)
  2144. );
  2145. res.Messages.AssertEqual(
  2146. OnNext(202, 0),
  2147. OnError<int>(202, ex)
  2148. );
  2149. }
  2150. [Fact]
  2151. public void Generate_DateTimeOffset_Throw_TimeSelector()
  2152. {
  2153. var scheduler = new TestScheduler();
  2154. var ex = new Exception();
  2155. var res = scheduler.Start(() =>
  2156. Observable.Generate(0, x => true, x => x + 1, x => x, new Func<int, DateTimeOffset>(x => { throw ex; }), scheduler)
  2157. );
  2158. res.Messages.AssertEqual(
  2159. OnError<int>(201, ex)
  2160. );
  2161. }
  2162. [Fact]
  2163. public void Generate_DateTimeOffset_Dispose()
  2164. {
  2165. var scheduler = new TestScheduler();
  2166. var res = scheduler.Start(() =>
  2167. Observable.Generate(0, x => true, x => x + 1, x => x, x => scheduler.Now.AddTicks(x + 1), scheduler),
  2168. 210
  2169. );
  2170. res.Messages.AssertEqual(
  2171. OnNext(202, 0),
  2172. OnNext(204, 1),
  2173. OnNext(207, 2)
  2174. );
  2175. }
  2176. [Fact]
  2177. public void Generate_DateTimeOffset_DefaultScheduler_ArgumentChecking()
  2178. {
  2179. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Generate(0, (Func<int, bool>)null, DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<int, DateTimeOffset>.Instance));
  2180. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Generate(0, DummyFunc<int, bool>.Instance, DummyFunc<int, int>.Instance, (Func<int, int>)null, DummyFunc<int, DateTimeOffset>.Instance));
  2181. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Generate(0, DummyFunc<int, bool>.Instance, (Func<int, int>)null, DummyFunc<int, int>.Instance, DummyFunc<int, DateTimeOffset>.Instance));
  2182. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Generate(0, DummyFunc<int, bool>.Instance, DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, (Func<int, DateTimeOffset>)null));
  2183. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Generate(0, DummyFunc<int, bool>.Instance, DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<int, DateTimeOffset>.Instance).Subscribe(null));
  2184. }
  2185. [Fact]
  2186. public void Generate_DateTimeOffset_DefaultScheduler()
  2187. {
  2188. Observable.Generate(0, x => x < 10, x => x + 1, x => x, x => DateTimeOffset.Now.AddMilliseconds(x)).AssertEqual(Observable.Generate(0, x => x < 10, x => x + 1, x => x, x => DateTimeOffset.Now.AddMilliseconds(x), DefaultScheduler.Instance));
  2189. }
  2190. #endregion
  2191. #region + Interval +
  2192. [Fact]
  2193. public void Interval_TimeSpan_ArgumentChecking()
  2194. {
  2195. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Interval(TimeSpan.Zero, null));
  2196. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Interval(TimeSpan.Zero, DummyScheduler.Instance).Subscribe(null));
  2197. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Interval(TimeSpan.FromSeconds(-1)));
  2198. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Interval(TimeSpan.FromSeconds(-1), DummyScheduler.Instance));
  2199. }
  2200. [Fact]
  2201. public void Interval_TimeSpan_Basic()
  2202. {
  2203. var scheduler = new TestScheduler();
  2204. var res = scheduler.Start(() =>
  2205. Observable.Interval(TimeSpan.FromTicks(100), scheduler)
  2206. );
  2207. res.Messages.AssertEqual(
  2208. OnNext(300, 0L),
  2209. OnNext(400, 1L),
  2210. OnNext(500, 2L),
  2211. OnNext(600, 3L),
  2212. OnNext(700, 4L),
  2213. OnNext(800, 5L),
  2214. OnNext(900, 6L)
  2215. );
  2216. }
  2217. [Fact]
  2218. public void Interval_TimeSpan_Zero()
  2219. {
  2220. var scheduler = new TestScheduler();
  2221. var res = scheduler.Start(() =>
  2222. Observable.Interval(TimeSpan.FromTicks(0), scheduler),
  2223. 210
  2224. );
  2225. res.Messages.AssertEqual(
  2226. OnNext(201, 0L),
  2227. OnNext(202, 1L),
  2228. OnNext(203, 2L),
  2229. OnNext(204, 3L),
  2230. OnNext(205, 4L),
  2231. OnNext(206, 5L),
  2232. OnNext(207, 6L),
  2233. OnNext(208, 7L),
  2234. OnNext(209, 8L)
  2235. );
  2236. }
  2237. [Fact]
  2238. public void Interval_TimeSpan_Zero_DefaultScheduler()
  2239. {
  2240. var scheduler = new TestScheduler();
  2241. var observer = scheduler.CreateObserver<long>();
  2242. var completed = new ManualResetEvent(false);
  2243. Observable.Interval(TimeSpan.Zero).TakeWhile(i => i < 10).Subscribe(observer.OnNext, () => completed.Set());
  2244. completed.WaitOne();
  2245. Assert.Equal(10, observer.Messages.Count);
  2246. }
  2247. [Fact]
  2248. public void Interval_TimeSpan_Disposed()
  2249. {
  2250. var scheduler = new TestScheduler();
  2251. var res = scheduler.Start(
  2252. () => Observable.Interval(TimeSpan.FromTicks(1000), scheduler)
  2253. );
  2254. res.Messages.AssertEqual(
  2255. );
  2256. }
  2257. [Fact]
  2258. public void Interval_TimeSpan_ObserverThrows()
  2259. {
  2260. var scheduler = new TestScheduler();
  2261. var xs = Observable.Interval(TimeSpan.FromTicks(1), scheduler);
  2262. xs.Subscribe(x => { throw new InvalidOperationException(); });
  2263. ReactiveAssert.Throws<InvalidOperationException>(() => scheduler.Start());
  2264. }
  2265. [Fact]
  2266. public void Interval_TimeSpan_DefaultScheduler()
  2267. {
  2268. Assert.True(Observable.Interval(TimeSpan.FromMilliseconds(1)).ToEnumerable().Take(3).SequenceEqual(new[] { 0L, 1L, 2L }));
  2269. }
  2270. #endregion
  2271. #region + Sample +
  2272. [Fact]
  2273. public void Sample_ArgumentChecking()
  2274. {
  2275. var scheduler = new TestScheduler();
  2276. var someObservable = Observable.Empty<int>();
  2277. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Sample(default(IObservable<int>), TimeSpan.Zero));
  2278. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Sample(default(IObservable<int>), TimeSpan.Zero, scheduler));
  2279. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Sample(someObservable, TimeSpan.Zero, null));
  2280. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Sample(someObservable, TimeSpan.FromSeconds(-1)));
  2281. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Sample(someObservable, TimeSpan.FromSeconds(-1), scheduler));
  2282. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Sample(default(IObservable<int>), someObservable));
  2283. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Sample(someObservable, default(IObservable<int>)));
  2284. }
  2285. [Fact]
  2286. public void Sample_Regular()
  2287. {
  2288. var scheduler = new TestScheduler();
  2289. var xs = scheduler.CreateHotObservable(
  2290. OnNext(150, 1),
  2291. OnNext(210, 2),
  2292. OnNext(230, 3),
  2293. OnNext(260, 4),
  2294. OnNext(300, 5),
  2295. OnNext(350, 6),
  2296. OnNext(380, 7),
  2297. OnCompleted<int>(390)
  2298. );
  2299. var res = scheduler.Start(() =>
  2300. xs.Sample(TimeSpan.FromTicks(50), scheduler)
  2301. );
  2302. res.Messages.AssertEqual(
  2303. OnNext(250, 3),
  2304. OnNext(300, 5), /* CHECK: boundary of sampling */
  2305. OnNext(350, 6),
  2306. OnNext(400, 7), /* Sample in last bucket */
  2307. OnCompleted<int>(400)
  2308. );
  2309. xs.Subscriptions.AssertEqual(
  2310. Subscribe(200, 390)
  2311. );
  2312. }
  2313. [Fact]
  2314. public void Sample_Periodic_Regular()
  2315. {
  2316. var scheduler = new PeriodicTestScheduler();
  2317. var xs = scheduler.CreateHotObservable(
  2318. OnNext(150, 1),
  2319. OnNext(210, 2),
  2320. OnNext(230, 3),
  2321. OnNext(260, 4),
  2322. OnNext(300, 5),
  2323. OnNext(350, 6),
  2324. OnNext(380, 7),
  2325. OnCompleted<int>(390)
  2326. );
  2327. var res = scheduler.Start(() =>
  2328. xs.Sample(TimeSpan.FromTicks(50), scheduler)
  2329. );
  2330. res.Messages.AssertEqual(
  2331. OnNext(250, 3),
  2332. OnNext(300, 5), /* CHECK: boundary of sampling */
  2333. OnNext(350, 6),
  2334. OnNext(400, 7), /* Sample in last bucket */
  2335. OnCompleted<int>(400)
  2336. );
  2337. xs.Subscriptions.AssertEqual(
  2338. Subscribe(200, 390)
  2339. );
  2340. #if !WINDOWS
  2341. scheduler.Timers.AssertEqual(
  2342. new TimerRun(200, 400) { 250, 300, 350, 400 }
  2343. );
  2344. #endif
  2345. }
  2346. [Fact]
  2347. public void Sample_ErrorInFlight()
  2348. {
  2349. var scheduler = new TestScheduler();
  2350. var ex = new Exception();
  2351. var xs = scheduler.CreateHotObservable(
  2352. OnNext(150, 1),
  2353. OnNext(210, 2),
  2354. OnNext(230, 3),
  2355. OnNext(260, 4),
  2356. OnNext(300, 5),
  2357. OnNext(310, 6),
  2358. OnError<int>(330, ex)
  2359. );
  2360. var res = scheduler.Start(() =>
  2361. xs.Sample(TimeSpan.FromTicks(50), scheduler)
  2362. );
  2363. res.Messages.AssertEqual(
  2364. OnNext(250, 3),
  2365. OnNext(300, 5), /* CHECK: boundary of sampling */
  2366. OnError<int>(330, ex)
  2367. );
  2368. xs.Subscriptions.AssertEqual(
  2369. Subscribe(200, 330)
  2370. );
  2371. }
  2372. [Fact]
  2373. public void Sample_Periodic_ErrorInFlight()
  2374. {
  2375. var scheduler = new PeriodicTestScheduler();
  2376. var ex = new Exception();
  2377. var xs = scheduler.CreateHotObservable(
  2378. OnNext(150, 1),
  2379. OnNext(210, 2),
  2380. OnNext(230, 3),
  2381. OnNext(260, 4),
  2382. OnNext(300, 5),
  2383. OnNext(310, 6),
  2384. OnError<int>(330, ex)
  2385. );
  2386. var res = scheduler.Start(() =>
  2387. xs.Sample(TimeSpan.FromTicks(50), scheduler)
  2388. );
  2389. res.Messages.AssertEqual(
  2390. OnNext(250, 3),
  2391. OnNext(300, 5), /* CHECK: boundary of sampling */
  2392. OnError<int>(330, ex)
  2393. );
  2394. xs.Subscriptions.AssertEqual(
  2395. Subscribe(200, 330)
  2396. );
  2397. #if !WINDOWS
  2398. scheduler.Timers.AssertEqual(
  2399. new TimerRun(200, 330) { 250, 300 }
  2400. );
  2401. #endif
  2402. }
  2403. [Fact]
  2404. public void Sample_Empty()
  2405. {
  2406. var scheduler = new TestScheduler();
  2407. var xs = scheduler.CreateHotObservable(
  2408. OnNext(150, 1),
  2409. OnCompleted<int>(300)
  2410. );
  2411. var res = scheduler.Start(() =>
  2412. xs.Sample(TimeSpan.FromTicks(50), scheduler)
  2413. );
  2414. res.Messages.AssertEqual(
  2415. OnCompleted<int>(300)
  2416. );
  2417. xs.Subscriptions.AssertEqual(
  2418. Subscribe(200, 300)
  2419. );
  2420. }
  2421. [Fact]
  2422. public void Sample_Periodic_Empty()
  2423. {
  2424. var scheduler = new PeriodicTestScheduler();
  2425. var xs = scheduler.CreateHotObservable(
  2426. OnNext(150, 1),
  2427. OnCompleted<int>(300)
  2428. );
  2429. var res = scheduler.Start(() =>
  2430. xs.Sample(TimeSpan.FromTicks(50), scheduler)
  2431. );
  2432. res.Messages.AssertEqual(
  2433. OnCompleted<int>(300)
  2434. );
  2435. xs.Subscriptions.AssertEqual(
  2436. Subscribe(200, 300)
  2437. );
  2438. #if !WINDOWS
  2439. scheduler.Timers.AssertEqual(
  2440. new TimerRun(200, 300) { 250, 300 }
  2441. );
  2442. #endif
  2443. }
  2444. [Fact]
  2445. public void Sample_Error()
  2446. {
  2447. var scheduler = new TestScheduler();
  2448. var ex = new Exception();
  2449. var xs = scheduler.CreateHotObservable(
  2450. OnNext(150, 1),
  2451. OnError<int>(300, ex)
  2452. );
  2453. var res = scheduler.Start(() =>
  2454. xs.Sample(TimeSpan.FromTicks(50), scheduler)
  2455. );
  2456. res.Messages.AssertEqual(
  2457. OnError<int>(300, ex)
  2458. );
  2459. xs.Subscriptions.AssertEqual(
  2460. Subscribe(200, 300)
  2461. );
  2462. }
  2463. [Fact]
  2464. public void Sample_Periodic_Error()
  2465. {
  2466. var scheduler = new PeriodicTestScheduler();
  2467. var ex = new Exception();
  2468. var xs = scheduler.CreateHotObservable(
  2469. OnNext(150, 1),
  2470. OnError<int>(300, ex)
  2471. );
  2472. var res = scheduler.Start(() =>
  2473. xs.Sample(TimeSpan.FromTicks(50), scheduler)
  2474. );
  2475. res.Messages.AssertEqual(
  2476. OnError<int>(300, ex)
  2477. );
  2478. xs.Subscriptions.AssertEqual(
  2479. Subscribe(200, 300)
  2480. );
  2481. #if !WINDOWS
  2482. scheduler.Timers.AssertEqual(
  2483. new TimerRun(200, 300) { 250 }
  2484. );
  2485. #endif
  2486. }
  2487. [Fact]
  2488. public void Sample_Never()
  2489. {
  2490. var scheduler = new TestScheduler();
  2491. var xs = scheduler.CreateHotObservable(
  2492. OnNext(150, 1)
  2493. );
  2494. var res = scheduler.Start(() =>
  2495. xs.Sample(TimeSpan.FromTicks(50), scheduler)
  2496. );
  2497. res.Messages.AssertEqual(
  2498. );
  2499. xs.Subscriptions.AssertEqual(
  2500. Subscribe(200, 1000)
  2501. );
  2502. }
  2503. [Fact]
  2504. public void Sample_Periodic_Never()
  2505. {
  2506. var scheduler = new PeriodicTestScheduler();
  2507. var xs = scheduler.CreateHotObservable(
  2508. OnNext(150, 1)
  2509. );
  2510. var res = scheduler.Start(() =>
  2511. xs.Sample(TimeSpan.FromTicks(50), scheduler)
  2512. );
  2513. res.Messages.AssertEqual(
  2514. );
  2515. xs.Subscriptions.AssertEqual(
  2516. Subscribe(200, 1000)
  2517. );
  2518. #if !WINDOWS
  2519. scheduler.Timers.AssertEqual(
  2520. new TimerRun(200, 1000) { 250, 300, 350, 400, 450, 500, 550, 600, 650, 700, 750, 800, 850, 900, 950 }
  2521. );
  2522. #endif
  2523. }
  2524. [Fact]
  2525. public void Sample_DefaultScheduler_Periodic()
  2526. {
  2527. var res = Observable.Return(42).Sample(TimeSpan.FromMilliseconds(1)).ToEnumerable().Single();
  2528. Assert.Equal(42, res);
  2529. }
  2530. [Fact]
  2531. public void Sample_DefaultScheduler_PeriodicDisabled()
  2532. {
  2533. var res = Observable.Return(42).Sample(TimeSpan.FromMilliseconds(1), Scheduler.Default.DisableOptimizations()).ToEnumerable().Single();
  2534. Assert.Equal(42, res);
  2535. }
  2536. [Fact]
  2537. public void Sample_Sampler_Simple1()
  2538. {
  2539. var scheduler = new TestScheduler();
  2540. var xs = scheduler.CreateHotObservable(
  2541. OnNext(150, 1),
  2542. OnNext(220, 2),
  2543. OnNext(240, 3),
  2544. OnNext(290, 4),
  2545. OnNext(300, 5),
  2546. OnNext(310, 6),
  2547. OnCompleted<int>(400)
  2548. );
  2549. var ys = scheduler.CreateHotObservable(
  2550. OnNext(150, ""),
  2551. OnNext(210, "bar"),
  2552. OnNext(250, "foo"),
  2553. OnNext(260, "qux"),
  2554. OnNext(320, "baz"),
  2555. OnCompleted<string>(500)
  2556. );
  2557. var res = scheduler.Start(() =>
  2558. xs.Sample(ys)
  2559. );
  2560. res.Messages.AssertEqual(
  2561. OnNext(250, 3),
  2562. OnNext(320, 6),
  2563. OnCompleted<int>(500 /* on sampling boundaries only */)
  2564. );
  2565. xs.Subscriptions.AssertEqual(
  2566. Subscribe(200, 400)
  2567. );
  2568. ys.Subscriptions.AssertEqual(
  2569. Subscribe(200, 500)
  2570. );
  2571. }
  2572. [Fact]
  2573. public void Sample_Sampler_Simple2()
  2574. {
  2575. var scheduler = new TestScheduler();
  2576. var xs = scheduler.CreateHotObservable(
  2577. OnNext(150, 1),
  2578. OnNext(220, 2),
  2579. OnNext(240, 3),
  2580. OnNext(290, 4),
  2581. OnNext(300, 5),
  2582. OnNext(310, 6),
  2583. OnNext(360, 7),
  2584. OnCompleted<int>(400)
  2585. );
  2586. var ys = scheduler.CreateHotObservable(
  2587. OnNext(150, ""),
  2588. OnNext(210, "bar"),
  2589. OnNext(250, "foo"),
  2590. OnNext(260, "qux"),
  2591. OnNext(320, "baz"),
  2592. OnCompleted<string>(500)
  2593. );
  2594. var res = scheduler.Start(() =>
  2595. xs.Sample(ys)
  2596. );
  2597. res.Messages.AssertEqual(
  2598. OnNext(250, 3),
  2599. OnNext(320, 6),
  2600. OnNext(500, 7),
  2601. OnCompleted<int>(500 /* on sampling boundaries only */)
  2602. );
  2603. xs.Subscriptions.AssertEqual(
  2604. Subscribe(200, 400)
  2605. );
  2606. ys.Subscriptions.AssertEqual(
  2607. Subscribe(200, 500)
  2608. );
  2609. }
  2610. [Fact]
  2611. public void Sample_Sampler_Simple3()
  2612. {
  2613. var scheduler = new TestScheduler();
  2614. var xs = scheduler.CreateHotObservable(
  2615. OnNext(150, 1),
  2616. OnNext(220, 2),
  2617. OnNext(240, 3),
  2618. OnNext(290, 4),
  2619. OnCompleted<int>(300)
  2620. );
  2621. var ys = scheduler.CreateHotObservable(
  2622. OnNext(150, ""),
  2623. OnNext(210, "bar"),
  2624. OnNext(250, "foo"),
  2625. OnNext(260, "qux"),
  2626. OnNext(320, "baz"),
  2627. OnCompleted<string>(500)
  2628. );
  2629. var res = scheduler.Start(() =>
  2630. xs.Sample(ys)
  2631. );
  2632. res.Messages.AssertEqual(
  2633. OnNext(250, 3),
  2634. OnNext(320, 4),
  2635. OnCompleted<int>(320 /* on sampling boundaries only */)
  2636. );
  2637. xs.Subscriptions.AssertEqual(
  2638. Subscribe(200, 300)
  2639. );
  2640. ys.Subscriptions.AssertEqual(
  2641. Subscribe(200, 320)
  2642. );
  2643. }
  2644. [Fact]
  2645. public void Sample_Sampler_SourceThrows()
  2646. {
  2647. var ex = new Exception();
  2648. var scheduler = new TestScheduler();
  2649. var xs = scheduler.CreateHotObservable(
  2650. OnNext(150, 1),
  2651. OnNext(220, 2),
  2652. OnNext(240, 3),
  2653. OnNext(290, 4),
  2654. OnNext(300, 5),
  2655. OnNext(310, 6),
  2656. OnError<int>(320, ex)
  2657. );
  2658. var ys = scheduler.CreateHotObservable(
  2659. OnNext(150, ""),
  2660. OnNext(210, "bar"),
  2661. OnNext(250, "foo"),
  2662. OnNext(260, "qux"),
  2663. OnNext(330, "baz"),
  2664. OnCompleted<string>(400)
  2665. );
  2666. var res = scheduler.Start(() =>
  2667. xs.Sample(ys)
  2668. );
  2669. res.Messages.AssertEqual(
  2670. OnNext(250, 3),
  2671. OnError<int>(320, ex)
  2672. );
  2673. xs.Subscriptions.AssertEqual(
  2674. Subscribe(200, 320)
  2675. );
  2676. ys.Subscriptions.AssertEqual(
  2677. Subscribe(200, 320)
  2678. );
  2679. }
  2680. #if !NO_PERF // BREAKING CHANGE v2 > v1.x - behavior when sampler throws
  2681. [Fact]
  2682. public void Sample_Sampler_SamplerThrows()
  2683. {
  2684. var ex = new Exception();
  2685. var scheduler = new TestScheduler();
  2686. var xs = scheduler.CreateHotObservable(
  2687. OnNext(150, 1),
  2688. OnNext(220, 2),
  2689. OnNext(240, 3),
  2690. OnNext(290, 4),
  2691. OnNext(300, 5),
  2692. OnNext(310, 6),
  2693. OnCompleted<int>(400)
  2694. );
  2695. var ys = scheduler.CreateHotObservable(
  2696. OnNext(150, ""),
  2697. OnNext(210, "bar"),
  2698. OnNext(250, "foo"),
  2699. OnNext(260, "qux"),
  2700. OnError<string>(320, ex)
  2701. );
  2702. var res = scheduler.Start(() =>
  2703. xs.Sample(ys)
  2704. );
  2705. res.Messages.AssertEqual(
  2706. OnNext(250, 3),
  2707. OnError<int>(320, ex)
  2708. );
  2709. xs.Subscriptions.AssertEqual(
  2710. Subscribe(200, 320)
  2711. );
  2712. ys.Subscriptions.AssertEqual(
  2713. Subscribe(200, 320)
  2714. );
  2715. }
  2716. #endif
  2717. #endregion
  2718. #region + Skip +
  2719. [Fact]
  2720. public void Skip_ArgumentChecking()
  2721. {
  2722. var xs = Observable.Return(42);
  2723. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Skip(default(IObservable<int>), TimeSpan.FromSeconds(1)));
  2724. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Skip(xs, TimeSpan.FromSeconds(-1)));
  2725. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Skip(default(IObservable<int>), TimeSpan.FromSeconds(1), Scheduler.Default));
  2726. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Skip(xs, TimeSpan.FromSeconds(1), default(IScheduler)));
  2727. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Skip(xs, TimeSpan.FromSeconds(-1), Scheduler.Default));
  2728. }
  2729. [Fact]
  2730. public void Skip_Zero()
  2731. {
  2732. var scheduler = new TestScheduler();
  2733. var xs = scheduler.CreateHotObservable<int>(
  2734. OnNext(210, 1),
  2735. OnNext(220, 2),
  2736. OnCompleted<int>(230)
  2737. );
  2738. var res = scheduler.Start(() =>
  2739. xs.Skip(TimeSpan.Zero, scheduler)
  2740. );
  2741. res.Messages.AssertEqual(
  2742. OnNext(210, 1),
  2743. OnNext(220, 2),
  2744. OnCompleted<int>(230)
  2745. );
  2746. xs.Subscriptions.AssertEqual(
  2747. Subscribe(200, 230)
  2748. );
  2749. }
  2750. [Fact]
  2751. public void Skip_Some()
  2752. {
  2753. var scheduler = new TestScheduler();
  2754. var xs = scheduler.CreateHotObservable<int>(
  2755. OnNext(210, 1),
  2756. OnNext(220, 2),
  2757. OnCompleted<int>(230)
  2758. );
  2759. var res = scheduler.Start(() =>
  2760. xs.Skip(TimeSpan.FromTicks(15), scheduler)
  2761. );
  2762. res.Messages.AssertEqual(
  2763. OnNext(220, 2),
  2764. OnCompleted<int>(230)
  2765. );
  2766. xs.Subscriptions.AssertEqual(
  2767. Subscribe(200, 230)
  2768. );
  2769. }
  2770. [Fact]
  2771. public void Skip_Late()
  2772. {
  2773. var scheduler = new TestScheduler();
  2774. var xs = scheduler.CreateHotObservable<int>(
  2775. OnNext(210, 1),
  2776. OnNext(220, 2),
  2777. OnCompleted<int>(230)
  2778. );
  2779. var res = scheduler.Start(() =>
  2780. xs.Skip(TimeSpan.FromTicks(50), scheduler)
  2781. );
  2782. res.Messages.AssertEqual(
  2783. OnCompleted<int>(230)
  2784. );
  2785. xs.Subscriptions.AssertEqual(
  2786. Subscribe(200, 230)
  2787. );
  2788. }
  2789. [Fact]
  2790. public void Skip_Error()
  2791. {
  2792. var scheduler = new TestScheduler();
  2793. var ex = new Exception();
  2794. var xs = scheduler.CreateHotObservable<int>(
  2795. OnError<int>(210, ex)
  2796. );
  2797. var res = scheduler.Start(() =>
  2798. xs.Skip(TimeSpan.FromTicks(50), scheduler)
  2799. );
  2800. res.Messages.AssertEqual(
  2801. OnError<int>(210, ex)
  2802. );
  2803. xs.Subscriptions.AssertEqual(
  2804. Subscribe(200, 210)
  2805. );
  2806. }
  2807. [Fact]
  2808. public void Skip_Never()
  2809. {
  2810. var scheduler = new TestScheduler();
  2811. var ex = new Exception();
  2812. var xs = scheduler.CreateHotObservable<int>(
  2813. );
  2814. var res = scheduler.Start(() =>
  2815. xs.Skip(TimeSpan.FromTicks(50), scheduler)
  2816. );
  2817. res.Messages.AssertEqual(
  2818. );
  2819. xs.Subscriptions.AssertEqual(
  2820. Subscribe(200, 1000)
  2821. );
  2822. }
  2823. [Fact]
  2824. public void Skip_Twice1()
  2825. {
  2826. var scheduler = new TestScheduler();
  2827. var ex = new Exception();
  2828. var xs = scheduler.CreateHotObservable<int>(
  2829. OnNext(210, 1),
  2830. OnNext(220, 2),
  2831. OnNext(230, 3),
  2832. OnNext(240, 4),
  2833. OnNext(250, 5),
  2834. OnNext(260, 6),
  2835. OnCompleted<int>(270)
  2836. );
  2837. var res = scheduler.Start(() =>
  2838. xs.Skip(TimeSpan.FromTicks(15), scheduler).Skip(TimeSpan.FromTicks(30), scheduler)
  2839. );
  2840. res.Messages.AssertEqual(
  2841. OnNext(240, 4),
  2842. OnNext(250, 5),
  2843. OnNext(260, 6),
  2844. OnCompleted<int>(270)
  2845. );
  2846. xs.Subscriptions.AssertEqual(
  2847. Subscribe(200, 270)
  2848. );
  2849. }
  2850. [Fact]
  2851. public void Skip_Twice2()
  2852. {
  2853. var scheduler = new TestScheduler();
  2854. var ex = new Exception();
  2855. var xs = scheduler.CreateHotObservable<int>(
  2856. OnNext(210, 1),
  2857. OnNext(220, 2),
  2858. OnNext(230, 3),
  2859. OnNext(240, 4),
  2860. OnNext(250, 5),
  2861. OnNext(260, 6),
  2862. OnCompleted<int>(270)
  2863. );
  2864. var res = scheduler.Start(() =>
  2865. xs.Skip(TimeSpan.FromTicks(30), scheduler).Skip(TimeSpan.FromTicks(15), scheduler)
  2866. );
  2867. res.Messages.AssertEqual(
  2868. OnNext(240, 4),
  2869. OnNext(250, 5),
  2870. OnNext(260, 6),
  2871. OnCompleted<int>(270)
  2872. );
  2873. xs.Subscriptions.AssertEqual(
  2874. Subscribe(200, 270)
  2875. );
  2876. }
  2877. [Fact]
  2878. public void Skip_Default()
  2879. {
  2880. var xs = Observable.Range(0, 10, Scheduler.Default);
  2881. var res = xs.Skip(TimeSpan.FromSeconds(60));
  2882. var e = new ManualResetEvent(false);
  2883. var lst = new List<int>();
  2884. res.Subscribe(
  2885. lst.Add,
  2886. () => e.Set()
  2887. );
  2888. e.WaitOne();
  2889. Assert.True(lst.Count == 0);
  2890. }
  2891. #endregion
  2892. #region + SkipLast +
  2893. [Fact]
  2894. public void SkipLast_ArgumentChecking()
  2895. {
  2896. var xs = Observable.Return(42);
  2897. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SkipLast(default(IObservable<int>), TimeSpan.FromSeconds(1)));
  2898. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.SkipLast(xs, TimeSpan.FromSeconds(-1)));
  2899. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SkipLast(default(IObservable<int>), TimeSpan.FromSeconds(1), Scheduler.Default));
  2900. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SkipLast(xs, TimeSpan.FromSeconds(1), default(IScheduler)));
  2901. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.SkipLast(xs, TimeSpan.FromSeconds(-1), Scheduler.Default));
  2902. }
  2903. [Fact]
  2904. public void SkipLast_Zero1()
  2905. {
  2906. var scheduler = new TestScheduler();
  2907. var xs = scheduler.CreateHotObservable<int>(
  2908. OnNext(210, 1),
  2909. OnNext(220, 2),
  2910. OnCompleted<int>(230)
  2911. );
  2912. var res = scheduler.Start(() =>
  2913. xs.SkipLast(TimeSpan.Zero, scheduler)
  2914. );
  2915. res.Messages.AssertEqual(
  2916. OnNext(210, 1),
  2917. OnNext(220, 2),
  2918. OnCompleted<int>(230)
  2919. );
  2920. xs.Subscriptions.AssertEqual(
  2921. Subscribe(200, 230)
  2922. );
  2923. }
  2924. [Fact]
  2925. public void SkipLast_Zero2()
  2926. {
  2927. var scheduler = new TestScheduler();
  2928. var xs = scheduler.CreateHotObservable<int>(
  2929. OnNext(210, 1),
  2930. OnNext(220, 2),
  2931. OnNext(230, 3),
  2932. OnCompleted<int>(230)
  2933. );
  2934. var res = scheduler.Start(() =>
  2935. xs.SkipLast(TimeSpan.Zero, scheduler)
  2936. );
  2937. res.Messages.AssertEqual(
  2938. OnNext(210, 1),
  2939. OnNext(220, 2),
  2940. OnNext(230, 3),
  2941. OnCompleted<int>(230)
  2942. );
  2943. xs.Subscriptions.AssertEqual(
  2944. Subscribe(200, 230)
  2945. );
  2946. }
  2947. [Fact]
  2948. public void SkipLast_Some1()
  2949. {
  2950. var scheduler = new TestScheduler();
  2951. var xs = scheduler.CreateHotObservable<int>(
  2952. OnNext(210, 1),
  2953. OnNext(220, 2),
  2954. OnCompleted<int>(230)
  2955. );
  2956. var res = scheduler.Start(() =>
  2957. xs.SkipLast(TimeSpan.FromTicks(15), scheduler)
  2958. );
  2959. res.Messages.AssertEqual(
  2960. OnNext(230, 1),
  2961. OnCompleted<int>(230)
  2962. );
  2963. xs.Subscriptions.AssertEqual(
  2964. Subscribe(200, 230)
  2965. );
  2966. }
  2967. [Fact]
  2968. public void SkipLast_Some2()
  2969. {
  2970. var scheduler = new TestScheduler();
  2971. var xs = scheduler.CreateHotObservable<int>(
  2972. OnNext(210, 1),
  2973. OnNext(220, 2),
  2974. OnNext(230, 3),
  2975. OnNext(240, 4),
  2976. OnNext(250, 5),
  2977. OnNext(260, 6),
  2978. OnNext(270, 7),
  2979. OnNext(280, 8),
  2980. OnNext(290, 9),
  2981. OnCompleted<int>(300)
  2982. );
  2983. var res = scheduler.Start(() =>
  2984. xs.SkipLast(TimeSpan.FromTicks(45), scheduler)
  2985. );
  2986. res.Messages.AssertEqual(
  2987. OnNext(260, 1),
  2988. OnNext(270, 2),
  2989. OnNext(280, 3),
  2990. OnNext(290, 4),
  2991. OnNext(300, 5),
  2992. OnCompleted<int>(300)
  2993. );
  2994. xs.Subscriptions.AssertEqual(
  2995. Subscribe(200, 300)
  2996. );
  2997. }
  2998. [Fact]
  2999. public void SkipLast_All()
  3000. {
  3001. var scheduler = new TestScheduler();
  3002. var xs = scheduler.CreateHotObservable<int>(
  3003. OnNext(210, 1),
  3004. OnNext(220, 2),
  3005. OnCompleted<int>(230)
  3006. );
  3007. var res = scheduler.Start(() =>
  3008. xs.SkipLast(TimeSpan.FromTicks(50), scheduler)
  3009. );
  3010. res.Messages.AssertEqual(
  3011. OnCompleted<int>(230)
  3012. );
  3013. xs.Subscriptions.AssertEqual(
  3014. Subscribe(200, 230)
  3015. );
  3016. }
  3017. [Fact]
  3018. public void SkipLast_Error()
  3019. {
  3020. var scheduler = new TestScheduler();
  3021. var ex = new Exception();
  3022. var xs = scheduler.CreateHotObservable<int>(
  3023. OnError<int>(210, ex)
  3024. );
  3025. var res = scheduler.Start(() =>
  3026. xs.SkipLast(TimeSpan.FromTicks(50), scheduler)
  3027. );
  3028. res.Messages.AssertEqual(
  3029. OnError<int>(210, ex)
  3030. );
  3031. xs.Subscriptions.AssertEqual(
  3032. Subscribe(200, 210)
  3033. );
  3034. }
  3035. [Fact]
  3036. public void SkipLast_Never()
  3037. {
  3038. var scheduler = new TestScheduler();
  3039. var ex = new Exception();
  3040. var xs = scheduler.CreateHotObservable<int>(
  3041. );
  3042. var res = scheduler.Start(() =>
  3043. xs.SkipLast(TimeSpan.FromTicks(50), scheduler)
  3044. );
  3045. res.Messages.AssertEqual(
  3046. );
  3047. xs.Subscriptions.AssertEqual(
  3048. Subscribe(200, 1000)
  3049. );
  3050. }
  3051. [Fact]
  3052. public void SkipLast_Default1()
  3053. {
  3054. var xs = Observable.Range(0, 10, Scheduler.Default);
  3055. var res = xs.SkipLast(TimeSpan.FromSeconds(60));
  3056. var e = new ManualResetEvent(false);
  3057. var lst = new List<int>();
  3058. res.Subscribe(
  3059. lst.Add,
  3060. () => e.Set()
  3061. );
  3062. e.WaitOne();
  3063. Assert.True(lst.Count == 0);
  3064. }
  3065. [Fact]
  3066. public void SkipLast_Default2()
  3067. {
  3068. var xs = Observable.Range(0, 10, Scheduler.Default);
  3069. var res = xs.SkipLast(TimeSpan.FromSeconds(60), Scheduler.Default.DisableOptimizations());
  3070. var e = new ManualResetEvent(false);
  3071. var lst = new List<int>();
  3072. res.Subscribe(
  3073. lst.Add,
  3074. () => e.Set()
  3075. );
  3076. e.WaitOne();
  3077. Assert.True(lst.Count == 0);
  3078. }
  3079. [Fact]
  3080. public void SkipLast_Default3()
  3081. {
  3082. var xs = Observable.Range(0, 10, Scheduler.Default);
  3083. var res = xs.SkipLast(TimeSpan.Zero);
  3084. var e = new ManualResetEvent(false);
  3085. var lst = new List<int>();
  3086. res.Subscribe(
  3087. lst.Add,
  3088. () => e.Set()
  3089. );
  3090. e.WaitOne();
  3091. Assert.True(lst.SequenceEqual(Enumerable.Range(0, 10)));
  3092. }
  3093. [Fact]
  3094. public void SkipLast_Default4()
  3095. {
  3096. var xs = Observable.Range(0, 10, Scheduler.Default);
  3097. var res = xs.SkipLast(TimeSpan.Zero, Scheduler.Default.DisableOptimizations());
  3098. var e = new ManualResetEvent(false);
  3099. var lst = new List<int>();
  3100. res.Subscribe(
  3101. lst.Add,
  3102. () => e.Set()
  3103. );
  3104. e.WaitOne();
  3105. Assert.True(lst.SequenceEqual(Enumerable.Range(0, 10)));
  3106. }
  3107. #endregion
  3108. #region + SkipUntil +
  3109. [Fact]
  3110. public void SkipUntil_ArgumentChecking()
  3111. {
  3112. var xs = Observable.Return(42);
  3113. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SkipUntil(default(IObservable<int>), DateTimeOffset.Now));
  3114. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SkipUntil(default(IObservable<int>), DateTimeOffset.Now, Scheduler.Default));
  3115. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SkipUntil(xs, DateTimeOffset.Now, default(IScheduler)));
  3116. }
  3117. [Fact]
  3118. public void SkipUntil_Zero()
  3119. {
  3120. var scheduler = new TestScheduler();
  3121. var xs = scheduler.CreateHotObservable<int>(
  3122. OnNext(210, 1),
  3123. OnNext(220, 2),
  3124. OnCompleted<int>(230)
  3125. );
  3126. var res = scheduler.Start(() =>
  3127. xs.SkipUntil(new DateTimeOffset(), scheduler)
  3128. );
  3129. res.Messages.AssertEqual(
  3130. OnNext(210, 1),
  3131. OnNext(220, 2),
  3132. OnCompleted<int>(230)
  3133. );
  3134. xs.Subscriptions.AssertEqual(
  3135. Subscribe(200, 230)
  3136. );
  3137. }
  3138. [Fact]
  3139. public void SkipUntil_Some()
  3140. {
  3141. var scheduler = new TestScheduler();
  3142. var xs = scheduler.CreateHotObservable<int>(
  3143. OnNext(210, 1),
  3144. OnNext(220, 2),
  3145. OnCompleted<int>(230)
  3146. );
  3147. var res = scheduler.Start(() =>
  3148. xs.SkipUntil(new DateTimeOffset(215, TimeSpan.Zero), scheduler)
  3149. );
  3150. res.Messages.AssertEqual(
  3151. OnNext(220, 2),
  3152. OnCompleted<int>(230)
  3153. );
  3154. xs.Subscriptions.AssertEqual(
  3155. Subscribe(200, 230)
  3156. );
  3157. }
  3158. [Fact]
  3159. public void SkipUntil_Late()
  3160. {
  3161. var scheduler = new TestScheduler();
  3162. var xs = scheduler.CreateHotObservable<int>(
  3163. OnNext(210, 1),
  3164. OnNext(220, 2),
  3165. OnCompleted<int>(230)
  3166. );
  3167. var res = scheduler.Start(() =>
  3168. xs.SkipUntil(new DateTimeOffset(250, TimeSpan.Zero), scheduler)
  3169. );
  3170. res.Messages.AssertEqual(
  3171. OnCompleted<int>(230)
  3172. );
  3173. xs.Subscriptions.AssertEqual(
  3174. Subscribe(200, 230)
  3175. );
  3176. }
  3177. [Fact]
  3178. public void SkipUntil_Error()
  3179. {
  3180. var scheduler = new TestScheduler();
  3181. var ex = new Exception();
  3182. var xs = scheduler.CreateHotObservable<int>(
  3183. OnError<int>(210, ex)
  3184. );
  3185. var res = scheduler.Start(() =>
  3186. xs.SkipUntil(new DateTimeOffset(250, TimeSpan.Zero), scheduler)
  3187. );
  3188. res.Messages.AssertEqual(
  3189. OnError<int>(210, ex)
  3190. );
  3191. xs.Subscriptions.AssertEqual(
  3192. Subscribe(200, 210)
  3193. );
  3194. }
  3195. [Fact]
  3196. public void SkipUntil_Never()
  3197. {
  3198. var scheduler = new TestScheduler();
  3199. var ex = new Exception();
  3200. var xs = scheduler.CreateHotObservable<int>(
  3201. );
  3202. var res = scheduler.Start(() =>
  3203. xs.SkipUntil(new DateTimeOffset(250, TimeSpan.Zero), scheduler)
  3204. );
  3205. res.Messages.AssertEqual(
  3206. );
  3207. xs.Subscriptions.AssertEqual(
  3208. Subscribe(200, 1000)
  3209. );
  3210. }
  3211. [Fact]
  3212. public void SkipUntil_Twice1()
  3213. {
  3214. var scheduler = new TestScheduler();
  3215. var ex = new Exception();
  3216. var xs = scheduler.CreateHotObservable<int>(
  3217. OnNext(210, 1),
  3218. OnNext(220, 2),
  3219. OnNext(230, 3),
  3220. OnNext(240, 4),
  3221. OnNext(250, 5),
  3222. OnNext(260, 6),
  3223. OnCompleted<int>(270)
  3224. );
  3225. var res = scheduler.Start(() =>
  3226. xs.SkipUntil(new DateTimeOffset(215, TimeSpan.Zero), scheduler).SkipUntil(new DateTimeOffset(230, TimeSpan.Zero), scheduler)
  3227. );
  3228. res.Messages.AssertEqual(
  3229. OnNext(240, 4),
  3230. OnNext(250, 5),
  3231. OnNext(260, 6),
  3232. OnCompleted<int>(270)
  3233. );
  3234. xs.Subscriptions.AssertEqual(
  3235. Subscribe(200, 270)
  3236. );
  3237. }
  3238. [Fact]
  3239. public void SkipUntil_Twice2()
  3240. {
  3241. var scheduler = new TestScheduler();
  3242. var ex = new Exception();
  3243. var xs = scheduler.CreateHotObservable<int>(
  3244. OnNext(210, 1),
  3245. OnNext(220, 2),
  3246. OnNext(230, 3),
  3247. OnNext(240, 4),
  3248. OnNext(250, 5),
  3249. OnNext(260, 6),
  3250. OnCompleted<int>(270)
  3251. );
  3252. var res = scheduler.Start(() =>
  3253. xs.SkipUntil(new DateTimeOffset(230, TimeSpan.Zero), scheduler).SkipUntil(new DateTimeOffset(215, TimeSpan.Zero), scheduler)
  3254. );
  3255. res.Messages.AssertEqual(
  3256. OnNext(240, 4),
  3257. OnNext(250, 5),
  3258. OnNext(260, 6),
  3259. OnCompleted<int>(270)
  3260. );
  3261. xs.Subscriptions.AssertEqual(
  3262. Subscribe(200, 270)
  3263. );
  3264. }
  3265. [Fact]
  3266. public void SkipUntil_Default()
  3267. {
  3268. var xs = Observable.Range(0, 10, Scheduler.Default);
  3269. var res = xs.SkipUntil(DateTimeOffset.UtcNow.AddMinutes(1));
  3270. var e = new ManualResetEvent(false);
  3271. var lst = new List<int>();
  3272. res.Subscribe(
  3273. lst.Add,
  3274. () => e.Set()
  3275. );
  3276. e.WaitOne();
  3277. Assert.True(lst.Count == 0);
  3278. }
  3279. #endregion
  3280. #region + Take +
  3281. [Fact]
  3282. public void Take_ArgumentChecking()
  3283. {
  3284. var xs = Observable.Return(42);
  3285. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Take(default(IObservable<int>), TimeSpan.FromSeconds(1)));
  3286. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Take(xs, TimeSpan.FromSeconds(-1)));
  3287. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Take(default(IObservable<int>), TimeSpan.FromSeconds(1), Scheduler.Default));
  3288. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Take(xs, TimeSpan.FromSeconds(1), default(IScheduler)));
  3289. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Take(xs, TimeSpan.FromSeconds(-1), Scheduler.Default));
  3290. }
  3291. [Fact]
  3292. public void Take_Zero()
  3293. {
  3294. var scheduler = new TestScheduler();
  3295. var xs = scheduler.CreateHotObservable<int>(
  3296. OnNext(210, 1),
  3297. OnNext(220, 2),
  3298. OnCompleted<int>(230)
  3299. );
  3300. var res = scheduler.Start(() =>
  3301. xs.Take(TimeSpan.Zero, scheduler)
  3302. );
  3303. res.Messages.AssertEqual(
  3304. OnCompleted<int>(201)
  3305. );
  3306. xs.Subscriptions.AssertEqual(
  3307. Subscribe(200, 201)
  3308. );
  3309. }
  3310. [Fact]
  3311. public void Take_Some()
  3312. {
  3313. var scheduler = new TestScheduler();
  3314. var xs = scheduler.CreateHotObservable<int>(
  3315. OnNext(210, 1),
  3316. OnNext(220, 2),
  3317. OnNext(230, 3),
  3318. OnCompleted<int>(240)
  3319. );
  3320. var res = scheduler.Start(() =>
  3321. xs.Take(TimeSpan.FromTicks(25), scheduler)
  3322. );
  3323. res.Messages.AssertEqual(
  3324. OnNext(210, 1),
  3325. OnNext(220, 2),
  3326. OnCompleted<int>(225)
  3327. );
  3328. xs.Subscriptions.AssertEqual(
  3329. Subscribe(200, 225)
  3330. );
  3331. }
  3332. [Fact]
  3333. public void Take_Late()
  3334. {
  3335. var scheduler = new TestScheduler();
  3336. var xs = scheduler.CreateHotObservable<int>(
  3337. OnNext(210, 1),
  3338. OnNext(220, 2),
  3339. OnCompleted<int>(230)
  3340. );
  3341. var res = scheduler.Start(() =>
  3342. xs.Take(TimeSpan.FromTicks(50), scheduler)
  3343. );
  3344. res.Messages.AssertEqual(
  3345. OnNext(210, 1),
  3346. OnNext(220, 2),
  3347. OnCompleted<int>(230)
  3348. );
  3349. xs.Subscriptions.AssertEqual(
  3350. Subscribe(200, 230)
  3351. );
  3352. }
  3353. [Fact]
  3354. public void Take_Error()
  3355. {
  3356. var scheduler = new TestScheduler();
  3357. var ex = new Exception();
  3358. var xs = scheduler.CreateHotObservable<int>(
  3359. OnError<int>(210, ex)
  3360. );
  3361. var res = scheduler.Start(() =>
  3362. xs.Take(TimeSpan.FromTicks(50), scheduler)
  3363. );
  3364. res.Messages.AssertEqual(
  3365. OnError<int>(210, ex)
  3366. );
  3367. xs.Subscriptions.AssertEqual(
  3368. Subscribe(200, 210)
  3369. );
  3370. }
  3371. [Fact]
  3372. public void Take_Never()
  3373. {
  3374. var scheduler = new TestScheduler();
  3375. var ex = new Exception();
  3376. var xs = scheduler.CreateHotObservable<int>(
  3377. );
  3378. var res = scheduler.Start(() =>
  3379. xs.Take(TimeSpan.FromTicks(50), scheduler)
  3380. );
  3381. res.Messages.AssertEqual(
  3382. OnCompleted<int>(250)
  3383. );
  3384. xs.Subscriptions.AssertEqual(
  3385. Subscribe(200, 250)
  3386. );
  3387. }
  3388. [Fact]
  3389. public void Take_Twice1()
  3390. {
  3391. var scheduler = new TestScheduler();
  3392. var ex = new Exception();
  3393. var xs = scheduler.CreateHotObservable<int>(
  3394. OnNext(210, 1),
  3395. OnNext(220, 2),
  3396. OnNext(230, 3),
  3397. OnNext(240, 4),
  3398. OnNext(250, 5),
  3399. OnNext(260, 6),
  3400. OnCompleted<int>(270)
  3401. );
  3402. var res = scheduler.Start(() =>
  3403. xs.Take(TimeSpan.FromTicks(55), scheduler).Take(TimeSpan.FromTicks(35), scheduler)
  3404. );
  3405. res.Messages.AssertEqual(
  3406. OnNext(210, 1),
  3407. OnNext(220, 2),
  3408. OnNext(230, 3),
  3409. OnCompleted<int>(235)
  3410. );
  3411. xs.Subscriptions.AssertEqual(
  3412. Subscribe(200, 235)
  3413. );
  3414. }
  3415. [Fact]
  3416. public void Take_Twice2()
  3417. {
  3418. var scheduler = new TestScheduler();
  3419. var ex = new Exception();
  3420. var xs = scheduler.CreateHotObservable<int>(
  3421. OnNext(210, 1),
  3422. OnNext(220, 2),
  3423. OnNext(230, 3),
  3424. OnNext(240, 4),
  3425. OnNext(250, 5),
  3426. OnNext(260, 6),
  3427. OnCompleted<int>(270)
  3428. );
  3429. var res = scheduler.Start(() =>
  3430. xs.Take(TimeSpan.FromTicks(35), scheduler).Take(TimeSpan.FromTicks(55), scheduler)
  3431. );
  3432. res.Messages.AssertEqual(
  3433. OnNext(210, 1),
  3434. OnNext(220, 2),
  3435. OnNext(230, 3),
  3436. OnCompleted<int>(235)
  3437. );
  3438. xs.Subscriptions.AssertEqual(
  3439. Subscribe(200, 235)
  3440. );
  3441. }
  3442. [Fact]
  3443. public void Take_Default()
  3444. {
  3445. var xs = Observable.Range(0, 10, Scheduler.Default);
  3446. var res = xs.Take(TimeSpan.FromSeconds(60));
  3447. var e = new ManualResetEvent(false);
  3448. var lst = new List<int>();
  3449. res.Subscribe(
  3450. lst.Add,
  3451. () => e.Set()
  3452. );
  3453. e.WaitOne();
  3454. Assert.True(lst.SequenceEqual(Enumerable.Range(0, 10)));
  3455. }
  3456. #endregion
  3457. #region + TakeLast +
  3458. [Fact]
  3459. public void TakeLast_ArgumentChecking()
  3460. {
  3461. var xs = Observable.Return(42);
  3462. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.TakeLast(default(IObservable<int>), TimeSpan.FromSeconds(1)));
  3463. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.TakeLast(xs, TimeSpan.FromSeconds(-1)));
  3464. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.TakeLast(default(IObservable<int>), TimeSpan.FromSeconds(1), Scheduler.Default));
  3465. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.TakeLast(xs, TimeSpan.FromSeconds(1), default(IScheduler)));
  3466. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.TakeLast(xs, TimeSpan.FromSeconds(-1), Scheduler.Default));
  3467. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.TakeLast(default(IObservable<int>), TimeSpan.FromSeconds(1), Scheduler.Default, Scheduler.Default));
  3468. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.TakeLast(xs, TimeSpan.FromSeconds(1), default(IScheduler), Scheduler.Default));
  3469. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.TakeLast(xs, TimeSpan.FromSeconds(1), Scheduler.Default, default(IScheduler)));
  3470. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.TakeLast(xs, TimeSpan.FromSeconds(-1), Scheduler.Default, Scheduler.Default));
  3471. }
  3472. [Fact]
  3473. public void TakeLast_Zero1()
  3474. {
  3475. var scheduler = new TestScheduler();
  3476. var xs = scheduler.CreateHotObservable<int>(
  3477. OnNext(210, 1),
  3478. OnNext(220, 2),
  3479. OnCompleted<int>(230)
  3480. );
  3481. var res = scheduler.Start(() =>
  3482. xs.TakeLast(TimeSpan.Zero, scheduler)
  3483. );
  3484. res.Messages.AssertEqual(
  3485. OnCompleted<int>(230)
  3486. );
  3487. xs.Subscriptions.AssertEqual(
  3488. Subscribe(200, 230)
  3489. );
  3490. }
  3491. [Fact]
  3492. public void TakeLast_Zero1_WithLoopScheduler()
  3493. {
  3494. var scheduler = new TestScheduler();
  3495. var xs = scheduler.CreateHotObservable<int>(
  3496. OnNext(210, 1),
  3497. OnNext(220, 2),
  3498. OnCompleted<int>(230)
  3499. );
  3500. var res = scheduler.Start(() =>
  3501. xs.TakeLast(TimeSpan.Zero, scheduler, scheduler)
  3502. );
  3503. res.Messages.AssertEqual(
  3504. OnCompleted<int>(231)
  3505. );
  3506. xs.Subscriptions.AssertEqual(
  3507. Subscribe(200, 230)
  3508. );
  3509. }
  3510. [Fact]
  3511. public void TakeLast_Zero2()
  3512. {
  3513. var scheduler = new TestScheduler();
  3514. var xs = scheduler.CreateHotObservable<int>(
  3515. OnNext(210, 1),
  3516. OnNext(220, 2),
  3517. OnNext(230, 3),
  3518. OnCompleted<int>(230)
  3519. );
  3520. var res = scheduler.Start(() =>
  3521. xs.TakeLast(TimeSpan.Zero, scheduler)
  3522. );
  3523. res.Messages.AssertEqual(
  3524. OnCompleted<int>(230)
  3525. );
  3526. xs.Subscriptions.AssertEqual(
  3527. Subscribe(200, 230)
  3528. );
  3529. }
  3530. [Fact]
  3531. public void TakeLast_Zero2_WithLoopScheduler()
  3532. {
  3533. var scheduler = new TestScheduler();
  3534. var xs = scheduler.CreateHotObservable<int>(
  3535. OnNext(210, 1),
  3536. OnNext(220, 2),
  3537. OnNext(230, 3),
  3538. OnCompleted<int>(230)
  3539. );
  3540. var res = scheduler.Start(() =>
  3541. xs.TakeLast(TimeSpan.Zero, scheduler, scheduler)
  3542. );
  3543. res.Messages.AssertEqual(
  3544. OnCompleted<int>(231)
  3545. );
  3546. xs.Subscriptions.AssertEqual(
  3547. Subscribe(200, 230)
  3548. );
  3549. }
  3550. [Fact]
  3551. public void TakeLast_Some1()
  3552. {
  3553. var scheduler = new TestScheduler();
  3554. var xs = scheduler.CreateHotObservable<int>(
  3555. OnNext(210, 1),
  3556. OnNext(220, 2),
  3557. OnNext(230, 3),
  3558. OnCompleted<int>(240)
  3559. );
  3560. var res = scheduler.Start(() =>
  3561. xs.TakeLast(TimeSpan.FromTicks(25), scheduler)
  3562. );
  3563. res.Messages.AssertEqual(
  3564. OnNext(240, 2),
  3565. OnNext(240, 3),
  3566. OnCompleted<int>(240)
  3567. );
  3568. xs.Subscriptions.AssertEqual(
  3569. Subscribe(200, 240)
  3570. );
  3571. }
  3572. [Fact]
  3573. public void TakeLast_Some1_WithLoopScheduler()
  3574. {
  3575. var scheduler = new TestScheduler();
  3576. var xs = scheduler.CreateHotObservable<int>(
  3577. OnNext(210, 1),
  3578. OnNext(220, 2),
  3579. OnNext(230, 3),
  3580. OnCompleted<int>(240)
  3581. );
  3582. var res = scheduler.Start(() =>
  3583. xs.TakeLast(TimeSpan.FromTicks(25), scheduler, scheduler)
  3584. );
  3585. res.Messages.AssertEqual(
  3586. OnNext(241, 2),
  3587. OnNext(242, 3),
  3588. OnCompleted<int>(243)
  3589. );
  3590. xs.Subscriptions.AssertEqual(
  3591. Subscribe(200, 240)
  3592. );
  3593. }
  3594. [Fact]
  3595. public void TakeLast_Some2()
  3596. {
  3597. var scheduler = new TestScheduler();
  3598. var xs = scheduler.CreateHotObservable<int>(
  3599. OnNext(210, 1),
  3600. OnNext(220, 2),
  3601. OnNext(230, 3),
  3602. OnCompleted<int>(300)
  3603. );
  3604. var res = scheduler.Start(() =>
  3605. xs.TakeLast(TimeSpan.FromTicks(25), scheduler)
  3606. );
  3607. res.Messages.AssertEqual(
  3608. OnCompleted<int>(300)
  3609. );
  3610. xs.Subscriptions.AssertEqual(
  3611. Subscribe(200, 300)
  3612. );
  3613. }
  3614. [Fact]
  3615. public void TakeLast_Some2_WithLoopScheduler()
  3616. {
  3617. var scheduler = new TestScheduler();
  3618. var xs = scheduler.CreateHotObservable<int>(
  3619. OnNext(210, 1),
  3620. OnNext(220, 2),
  3621. OnNext(230, 3),
  3622. OnCompleted<int>(300)
  3623. );
  3624. var res = scheduler.Start(() =>
  3625. xs.TakeLast(TimeSpan.FromTicks(25), scheduler, scheduler)
  3626. );
  3627. res.Messages.AssertEqual(
  3628. OnCompleted<int>(301)
  3629. );
  3630. xs.Subscriptions.AssertEqual(
  3631. Subscribe(200, 300)
  3632. );
  3633. }
  3634. [Fact]
  3635. public void TakeLast_Some3()
  3636. {
  3637. var scheduler = new TestScheduler();
  3638. var xs = scheduler.CreateHotObservable<int>(
  3639. OnNext(210, 1),
  3640. OnNext(220, 2),
  3641. OnNext(230, 3),
  3642. OnNext(240, 4),
  3643. OnNext(250, 5),
  3644. OnNext(260, 6),
  3645. OnNext(270, 7),
  3646. OnNext(280, 8),
  3647. OnNext(290, 9),
  3648. OnCompleted<int>(300)
  3649. );
  3650. var res = scheduler.Start(() =>
  3651. xs.TakeLast(TimeSpan.FromTicks(45), scheduler)
  3652. );
  3653. res.Messages.AssertEqual(
  3654. OnNext(300, 6),
  3655. OnNext(300, 7),
  3656. OnNext(300, 8),
  3657. OnNext(300, 9),
  3658. OnCompleted<int>(300)
  3659. );
  3660. xs.Subscriptions.AssertEqual(
  3661. Subscribe(200, 300)
  3662. );
  3663. }
  3664. [Fact]
  3665. public void TakeLast_Some3_WithLoopScheduler()
  3666. {
  3667. var scheduler = new TestScheduler();
  3668. var xs = scheduler.CreateHotObservable<int>(
  3669. OnNext(210, 1),
  3670. OnNext(220, 2),
  3671. OnNext(230, 3),
  3672. OnNext(240, 4),
  3673. OnNext(250, 5),
  3674. OnNext(260, 6),
  3675. OnNext(270, 7),
  3676. OnNext(280, 8),
  3677. OnNext(290, 9),
  3678. OnCompleted<int>(300)
  3679. );
  3680. var res = scheduler.Start(() =>
  3681. xs.TakeLast(TimeSpan.FromTicks(45), scheduler, scheduler)
  3682. );
  3683. res.Messages.AssertEqual(
  3684. OnNext(301, 6),
  3685. OnNext(302, 7),
  3686. OnNext(303, 8),
  3687. OnNext(304, 9),
  3688. OnCompleted<int>(305)
  3689. );
  3690. xs.Subscriptions.AssertEqual(
  3691. Subscribe(200, 300)
  3692. );
  3693. }
  3694. [Fact]
  3695. public void TakeLast_Some4()
  3696. {
  3697. var scheduler = new TestScheduler();
  3698. var xs = scheduler.CreateHotObservable<int>(
  3699. OnNext(210, 1),
  3700. OnNext(240, 2),
  3701. OnNext(250, 3),
  3702. OnNext(280, 4),
  3703. OnNext(290, 5),
  3704. OnNext(300, 6),
  3705. OnCompleted<int>(350)
  3706. );
  3707. var res = scheduler.Start(() =>
  3708. xs.TakeLast(TimeSpan.FromTicks(25), scheduler)
  3709. );
  3710. res.Messages.AssertEqual(
  3711. OnCompleted<int>(350)
  3712. );
  3713. xs.Subscriptions.AssertEqual(
  3714. Subscribe(200, 350)
  3715. );
  3716. }
  3717. [Fact]
  3718. public void TakeLast_Some4_WithLoopScheduler()
  3719. {
  3720. var scheduler = new TestScheduler();
  3721. var xs = scheduler.CreateHotObservable<int>(
  3722. OnNext(210, 1),
  3723. OnNext(240, 2),
  3724. OnNext(250, 3),
  3725. OnNext(280, 4),
  3726. OnNext(290, 5),
  3727. OnNext(300, 6),
  3728. OnCompleted<int>(350)
  3729. );
  3730. var res = scheduler.Start(() =>
  3731. xs.TakeLast(TimeSpan.FromTicks(25), scheduler, scheduler)
  3732. );
  3733. res.Messages.AssertEqual(
  3734. OnCompleted<int>(351)
  3735. );
  3736. xs.Subscriptions.AssertEqual(
  3737. Subscribe(200, 350)
  3738. );
  3739. }
  3740. [Fact]
  3741. public void TakeLast_All()
  3742. {
  3743. var scheduler = new TestScheduler();
  3744. var xs = scheduler.CreateHotObservable<int>(
  3745. OnNext(210, 1),
  3746. OnNext(220, 2),
  3747. OnCompleted<int>(230)
  3748. );
  3749. var res = scheduler.Start(() =>
  3750. xs.TakeLast(TimeSpan.FromTicks(50), scheduler)
  3751. );
  3752. res.Messages.AssertEqual(
  3753. OnNext(230, 1),
  3754. OnNext(230, 2),
  3755. OnCompleted<int>(230)
  3756. );
  3757. xs.Subscriptions.AssertEqual(
  3758. Subscribe(200, 230)
  3759. );
  3760. }
  3761. [Fact]
  3762. public void TakeLast_All_WithLoopScheduler()
  3763. {
  3764. var scheduler = new TestScheduler();
  3765. var xs = scheduler.CreateHotObservable<int>(
  3766. OnNext(210, 1),
  3767. OnNext(220, 2),
  3768. OnCompleted<int>(230)
  3769. );
  3770. var res = scheduler.Start(() =>
  3771. xs.TakeLast(TimeSpan.FromTicks(50), scheduler, scheduler)
  3772. );
  3773. res.Messages.AssertEqual(
  3774. OnNext(231, 1),
  3775. OnNext(232, 2),
  3776. OnCompleted<int>(233)
  3777. );
  3778. xs.Subscriptions.AssertEqual(
  3779. Subscribe(200, 230)
  3780. );
  3781. }
  3782. [Fact]
  3783. public void TakeLast_Error()
  3784. {
  3785. var scheduler = new TestScheduler();
  3786. var ex = new Exception();
  3787. var xs = scheduler.CreateHotObservable<int>(
  3788. OnError<int>(210, ex)
  3789. );
  3790. var res = scheduler.Start(() =>
  3791. xs.TakeLast(TimeSpan.FromTicks(50), scheduler)
  3792. );
  3793. res.Messages.AssertEqual(
  3794. OnError<int>(210, ex)
  3795. );
  3796. xs.Subscriptions.AssertEqual(
  3797. Subscribe(200, 210)
  3798. );
  3799. }
  3800. [Fact]
  3801. public void TakeLast_Error_WithLoopScheduler()
  3802. {
  3803. var scheduler = new TestScheduler();
  3804. var ex = new Exception();
  3805. var xs = scheduler.CreateHotObservable<int>(
  3806. OnError<int>(210, ex)
  3807. );
  3808. var res = scheduler.Start(() =>
  3809. xs.TakeLast(TimeSpan.FromTicks(50), scheduler, scheduler)
  3810. );
  3811. res.Messages.AssertEqual(
  3812. OnError<int>(210, ex)
  3813. );
  3814. xs.Subscriptions.AssertEqual(
  3815. Subscribe(200, 210)
  3816. );
  3817. }
  3818. [Fact]
  3819. public void TakeLast_Never()
  3820. {
  3821. var scheduler = new TestScheduler();
  3822. var ex = new Exception();
  3823. var xs = scheduler.CreateHotObservable<int>(
  3824. );
  3825. var res = scheduler.Start(() =>
  3826. xs.TakeLast(TimeSpan.FromTicks(50), scheduler)
  3827. );
  3828. res.Messages.AssertEqual(
  3829. );
  3830. xs.Subscriptions.AssertEqual(
  3831. Subscribe(200, 1000)
  3832. );
  3833. }
  3834. [Fact]
  3835. public void TakeLast_Never_WithLoopScheduler()
  3836. {
  3837. var scheduler = new TestScheduler();
  3838. var ex = new Exception();
  3839. var xs = scheduler.CreateHotObservable<int>(
  3840. );
  3841. var res = scheduler.Start(() =>
  3842. xs.TakeLast(TimeSpan.FromTicks(50), scheduler, scheduler)
  3843. );
  3844. res.Messages.AssertEqual(
  3845. );
  3846. xs.Subscriptions.AssertEqual(
  3847. Subscribe(200, 1000)
  3848. );
  3849. }
  3850. [Fact]
  3851. public void TakeLast_Default1()
  3852. {
  3853. var xs = Observable.Range(0, 10, Scheduler.Default);
  3854. var res = xs.TakeLast(TimeSpan.FromSeconds(60));
  3855. var e = new ManualResetEvent(false);
  3856. var lst = new List<int>();
  3857. res.Subscribe(
  3858. lst.Add,
  3859. () => e.Set()
  3860. );
  3861. e.WaitOne();
  3862. Assert.True(lst.SequenceEqual(Enumerable.Range(0, 10)));
  3863. }
  3864. [Fact]
  3865. public void TakeLast_Default2()
  3866. {
  3867. var xs = Observable.Range(0, 10, Scheduler.Default);
  3868. var res = xs.TakeLast(TimeSpan.FromSeconds(60), Scheduler.Default.DisableOptimizations());
  3869. var e = new ManualResetEvent(false);
  3870. var lst = new List<int>();
  3871. res.Subscribe(
  3872. lst.Add,
  3873. () => e.Set()
  3874. );
  3875. e.WaitOne();
  3876. Assert.True(lst.SequenceEqual(Enumerable.Range(0, 10)));
  3877. }
  3878. [Fact]
  3879. public void TakeLast_Default3()
  3880. {
  3881. var xs = Observable.Range(0, 10, Scheduler.Default);
  3882. var res = xs.TakeLast(TimeSpan.Zero);
  3883. var e = new ManualResetEvent(false);
  3884. var lst = new List<int>();
  3885. res.Subscribe(
  3886. lst.Add,
  3887. () => e.Set()
  3888. );
  3889. e.WaitOne();
  3890. Assert.True(lst.Count == 0);
  3891. }
  3892. [Fact]
  3893. public void TakeLast_Default4()
  3894. {
  3895. var xs = Observable.Range(0, 10, Scheduler.Default);
  3896. var res = xs.TakeLast(TimeSpan.Zero, Scheduler.Default.DisableOptimizations());
  3897. var e = new ManualResetEvent(false);
  3898. var lst = new List<int>();
  3899. res.Subscribe(
  3900. lst.Add,
  3901. () => e.Set()
  3902. );
  3903. e.WaitOne();
  3904. Assert.True(lst.Count == 0);
  3905. }
  3906. [Fact]
  3907. public void TakeLast_LongRunning_Regular()
  3908. {
  3909. var res = Observable.Range(0, 10, Scheduler.Default).TakeLast(TimeSpan.FromSeconds(60), Scheduler.Default, NewThreadScheduler.Default);
  3910. var lst = new List<int>();
  3911. res.ForEach(lst.Add);
  3912. Assert.True(Enumerable.Range(0, 10).SequenceEqual(lst));
  3913. }
  3914. #endregion
  3915. #region + TakeLastBuffer +
  3916. [Fact]
  3917. public void TakeLastBuffer_ArgumentChecking()
  3918. {
  3919. var xs = Observable.Return(42);
  3920. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.TakeLastBuffer(default(IObservable<int>), TimeSpan.FromSeconds(1)));
  3921. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.TakeLastBuffer(xs, TimeSpan.FromSeconds(-1)));
  3922. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.TakeLastBuffer(default(IObservable<int>), TimeSpan.FromSeconds(1), Scheduler.Default));
  3923. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.TakeLastBuffer(xs, TimeSpan.FromSeconds(1), default(IScheduler)));
  3924. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.TakeLastBuffer(xs, TimeSpan.FromSeconds(-1), Scheduler.Default));
  3925. }
  3926. [Fact]
  3927. public void TakeLastBuffer_Zero1()
  3928. {
  3929. var scheduler = new TestScheduler();
  3930. var xs = scheduler.CreateHotObservable<int>(
  3931. OnNext(210, 1),
  3932. OnNext(220, 2),
  3933. OnCompleted<int>(230)
  3934. );
  3935. var res = scheduler.Start(() =>
  3936. xs.TakeLastBuffer(TimeSpan.Zero, scheduler)
  3937. );
  3938. res.Messages.AssertEqual(
  3939. OnNext<IList<int>>(230, lst => lst.Count == 0),
  3940. OnCompleted<IList<int>>(230)
  3941. );
  3942. xs.Subscriptions.AssertEqual(
  3943. Subscribe(200, 230)
  3944. );
  3945. }
  3946. [Fact]
  3947. public void TakeLastBuffer_Zero2()
  3948. {
  3949. var scheduler = new TestScheduler();
  3950. var xs = scheduler.CreateHotObservable<int>(
  3951. OnNext(210, 1),
  3952. OnNext(220, 2),
  3953. OnNext(230, 3),
  3954. OnCompleted<int>(230)
  3955. );
  3956. var res = scheduler.Start(() =>
  3957. xs.TakeLastBuffer(TimeSpan.Zero, scheduler)
  3958. );
  3959. res.Messages.AssertEqual(
  3960. OnNext<IList<int>>(230, lst => lst.Count == 0),
  3961. OnCompleted<IList<int>>(230)
  3962. );
  3963. xs.Subscriptions.AssertEqual(
  3964. Subscribe(200, 230)
  3965. );
  3966. }
  3967. [Fact]
  3968. public void TakeLastBuffer_Some1()
  3969. {
  3970. var scheduler = new TestScheduler();
  3971. var xs = scheduler.CreateHotObservable<int>(
  3972. OnNext(210, 1),
  3973. OnNext(220, 2),
  3974. OnNext(230, 3),
  3975. OnCompleted<int>(240)
  3976. );
  3977. var res = scheduler.Start(() =>
  3978. xs.TakeLastBuffer(TimeSpan.FromTicks(25), scheduler)
  3979. );
  3980. res.Messages.AssertEqual(
  3981. OnNext<IList<int>>(240, lst => lst.SequenceEqual(new[] { 2, 3 })),
  3982. OnCompleted<IList<int>>(240)
  3983. );
  3984. xs.Subscriptions.AssertEqual(
  3985. Subscribe(200, 240)
  3986. );
  3987. }
  3988. [Fact]
  3989. public void TakeLastBuffer_Some2()
  3990. {
  3991. var scheduler = new TestScheduler();
  3992. var xs = scheduler.CreateHotObservable<int>(
  3993. OnNext(210, 1),
  3994. OnNext(220, 2),
  3995. OnNext(230, 3),
  3996. OnCompleted<int>(300)
  3997. );
  3998. var res = scheduler.Start(() =>
  3999. xs.TakeLastBuffer(TimeSpan.FromTicks(25), scheduler)
  4000. );
  4001. res.Messages.AssertEqual(
  4002. OnNext<IList<int>>(300, lst => lst.Count == 0),
  4003. OnCompleted<IList<int>>(300)
  4004. );
  4005. xs.Subscriptions.AssertEqual(
  4006. Subscribe(200, 300)
  4007. );
  4008. }
  4009. [Fact]
  4010. public void TakeLastBuffer_Some3()
  4011. {
  4012. var scheduler = new TestScheduler();
  4013. var xs = scheduler.CreateHotObservable<int>(
  4014. OnNext(210, 1),
  4015. OnNext(220, 2),
  4016. OnNext(230, 3),
  4017. OnNext(240, 4),
  4018. OnNext(250, 5),
  4019. OnNext(260, 6),
  4020. OnNext(270, 7),
  4021. OnNext(280, 8),
  4022. OnNext(290, 9),
  4023. OnCompleted<int>(300)
  4024. );
  4025. var res = scheduler.Start(() =>
  4026. xs.TakeLastBuffer(TimeSpan.FromTicks(45), scheduler)
  4027. );
  4028. res.Messages.AssertEqual(
  4029. OnNext<IList<int>>(300, lst => lst.SequenceEqual(new[] { 6, 7, 8, 9 })),
  4030. OnCompleted<IList<int>>(300)
  4031. );
  4032. xs.Subscriptions.AssertEqual(
  4033. Subscribe(200, 300)
  4034. );
  4035. }
  4036. [Fact]
  4037. public void TakeLastBuffer_Some4()
  4038. {
  4039. var scheduler = new TestScheduler();
  4040. var xs = scheduler.CreateHotObservable<int>(
  4041. OnNext(210, 1),
  4042. OnNext(240, 2),
  4043. OnNext(250, 3),
  4044. OnNext(280, 4),
  4045. OnNext(290, 5),
  4046. OnNext(300, 6),
  4047. OnCompleted<int>(350)
  4048. );
  4049. var res = scheduler.Start(() =>
  4050. xs.TakeLastBuffer(TimeSpan.FromTicks(25), scheduler)
  4051. );
  4052. res.Messages.AssertEqual(
  4053. OnNext<IList<int>>(350, lst => lst.Count == 0),
  4054. OnCompleted<IList<int>>(350)
  4055. );
  4056. xs.Subscriptions.AssertEqual(
  4057. Subscribe(200, 350)
  4058. );
  4059. }
  4060. [Fact]
  4061. public void TakeLastBuffer_All()
  4062. {
  4063. var scheduler = new TestScheduler();
  4064. var xs = scheduler.CreateHotObservable<int>(
  4065. OnNext(210, 1),
  4066. OnNext(220, 2),
  4067. OnCompleted<int>(230)
  4068. );
  4069. var res = scheduler.Start(() =>
  4070. xs.TakeLastBuffer(TimeSpan.FromTicks(50), scheduler)
  4071. );
  4072. res.Messages.AssertEqual(
  4073. OnNext<IList<int>>(230, lst => lst.SequenceEqual(new[] { 1, 2 })),
  4074. OnCompleted<IList<int>>(230)
  4075. );
  4076. xs.Subscriptions.AssertEqual(
  4077. Subscribe(200, 230)
  4078. );
  4079. }
  4080. [Fact]
  4081. public void TakeLastBuffer_Error()
  4082. {
  4083. var scheduler = new TestScheduler();
  4084. var ex = new Exception();
  4085. var xs = scheduler.CreateHotObservable<int>(
  4086. OnError<int>(210, ex)
  4087. );
  4088. var res = scheduler.Start(() =>
  4089. xs.TakeLastBuffer(TimeSpan.FromTicks(50), scheduler)
  4090. );
  4091. res.Messages.AssertEqual(
  4092. OnError<IList<int>>(210, ex)
  4093. );
  4094. xs.Subscriptions.AssertEqual(
  4095. Subscribe(200, 210)
  4096. );
  4097. }
  4098. [Fact]
  4099. public void TakeLastBuffer_Never()
  4100. {
  4101. var scheduler = new TestScheduler();
  4102. var ex = new Exception();
  4103. var xs = scheduler.CreateHotObservable<int>(
  4104. );
  4105. var res = scheduler.Start(() =>
  4106. xs.TakeLastBuffer(TimeSpan.FromTicks(50), scheduler)
  4107. );
  4108. res.Messages.AssertEqual(
  4109. );
  4110. xs.Subscriptions.AssertEqual(
  4111. Subscribe(200, 1000)
  4112. );
  4113. }
  4114. [Fact]
  4115. public void TakeLastBuffer_Default1()
  4116. {
  4117. var xs = Observable.Range(0, 10, Scheduler.Default);
  4118. var res = xs.TakeLastBuffer(TimeSpan.FromSeconds(60)).SingleAsync();
  4119. var e = new ManualResetEvent(false);
  4120. var lst = default (IList<int>);
  4121. res.Subscribe(
  4122. x => lst = x,
  4123. () => e.Set()
  4124. );
  4125. e.WaitOne();
  4126. Assert.True(lst.SequenceEqual(Enumerable.Range(0, 10)));
  4127. }
  4128. [Fact]
  4129. public void TakeLastBuffer_Default2()
  4130. {
  4131. var xs = Observable.Range(0, 10, Scheduler.Default);
  4132. var res = xs.TakeLastBuffer(TimeSpan.FromSeconds(60), Scheduler.Default.DisableOptimizations()).SingleAsync();
  4133. var e = new ManualResetEvent(false);
  4134. var lst = default(IList<int>);
  4135. res.Subscribe(
  4136. x => lst = x,
  4137. () => e.Set()
  4138. );
  4139. e.WaitOne();
  4140. Assert.True(lst.SequenceEqual(Enumerable.Range(0, 10)));
  4141. }
  4142. [Fact]
  4143. public void TakeLastBuffer_Default3()
  4144. {
  4145. var xs = Observable.Range(0, 10, Scheduler.Default);
  4146. var res = xs.TakeLastBuffer(TimeSpan.Zero).SingleAsync();
  4147. var e = new ManualResetEvent(false);
  4148. var lst = default(IList<int>);
  4149. res.Subscribe(
  4150. x => lst = x,
  4151. () => e.Set()
  4152. );
  4153. e.WaitOne();
  4154. Assert.True(lst.Count == 0);
  4155. }
  4156. [Fact]
  4157. public void TakeLastBuffer_Default4()
  4158. {
  4159. var xs = Observable.Range(0, 10, Scheduler.Default);
  4160. var res = xs.TakeLastBuffer(TimeSpan.Zero, Scheduler.Default.DisableOptimizations()).SingleAsync();
  4161. var e = new ManualResetEvent(false);
  4162. var lst = default(IList<int>);
  4163. res.Subscribe(
  4164. x => lst = x,
  4165. () => e.Set()
  4166. );
  4167. e.WaitOne();
  4168. Assert.True(lst.Count == 0);
  4169. }
  4170. #endregion
  4171. #region + TakeUntil +
  4172. [Fact]
  4173. public void TakeUntil_ArgumentChecking()
  4174. {
  4175. var xs = Observable.Return(42);
  4176. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.TakeUntil(default(IObservable<int>), DateTimeOffset.Now));
  4177. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.TakeUntil(default(IObservable<int>), DateTimeOffset.Now, Scheduler.Default));
  4178. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.TakeUntil(xs, DateTimeOffset.Now, default(IScheduler)));
  4179. }
  4180. [Fact]
  4181. public void TakeUntil_Zero()
  4182. {
  4183. var scheduler = new TestScheduler();
  4184. var xs = scheduler.CreateHotObservable<int>(
  4185. OnNext(210, 1),
  4186. OnNext(220, 2),
  4187. OnCompleted<int>(230)
  4188. );
  4189. var res = scheduler.Start(() =>
  4190. xs.TakeUntil(new DateTimeOffset(), scheduler)
  4191. );
  4192. res.Messages.AssertEqual(
  4193. OnCompleted<int>(201)
  4194. );
  4195. xs.Subscriptions.AssertEqual(
  4196. Subscribe(200, 201)
  4197. );
  4198. }
  4199. [Fact]
  4200. public void TakeUntil_Some()
  4201. {
  4202. var scheduler = new TestScheduler();
  4203. var xs = scheduler.CreateHotObservable<int>(
  4204. OnNext(210, 1),
  4205. OnNext(220, 2),
  4206. OnNext(230, 3),
  4207. OnCompleted<int>(240)
  4208. );
  4209. var res = scheduler.Start(() =>
  4210. xs.TakeUntil(new DateTimeOffset(225, TimeSpan.Zero), scheduler)
  4211. );
  4212. res.Messages.AssertEqual(
  4213. OnNext(210, 1),
  4214. OnNext(220, 2),
  4215. OnCompleted<int>(225)
  4216. );
  4217. xs.Subscriptions.AssertEqual(
  4218. Subscribe(200, 225)
  4219. );
  4220. }
  4221. [Fact]
  4222. public void TakeUntil_Late()
  4223. {
  4224. var scheduler = new TestScheduler();
  4225. var xs = scheduler.CreateHotObservable<int>(
  4226. OnNext(210, 1),
  4227. OnNext(220, 2),
  4228. OnCompleted<int>(230)
  4229. );
  4230. var res = scheduler.Start(() =>
  4231. xs.TakeUntil(new DateTimeOffset(250, TimeSpan.Zero), scheduler)
  4232. );
  4233. res.Messages.AssertEqual(
  4234. OnNext(210, 1),
  4235. OnNext(220, 2),
  4236. OnCompleted<int>(230)
  4237. );
  4238. xs.Subscriptions.AssertEqual(
  4239. Subscribe(200, 230)
  4240. );
  4241. }
  4242. [Fact]
  4243. public void TakeUntil_Error()
  4244. {
  4245. var scheduler = new TestScheduler();
  4246. var ex = new Exception();
  4247. var xs = scheduler.CreateHotObservable<int>(
  4248. OnError<int>(210, ex)
  4249. );
  4250. var res = scheduler.Start(() =>
  4251. xs.TakeUntil(new DateTimeOffset(250, TimeSpan.Zero), scheduler)
  4252. );
  4253. res.Messages.AssertEqual(
  4254. OnError<int>(210, ex)
  4255. );
  4256. xs.Subscriptions.AssertEqual(
  4257. Subscribe(200, 210)
  4258. );
  4259. }
  4260. [Fact]
  4261. public void TakeUntil_Never()
  4262. {
  4263. var scheduler = new TestScheduler();
  4264. var ex = new Exception();
  4265. var xs = scheduler.CreateHotObservable<int>(
  4266. );
  4267. var res = scheduler.Start(() =>
  4268. xs.TakeUntil(new DateTimeOffset(250, TimeSpan.Zero), scheduler)
  4269. );
  4270. res.Messages.AssertEqual(
  4271. OnCompleted<int>(250)
  4272. );
  4273. xs.Subscriptions.AssertEqual(
  4274. Subscribe(200, 250)
  4275. );
  4276. }
  4277. [Fact]
  4278. public void TakeUntil_Twice1()
  4279. {
  4280. var scheduler = new TestScheduler();
  4281. var ex = new Exception();
  4282. var xs = scheduler.CreateHotObservable<int>(
  4283. OnNext(210, 1),
  4284. OnNext(220, 2),
  4285. OnNext(230, 3),
  4286. OnNext(240, 4),
  4287. OnNext(250, 5),
  4288. OnNext(260, 6),
  4289. OnCompleted<int>(270)
  4290. );
  4291. var res = scheduler.Start(() =>
  4292. xs.TakeUntil(new DateTimeOffset(255, TimeSpan.Zero), scheduler).TakeUntil(new DateTimeOffset(235, TimeSpan.Zero), scheduler)
  4293. );
  4294. res.Messages.AssertEqual(
  4295. OnNext(210, 1),
  4296. OnNext(220, 2),
  4297. OnNext(230, 3),
  4298. OnCompleted<int>(235)
  4299. );
  4300. xs.Subscriptions.AssertEqual(
  4301. Subscribe(200, 235)
  4302. );
  4303. }
  4304. [Fact]
  4305. public void TakeUntil_Twice2()
  4306. {
  4307. var scheduler = new TestScheduler();
  4308. var ex = new Exception();
  4309. var xs = scheduler.CreateHotObservable<int>(
  4310. OnNext(210, 1),
  4311. OnNext(220, 2),
  4312. OnNext(230, 3),
  4313. OnNext(240, 4),
  4314. OnNext(250, 5),
  4315. OnNext(260, 6),
  4316. OnCompleted<int>(270)
  4317. );
  4318. var res = scheduler.Start(() =>
  4319. xs.TakeUntil(new DateTimeOffset(235, TimeSpan.Zero), scheduler).TakeUntil(new DateTimeOffset(255, TimeSpan.Zero), scheduler)
  4320. );
  4321. res.Messages.AssertEqual(
  4322. OnNext(210, 1),
  4323. OnNext(220, 2),
  4324. OnNext(230, 3),
  4325. OnCompleted<int>(235)
  4326. );
  4327. xs.Subscriptions.AssertEqual(
  4328. Subscribe(200, 235)
  4329. );
  4330. }
  4331. [Fact]
  4332. public void TakeUntil_Default()
  4333. {
  4334. var xs = Observable.Range(0, 10, Scheduler.Default);
  4335. var res = xs.TakeUntil(DateTimeOffset.Now.AddMinutes(1));
  4336. var e = new ManualResetEvent(false);
  4337. var lst = new List<int>();
  4338. res.Subscribe(
  4339. lst.Add,
  4340. () => e.Set()
  4341. );
  4342. e.WaitOne();
  4343. Assert.True(lst.SequenceEqual(Enumerable.Range(0, 10)));
  4344. }
  4345. #endregion
  4346. #region + Throttle +
  4347. [Fact]
  4348. public void Throttle_ArgumentChecking()
  4349. {
  4350. var scheduler = new TestScheduler();
  4351. var someObservable = Observable.Empty<int>();
  4352. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Throttle(default(IObservable<int>), TimeSpan.Zero));
  4353. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Throttle(someObservable, TimeSpan.Zero, null));
  4354. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Throttle(default(IObservable<int>), TimeSpan.Zero, scheduler));
  4355. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Throttle(someObservable, TimeSpan.FromSeconds(-1)));
  4356. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Throttle(someObservable, TimeSpan.FromSeconds(-1), scheduler));
  4357. }
  4358. private IEnumerable<Recorded<Notification<T>>> Generate<T, S>(S seed, Func<S, bool> condition, Func<S, S> iterate, Func<S, Recorded<Notification<T>>> selector, Func<S, Recorded<Notification<T>>> final)
  4359. {
  4360. S s;
  4361. for (s = seed; condition(s); s = iterate(s))
  4362. yield return selector(s);
  4363. yield return final(s);
  4364. }
  4365. [Fact]
  4366. public void Throttle_TimeSpan_AllPass()
  4367. {
  4368. var scheduler = new TestScheduler();
  4369. var xs = scheduler.CreateHotObservable(
  4370. OnNext(150, 0),
  4371. OnNext(210, 1),
  4372. OnNext(240, 2),
  4373. OnNext(270, 3),
  4374. OnNext(300, 4),
  4375. OnCompleted<int>(400)
  4376. );
  4377. var res = scheduler.Start(() =>
  4378. xs.Throttle(TimeSpan.FromTicks(20), scheduler)
  4379. );
  4380. res.Messages.AssertEqual(
  4381. OnNext(230, 1),
  4382. OnNext(260, 2),
  4383. OnNext(290, 3),
  4384. OnNext(320, 4),
  4385. OnCompleted<int>(400)
  4386. );
  4387. xs.Subscriptions.AssertEqual(
  4388. Subscribe(200, 400)
  4389. );
  4390. }
  4391. [Fact]
  4392. public void Throttle_TimeSpan_AllPass_ErrorEnd()
  4393. {
  4394. var ex = new Exception();
  4395. var scheduler = new TestScheduler();
  4396. var xs = scheduler.CreateHotObservable(
  4397. OnNext(150, 0),
  4398. OnNext(210, 1),
  4399. OnNext(240, 2),
  4400. OnNext(270, 3),
  4401. OnNext(300, 4),
  4402. OnError<int>(400, ex)
  4403. );
  4404. var res = scheduler.Start(() =>
  4405. xs.Throttle(TimeSpan.FromTicks(20), scheduler)
  4406. );
  4407. res.Messages.AssertEqual(
  4408. OnNext(230, 1),
  4409. OnNext(260, 2),
  4410. OnNext(290, 3),
  4411. OnNext(320, 4),
  4412. OnError<int>(400, ex)
  4413. );
  4414. xs.Subscriptions.AssertEqual(
  4415. Subscribe(200, 400)
  4416. );
  4417. }
  4418. [Fact]
  4419. public void Throttle_TimeSpan_AllDrop()
  4420. {
  4421. var scheduler = new TestScheduler();
  4422. var xs = scheduler.CreateHotObservable(
  4423. OnNext(150, 0),
  4424. OnNext(210, 1),
  4425. OnNext(240, 2),
  4426. OnNext(270, 3),
  4427. OnNext(300, 4),
  4428. OnNext(330, 5),
  4429. OnNext(360, 6),
  4430. OnNext(390, 7),
  4431. OnCompleted<int>(400)
  4432. );
  4433. var res = scheduler.Start(() =>
  4434. xs.Throttle(TimeSpan.FromTicks(40), scheduler)
  4435. );
  4436. res.Messages.AssertEqual(
  4437. OnNext(400, 7),
  4438. OnCompleted<int>(400)
  4439. );
  4440. xs.Subscriptions.AssertEqual(
  4441. Subscribe(200, 400)
  4442. );
  4443. }
  4444. [Fact]
  4445. public void Throttle_TimeSpan_AllDrop_ErrorEnd()
  4446. {
  4447. var ex = new Exception();
  4448. var scheduler = new TestScheduler();
  4449. var xs = scheduler.CreateHotObservable(
  4450. OnNext(150, 0),
  4451. OnNext(210, 1),
  4452. OnNext(240, 2),
  4453. OnNext(270, 3),
  4454. OnNext(300, 4),
  4455. OnNext(330, 5),
  4456. OnNext(360, 6),
  4457. OnNext(390, 7),
  4458. OnError<int>(400, ex)
  4459. );
  4460. var res = scheduler.Start(() =>
  4461. xs.Throttle(TimeSpan.FromTicks(40), scheduler)
  4462. );
  4463. res.Messages.AssertEqual(
  4464. OnError<int>(400, ex)
  4465. );
  4466. xs.Subscriptions.AssertEqual(
  4467. Subscribe(200, 400)
  4468. );
  4469. }
  4470. [Fact]
  4471. public void Throttle_Empty()
  4472. {
  4473. var scheduler = new TestScheduler();
  4474. var xs = scheduler.CreateHotObservable(
  4475. OnNext(150, 0),
  4476. OnCompleted<int>(300)
  4477. );
  4478. var res = scheduler.Start(() =>
  4479. xs.Throttle(TimeSpan.FromTicks(10), scheduler)
  4480. );
  4481. res.Messages.AssertEqual(
  4482. OnCompleted<int>(300)
  4483. );
  4484. xs.Subscriptions.AssertEqual(
  4485. Subscribe(200, 300)
  4486. );
  4487. }
  4488. [Fact]
  4489. public void Throttle_Error()
  4490. {
  4491. var scheduler = new TestScheduler();
  4492. var ex = new Exception();
  4493. var xs = scheduler.CreateHotObservable(
  4494. OnNext(150, 0),
  4495. OnError<int>(300, ex)
  4496. );
  4497. var res = scheduler.Start(() =>
  4498. xs.Throttle(TimeSpan.FromTicks(10), scheduler)
  4499. );
  4500. res.Messages.AssertEqual(
  4501. OnError<int>(300, ex)
  4502. );
  4503. xs.Subscriptions.AssertEqual(
  4504. Subscribe(200, 300)
  4505. );
  4506. }
  4507. [Fact]
  4508. public void Throttle_Never()
  4509. {
  4510. var scheduler = new TestScheduler();
  4511. var xs = scheduler.CreateHotObservable(
  4512. OnNext(150, 0)
  4513. );
  4514. var res = scheduler.Start(() =>
  4515. xs.Throttle(TimeSpan.FromTicks(10), scheduler)
  4516. );
  4517. res.Messages.AssertEqual(
  4518. );
  4519. xs.Subscriptions.AssertEqual(
  4520. Subscribe(200, 1000)
  4521. );
  4522. }
  4523. [Fact]
  4524. public void Throttle_Simple()
  4525. {
  4526. var scheduler = new TestScheduler();
  4527. var xs = scheduler.CreateHotObservable(
  4528. OnNext(150, 0),
  4529. OnNext(210, 1),
  4530. OnNext(240, 2),
  4531. OnNext(250, 3),
  4532. OnNext(280, 4),
  4533. OnCompleted<int>(300)
  4534. );
  4535. var res = scheduler.Start(() =>
  4536. xs.Throttle(TimeSpan.FromTicks(20), scheduler)
  4537. );
  4538. res.Messages.AssertEqual(
  4539. OnNext(230, 1),
  4540. OnNext(270, 3),
  4541. OnNext(300, 4),
  4542. OnCompleted<int>(300)
  4543. );
  4544. xs.Subscriptions.AssertEqual(
  4545. Subscribe(200, 300)
  4546. );
  4547. }
  4548. [Fact]
  4549. public void Throttle_DefaultScheduler()
  4550. {
  4551. Assert.True(Observable.Return(1).Throttle(TimeSpan.FromMilliseconds(1)).ToEnumerable().SequenceEqual(new[] { 1 }));
  4552. }
  4553. [Fact]
  4554. public void Throttle_Duration_ArgumentChecking()
  4555. {
  4556. var someObservable = DummyObservable<int>.Instance;
  4557. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Throttle(default(IObservable<int>), x => someObservable));
  4558. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Throttle(someObservable, default(Func<int, IObservable<string>>)));
  4559. }
  4560. [Fact]
  4561. public void Throttle_Duration_DelayBehavior()
  4562. {
  4563. var scheduler = new TestScheduler();
  4564. var xs = scheduler.CreateHotObservable(
  4565. OnNext(150, -1),
  4566. OnNext(250, 0),
  4567. OnNext(280, 1),
  4568. OnNext(310, 2),
  4569. OnNext(350, 3),
  4570. OnNext(400, 4),
  4571. OnCompleted<int>(550)
  4572. );
  4573. var ys = new[] {
  4574. scheduler.CreateColdObservable(
  4575. OnNext(20, 42),
  4576. OnNext(25, 99)
  4577. ),
  4578. scheduler.CreateColdObservable(
  4579. OnNext(20, 42),
  4580. OnNext(25, 99)
  4581. ),
  4582. scheduler.CreateColdObservable(
  4583. OnNext(20, 42),
  4584. OnNext(25, 99)
  4585. ),
  4586. scheduler.CreateColdObservable(
  4587. OnNext(20, 42),
  4588. OnNext(25, 99)
  4589. ),
  4590. scheduler.CreateColdObservable(
  4591. OnNext(20, 42),
  4592. OnNext(25, 99)
  4593. ),
  4594. };
  4595. var res = scheduler.Start(() =>
  4596. xs.Throttle(x => ys[x])
  4597. );
  4598. res.Messages.AssertEqual(
  4599. OnNext<int>(250 + 20, 0),
  4600. OnNext<int>(280 + 20, 1),
  4601. OnNext<int>(310 + 20, 2),
  4602. OnNext<int>(350 + 20, 3),
  4603. OnNext<int>(400 + 20, 4),
  4604. OnCompleted<int>(550)
  4605. );
  4606. xs.Subscriptions.AssertEqual(
  4607. Subscribe(200, 550)
  4608. );
  4609. ys[0].Subscriptions.AssertEqual(Subscribe(250, 250 + 20));
  4610. ys[1].Subscriptions.AssertEqual(Subscribe(280, 280 + 20));
  4611. ys[2].Subscriptions.AssertEqual(Subscribe(310, 310 + 20));
  4612. ys[3].Subscriptions.AssertEqual(Subscribe(350, 350 + 20));
  4613. ys[4].Subscriptions.AssertEqual(Subscribe(400, 400 + 20));
  4614. }
  4615. [Fact]
  4616. public void Throttle_Duration_ThrottleBehavior()
  4617. {
  4618. var scheduler = new TestScheduler();
  4619. var xs = scheduler.CreateHotObservable(
  4620. OnNext(150, -1),
  4621. OnNext(250, 0),
  4622. OnNext(280, 1),
  4623. OnNext(310, 2),
  4624. OnNext(350, 3),
  4625. OnNext(400, 4),
  4626. OnCompleted<int>(550)
  4627. );
  4628. var ys = new[] {
  4629. scheduler.CreateColdObservable(
  4630. OnNext(20, 42),
  4631. OnNext(25, 99)
  4632. ),
  4633. scheduler.CreateColdObservable(
  4634. OnNext(40, 42),
  4635. OnNext(45, 99)
  4636. ),
  4637. scheduler.CreateColdObservable(
  4638. OnNext(20, 42),
  4639. OnNext(25, 99)
  4640. ),
  4641. scheduler.CreateColdObservable(
  4642. OnNext(60, 42),
  4643. OnNext(65, 99)
  4644. ),
  4645. scheduler.CreateColdObservable(
  4646. OnNext(20, 42),
  4647. OnNext(25, 99)
  4648. ),
  4649. };
  4650. var res = scheduler.Start(() =>
  4651. xs.Throttle(x => ys[x])
  4652. );
  4653. res.Messages.AssertEqual(
  4654. OnNext<int>(250 + 20, 0),
  4655. OnNext<int>(310 + 20, 2),
  4656. OnNext<int>(400 + 20, 4),
  4657. OnCompleted<int>(550)
  4658. );
  4659. xs.Subscriptions.AssertEqual(
  4660. Subscribe(200, 550)
  4661. );
  4662. ys[0].Subscriptions.AssertEqual(Subscribe(250, 250 + 20));
  4663. ys[1].Subscriptions.AssertEqual(Subscribe(280, 310));
  4664. ys[2].Subscriptions.AssertEqual(Subscribe(310, 310 + 20));
  4665. ys[3].Subscriptions.AssertEqual(Subscribe(350, 400));
  4666. ys[4].Subscriptions.AssertEqual(Subscribe(400, 400 + 20));
  4667. }
  4668. [Fact]
  4669. public void Throttle_Duration_EarlyCompletion()
  4670. {
  4671. var scheduler = new TestScheduler();
  4672. var xs = scheduler.CreateHotObservable(
  4673. OnNext(150, -1),
  4674. OnNext(250, 0),
  4675. OnNext(280, 1),
  4676. OnNext(310, 2),
  4677. OnNext(350, 3),
  4678. OnNext(400, 4),
  4679. OnCompleted<int>(410)
  4680. );
  4681. var ys = new[] {
  4682. scheduler.CreateColdObservable(
  4683. OnNext(20, 42),
  4684. OnNext(25, 99)
  4685. ),
  4686. scheduler.CreateColdObservable(
  4687. OnNext(40, 42),
  4688. OnNext(45, 99)
  4689. ),
  4690. scheduler.CreateColdObservable(
  4691. OnNext(20, 42),
  4692. OnNext(25, 99)
  4693. ),
  4694. scheduler.CreateColdObservable(
  4695. OnNext(60, 42),
  4696. OnNext(65, 99)
  4697. ),
  4698. scheduler.CreateColdObservable(
  4699. OnNext(20, 42),
  4700. OnNext(25, 99)
  4701. ),
  4702. };
  4703. var res = scheduler.Start(() =>
  4704. xs.Throttle(x => ys[x])
  4705. );
  4706. res.Messages.AssertEqual(
  4707. OnNext<int>(250 + 20, 0),
  4708. OnNext<int>(310 + 20, 2),
  4709. OnNext<int>(410, 4),
  4710. OnCompleted<int>(410)
  4711. );
  4712. xs.Subscriptions.AssertEqual(
  4713. Subscribe(200, 410)
  4714. );
  4715. ys[0].Subscriptions.AssertEqual(Subscribe(250, 250 + 20));
  4716. ys[1].Subscriptions.AssertEqual(Subscribe(280, 310));
  4717. ys[2].Subscriptions.AssertEqual(Subscribe(310, 310 + 20));
  4718. ys[3].Subscriptions.AssertEqual(Subscribe(350, 400));
  4719. ys[4].Subscriptions.AssertEqual(Subscribe(400, 410));
  4720. }
  4721. [Fact]
  4722. public void Throttle_Duration_InnerError()
  4723. {
  4724. var scheduler = new TestScheduler();
  4725. var xs = scheduler.CreateHotObservable(
  4726. OnNext(150, 1),
  4727. OnNext(250, 2),
  4728. OnNext(350, 3),
  4729. OnNext(450, 4),
  4730. OnCompleted<int>(550)
  4731. );
  4732. var ex = new Exception();
  4733. var res = scheduler.Start(() =>
  4734. xs.Throttle(x =>
  4735. x < 4 ? scheduler.CreateColdObservable(
  4736. OnNext(x * 10, "Ignore"),
  4737. OnNext(x * 10 + 5, "Aargh!")
  4738. )
  4739. : scheduler.CreateColdObservable(
  4740. OnError<string>(x * 10, ex)
  4741. )
  4742. )
  4743. );
  4744. res.Messages.AssertEqual(
  4745. OnNext<int>(250 + 2 * 10, 2),
  4746. OnNext<int>(350 + 3 * 10, 3),
  4747. OnError<int>(450 + 4 * 10, ex)
  4748. );
  4749. xs.Subscriptions.AssertEqual(
  4750. Subscribe(200, 490)
  4751. );
  4752. }
  4753. [Fact]
  4754. public void Throttle_Duration_OuterError()
  4755. {
  4756. var scheduler = new TestScheduler();
  4757. var ex = new Exception();
  4758. var xs = scheduler.CreateHotObservable(
  4759. OnNext(150, 1),
  4760. OnNext(250, 2),
  4761. OnNext(350, 3),
  4762. OnNext(450, 4),
  4763. OnError<int>(460, ex)
  4764. );
  4765. var res = scheduler.Start(() =>
  4766. xs.Throttle(x =>
  4767. scheduler.CreateColdObservable(
  4768. OnNext(x * 10, "Ignore"),
  4769. OnNext(x * 10 + 5, "Aargh!")
  4770. )
  4771. )
  4772. );
  4773. res.Messages.AssertEqual(
  4774. OnNext<int>(250 + 2 * 10, 2),
  4775. OnNext<int>(350 + 3 * 10, 3),
  4776. OnError<int>(460, ex)
  4777. );
  4778. xs.Subscriptions.AssertEqual(
  4779. Subscribe(200, 460)
  4780. );
  4781. }
  4782. [Fact]
  4783. public void Throttle_Duration_SelectorThrows()
  4784. {
  4785. var scheduler = new TestScheduler();
  4786. var xs = scheduler.CreateHotObservable(
  4787. OnNext(150, 1),
  4788. OnNext(250, 2),
  4789. OnNext(350, 3),
  4790. OnNext(450, 4),
  4791. OnCompleted<int>(550)
  4792. );
  4793. var ex = new Exception();
  4794. var res = scheduler.Start(() =>
  4795. xs.Throttle(x =>
  4796. {
  4797. if (x < 4)
  4798. {
  4799. return scheduler.CreateColdObservable(
  4800. OnNext(x * 10, "Ignore"),
  4801. OnNext(x * 10 + 5, "Aargh!")
  4802. );
  4803. }
  4804. else
  4805. throw ex;
  4806. })
  4807. );
  4808. res.Messages.AssertEqual(
  4809. OnNext<int>(250 + 2 * 10, 2),
  4810. OnNext<int>(350 + 3 * 10, 3),
  4811. OnError<int>(450, ex)
  4812. );
  4813. xs.Subscriptions.AssertEqual(
  4814. Subscribe(200, 450)
  4815. );
  4816. }
  4817. [Fact]
  4818. public void Throttle_Duration_InnerDone_DelayBehavior()
  4819. {
  4820. var scheduler = new TestScheduler();
  4821. var xs = scheduler.CreateHotObservable(
  4822. OnNext(150, 1),
  4823. OnNext(250, 2),
  4824. OnNext(350, 3),
  4825. OnNext(450, 4),
  4826. OnCompleted<int>(550)
  4827. );
  4828. var ex = new Exception();
  4829. var res = scheduler.Start(() =>
  4830. xs.Throttle(x =>
  4831. scheduler.CreateColdObservable(
  4832. OnCompleted<string>(x * 10)
  4833. )
  4834. )
  4835. );
  4836. res.Messages.AssertEqual(
  4837. OnNext<int>(250 + 2 * 10, 2),
  4838. OnNext<int>(350 + 3 * 10, 3),
  4839. OnNext<int>(450 + 4 * 10, 4),
  4840. OnCompleted<int>(550)
  4841. );
  4842. xs.Subscriptions.AssertEqual(
  4843. Subscribe(200, 550)
  4844. );
  4845. }
  4846. [Fact]
  4847. public void Throttle_Duration_InnerDone_ThrottleBehavior()
  4848. {
  4849. var scheduler = new TestScheduler();
  4850. var xs = scheduler.CreateHotObservable(
  4851. OnNext(150, 1),
  4852. OnNext(250, 2),
  4853. OnNext(280, 3),
  4854. OnNext(300, 4),
  4855. OnNext(400, 5),
  4856. OnNext(410, 6),
  4857. OnCompleted<int>(550)
  4858. );
  4859. var ex = new Exception();
  4860. var res = scheduler.Start(() =>
  4861. xs.Throttle(x =>
  4862. scheduler.CreateColdObservable(
  4863. OnCompleted<string>(x * 10)
  4864. )
  4865. )
  4866. );
  4867. res.Messages.AssertEqual(
  4868. OnNext<int>(250 + 2 * 10, 2),
  4869. OnNext<int>(300 + 4 * 10, 4),
  4870. OnNext<int>(410 + 6 * 10, 6),
  4871. OnCompleted<int>(550)
  4872. );
  4873. xs.Subscriptions.AssertEqual(
  4874. Subscribe(200, 550)
  4875. );
  4876. }
  4877. #endregion
  4878. #region + TimeInterval +
  4879. [Fact]
  4880. public void TimeInterval_ArgumentChecking()
  4881. {
  4882. var scheduler = new TestScheduler();
  4883. var someObservable = Observable.Empty<int>();
  4884. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.TimeInterval(default(IObservable<int>)));
  4885. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.TimeInterval(default(IObservable<int>), scheduler));
  4886. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.TimeInterval(someObservable, null));
  4887. }
  4888. [Fact]
  4889. public void TimeInterval_Regular()
  4890. {
  4891. var scheduler = new TestScheduler();
  4892. var xs = scheduler.CreateHotObservable(
  4893. OnNext(150, 1),
  4894. OnNext(210, 2),
  4895. OnNext(230, 3),
  4896. OnNext(260, 4),
  4897. OnNext(300, 5),
  4898. OnNext(350, 6),
  4899. OnCompleted<int>(400)
  4900. );
  4901. var res = scheduler.Start(() =>
  4902. xs.TimeInterval(scheduler.DisableOptimizations())
  4903. );
  4904. res.Messages.AssertEqual(
  4905. OnNext(210, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  4906. OnNext(230, new TimeInterval<int>(3, TimeSpan.FromTicks(20))),
  4907. OnNext(260, new TimeInterval<int>(4, TimeSpan.FromTicks(30))),
  4908. OnNext(300, new TimeInterval<int>(5, TimeSpan.FromTicks(40))),
  4909. OnNext(350, new TimeInterval<int>(6, TimeSpan.FromTicks(50))),
  4910. OnCompleted<TimeInterval<int>>(400)
  4911. );
  4912. xs.Subscriptions.AssertEqual(
  4913. Subscribe(200, 400)
  4914. );
  4915. }
  4916. [Fact]
  4917. public void TimeInterval_Empty()
  4918. {
  4919. var scheduler = new TestScheduler();
  4920. var xs = scheduler.CreateHotObservable(
  4921. OnNext(150, 1),
  4922. OnCompleted<int>(300)
  4923. );
  4924. var res = scheduler.Start(() =>
  4925. xs.TimeInterval(scheduler.DisableOptimizations())
  4926. );
  4927. res.Messages.AssertEqual(
  4928. OnCompleted<TimeInterval<int>>(300)
  4929. );
  4930. xs.Subscriptions.AssertEqual(
  4931. Subscribe(200, 300)
  4932. );
  4933. }
  4934. [Fact]
  4935. public void TimeInterval_Error()
  4936. {
  4937. var scheduler = new TestScheduler();
  4938. var ex = new Exception();
  4939. var xs = scheduler.CreateHotObservable(
  4940. OnNext(150, 1),
  4941. OnError<int>(300, ex)
  4942. );
  4943. var res = scheduler.Start(() =>
  4944. xs.TimeInterval(scheduler.DisableOptimizations())
  4945. );
  4946. res.Messages.AssertEqual(
  4947. OnError<TimeInterval<int>>(300, ex)
  4948. );
  4949. xs.Subscriptions.AssertEqual(
  4950. Subscribe(200, 300)
  4951. );
  4952. }
  4953. [Fact]
  4954. public void TimeInterval_Never()
  4955. {
  4956. var scheduler = new TestScheduler();
  4957. var xs = scheduler.CreateHotObservable(
  4958. OnNext(150, 1)
  4959. );
  4960. var res = scheduler.Start(() =>
  4961. xs.TimeInterval(scheduler.DisableOptimizations())
  4962. );
  4963. res.Messages.AssertEqual(
  4964. );
  4965. xs.Subscriptions.AssertEqual(
  4966. Subscribe(200, 1000)
  4967. );
  4968. }
  4969. [Fact]
  4970. public void TimeInterval_DefaultScheduler()
  4971. {
  4972. Assert.True(Observable.Return(1).TimeInterval().Count().First() == 1);
  4973. }
  4974. [Fact]
  4975. public void TimeInterval_WithStopwatch_Regular()
  4976. {
  4977. var scheduler = new TestScheduler();
  4978. var xs = scheduler.CreateHotObservable(
  4979. OnNext(150, 1),
  4980. OnNext(210, 2),
  4981. OnNext(230, 3),
  4982. OnNext(260, 4),
  4983. OnNext(300, 5),
  4984. OnNext(350, 6),
  4985. OnCompleted<int>(400)
  4986. );
  4987. var res = scheduler.Start(() =>
  4988. xs.TimeInterval(scheduler)
  4989. );
  4990. res.Messages.AssertEqual(
  4991. OnNext(210, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  4992. OnNext(230, new TimeInterval<int>(3, TimeSpan.FromTicks(20))),
  4993. OnNext(260, new TimeInterval<int>(4, TimeSpan.FromTicks(30))),
  4994. OnNext(300, new TimeInterval<int>(5, TimeSpan.FromTicks(40))),
  4995. OnNext(350, new TimeInterval<int>(6, TimeSpan.FromTicks(50))),
  4996. OnCompleted<TimeInterval<int>>(400)
  4997. );
  4998. xs.Subscriptions.AssertEqual(
  4999. Subscribe(200, 400)
  5000. );
  5001. }
  5002. [Fact]
  5003. public void TimeInterval_WithStopwatch_Empty()
  5004. {
  5005. var scheduler = new TestScheduler();
  5006. var xs = scheduler.CreateHotObservable(
  5007. OnNext(150, 1),
  5008. OnCompleted<int>(300)
  5009. );
  5010. var res = scheduler.Start(() =>
  5011. xs.TimeInterval(scheduler)
  5012. );
  5013. res.Messages.AssertEqual(
  5014. OnCompleted<TimeInterval<int>>(300)
  5015. );
  5016. xs.Subscriptions.AssertEqual(
  5017. Subscribe(200, 300)
  5018. );
  5019. }
  5020. [Fact]
  5021. public void TimeInterval_WithStopwatch_Error()
  5022. {
  5023. var scheduler = new TestScheduler();
  5024. var ex = new Exception();
  5025. var xs = scheduler.CreateHotObservable(
  5026. OnNext(150, 1),
  5027. OnError<int>(300, ex)
  5028. );
  5029. var res = scheduler.Start(() =>
  5030. xs.TimeInterval(scheduler)
  5031. );
  5032. res.Messages.AssertEqual(
  5033. OnError<TimeInterval<int>>(300, ex)
  5034. );
  5035. xs.Subscriptions.AssertEqual(
  5036. Subscribe(200, 300)
  5037. );
  5038. }
  5039. [Fact]
  5040. public void TimeInterval_WithStopwatch_Never()
  5041. {
  5042. var scheduler = new TestScheduler();
  5043. var xs = scheduler.CreateHotObservable(
  5044. OnNext(150, 1)
  5045. );
  5046. var res = scheduler.Start(() =>
  5047. xs.TimeInterval(scheduler)
  5048. );
  5049. res.Messages.AssertEqual(
  5050. );
  5051. xs.Subscriptions.AssertEqual(
  5052. Subscribe(200, 1000)
  5053. );
  5054. }
  5055. #endregion
  5056. #region + Timeout +
  5057. [Fact]
  5058. public void Timeout_ArgumentChecking()
  5059. {
  5060. var scheduler = new TestScheduler();
  5061. var someObservable = Observable.Empty<int>();
  5062. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Timeout(default(IObservable<int>), TimeSpan.Zero));
  5063. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Timeout(default(IObservable<int>), TimeSpan.Zero, someObservable));
  5064. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Timeout(someObservable, TimeSpan.Zero, default(IObservable<int>)));
  5065. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Timeout(default(IObservable<int>), new DateTimeOffset()));
  5066. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Timeout(default(IObservable<int>), new DateTimeOffset(), someObservable));
  5067. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Timeout(someObservable, new DateTimeOffset(), default(IObservable<int>)));
  5068. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Timeout(default(IObservable<int>), TimeSpan.Zero, scheduler));
  5069. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Timeout(someObservable, TimeSpan.Zero, default(IScheduler)));
  5070. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Timeout(default(IObservable<int>), TimeSpan.Zero, someObservable, scheduler));
  5071. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Timeout(someObservable, TimeSpan.Zero, someObservable, null));
  5072. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Timeout(someObservable, TimeSpan.Zero, default(IObservable<int>), scheduler));
  5073. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Timeout(default(IObservable<int>), new DateTimeOffset(), scheduler));
  5074. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Timeout(someObservable, new DateTimeOffset(), default(IScheduler)));
  5075. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Timeout(default(IObservable<int>), new DateTimeOffset(), someObservable, scheduler));
  5076. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Timeout(someObservable, new DateTimeOffset(), someObservable, null));
  5077. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Timeout(someObservable, new DateTimeOffset(), default(IObservable<int>), scheduler));
  5078. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Timeout(someObservable, TimeSpan.FromSeconds(-1)));
  5079. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Timeout(someObservable, TimeSpan.FromSeconds(-1), scheduler));
  5080. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Timeout(someObservable, TimeSpan.FromSeconds(-1), someObservable));
  5081. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Timeout(someObservable, TimeSpan.FromSeconds(-1), someObservable, scheduler));
  5082. }
  5083. [Fact]
  5084. public void Timeout_InTime()
  5085. {
  5086. var scheduler = new TestScheduler();
  5087. var xs = scheduler.CreateHotObservable(
  5088. OnNext(150, 1),
  5089. OnNext(210, 2),
  5090. OnNext(230, 3),
  5091. OnNext(260, 4),
  5092. OnNext(300, 5),
  5093. OnNext(350, 6),
  5094. OnCompleted<int>(400)
  5095. );
  5096. var res = scheduler.Start(() =>
  5097. xs.Timeout(TimeSpan.FromTicks(500), scheduler)
  5098. );
  5099. res.Messages.AssertEqual(
  5100. OnNext(210, 2),
  5101. OnNext(230, 3),
  5102. OnNext(260, 4),
  5103. OnNext(300, 5),
  5104. OnNext(350, 6),
  5105. OnCompleted<int>(400)
  5106. );
  5107. xs.Subscriptions.AssertEqual(
  5108. Subscribe(200, 400)
  5109. );
  5110. }
  5111. [Fact]
  5112. public void Timeout_DateTimeOffset_TimeoutOccurs_WithDefaultException()
  5113. {
  5114. var scheduler = new TestScheduler();
  5115. var xs = scheduler.CreateHotObservable(
  5116. OnNext(410, 1)
  5117. );
  5118. var res = scheduler.Start(() =>
  5119. xs.Timeout(new DateTimeOffset(new DateTime(400), TimeSpan.Zero), scheduler)
  5120. );
  5121. res.Messages.AssertEqual(
  5122. OnError<int>(400, ex => ex is TimeoutException)
  5123. );
  5124. xs.Subscriptions.AssertEqual(
  5125. Subscribe(200, 400)
  5126. );
  5127. }
  5128. [Fact]
  5129. public void Timeout_TimeSpan_TimeoutOccurs_WithDefaultException()
  5130. {
  5131. var scheduler = new TestScheduler();
  5132. var xs = scheduler.CreateHotObservable(
  5133. OnNext(410, 1)
  5134. );
  5135. var res = scheduler.Start(() =>
  5136. xs.Timeout(TimeSpan.FromTicks(200), scheduler)
  5137. );
  5138. res.Messages.AssertEqual(
  5139. OnError<int>(400, ex => ex is TimeoutException)
  5140. );
  5141. xs.Subscriptions.AssertEqual(
  5142. Subscribe(200, 400)
  5143. );
  5144. }
  5145. [Fact]
  5146. public void Timeout_TimeSpan_DefaultScheduler()
  5147. {
  5148. Assert.True(Observable.Return(1).Timeout(TimeSpan.FromSeconds(10)).ToEnumerable().Single() == 1);
  5149. }
  5150. [Fact]
  5151. public void Timeout_TimeSpan_Observable_DefaultScheduler()
  5152. {
  5153. Assert.True(Observable.Return(1).Timeout(TimeSpan.FromSeconds(10), Observable.Return(2)).ToEnumerable().Single() == 1);
  5154. }
  5155. [Fact]
  5156. public void Timeout_DateTimeOffset_DefaultScheduler()
  5157. {
  5158. Assert.True(Observable.Return(1).Timeout(DateTimeOffset.UtcNow + TimeSpan.FromSeconds(10)).ToEnumerable().Single() == 1);
  5159. }
  5160. [Fact]
  5161. public void Timeout_DateTimeOffset_Observable_DefaultScheduler()
  5162. {
  5163. Assert.True(Observable.Return(1).Timeout(DateTimeOffset.UtcNow + TimeSpan.FromSeconds(10), Observable.Return(2)).ToEnumerable().Single() == 1);
  5164. }
  5165. [Fact]
  5166. public void Timeout_TimeoutOccurs_1()
  5167. {
  5168. var scheduler = new TestScheduler();
  5169. var xs = scheduler.CreateHotObservable(
  5170. OnNext(70, 1),
  5171. OnNext(130, 2),
  5172. OnNext(310, 3),
  5173. OnNext(400, 4),
  5174. OnCompleted<int>(500)
  5175. );
  5176. var ys = scheduler.CreateColdObservable(
  5177. OnNext(50, -1),
  5178. OnNext(200, -2),
  5179. OnNext(310, -3),
  5180. OnCompleted<int>(320)
  5181. );
  5182. var res = scheduler.Start(() =>
  5183. xs.Timeout(TimeSpan.FromTicks(100), ys, scheduler)
  5184. );
  5185. res.Messages.AssertEqual(
  5186. OnNext(350, -1),
  5187. OnNext(500, -2),
  5188. OnNext(610, -3),
  5189. OnCompleted<int>(620)
  5190. );
  5191. xs.Subscriptions.AssertEqual(
  5192. Subscribe(200, 300)
  5193. );
  5194. ys.Subscriptions.AssertEqual(
  5195. Subscribe(300, 620)
  5196. );
  5197. }
  5198. [Fact]
  5199. public void Timeout_TimeoutOccurs_2()
  5200. {
  5201. var scheduler = new TestScheduler();
  5202. var xs = scheduler.CreateHotObservable(
  5203. OnNext(70, 1),
  5204. OnNext(130, 2),
  5205. OnNext(240, 3),
  5206. OnNext(310, 4),
  5207. OnNext(430, 5),
  5208. OnCompleted<int>(500)
  5209. );
  5210. var ys = scheduler.CreateColdObservable(
  5211. OnNext(50, -1),
  5212. OnNext(200, -2),
  5213. OnNext(310, -3),
  5214. OnCompleted<int>(320)
  5215. );
  5216. var res = scheduler.Start(() =>
  5217. xs.Timeout(TimeSpan.FromTicks(100), ys, scheduler)
  5218. );
  5219. res.Messages.AssertEqual(
  5220. OnNext(240, 3),
  5221. OnNext(310, 4),
  5222. OnNext(460, -1),
  5223. OnNext(610, -2),
  5224. OnNext(720, -3),
  5225. OnCompleted<int>(730)
  5226. );
  5227. xs.Subscriptions.AssertEqual(
  5228. Subscribe(200, 410)
  5229. );
  5230. ys.Subscriptions.AssertEqual(
  5231. Subscribe(410, 730)
  5232. );
  5233. }
  5234. [Fact]
  5235. public void Timeout_TimeoutOccurs_Never()
  5236. {
  5237. var scheduler = new TestScheduler();
  5238. var xs = scheduler.CreateHotObservable(
  5239. OnNext(70, 1),
  5240. OnNext(130, 2),
  5241. OnNext(240, 3),
  5242. OnNext(310, 4),
  5243. OnNext(430, 5),
  5244. OnCompleted<int>(500)
  5245. );
  5246. var ys = scheduler.CreateColdObservable<int>(
  5247. );
  5248. var res = scheduler.Start(() =>
  5249. xs.Timeout(TimeSpan.FromTicks(100), ys, scheduler)
  5250. );
  5251. res.Messages.AssertEqual(
  5252. OnNext(240, 3),
  5253. OnNext(310, 4)
  5254. );
  5255. xs.Subscriptions.AssertEqual(
  5256. Subscribe(200, 410)
  5257. );
  5258. ys.Subscriptions.AssertEqual(
  5259. Subscribe(410, 1000)
  5260. );
  5261. }
  5262. [Fact]
  5263. public void Timeout_TimeoutOccurs_Completed()
  5264. {
  5265. var scheduler = new TestScheduler();
  5266. var xs = scheduler.CreateHotObservable(
  5267. OnCompleted<int>(500)
  5268. );
  5269. var ys = scheduler.CreateColdObservable(
  5270. OnNext(100, -1)
  5271. );
  5272. var res = scheduler.Start(() =>
  5273. xs.Timeout(TimeSpan.FromTicks(100), ys, scheduler)
  5274. );
  5275. res.Messages.AssertEqual(
  5276. OnNext(400, -1)
  5277. );
  5278. xs.Subscriptions.AssertEqual(
  5279. Subscribe(200, 300)
  5280. );
  5281. ys.Subscriptions.AssertEqual(
  5282. Subscribe(300, 1000)
  5283. );
  5284. }
  5285. [Fact]
  5286. public void Timeout_TimeoutOccurs_Error()
  5287. {
  5288. var scheduler = new TestScheduler();
  5289. var xs = scheduler.CreateHotObservable(
  5290. OnError<int>(500, new Exception())
  5291. );
  5292. var ys = scheduler.CreateColdObservable(
  5293. OnNext(100, -1)
  5294. );
  5295. var res = scheduler.Start(() =>
  5296. xs.Timeout(TimeSpan.FromTicks(100), ys, scheduler)
  5297. );
  5298. res.Messages.AssertEqual(
  5299. OnNext(400, -1)
  5300. );
  5301. xs.Subscriptions.AssertEqual(
  5302. Subscribe(200, 300)
  5303. );
  5304. ys.Subscriptions.AssertEqual(
  5305. Subscribe(300, 1000)
  5306. );
  5307. }
  5308. [Fact]
  5309. public void Timeout_TimeoutOccurs_NextIsError()
  5310. {
  5311. var ex = new Exception();
  5312. var scheduler = new TestScheduler();
  5313. var xs = scheduler.CreateHotObservable(
  5314. OnNext<int>(500, 42)
  5315. );
  5316. var ys = scheduler.CreateColdObservable(
  5317. OnError<int>(100, ex)
  5318. );
  5319. var res = scheduler.Start(() =>
  5320. xs.Timeout(TimeSpan.FromTicks(100), ys, scheduler)
  5321. );
  5322. res.Messages.AssertEqual(
  5323. OnError<int>(400, ex)
  5324. );
  5325. xs.Subscriptions.AssertEqual(
  5326. Subscribe(200, 300)
  5327. );
  5328. ys.Subscriptions.AssertEqual(
  5329. Subscribe(300, 400)
  5330. );
  5331. }
  5332. [Fact]
  5333. public void Timeout_TimeoutNotOccurs_Completed()
  5334. {
  5335. var scheduler = new TestScheduler();
  5336. var xs = scheduler.CreateHotObservable(
  5337. OnCompleted<int>(250)
  5338. );
  5339. var ys = scheduler.CreateColdObservable(
  5340. OnNext(100, -1)
  5341. );
  5342. var res = scheduler.Start(() =>
  5343. xs.Timeout(TimeSpan.FromTicks(100), ys, scheduler)
  5344. );
  5345. res.Messages.AssertEqual(
  5346. OnCompleted<int>(250)
  5347. );
  5348. xs.Subscriptions.AssertEqual(
  5349. Subscribe(200, 250)
  5350. );
  5351. ys.Subscriptions.AssertEqual(
  5352. );
  5353. }
  5354. [Fact]
  5355. public void Timeout_TimeoutNotOccurs_Error()
  5356. {
  5357. var scheduler = new TestScheduler();
  5358. var ex = new Exception();
  5359. var xs = scheduler.CreateHotObservable(
  5360. OnError<int>(250, ex)
  5361. );
  5362. var ys = scheduler.CreateColdObservable(
  5363. OnNext(100, -1)
  5364. );
  5365. var res = scheduler.Start(() =>
  5366. xs.Timeout(TimeSpan.FromTicks(100), ys, scheduler)
  5367. );
  5368. res.Messages.AssertEqual(
  5369. OnError<int>(250, ex)
  5370. );
  5371. xs.Subscriptions.AssertEqual(
  5372. Subscribe(200, 250)
  5373. );
  5374. ys.Subscriptions.AssertEqual(
  5375. );
  5376. }
  5377. [Fact]
  5378. public void Timeout_TimeoutDoesNotOccur()
  5379. {
  5380. var scheduler = new TestScheduler();
  5381. var xs = scheduler.CreateHotObservable(
  5382. OnNext(70, 1),
  5383. OnNext(130, 2),
  5384. OnNext(240, 3),
  5385. OnNext(320, 4),
  5386. OnNext(410, 5),
  5387. OnCompleted<int>(500)
  5388. );
  5389. var ys = scheduler.CreateColdObservable(
  5390. OnNext(50, -1),
  5391. OnNext(200, -2),
  5392. OnNext(310, -3),
  5393. OnCompleted<int>(320)
  5394. );
  5395. var res = scheduler.Start(() =>
  5396. xs.Timeout(TimeSpan.FromTicks(100), ys, scheduler)
  5397. );
  5398. res.Messages.AssertEqual(
  5399. OnNext(240, 3),
  5400. OnNext(320, 4),
  5401. OnNext(410, 5),
  5402. OnCompleted<int>(500)
  5403. );
  5404. xs.Subscriptions.AssertEqual(
  5405. Subscribe(200, 500)
  5406. );
  5407. ys.Subscriptions.AssertEqual(
  5408. );
  5409. }
  5410. [Fact]
  5411. public void Timeout_DateTimeOffset_TimeoutOccurs()
  5412. {
  5413. var scheduler = new TestScheduler();
  5414. var xs = scheduler.CreateHotObservable(
  5415. OnNext(410, 1)
  5416. );
  5417. var ys = scheduler.CreateColdObservable(
  5418. OnNext(100, -1)
  5419. );
  5420. var res = scheduler.Start(() =>
  5421. xs.Timeout(new DateTimeOffset(new DateTime(400), TimeSpan.Zero), ys, scheduler)
  5422. );
  5423. res.Messages.AssertEqual(
  5424. OnNext(500, -1)
  5425. );
  5426. xs.Subscriptions.AssertEqual(
  5427. Subscribe(200, 400)
  5428. );
  5429. ys.Subscriptions.AssertEqual(
  5430. Subscribe(400, 1000)
  5431. );
  5432. }
  5433. [Fact]
  5434. public void Timeout_DateTimeOffset_TimeoutDoesNotOccur_Completed()
  5435. {
  5436. var scheduler = new TestScheduler();
  5437. var xs = scheduler.CreateHotObservable(
  5438. OnNext(310, 1),
  5439. OnCompleted<int>(390)
  5440. );
  5441. var ys = scheduler.CreateColdObservable(
  5442. OnNext(100, -1)
  5443. );
  5444. var res = scheduler.Start(() =>
  5445. xs.Timeout(new DateTimeOffset(new DateTime(400), TimeSpan.Zero), ys, scheduler)
  5446. );
  5447. res.Messages.AssertEqual(
  5448. OnNext(310, 1),
  5449. OnCompleted<int>(390)
  5450. );
  5451. xs.Subscriptions.AssertEqual(
  5452. Subscribe(200, 390)
  5453. );
  5454. ys.Subscriptions.AssertEqual(
  5455. );
  5456. }
  5457. [Fact]
  5458. public void Timeout_DateTimeOffset_TimeoutDoesNotOccur_Error()
  5459. {
  5460. var scheduler = new TestScheduler();
  5461. var ex = new Exception();
  5462. var xs = scheduler.CreateHotObservable(
  5463. OnNext(310, 1),
  5464. OnError<int>(390, ex)
  5465. );
  5466. var ys = scheduler.CreateColdObservable(
  5467. OnNext(100, -1)
  5468. );
  5469. var res = scheduler.Start(() =>
  5470. xs.Timeout(new DateTimeOffset(new DateTime(400), TimeSpan.Zero), ys, scheduler)
  5471. );
  5472. res.Messages.AssertEqual(
  5473. OnNext(310, 1),
  5474. OnError<int>(390, ex)
  5475. );
  5476. xs.Subscriptions.AssertEqual(
  5477. Subscribe(200, 390)
  5478. );
  5479. ys.Subscriptions.AssertEqual(
  5480. );
  5481. }
  5482. [Fact]
  5483. public void Timeout_DateTimeOffset_TimeoutOccur_2()
  5484. {
  5485. var scheduler = new TestScheduler();
  5486. var xs = scheduler.CreateHotObservable(
  5487. OnNext(310, 1),
  5488. OnNext(350, 2),
  5489. OnNext(420, 3),
  5490. OnCompleted<int>(450)
  5491. );
  5492. var ys = scheduler.CreateColdObservable(
  5493. OnNext(100, -1)
  5494. );
  5495. var res = scheduler.Start(() =>
  5496. xs.Timeout(new DateTimeOffset(new DateTime(400), TimeSpan.Zero), ys, scheduler)
  5497. );
  5498. res.Messages.AssertEqual(
  5499. OnNext(310, 1),
  5500. OnNext(350, 2),
  5501. OnNext(500, -1)
  5502. );
  5503. xs.Subscriptions.AssertEqual(
  5504. Subscribe(200, 400)
  5505. );
  5506. ys.Subscriptions.AssertEqual(
  5507. Subscribe(400, 1000)
  5508. );
  5509. }
  5510. [Fact]
  5511. public void Timeout_DateTimeOffset_TimeoutOccur_3()
  5512. {
  5513. var scheduler = new TestScheduler();
  5514. var xs = scheduler.CreateHotObservable(
  5515. OnNext(310, 1),
  5516. OnNext(350, 2),
  5517. OnNext(420, 3),
  5518. OnCompleted<int>(450)
  5519. );
  5520. var ys = scheduler.CreateColdObservable<int>(
  5521. );
  5522. var res = scheduler.Start(() =>
  5523. xs.Timeout(new DateTimeOffset(new DateTime(400), TimeSpan.Zero), ys, scheduler)
  5524. );
  5525. res.Messages.AssertEqual(
  5526. OnNext(310, 1),
  5527. OnNext(350, 2)
  5528. );
  5529. xs.Subscriptions.AssertEqual(
  5530. Subscribe(200, 400)
  5531. );
  5532. ys.Subscriptions.AssertEqual(
  5533. Subscribe(400, 1000)
  5534. );
  5535. }
  5536. [Fact]
  5537. public void Timeout_Duration_ArgumentChecking()
  5538. {
  5539. var someObservable = Observable.Empty<int>();
  5540. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Timeout(default(IObservable<int>), someObservable, x => someObservable, someObservable));
  5541. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Timeout(someObservable, default(IObservable<int>), x => someObservable, someObservable));
  5542. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Timeout(someObservable, someObservable, default(Func<int, IObservable<int>>), someObservable));
  5543. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Timeout(someObservable, someObservable, x => someObservable, default(IObservable<int>)));
  5544. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Timeout(default(IObservable<int>), someObservable, x => someObservable));
  5545. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Timeout(someObservable, default(IObservable<int>), x => someObservable));
  5546. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Timeout(someObservable, someObservable, default(Func<int, IObservable<int>>)));
  5547. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Timeout(default(IObservable<int>), x => someObservable, someObservable));
  5548. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Timeout(someObservable, default(Func<int, IObservable<int>>), someObservable));
  5549. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Timeout(someObservable, x => someObservable, default(IObservable<int>)));
  5550. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Timeout(default(IObservable<int>), x => someObservable));
  5551. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Timeout(someObservable, default(Func<int, IObservable<int>>)));
  5552. }
  5553. [Fact]
  5554. public void Timeout_Duration_Simple_Never()
  5555. {
  5556. var scheduler = new TestScheduler();
  5557. var xs = scheduler.CreateHotObservable(
  5558. OnNext(310, 1),
  5559. OnNext(350, 2),
  5560. OnNext(420, 3),
  5561. OnCompleted<int>(450)
  5562. );
  5563. var ys = scheduler.CreateColdObservable<int>(
  5564. );
  5565. var res = scheduler.Start(() =>
  5566. xs.Timeout(ys, _ => ys)
  5567. );
  5568. res.Messages.AssertEqual(
  5569. OnNext(310, 1),
  5570. OnNext(350, 2),
  5571. OnNext(420, 3),
  5572. OnCompleted<int>(450)
  5573. );
  5574. xs.Subscriptions.AssertEqual(
  5575. Subscribe(200, 450)
  5576. );
  5577. ys.Subscriptions.AssertEqual(
  5578. Subscribe(200, 310),
  5579. Subscribe(310, 350),
  5580. Subscribe(350, 420),
  5581. Subscribe(420, 450)
  5582. );
  5583. }
  5584. [Fact]
  5585. public void Timeout_Duration_Simple_TimeoutFirst()
  5586. {
  5587. var scheduler = new TestScheduler();
  5588. var xs = scheduler.CreateHotObservable(
  5589. OnNext(310, 1),
  5590. OnNext(350, 2),
  5591. OnNext(420, 3),
  5592. OnCompleted<int>(450)
  5593. );
  5594. var ys = scheduler.CreateColdObservable<string>(
  5595. OnNext(100, "Boo!")
  5596. );
  5597. var zs = scheduler.CreateColdObservable<string>(
  5598. );
  5599. var res = scheduler.Start(() =>
  5600. xs.Timeout(ys, _ => zs)
  5601. );
  5602. res.Messages.AssertEqual(
  5603. OnError<int>(300, ex => ex is TimeoutException)
  5604. );
  5605. xs.Subscriptions.AssertEqual(
  5606. Subscribe(200, 300)
  5607. );
  5608. ys.Subscriptions.AssertEqual(
  5609. Subscribe(200, 300)
  5610. );
  5611. zs.Subscriptions.AssertEqual(
  5612. );
  5613. }
  5614. [Fact]
  5615. public void Timeout_Duration_Simple_TimeoutFirst_Other()
  5616. {
  5617. var scheduler = new TestScheduler();
  5618. var xs = scheduler.CreateHotObservable(
  5619. OnNext(310, 1),
  5620. OnNext(350, 2),
  5621. OnNext(420, 3),
  5622. OnCompleted<int>(450)
  5623. );
  5624. var ys = scheduler.CreateColdObservable<string>(
  5625. OnNext(100, "Boo!")
  5626. );
  5627. var zs = scheduler.CreateColdObservable<string>(
  5628. );
  5629. var ts = scheduler.CreateColdObservable<int>(
  5630. OnNext(50, 42),
  5631. OnCompleted<int>(70)
  5632. );
  5633. var res = scheduler.Start(() =>
  5634. xs.Timeout(ys, _ => zs, ts)
  5635. );
  5636. res.Messages.AssertEqual(
  5637. OnNext(350, 42),
  5638. OnCompleted<int>(370)
  5639. );
  5640. xs.Subscriptions.AssertEqual(
  5641. Subscribe(200, 300)
  5642. );
  5643. ys.Subscriptions.AssertEqual(
  5644. Subscribe(200, 300)
  5645. );
  5646. zs.Subscriptions.AssertEqual(
  5647. );
  5648. ts.Subscriptions.AssertEqual(
  5649. Subscribe(300, 370)
  5650. );
  5651. }
  5652. [Fact]
  5653. public void Timeout_Duration_Simple_TimeoutLater()
  5654. {
  5655. var scheduler = new TestScheduler();
  5656. var xs = scheduler.CreateHotObservable(
  5657. OnNext(310, 1),
  5658. OnNext(350, 2),
  5659. OnNext(420, 3),
  5660. OnCompleted<int>(450)
  5661. );
  5662. var ys = scheduler.CreateColdObservable<string>(
  5663. );
  5664. var zs = scheduler.CreateColdObservable<string>(
  5665. OnNext(50, "Boo!")
  5666. );
  5667. var res = scheduler.Start(() =>
  5668. xs.Timeout(ys, _ => zs)
  5669. );
  5670. res.Messages.AssertEqual(
  5671. OnNext<int>(310, 1),
  5672. OnNext<int>(350, 2),
  5673. OnError<int>(400, ex => ex is TimeoutException)
  5674. );
  5675. xs.Subscriptions.AssertEqual(
  5676. Subscribe(200, 400)
  5677. );
  5678. ys.Subscriptions.AssertEqual(
  5679. Subscribe(200, 310)
  5680. );
  5681. zs.Subscriptions.AssertEqual(
  5682. Subscribe(310, 350),
  5683. Subscribe(350, 400)
  5684. );
  5685. }
  5686. [Fact]
  5687. public void Timeout_Duration_Simple_TimeoutLater_Other()
  5688. {
  5689. var scheduler = new TestScheduler();
  5690. var xs = scheduler.CreateHotObservable(
  5691. OnNext(310, 1),
  5692. OnNext(350, 2),
  5693. OnNext(420, 3),
  5694. OnCompleted<int>(450)
  5695. );
  5696. var ys = scheduler.CreateColdObservable<string>(
  5697. );
  5698. var zs = scheduler.CreateColdObservable<string>(
  5699. OnNext(50, "Boo!")
  5700. );
  5701. var ts = scheduler.CreateColdObservable<int>(
  5702. OnNext(50, 42),
  5703. OnCompleted<int>(70)
  5704. );
  5705. var res = scheduler.Start(() =>
  5706. xs.Timeout(ys, _ => zs, ts)
  5707. );
  5708. res.Messages.AssertEqual(
  5709. OnNext(310, 1),
  5710. OnNext(350, 2),
  5711. OnNext(450, 42),
  5712. OnCompleted<int>(470)
  5713. );
  5714. xs.Subscriptions.AssertEqual(
  5715. Subscribe(200, 400)
  5716. );
  5717. ys.Subscriptions.AssertEqual(
  5718. Subscribe(200, 310)
  5719. );
  5720. zs.Subscriptions.AssertEqual(
  5721. Subscribe(310, 350),
  5722. Subscribe(350, 400)
  5723. );
  5724. ts.Subscriptions.AssertEqual(
  5725. Subscribe(400, 470)
  5726. );
  5727. }
  5728. [Fact]
  5729. public void Timeout_Duration_Simple_TimeoutLater_NoFirst()
  5730. {
  5731. var scheduler = new TestScheduler();
  5732. var xs = scheduler.CreateHotObservable(
  5733. OnNext(310, 1),
  5734. OnNext(350, 2),
  5735. OnNext(420, 3),
  5736. OnCompleted<int>(450)
  5737. );
  5738. var zs = scheduler.CreateColdObservable<string>(
  5739. OnNext(50, "Boo!")
  5740. );
  5741. var res = scheduler.Start(() =>
  5742. xs.Timeout(_ => zs)
  5743. );
  5744. res.Messages.AssertEqual(
  5745. OnNext<int>(310, 1),
  5746. OnNext<int>(350, 2),
  5747. OnError<int>(400, ex => ex is TimeoutException)
  5748. );
  5749. xs.Subscriptions.AssertEqual(
  5750. Subscribe(200, 400)
  5751. );
  5752. zs.Subscriptions.AssertEqual(
  5753. Subscribe(310, 350),
  5754. Subscribe(350, 400)
  5755. );
  5756. }
  5757. [Fact]
  5758. public void Timeout_Duration_Simple_TimeoutLater_Other_NoFirst()
  5759. {
  5760. var scheduler = new TestScheduler();
  5761. var xs = scheduler.CreateHotObservable(
  5762. OnNext(310, 1),
  5763. OnNext(350, 2),
  5764. OnNext(420, 3),
  5765. OnCompleted<int>(450)
  5766. );
  5767. var zs = scheduler.CreateColdObservable<string>(
  5768. OnNext(50, "Boo!")
  5769. );
  5770. var ts = scheduler.CreateColdObservable<int>(
  5771. OnNext(50, 42),
  5772. OnCompleted<int>(70)
  5773. );
  5774. var res = scheduler.Start(() =>
  5775. xs.Timeout(_ => zs, ts)
  5776. );
  5777. res.Messages.AssertEqual(
  5778. OnNext(310, 1),
  5779. OnNext(350, 2),
  5780. OnNext(450, 42),
  5781. OnCompleted<int>(470)
  5782. );
  5783. xs.Subscriptions.AssertEqual(
  5784. Subscribe(200, 400)
  5785. );
  5786. zs.Subscriptions.AssertEqual(
  5787. Subscribe(310, 350),
  5788. Subscribe(350, 400)
  5789. );
  5790. ts.Subscriptions.AssertEqual(
  5791. Subscribe(400, 470)
  5792. );
  5793. }
  5794. [Fact]
  5795. public void Timeout_Duration_Simple_TimeoutByCompletion()
  5796. {
  5797. var scheduler = new TestScheduler();
  5798. var xs = scheduler.CreateHotObservable(
  5799. OnNext(310, 1),
  5800. OnNext(350, 2),
  5801. OnNext(420, 3),
  5802. OnCompleted<int>(450)
  5803. );
  5804. var ys = scheduler.CreateColdObservable<string>(
  5805. );
  5806. var zs = scheduler.CreateColdObservable<string>(
  5807. OnCompleted<string>(50)
  5808. );
  5809. var res = scheduler.Start(() =>
  5810. xs.Timeout(ys, _ => zs)
  5811. );
  5812. res.Messages.AssertEqual(
  5813. OnNext<int>(310, 1),
  5814. OnNext<int>(350, 2),
  5815. OnError<int>(400, ex => ex is TimeoutException)
  5816. );
  5817. xs.Subscriptions.AssertEqual(
  5818. Subscribe(200, 400)
  5819. );
  5820. ys.Subscriptions.AssertEqual(
  5821. Subscribe(200, 310)
  5822. );
  5823. zs.Subscriptions.AssertEqual(
  5824. Subscribe(310, 350),
  5825. Subscribe(350, 400)
  5826. );
  5827. }
  5828. [Fact]
  5829. public void Timeout_Duration_Simple_SelectorThrows()
  5830. {
  5831. var scheduler = new TestScheduler();
  5832. var xs = scheduler.CreateHotObservable(
  5833. OnNext(310, 1),
  5834. OnNext(350, 2),
  5835. OnNext(420, 3),
  5836. OnCompleted<int>(450)
  5837. );
  5838. var ys = scheduler.CreateColdObservable<string>(
  5839. );
  5840. var zs = scheduler.CreateColdObservable<string>(
  5841. );
  5842. var ex = new Exception();
  5843. var res = scheduler.Start(() =>
  5844. xs.Timeout(ys, x =>
  5845. {
  5846. if (x < 3)
  5847. return zs;
  5848. else
  5849. throw ex;
  5850. })
  5851. );
  5852. res.Messages.AssertEqual(
  5853. OnNext(310, 1),
  5854. OnNext(350, 2),
  5855. OnNext(420, 3),
  5856. OnError<int>(420, ex)
  5857. );
  5858. xs.Subscriptions.AssertEqual(
  5859. Subscribe(200, 420)
  5860. );
  5861. ys.Subscriptions.AssertEqual(
  5862. Subscribe(200, 310)
  5863. );
  5864. zs.Subscriptions.AssertEqual(
  5865. Subscribe(310, 350),
  5866. Subscribe(350, 420)
  5867. );
  5868. }
  5869. [Fact]
  5870. public void Timeout_Duration_Simple_InnerThrows()
  5871. {
  5872. var ex = new Exception();
  5873. var scheduler = new TestScheduler();
  5874. var xs = scheduler.CreateHotObservable(
  5875. OnNext(310, 1),
  5876. OnNext(350, 2),
  5877. OnNext(420, 3),
  5878. OnCompleted<int>(450)
  5879. );
  5880. var ys = scheduler.CreateColdObservable<string>(
  5881. );
  5882. var zs = scheduler.CreateColdObservable<string>(
  5883. OnError<string>(50, ex)
  5884. );
  5885. var res = scheduler.Start(() =>
  5886. xs.Timeout(ys, x => zs)
  5887. );
  5888. res.Messages.AssertEqual(
  5889. OnNext(310, 1),
  5890. OnNext(350, 2),
  5891. OnError<int>(400, ex)
  5892. );
  5893. xs.Subscriptions.AssertEqual(
  5894. Subscribe(200, 400)
  5895. );
  5896. ys.Subscriptions.AssertEqual(
  5897. Subscribe(200, 310)
  5898. );
  5899. zs.Subscriptions.AssertEqual(
  5900. Subscribe(310, 350),
  5901. Subscribe(350, 400)
  5902. );
  5903. }
  5904. [Fact]
  5905. public void Timeout_Duration_Simple_FirstThrows()
  5906. {
  5907. var ex = new Exception();
  5908. var scheduler = new TestScheduler();
  5909. var xs = scheduler.CreateHotObservable(
  5910. OnNext(310, 1),
  5911. OnNext(350, 2),
  5912. OnNext(420, 3),
  5913. OnCompleted<int>(450)
  5914. );
  5915. var ys = scheduler.CreateColdObservable<string>(
  5916. OnError<string>(50, ex)
  5917. );
  5918. var zs = scheduler.CreateColdObservable<string>(
  5919. );
  5920. var res = scheduler.Start(() =>
  5921. xs.Timeout(ys, x => zs)
  5922. );
  5923. res.Messages.AssertEqual(
  5924. OnError<int>(250, ex)
  5925. );
  5926. xs.Subscriptions.AssertEqual(
  5927. Subscribe(200, 250)
  5928. );
  5929. ys.Subscriptions.AssertEqual(
  5930. Subscribe(200, 250)
  5931. );
  5932. zs.Subscriptions.AssertEqual(
  5933. );
  5934. }
  5935. [Fact]
  5936. public void Timeout_Duration_Simple_SourceThrows()
  5937. {
  5938. var ex = new Exception();
  5939. var scheduler = new TestScheduler();
  5940. var xs = scheduler.CreateHotObservable(
  5941. OnNext(310, 1),
  5942. OnNext(350, 2),
  5943. OnNext(420, 3),
  5944. OnError<int>(450, ex)
  5945. );
  5946. var ys = scheduler.CreateColdObservable<string>(
  5947. );
  5948. var zs = scheduler.CreateColdObservable<string>(
  5949. );
  5950. var res = scheduler.Start(() =>
  5951. xs.Timeout(ys, x => zs)
  5952. );
  5953. res.Messages.AssertEqual(
  5954. OnNext(310, 1),
  5955. OnNext(350, 2),
  5956. OnNext(420, 3),
  5957. OnError<int>(450, ex)
  5958. );
  5959. xs.Subscriptions.AssertEqual(
  5960. Subscribe(200, 450)
  5961. );
  5962. ys.Subscriptions.AssertEqual(
  5963. Subscribe(200, 310)
  5964. );
  5965. zs.Subscriptions.AssertEqual(
  5966. Subscribe(310, 350),
  5967. Subscribe(350, 420),
  5968. Subscribe(420, 450)
  5969. );
  5970. }
  5971. #endregion
  5972. #region + Timer +
  5973. [Fact]
  5974. public void OneShotTimer_TimeSpan_ArgumentChecking()
  5975. {
  5976. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Timer(TimeSpan.Zero, null));
  5977. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Timer(TimeSpan.Zero, DummyScheduler.Instance).Subscribe(null));
  5978. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Timer(DateTimeOffset.Now, null));
  5979. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Timer(TimeSpan.Zero, TimeSpan.Zero, null));
  5980. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Timer(DateTimeOffset.Now, TimeSpan.Zero, null));
  5981. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(-1)));
  5982. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(-1), DummyScheduler.Instance));
  5983. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Timer(DateTimeOffset.Now, TimeSpan.FromSeconds(-1)));
  5984. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Timer(DateTimeOffset.Now, TimeSpan.FromSeconds(-1), DummyScheduler.Instance));
  5985. }
  5986. [Fact]
  5987. public void OneShotTimer_TimeSpan_Basic()
  5988. {
  5989. var scheduler = new TestScheduler();
  5990. var res = scheduler.Start(() =>
  5991. Observable.Timer(TimeSpan.FromTicks(300), scheduler)
  5992. );
  5993. res.Messages.AssertEqual(
  5994. OnNext(500, 0L),
  5995. OnCompleted<long>(500)
  5996. );
  5997. }
  5998. [Fact]
  5999. public void OneShotTimer_TimeSpan_Zero()
  6000. {
  6001. var scheduler = new TestScheduler();
  6002. var res = scheduler.Start(() =>
  6003. Observable.Timer(TimeSpan.FromTicks(0), scheduler)
  6004. );
  6005. res.Messages.AssertEqual(
  6006. OnNext(201, 0L),
  6007. OnCompleted<long>(201)
  6008. );
  6009. }
  6010. [Fact]
  6011. public void OneShotTimer_TimeSpan_Zero_DefaultScheduler()
  6012. {
  6013. var scheduler = new TestScheduler();
  6014. var observer = scheduler.CreateObserver<long>();
  6015. var completed = new ManualResetEvent(false);
  6016. Observable.Timer(TimeSpan.Zero).Subscribe(observer.OnNext, () => completed.Set());
  6017. completed.WaitOne();
  6018. Assert.Equal(1, observer.Messages.Count);
  6019. }
  6020. [Fact]
  6021. public void OneShotTimer_TimeSpan_Negative()
  6022. {
  6023. var scheduler = new TestScheduler();
  6024. var res = scheduler.Start(() =>
  6025. Observable.Timer(TimeSpan.FromTicks(-1), scheduler)
  6026. );
  6027. res.Messages.AssertEqual(
  6028. OnNext(201, 0L),
  6029. OnCompleted<long>(201)
  6030. );
  6031. }
  6032. [Fact]
  6033. public void OneShotTimer_TimeSpan_Disposed()
  6034. {
  6035. var scheduler = new TestScheduler();
  6036. var res = scheduler.Start(() =>
  6037. Observable.Timer(TimeSpan.FromTicks(1000), scheduler)
  6038. );
  6039. res.Messages.AssertEqual(
  6040. );
  6041. }
  6042. [Fact]
  6043. public void OneShotTimer_TimeSpan_ObserverThrows()
  6044. {
  6045. var scheduler1 = new TestScheduler();
  6046. var xs = Observable.Timer(TimeSpan.FromTicks(1), scheduler1);
  6047. xs.Subscribe(x => { throw new InvalidOperationException(); });
  6048. ReactiveAssert.Throws<InvalidOperationException>(() => scheduler1.Start());
  6049. var scheduler2 = new TestScheduler();
  6050. var ys = Observable.Timer(TimeSpan.FromTicks(1), scheduler2);
  6051. ys.Subscribe(x => { }, ex => { }, () => { throw new InvalidOperationException(); });
  6052. ReactiveAssert.Throws<InvalidOperationException>(() => scheduler2.Start());
  6053. }
  6054. [Fact]
  6055. public void OneShotTimer_TimeSpan_DefaultScheduler()
  6056. {
  6057. Assert.True(Observable.Timer(TimeSpan.FromMilliseconds(1)).ToEnumerable().SequenceEqual(new[] { 0L }));
  6058. }
  6059. [Fact]
  6060. public void OneShotTimer_DateTimeOffset_DefaultScheduler()
  6061. {
  6062. Assert.True(Observable.Timer(DateTimeOffset.UtcNow + TimeSpan.FromSeconds(1)).ToEnumerable().SequenceEqual(new[] { 0L }));
  6063. }
  6064. [Fact]
  6065. public void OneShotTimer_TimeSpan_TimeSpan_DefaultScheduler()
  6066. {
  6067. Assert.True(Observable.Timer(TimeSpan.FromMilliseconds(1), TimeSpan.FromMilliseconds(1)).ToEnumerable().Take(2).SequenceEqual(new[] { 0L, 1L }));
  6068. }
  6069. [Fact]
  6070. public void OneShotTimer_DateTimeOffset_TimeSpan_DefaultScheduler()
  6071. {
  6072. Assert.True(Observable.Timer(DateTimeOffset.UtcNow + TimeSpan.FromSeconds(1), TimeSpan.FromMilliseconds(1)).ToEnumerable().Take(2).SequenceEqual(new[] { 0L, 1L }));
  6073. }
  6074. [Fact]
  6075. public void OneShotTimer_DateTimeOffset_Basic()
  6076. {
  6077. var scheduler = new TestScheduler();
  6078. var res = scheduler.Start(() =>
  6079. Observable.Timer(new DateTimeOffset(500, TimeSpan.Zero), scheduler)
  6080. );
  6081. res.Messages.AssertEqual(
  6082. OnNext(500, 0L),
  6083. OnCompleted<long>(500)
  6084. );
  6085. }
  6086. [Fact]
  6087. public void OneShotTimer_DateTimeOffset_Zero()
  6088. {
  6089. var scheduler = new TestScheduler();
  6090. var res = scheduler.Start(() =>
  6091. Observable.Timer(new DateTimeOffset(200, TimeSpan.Zero), scheduler)
  6092. );
  6093. res.Messages.AssertEqual(
  6094. OnNext(201, 0L),
  6095. OnCompleted<long>(201)
  6096. );
  6097. }
  6098. [Fact]
  6099. public void OneShotTimer_DateTimeOffset_Past()
  6100. {
  6101. var scheduler = new TestScheduler();
  6102. var res = scheduler.Start(() =>
  6103. Observable.Timer(new DateTimeOffset(0, TimeSpan.Zero), scheduler)
  6104. );
  6105. res.Messages.AssertEqual(
  6106. OnNext(201, 0L),
  6107. OnCompleted<long>(201)
  6108. );
  6109. }
  6110. [Fact]
  6111. public void RepeatingTimer_TimeSpan_Zero_DefaultScheduler()
  6112. {
  6113. var scheduler = new TestScheduler();
  6114. var observer = scheduler.CreateObserver<long>();
  6115. var completed = new ManualResetEvent(false);
  6116. Observable.Timer(TimeSpan.Zero, TimeSpan.Zero).TakeWhile(i => i < 10).Subscribe(observer.OnNext, () => completed.Set());
  6117. completed.WaitOne();
  6118. Assert.Equal(10, observer.Messages.Count);
  6119. }
  6120. [Fact]
  6121. public void RepeatingTimer_DateTimeOffset_TimeSpan_Simple()
  6122. {
  6123. var scheduler = new TestScheduler();
  6124. var res = scheduler.Start(() =>
  6125. Observable.Timer(new DateTimeOffset(300, TimeSpan.Zero), TimeSpan.FromTicks(100), scheduler),
  6126. 0, 200, 750
  6127. );
  6128. res.Messages.AssertEqual(
  6129. OnNext(300, 0L),
  6130. OnNext(400, 1L),
  6131. OnNext(500, 2L),
  6132. OnNext(600, 3L),
  6133. OnNext(700, 4L)
  6134. );
  6135. }
  6136. [Fact]
  6137. public void RepeatingTimer_TimeSpan_TimeSpan_Simple()
  6138. {
  6139. var scheduler = new TestScheduler();
  6140. var res = scheduler.Start(() =>
  6141. Observable.Timer(TimeSpan.FromTicks(100), TimeSpan.FromTicks(100), scheduler),
  6142. 0, 200, 750
  6143. );
  6144. res.Messages.AssertEqual(
  6145. OnNext(300, 0L),
  6146. OnNext(400, 1L),
  6147. OnNext(500, 2L),
  6148. OnNext(600, 3L),
  6149. OnNext(700, 4L)
  6150. );
  6151. }
  6152. [Fact]
  6153. public void RepeatingTimer_Periodic1()
  6154. {
  6155. var scheduler = new PeriodicTestScheduler();
  6156. var res = scheduler.Start(() =>
  6157. Observable.Timer(TimeSpan.FromTicks(50), TimeSpan.FromTicks(100), scheduler),
  6158. 0, 200, 700
  6159. );
  6160. res.Messages.AssertEqual(
  6161. OnNext(250, 0L),
  6162. OnNext(350, 1L),
  6163. OnNext(450, 2L),
  6164. OnNext(550, 3L),
  6165. OnNext(650, 4L)
  6166. );
  6167. #if !WINDOWS
  6168. scheduler.Timers.AssertEqual(
  6169. new TimerRun(250, 700) { 350, 450, 550, 650 }
  6170. );
  6171. #endif
  6172. }
  6173. [Fact]
  6174. public void RepeatingTimer_Periodic2()
  6175. {
  6176. var scheduler = new PeriodicTestScheduler();
  6177. var res = scheduler.Start(() =>
  6178. Observable.Timer(TimeSpan.FromTicks(100), TimeSpan.FromTicks(100), scheduler),
  6179. 0, 200, 750
  6180. );
  6181. res.Messages.AssertEqual(
  6182. OnNext(300, 0L),
  6183. OnNext(400, 1L),
  6184. OnNext(500, 2L),
  6185. OnNext(600, 3L),
  6186. OnNext(700, 4L)
  6187. );
  6188. #if !WINDOWS
  6189. scheduler.Timers.AssertEqual(
  6190. new TimerRun(200, 750) { 300, 400, 500, 600, 700 }
  6191. );
  6192. #endif
  6193. }
  6194. [Fact]
  6195. public void RepeatingTimer_UsingStopwatch_Slippage1()
  6196. {
  6197. var scheduler = new TestScheduler();
  6198. var xs = default(IObservable<long>);
  6199. scheduler.ScheduleAbsolute(100, () => { xs = Observable.Timer(TimeSpan.Zero, TimeSpan.FromTicks(100), scheduler); });
  6200. var times = new List<long>();
  6201. var onNext = new Action<long>(x =>
  6202. {
  6203. times.Add(scheduler.Clock);
  6204. if (x == 0)
  6205. {
  6206. return;
  6207. }
  6208. if (x < 2)
  6209. {
  6210. scheduler.Sleep(50);
  6211. return;
  6212. }
  6213. if (x < 4)
  6214. {
  6215. scheduler.Sleep(120);
  6216. return;
  6217. }
  6218. if (x < 6)
  6219. {
  6220. scheduler.Sleep(50);
  6221. return;
  6222. }
  6223. if (x < 8)
  6224. {
  6225. return;
  6226. }
  6227. });
  6228. var d = default(IDisposable);
  6229. scheduler.ScheduleAbsolute(200, () => { d = xs.Subscribe(onNext); });
  6230. scheduler.ScheduleAbsolute(1000, () => { d.Dispose(); });
  6231. scheduler.Start();
  6232. times.AssertEqual(
  6233. 201, // 1 off because of initial scheduling jump (InvokeStart)
  6234. 301,
  6235. 401,
  6236. 522, // 2 off because of 401 + 120 + 1 scheduling tick
  6237. 643, // 3 off because of 522 + 120 + 1 scheduling tick
  6238. 701,
  6239. 801,
  6240. 901
  6241. );
  6242. }
  6243. [Fact]
  6244. public void RepeatingTimer_UsingStopwatch_Slippage2()
  6245. {
  6246. var scheduler = new TestScheduler();
  6247. var xs = default(IObservable<long>);
  6248. scheduler.ScheduleAbsolute(100, () => { xs = Observable.Timer(TimeSpan.FromTicks(100), TimeSpan.FromTicks(100), scheduler); });
  6249. var times = new List<long>();
  6250. var onNext = new Action<long>(x =>
  6251. {
  6252. times.Add(scheduler.Clock);
  6253. if (x == 0)
  6254. {
  6255. return;
  6256. }
  6257. if (x < 2)
  6258. {
  6259. scheduler.Sleep(50);
  6260. return;
  6261. }
  6262. if (x < 4)
  6263. {
  6264. scheduler.Sleep(120);
  6265. return;
  6266. }
  6267. if (x < 6)
  6268. {
  6269. scheduler.Sleep(50);
  6270. return;
  6271. }
  6272. if (x < 8)
  6273. {
  6274. return;
  6275. }
  6276. });
  6277. var d = default(IDisposable);
  6278. scheduler.ScheduleAbsolute(200, () => { d = xs.Subscribe(onNext); });
  6279. scheduler.ScheduleAbsolute(1000, () => { d.Dispose(); });
  6280. scheduler.Start();
  6281. times.AssertEqual(
  6282. 300,
  6283. 400,
  6284. 500,
  6285. 621, // 1 off because of recursive scheduling beyond the target time
  6286. 742, // 2 off because of 621 + 120 + 1 scheduling tick
  6287. 800,
  6288. 900
  6289. );
  6290. }
  6291. [Fact]
  6292. public void RepeatingTimer_UsingStopwatch_Slippage3_CatchUpFromLongInvokeStart()
  6293. {
  6294. var scheduler = new TestScheduler();
  6295. var xs = default(IObservable<long>);
  6296. scheduler.ScheduleAbsolute(100, () => { xs = Observable.Timer(TimeSpan.Zero, TimeSpan.FromTicks(100), scheduler); });
  6297. var times = new List<long>();
  6298. var onNext = new Action<long>(x =>
  6299. {
  6300. times.Add(scheduler.Clock);
  6301. if (x == 0)
  6302. {
  6303. scheduler.Sleep(350);
  6304. return;
  6305. }
  6306. });
  6307. var d = default(IDisposable);
  6308. scheduler.ScheduleAbsolute(200, () => { d = xs.Subscribe(onNext); });
  6309. scheduler.ScheduleAbsolute(1000, () => { d.Dispose(); });
  6310. scheduler.Start();
  6311. times.AssertEqual(
  6312. 201, // 1 off because of initial scheduling jump (InvokeStart)
  6313. 551, // catching up after excessive delay of 350 (target was 300)
  6314. 552, // catching up after excessive delay of 350 (target was 400)
  6315. 553, // catching up after excessive delay of 350 (target was 500)
  6316. 601, // back in sync
  6317. 701,
  6318. 801,
  6319. 901
  6320. );
  6321. }
  6322. [Fact]
  6323. public void RepeatingTimer_UsingStopwatch_Slippage3_CatchUpFromLongInvokeStart_ThrowsFirst()
  6324. {
  6325. var ex = new Exception();
  6326. var scheduler = new TestScheduler();
  6327. var xs = default(IObservable<long>);
  6328. scheduler.ScheduleAbsolute(100, () => { xs = Observable.Timer(TimeSpan.Zero, TimeSpan.FromTicks(100), scheduler); });
  6329. var onNext = new Action<long>(x =>
  6330. {
  6331. if (x == 0)
  6332. throw ex;
  6333. });
  6334. var d = default(IDisposable);
  6335. scheduler.ScheduleAbsolute(200, () => { d = xs.Subscribe(onNext); });
  6336. scheduler.ScheduleAbsolute(1000, () => { d.Dispose(); });
  6337. try
  6338. {
  6339. scheduler.Start();
  6340. }
  6341. catch (Exception e)
  6342. {
  6343. Assert.Equal(201, scheduler.Clock);
  6344. Assert.Same(ex, e);
  6345. }
  6346. }
  6347. [Fact]
  6348. public void RepeatingTimer_UsingStopwatch_Slippage3_CatchUpFromLongInvokeStart_ThrowsBeyondFirst()
  6349. {
  6350. var ex = new Exception();
  6351. var scheduler = new TestScheduler();
  6352. var xs = default(IObservable<long>);
  6353. scheduler.ScheduleAbsolute(100, () => { xs = Observable.Timer(TimeSpan.Zero, TimeSpan.FromTicks(100), scheduler); });
  6354. var times = new List<long>();
  6355. var onNext = new Action<long>(x =>
  6356. {
  6357. times.Add(scheduler.Clock);
  6358. if (x == 0)
  6359. {
  6360. scheduler.Sleep(350);
  6361. return;
  6362. }
  6363. if (x == 5)
  6364. throw ex;
  6365. });
  6366. var d = default(IDisposable);
  6367. scheduler.ScheduleAbsolute(200, () => { d = xs.Subscribe(onNext); });
  6368. scheduler.ScheduleAbsolute(1000, () => { d.Dispose(); });
  6369. try
  6370. {
  6371. scheduler.Start();
  6372. }
  6373. catch (Exception e)
  6374. {
  6375. Assert.Equal(701, scheduler.Clock);
  6376. Assert.Same(ex, e);
  6377. }
  6378. times.AssertEqual(
  6379. 201, // 1 off because of initial scheduling jump (InvokeStart)
  6380. 551, // catching up after excessive delay of 350 (target was 300)
  6381. 552, // catching up after excessive delay of 350 (target was 400)
  6382. 553, // catching up after excessive delay of 350 (target was 500)
  6383. 601, // back in sync
  6384. 701
  6385. );
  6386. }
  6387. [Fact]
  6388. public void RepeatingTimer_NoStopwatch_Slippage1()
  6389. {
  6390. var scheduler = new TestScheduler();
  6391. var xs = default(IObservable<long>);
  6392. scheduler.ScheduleAbsolute(100, () => { xs = Observable.Timer(TimeSpan.Zero, TimeSpan.FromTicks(100), scheduler.DisableOptimizations(typeof(IStopwatchProvider))); });
  6393. var times = new List<long>();
  6394. var onNext = new Action<long>(x =>
  6395. {
  6396. times.Add(scheduler.Clock);
  6397. if (x == 0)
  6398. {
  6399. return;
  6400. }
  6401. if (x < 2)
  6402. {
  6403. scheduler.Sleep(50);
  6404. return;
  6405. }
  6406. if (x < 4)
  6407. {
  6408. scheduler.Sleep(120);
  6409. return;
  6410. }
  6411. if (x < 6)
  6412. {
  6413. scheduler.Sleep(50);
  6414. return;
  6415. }
  6416. if (x < 8)
  6417. {
  6418. return;
  6419. }
  6420. });
  6421. var d = default(IDisposable);
  6422. scheduler.ScheduleAbsolute(200, () => { d = xs.Subscribe(onNext); });
  6423. scheduler.ScheduleAbsolute(1000, () => { d.Dispose(); });
  6424. scheduler.Start();
  6425. times.AssertEqual(
  6426. 201, // 1 off because of initial scheduling jump (InvokeStart)
  6427. 301,
  6428. 401,
  6429. 523, // 3 off because of 401 + 120 + 2 scheduling ticks (one due to yield in SchedulePeriodic emulation code)
  6430. 645, // 5 off because of 523 + 120 + 2 scheduling ticks (one due to yield in SchedulePeriodic emulation code)
  6431. 743, // \
  6432. 843, // +--> 43 off because this situation (no stopwatch or periodic scheduling interface) only gets best effort treatment (see SchedulePeriodic emulation code)
  6433. 943 // /
  6434. );
  6435. }
  6436. [Fact]
  6437. public void RepeatingTimer_NoStopwatch_Slippage2()
  6438. {
  6439. var scheduler = new TestScheduler();
  6440. var xs = default(IObservable<long>);
  6441. scheduler.ScheduleAbsolute(100, () => { xs = Observable.Timer(TimeSpan.FromTicks(100), TimeSpan.FromTicks(100), scheduler.DisableOptimizations(typeof(IStopwatchProvider))); });
  6442. var times = new List<long>();
  6443. var onNext = new Action<long>(x =>
  6444. {
  6445. times.Add(scheduler.Clock);
  6446. if (x == 0)
  6447. {
  6448. return;
  6449. }
  6450. if (x < 2)
  6451. {
  6452. scheduler.Sleep(50);
  6453. return;
  6454. }
  6455. if (x < 4)
  6456. {
  6457. scheduler.Sleep(120);
  6458. return;
  6459. }
  6460. if (x < 6)
  6461. {
  6462. scheduler.Sleep(50);
  6463. return;
  6464. }
  6465. if (x < 8)
  6466. {
  6467. return;
  6468. }
  6469. });
  6470. var d = default(IDisposable);
  6471. scheduler.ScheduleAbsolute(200, () => { d = xs.Subscribe(onNext); });
  6472. scheduler.ScheduleAbsolute(1000, () => { d.Dispose(); });
  6473. scheduler.Start();
  6474. times.AssertEqual(
  6475. 300,
  6476. 400,
  6477. 500,
  6478. 622, // 2 off because of 500 + 120 + 2 scheduling ticks (one due to yield in SchedulePeriodic emulation code)
  6479. 744, // 4 off because of 622 + 120 + 2 scheduling ticks (one due to yield in SchedulePeriodic emulation code)
  6480. 842, // |
  6481. 942 // +--> 42 off because this situation (no stopwatch or periodic scheduling interface) only gets best effort treatment (see SchedulePeriodic emulation code)
  6482. );
  6483. }
  6484. #if !NO_THREAD
  6485. [Fact]
  6486. public void RepeatingTimer_Start_CatchUp()
  6487. {
  6488. var e = new ManualResetEvent(false);
  6489. var xs = Observable.Timer(TimeSpan.Zero, TimeSpan.FromMilliseconds(10));
  6490. var d = new SingleAssignmentDisposable();
  6491. d.Disposable = xs.Subscribe(x =>
  6492. {
  6493. if (x == 0)
  6494. Thread.Sleep(500);
  6495. if (x > 10)
  6496. {
  6497. e.Set();
  6498. d.Dispose();
  6499. }
  6500. });
  6501. e.WaitOne();
  6502. }
  6503. [Fact]
  6504. public void RepeatingTimer_Start_CatchUp_Throws()
  6505. {
  6506. var end = new ManualResetEvent(false);
  6507. var err = new Exception();
  6508. var ex = default(Exception);
  6509. var s = ThreadPoolScheduler.Instance.Catch<Exception>(e =>
  6510. {
  6511. Interlocked.Exchange(ref ex, e);
  6512. end.Set();
  6513. return true;
  6514. });
  6515. var xs = Observable.Timer(TimeSpan.Zero, TimeSpan.FromMilliseconds(10), s);
  6516. xs.Subscribe(x =>
  6517. {
  6518. if (x == 0)
  6519. Thread.Sleep(500);
  6520. if (x == 5)
  6521. throw err;
  6522. });
  6523. end.WaitOne();
  6524. Assert.Same(err, ex);
  6525. }
  6526. #endif
  6527. class SchedulerWithCatch : IServiceProvider, IScheduler
  6528. {
  6529. private readonly IScheduler _scheduler;
  6530. private readonly Action<Exception> _setException;
  6531. public SchedulerWithCatch(IScheduler scheduler, Action<Exception> setException)
  6532. {
  6533. _scheduler = scheduler;
  6534. _setException = setException;
  6535. }
  6536. public object GetService(Type serviceType)
  6537. {
  6538. return ((IServiceProvider)_scheduler).GetService(serviceType);
  6539. }
  6540. public DateTimeOffset Now
  6541. {
  6542. get { return _scheduler.Now; }
  6543. }
  6544. public IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action)
  6545. {
  6546. return _scheduler.Schedule<TState>(state, GetCatch(action));
  6547. }
  6548. public IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)
  6549. {
  6550. return _scheduler.Schedule<TState>(state, dueTime, GetCatch(action));
  6551. }
  6552. public IDisposable Schedule<TState>(TState state, DateTimeOffset dueTime, Func<IScheduler, TState, IDisposable> action)
  6553. {
  6554. return _scheduler.Schedule<TState>(state, dueTime, GetCatch(action));
  6555. }
  6556. private Func<IScheduler, TState, IDisposable> GetCatch<TState>(Func<IScheduler, TState, IDisposable> action)
  6557. {
  6558. return (self, s) =>
  6559. {
  6560. try
  6561. {
  6562. return action(new SchedulerWithCatch(self, _setException), s);
  6563. }
  6564. catch (Exception ex)
  6565. {
  6566. _setException(ex);
  6567. return Disposable.Empty;
  6568. }
  6569. };
  6570. }
  6571. }
  6572. class PeriodicTestScheduler : TestScheduler, ISchedulerPeriodic, IServiceProvider
  6573. {
  6574. private readonly List<TimerRun> _timers;
  6575. public PeriodicTestScheduler()
  6576. {
  6577. _timers = new List<TimerRun>();
  6578. }
  6579. public IDisposable SchedulePeriodic<TState>(TState state, TimeSpan period, Func<TState, TState> action)
  6580. {
  6581. var run = new TimerRun(this.Clock);
  6582. _timers.Add(run);
  6583. var x = state;
  6584. var d = this.Schedule(period, self =>
  6585. {
  6586. run.Add(this.Clock);
  6587. x = action(x);
  6588. self(period);
  6589. });
  6590. return new CompositeDisposable(
  6591. Disposable.Create(() => { run.Stop(this.Clock); }),
  6592. d
  6593. );
  6594. }
  6595. public List<TimerRun> Timers
  6596. {
  6597. get { return _timers; }
  6598. }
  6599. protected override object GetService(Type serviceType)
  6600. {
  6601. if (serviceType == typeof(ISchedulerPeriodic))
  6602. return this as ISchedulerPeriodic;
  6603. return base.GetService(serviceType);
  6604. }
  6605. }
  6606. class TimerRun : IEnumerable<long>
  6607. {
  6608. private readonly long _started;
  6609. private long _stopped;
  6610. private bool _hasStopped;
  6611. private readonly List<long> _ticks;
  6612. public TimerRun(long started)
  6613. {
  6614. _started = started;
  6615. _ticks = new List<long>();
  6616. }
  6617. public TimerRun(long started, long stopped)
  6618. {
  6619. _started = started;
  6620. _stopped = stopped;
  6621. _hasStopped = true;
  6622. _ticks = new List<long>();
  6623. }
  6624. public override int GetHashCode()
  6625. {
  6626. return 0;
  6627. }
  6628. public override bool Equals(object obj)
  6629. {
  6630. var other = obj as TimerRun;
  6631. if (other == null)
  6632. return false;
  6633. return _started == other._started && _stopped == other._stopped && _ticks.SequenceEqual(other._ticks);
  6634. }
  6635. public long Started
  6636. {
  6637. get { return _started; }
  6638. }
  6639. public IEnumerable<long> Ticks
  6640. {
  6641. get { return _ticks; }
  6642. }
  6643. public long Stopped
  6644. {
  6645. get { return _stopped; }
  6646. }
  6647. internal void Stop(long clock)
  6648. {
  6649. _stopped = clock;
  6650. _hasStopped = true;
  6651. }
  6652. public override string ToString()
  6653. {
  6654. var sb = new StringBuilder();
  6655. sb.Append("Start(" + _started + ") ");
  6656. sb.Append("Ticks(" + string.Join(", ", _ticks.Select(t => t.ToString()).ToArray()) + ") ");
  6657. if (_hasStopped)
  6658. sb.Append("Stop(" + _stopped + ")");
  6659. return sb.ToString();
  6660. }
  6661. public IEnumerator<long> GetEnumerator()
  6662. {
  6663. return _ticks.GetEnumerator();
  6664. }
  6665. System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
  6666. {
  6667. return _ticks.GetEnumerator();
  6668. }
  6669. public void Add(long clock)
  6670. {
  6671. _ticks.Add(clock);
  6672. }
  6673. }
  6674. #endregion
  6675. #region + Timestamp +
  6676. [Fact]
  6677. public void Timestamp_ArgumentChecking()
  6678. {
  6679. var scheduler = new TestScheduler();
  6680. var someObservable = Observable.Empty<int>();
  6681. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Timestamp(default(IObservable<int>)));
  6682. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Timestamp(default(IObservable<int>), scheduler));
  6683. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Timestamp(someObservable, null));
  6684. }
  6685. [Fact]
  6686. public void Timestamp_Regular()
  6687. {
  6688. var scheduler = new TestScheduler();
  6689. var xs = scheduler.CreateHotObservable(
  6690. OnNext(150, 1),
  6691. OnNext(210, 2),
  6692. OnNext(230, 3),
  6693. OnNext(260, 4),
  6694. OnNext(300, 5),
  6695. OnNext(350, 6),
  6696. OnCompleted<int>(400)
  6697. );
  6698. var res = scheduler.Start(() =>
  6699. xs.Timestamp(scheduler)
  6700. );
  6701. res.Messages.AssertEqual(
  6702. OnNext(210, new Timestamped<int>(2, new DateTimeOffset(210, TimeSpan.Zero))),
  6703. OnNext(230, new Timestamped<int>(3, new DateTimeOffset(230, TimeSpan.Zero))),
  6704. OnNext(260, new Timestamped<int>(4, new DateTimeOffset(260, TimeSpan.Zero))),
  6705. OnNext(300, new Timestamped<int>(5, new DateTimeOffset(300, TimeSpan.Zero))),
  6706. OnNext(350, new Timestamped<int>(6, new DateTimeOffset(350, TimeSpan.Zero))),
  6707. OnCompleted<Timestamped<int>>(400)
  6708. );
  6709. xs.Subscriptions.AssertEqual(
  6710. Subscribe(200, 400)
  6711. );
  6712. }
  6713. [Fact]
  6714. public void Timestamp_Empty()
  6715. {
  6716. var scheduler = new TestScheduler();
  6717. var xs = scheduler.CreateHotObservable<int>(
  6718. OnNext(150, 1),
  6719. OnCompleted<int>(400)
  6720. );
  6721. var res = scheduler.Start(() =>
  6722. xs.Timestamp(scheduler)
  6723. );
  6724. res.Messages.AssertEqual(
  6725. OnCompleted<Timestamped<int>>(400)
  6726. );
  6727. xs.Subscriptions.AssertEqual(
  6728. Subscribe(200, 400)
  6729. );
  6730. }
  6731. [Fact]
  6732. public void Timestamp_Error()
  6733. {
  6734. var scheduler = new TestScheduler();
  6735. var ex = new Exception();
  6736. var xs = scheduler.CreateHotObservable<int>(
  6737. OnNext(150, 1),
  6738. OnError<int>(400, ex)
  6739. );
  6740. var res = scheduler.Start(() =>
  6741. xs.Timestamp(scheduler)
  6742. );
  6743. res.Messages.AssertEqual(
  6744. OnError<Timestamped<int>>(400, ex)
  6745. );
  6746. xs.Subscriptions.AssertEqual(
  6747. Subscribe(200, 400)
  6748. );
  6749. }
  6750. [Fact]
  6751. public void Timestamp_Never()
  6752. {
  6753. var scheduler = new TestScheduler();
  6754. var xs = scheduler.CreateHotObservable<int>(
  6755. OnNext(150, 1)
  6756. );
  6757. var res = scheduler.Start(() =>
  6758. xs.Timestamp(scheduler)
  6759. );
  6760. res.Messages.AssertEqual(
  6761. );
  6762. xs.Subscriptions.AssertEqual(
  6763. Subscribe(200, 1000)
  6764. );
  6765. }
  6766. [Fact]
  6767. public void Timestamp_DefaultScheduler()
  6768. {
  6769. Assert.True(Observable.Return(1).Timestamp().Count().First() == 1);
  6770. }
  6771. #endregion
  6772. #region + Window +
  6773. [Fact]
  6774. public void Window_Time_Basic()
  6775. {
  6776. var scheduler = new TestScheduler();
  6777. var xs = scheduler.CreateHotObservable(
  6778. OnNext(150, 1),
  6779. OnNext(210, 2),
  6780. OnNext(240, 3),
  6781. OnNext(270, 4),
  6782. OnNext(320, 5),
  6783. OnNext(360, 6),
  6784. OnNext(390, 7),
  6785. OnNext(410, 8),
  6786. OnNext(460, 9),
  6787. OnNext(470, 10),
  6788. OnCompleted<int>(490)
  6789. );
  6790. var res = scheduler.Start(() =>
  6791. xs.Window(TimeSpan.FromTicks(100), scheduler).Select((ys, i) => ys.Select(y => i + " " + y).Concat(Observable.Return(i + " end"))).Merge()
  6792. );
  6793. res.Messages.AssertEqual(
  6794. OnNext(210, "0 2"),
  6795. OnNext(240, "0 3"),
  6796. OnNext(270, "0 4"),
  6797. OnNext(300, "0 end"),
  6798. OnNext(320, "1 5"),
  6799. OnNext(360, "1 6"),
  6800. OnNext(390, "1 7"),
  6801. OnNext(400, "1 end"),
  6802. OnNext(410, "2 8"),
  6803. OnNext(460, "2 9"),
  6804. OnNext(470, "2 10"),
  6805. OnNext(490, "2 end"),
  6806. OnCompleted<string>(490)
  6807. );
  6808. xs.Subscriptions.AssertEqual(
  6809. Subscribe(200, 490)
  6810. );
  6811. }
  6812. [Fact]
  6813. public void Window_Time_Basic_Periodic()
  6814. {
  6815. var scheduler = new PeriodicTestScheduler();
  6816. var xs = scheduler.CreateHotObservable(
  6817. OnNext(150, 1),
  6818. OnNext(210, 2),
  6819. OnNext(240, 3),
  6820. OnNext(270, 4),
  6821. OnNext(320, 5),
  6822. OnNext(360, 6),
  6823. OnNext(390, 7),
  6824. OnNext(410, 8),
  6825. OnNext(460, 9),
  6826. OnNext(470, 10),
  6827. OnCompleted<int>(490)
  6828. );
  6829. var res = scheduler.Start(() =>
  6830. xs.Window(TimeSpan.FromTicks(100), scheduler).Select((ys, i) => ys.Select(y => i + " " + y).Concat(Observable.Return(i + " end"))).Merge()
  6831. );
  6832. res.Messages.AssertEqual(
  6833. OnNext(210, "0 2"),
  6834. OnNext(240, "0 3"),
  6835. OnNext(270, "0 4"),
  6836. OnNext(300, "0 end"),
  6837. OnNext(320, "1 5"),
  6838. OnNext(360, "1 6"),
  6839. OnNext(390, "1 7"),
  6840. OnNext(400, "1 end"),
  6841. OnNext(410, "2 8"),
  6842. OnNext(460, "2 9"),
  6843. OnNext(470, "2 10"),
  6844. OnNext(490, "2 end"),
  6845. OnCompleted<string>(490)
  6846. );
  6847. xs.Subscriptions.AssertEqual(
  6848. Subscribe(200, 490)
  6849. );
  6850. #if !WINDOWS
  6851. scheduler.Timers.AssertEqual(
  6852. new TimerRun(200, 490) { 300, 400 }
  6853. );
  6854. #endif
  6855. }
  6856. [Fact]
  6857. public void Window_Time_Basic_Periodic_Error()
  6858. {
  6859. var ex = new Exception();
  6860. var scheduler = new PeriodicTestScheduler();
  6861. var xs = scheduler.CreateHotObservable(
  6862. OnNext(150, 1),
  6863. OnNext(210, 2),
  6864. OnNext(240, 3),
  6865. OnNext(270, 4),
  6866. OnNext(320, 5),
  6867. OnNext(360, 6),
  6868. OnNext(390, 7),
  6869. OnNext(410, 8),
  6870. OnError<int>(460, ex)
  6871. );
  6872. var res = scheduler.Start(() =>
  6873. xs.Window(TimeSpan.FromTicks(100), scheduler).Select((ys, i) => ys.Select(y => i + " " + y).Concat(Observable.Return(i + " end"))).Merge()
  6874. );
  6875. res.Messages.AssertEqual(
  6876. OnNext(210, "0 2"),
  6877. OnNext(240, "0 3"),
  6878. OnNext(270, "0 4"),
  6879. OnNext(300, "0 end"),
  6880. OnNext(320, "1 5"),
  6881. OnNext(360, "1 6"),
  6882. OnNext(390, "1 7"),
  6883. OnNext(400, "1 end"),
  6884. OnNext(410, "2 8"),
  6885. OnError<string>(460, ex)
  6886. );
  6887. xs.Subscriptions.AssertEqual(
  6888. Subscribe(200, 460)
  6889. );
  6890. #if !WINDOWS
  6891. scheduler.Timers.AssertEqual(
  6892. new TimerRun(200, 460) { 300, 400 }
  6893. );
  6894. #endif
  6895. }
  6896. [Fact]
  6897. public void Window_Time_Basic_Both()
  6898. {
  6899. var scheduler = new TestScheduler();
  6900. var xs = scheduler.CreateHotObservable(
  6901. OnNext(150, 1),
  6902. OnNext(210, 2),
  6903. OnNext(240, 3),
  6904. OnNext(270, 4),
  6905. OnNext(320, 5),
  6906. OnNext(360, 6),
  6907. OnNext(390, 7),
  6908. OnNext(410, 8),
  6909. OnNext(460, 9),
  6910. OnNext(470, 10),
  6911. OnCompleted<int>(490)
  6912. );
  6913. var res = scheduler.Start(() =>
  6914. xs.Window(TimeSpan.FromTicks(100), TimeSpan.FromTicks(50), scheduler).Select((ys, i) => ys.Select(y => i + " " + y).Concat(Observable.Return(i + " end"))).Merge()
  6915. );
  6916. res.Messages.AssertEqual(
  6917. OnNext(210, "0 2"),
  6918. OnNext(240, "0 3"),
  6919. OnNext(270, "0 4"),
  6920. OnNext(270, "1 4"),
  6921. OnNext(300, "0 end"),
  6922. OnNext(320, "1 5"),
  6923. OnNext(320, "2 5"),
  6924. OnNext(350, "1 end"),
  6925. OnNext(360, "2 6"),
  6926. OnNext(360, "3 6"),
  6927. OnNext(390, "2 7"),
  6928. OnNext(390, "3 7"),
  6929. OnNext(400, "2 end"),
  6930. OnNext(410, "3 8"),
  6931. OnNext(410, "4 8"),
  6932. OnNext(450, "3 end"),
  6933. OnNext(460, "4 9"),
  6934. OnNext(460, "5 9"),
  6935. OnNext(470, "4 10"),
  6936. OnNext(470, "5 10"),
  6937. OnNext(490, "4 end"),
  6938. OnNext(490, "5 end"),
  6939. OnCompleted<string>(490)
  6940. );
  6941. xs.Subscriptions.AssertEqual(
  6942. Subscribe(200, 490)
  6943. );
  6944. }
  6945. [Fact]
  6946. public void WindowWithTime_ArgumentChecking()
  6947. {
  6948. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Window(default(IObservable<int>), TimeSpan.FromTicks(1), TimeSpan.FromTicks(1), DummyScheduler.Instance));
  6949. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Window(DummyObservable<int>.Instance, TimeSpan.FromTicks(-1), TimeSpan.FromTicks(1), DummyScheduler.Instance));
  6950. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Window(DummyObservable<int>.Instance, TimeSpan.FromTicks(1), TimeSpan.FromTicks(-1), DummyScheduler.Instance));
  6951. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Window(DummyObservable<int>.Instance, TimeSpan.FromTicks(1), TimeSpan.FromTicks(1), null));
  6952. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Window(default(IObservable<int>), TimeSpan.FromTicks(1), TimeSpan.FromTicks(1)));
  6953. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Window(DummyObservable<int>.Instance, TimeSpan.FromTicks(-1), TimeSpan.FromTicks(1)));
  6954. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Window(DummyObservable<int>.Instance, TimeSpan.FromTicks(1), TimeSpan.FromTicks(-1)));
  6955. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Window(default(IObservable<int>), TimeSpan.FromTicks(1), DummyScheduler.Instance));
  6956. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Window(DummyObservable<int>.Instance, TimeSpan.FromTicks(-1), DummyScheduler.Instance));
  6957. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Window(DummyObservable<int>.Instance, TimeSpan.FromTicks(1), null));
  6958. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Window(default(IObservable<int>), TimeSpan.FromTicks(1)));
  6959. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Window(DummyObservable<int>.Instance, TimeSpan.FromTicks(-1)));
  6960. }
  6961. [Fact]
  6962. public void WindowWithTime_Basic1()
  6963. {
  6964. var scheduler = new TestScheduler();
  6965. var xs = scheduler.CreateHotObservable(
  6966. OnNext(100, 1),
  6967. OnNext(210, 2),
  6968. OnNext(240, 3),
  6969. OnNext(280, 4),
  6970. OnNext(320, 5),
  6971. OnNext(350, 6),
  6972. OnNext(380, 7),
  6973. OnNext(420, 8),
  6974. OnNext(470, 9),
  6975. OnCompleted<int>(600)
  6976. );
  6977. var res = scheduler.Start(() =>
  6978. xs.Window(TimeSpan.FromTicks(100), TimeSpan.FromTicks(70), scheduler).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge()
  6979. );
  6980. res.Messages.AssertEqual(
  6981. OnNext(210, "0 2"),
  6982. OnNext(240, "0 3"),
  6983. OnNext(280, "0 4"),
  6984. OnNext(280, "1 4"),
  6985. OnNext(320, "1 5"),
  6986. OnNext(350, "1 6"),
  6987. OnNext(350, "2 6"),
  6988. OnNext(380, "2 7"),
  6989. OnNext(420, "2 8"),
  6990. OnNext(420, "3 8"),
  6991. OnNext(470, "3 9"),
  6992. OnCompleted<string>(600)
  6993. );
  6994. xs.Subscriptions.AssertEqual(
  6995. Subscribe(200, 600)
  6996. );
  6997. }
  6998. [Fact]
  6999. public void WindowWithTime_Basic2()
  7000. {
  7001. var scheduler = new TestScheduler();
  7002. var xs = scheduler.CreateHotObservable(
  7003. OnNext(100, 1),
  7004. OnNext(210, 2),
  7005. OnNext(240, 3),
  7006. OnNext(280, 4),
  7007. OnNext(320, 5),
  7008. OnNext(350, 6),
  7009. OnNext(380, 7),
  7010. OnNext(420, 8),
  7011. OnNext(470, 9),
  7012. OnCompleted<int>(600)
  7013. );
  7014. var res = scheduler.Start(() =>
  7015. xs.Window(TimeSpan.FromTicks(70), TimeSpan.FromTicks(100), scheduler).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge()
  7016. );
  7017. res.Messages.AssertEqual(
  7018. OnNext(210, "0 2"),
  7019. OnNext(240, "0 3"),
  7020. OnNext(320, "1 5"),
  7021. OnNext(350, "1 6"),
  7022. OnNext(420, "2 8"),
  7023. OnNext(470, "2 9"),
  7024. OnCompleted<string>(600)
  7025. );
  7026. xs.Subscriptions.AssertEqual(
  7027. Subscribe(200, 600)
  7028. );
  7029. }
  7030. [Fact]
  7031. public void WindowWithTime_Error()
  7032. {
  7033. var scheduler = new TestScheduler();
  7034. var ex = new Exception();
  7035. var xs = scheduler.CreateHotObservable(
  7036. OnNext(100, 1),
  7037. OnNext(210, 2),
  7038. OnNext(240, 3),
  7039. OnNext(280, 4),
  7040. OnNext(320, 5),
  7041. OnNext(350, 6),
  7042. OnNext(380, 7),
  7043. OnNext(420, 8),
  7044. OnNext(470, 9),
  7045. OnError<int>(600, ex)
  7046. );
  7047. var res = scheduler.Start(() =>
  7048. xs.Window(TimeSpan.FromTicks(100), TimeSpan.FromTicks(70), scheduler).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge()
  7049. );
  7050. res.Messages.AssertEqual(
  7051. OnNext(210, "0 2"),
  7052. OnNext(240, "0 3"),
  7053. OnNext(280, "0 4"),
  7054. OnNext(280, "1 4"),
  7055. OnNext(320, "1 5"),
  7056. OnNext(350, "1 6"),
  7057. OnNext(350, "2 6"),
  7058. OnNext(380, "2 7"),
  7059. OnNext(420, "2 8"),
  7060. OnNext(420, "3 8"),
  7061. OnNext(470, "3 9"),
  7062. OnError<string>(600, ex)
  7063. );
  7064. xs.Subscriptions.AssertEqual(
  7065. Subscribe(200, 600)
  7066. );
  7067. }
  7068. [Fact]
  7069. public void WindowWithTime_Disposed()
  7070. {
  7071. var scheduler = new TestScheduler();
  7072. var xs = scheduler.CreateHotObservable(
  7073. OnNext(100, 1),
  7074. OnNext(210, 2),
  7075. OnNext(240, 3),
  7076. OnNext(280, 4),
  7077. OnNext(320, 5),
  7078. OnNext(350, 6),
  7079. OnNext(380, 7),
  7080. OnNext(420, 8),
  7081. OnNext(470, 9),
  7082. OnCompleted<int>(600)
  7083. );
  7084. var res = scheduler.Start(() =>
  7085. xs.Window(TimeSpan.FromTicks(100), TimeSpan.FromTicks(70), scheduler).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge(),
  7086. 370
  7087. );
  7088. res.Messages.AssertEqual(
  7089. OnNext(210, "0 2"),
  7090. OnNext(240, "0 3"),
  7091. OnNext(280, "0 4"),
  7092. OnNext(280, "1 4"),
  7093. OnNext(320, "1 5"),
  7094. OnNext(350, "1 6"),
  7095. OnNext(350, "2 6")
  7096. );
  7097. xs.Subscriptions.AssertEqual(
  7098. Subscribe(200, 370)
  7099. );
  7100. }
  7101. [Fact]
  7102. public void WindowWithTime_Basic_Same()
  7103. {
  7104. var scheduler = new TestScheduler();
  7105. var xs = scheduler.CreateHotObservable(
  7106. OnNext(100, 1),
  7107. OnNext(210, 2),
  7108. OnNext(240, 3),
  7109. OnNext(280, 4),
  7110. OnNext(320, 5),
  7111. OnNext(350, 6),
  7112. OnNext(380, 7),
  7113. OnNext(420, 8),
  7114. OnNext(470, 9),
  7115. OnCompleted<int>(600)
  7116. );
  7117. var res = scheduler.Start(() =>
  7118. xs.Window(TimeSpan.FromTicks(100), scheduler).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge()
  7119. );
  7120. res.Messages.AssertEqual(
  7121. OnNext(210, "0 2"),
  7122. OnNext(240, "0 3"),
  7123. OnNext(280, "0 4"),
  7124. OnNext(320, "1 5"),
  7125. OnNext(350, "1 6"),
  7126. OnNext(380, "1 7"),
  7127. OnNext(420, "2 8"),
  7128. OnNext(470, "2 9"),
  7129. OnCompleted<string>(600)
  7130. );
  7131. xs.Subscriptions.AssertEqual(
  7132. Subscribe(200, 600)
  7133. );
  7134. }
  7135. [Fact]
  7136. public void WindowWithTime_Default()
  7137. {
  7138. Observable.Range(0, 10).Window(TimeSpan.FromDays(1), TimeSpan.FromDays(1)).SelectMany(Observable.ToList).First().AssertEqual(Enumerable.Range(0, 10));
  7139. Observable.Range(0, 10).Window(TimeSpan.FromDays(1)).SelectMany(Observable.ToList).First().AssertEqual(Enumerable.Range(0, 10));
  7140. }
  7141. [Fact]
  7142. public void WindowWithTimeOrCount_ArgumentChecking()
  7143. {
  7144. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Window(default(IObservable<int>), TimeSpan.FromTicks(1), 1, DummyScheduler.Instance));
  7145. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Window(DummyObservable<int>.Instance, TimeSpan.FromTicks(-1), 1, DummyScheduler.Instance));
  7146. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Window(DummyObservable<int>.Instance, TimeSpan.FromTicks(1), 0, DummyScheduler.Instance));
  7147. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Window(DummyObservable<int>.Instance, TimeSpan.FromTicks(1), 1, null));
  7148. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Window(default(IObservable<int>), TimeSpan.FromTicks(1), 1));
  7149. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Window(DummyObservable<int>.Instance, TimeSpan.FromTicks(-1), 1));
  7150. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Window(DummyObservable<int>.Instance, TimeSpan.FromTicks(1), 0));
  7151. }
  7152. [Fact]
  7153. public void WindowWithTimeOrCount_Basic()
  7154. {
  7155. var scheduler = new TestScheduler();
  7156. var xs = scheduler.CreateHotObservable(
  7157. OnNext(205, 1),
  7158. OnNext(210, 2),
  7159. OnNext(240, 3),
  7160. OnNext(280, 4),
  7161. OnNext(320, 5),
  7162. OnNext(350, 6),
  7163. OnNext(370, 7),
  7164. OnNext(420, 8),
  7165. OnNext(470, 9),
  7166. OnCompleted<int>(600)
  7167. );
  7168. var res = scheduler.Start(() =>
  7169. xs.Window(TimeSpan.FromTicks(70), 3, scheduler).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge()
  7170. );
  7171. res.Messages.AssertEqual(
  7172. OnNext(205, "0 1"),
  7173. OnNext(210, "0 2"),
  7174. OnNext(240, "0 3"),
  7175. OnNext(280, "1 4"),
  7176. OnNext(320, "2 5"),
  7177. OnNext(350, "2 6"),
  7178. OnNext(370, "2 7"),
  7179. OnNext(420, "3 8"),
  7180. OnNext(470, "4 9"),
  7181. OnCompleted<string>(600)
  7182. );
  7183. xs.Subscriptions.AssertEqual(
  7184. Subscribe(200, 600)
  7185. );
  7186. }
  7187. [Fact]
  7188. public void WindowWithTimeOrCount_Error()
  7189. {
  7190. var scheduler = new TestScheduler();
  7191. var ex = new Exception();
  7192. var xs = scheduler.CreateHotObservable(
  7193. OnNext(205, 1),
  7194. OnNext(210, 2),
  7195. OnNext(240, 3),
  7196. OnNext(280, 4),
  7197. OnNext(320, 5),
  7198. OnNext(350, 6),
  7199. OnNext(370, 7),
  7200. OnNext(420, 8),
  7201. OnNext(470, 9),
  7202. OnError<int>(600, ex)
  7203. );
  7204. var res = scheduler.Start(() =>
  7205. xs.Window(TimeSpan.FromTicks(70), 3, scheduler).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge()
  7206. );
  7207. res.Messages.AssertEqual(
  7208. OnNext(205, "0 1"),
  7209. OnNext(210, "0 2"),
  7210. OnNext(240, "0 3"),
  7211. OnNext(280, "1 4"),
  7212. OnNext(320, "2 5"),
  7213. OnNext(350, "2 6"),
  7214. OnNext(370, "2 7"),
  7215. OnNext(420, "3 8"),
  7216. OnNext(470, "4 9"),
  7217. OnError<string>(600, ex)
  7218. );
  7219. xs.Subscriptions.AssertEqual(
  7220. Subscribe(200, 600)
  7221. );
  7222. }
  7223. [Fact]
  7224. public void WindowWithTimeOrCount_Disposed()
  7225. {
  7226. var scheduler = new TestScheduler();
  7227. var xs = scheduler.CreateHotObservable(
  7228. OnNext(205, 1),
  7229. OnNext(210, 2),
  7230. OnNext(240, 3),
  7231. OnNext(280, 4),
  7232. OnNext(320, 5),
  7233. OnNext(350, 6),
  7234. OnNext(370, 7),
  7235. OnNext(420, 8),
  7236. OnNext(470, 9),
  7237. OnCompleted<int>(600)
  7238. );
  7239. var res = scheduler.Start(() =>
  7240. xs.Window(TimeSpan.FromTicks(70), 3, scheduler).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge(),
  7241. 370
  7242. );
  7243. res.Messages.AssertEqual(
  7244. OnNext(205, "0 1"),
  7245. OnNext(210, "0 2"),
  7246. OnNext(240, "0 3"),
  7247. OnNext(280, "1 4"),
  7248. OnNext(320, "2 5"),
  7249. OnNext(350, "2 6"),
  7250. OnNext(370, "2 7")
  7251. );
  7252. xs.Subscriptions.AssertEqual(
  7253. Subscribe(200, 370)
  7254. );
  7255. }
  7256. [Fact]
  7257. public void WindowWithTimeOrCount_Default()
  7258. {
  7259. Observable.Range(1, 10).Window(TimeSpan.FromDays(1), 3).Skip(1).First().SequenceEqual(Observable.Range(4, 3));
  7260. }
  7261. #endregion
  7262. }
  7263. }