SkipUntilTest.cs 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Reactive.Concurrency;
  7. using System.Reactive.Linq;
  8. using System.Threading;
  9. using Microsoft.Reactive.Testing;
  10. using ReactiveTests.Dummies;
  11. using Xunit;
  12. namespace ReactiveTests.Tests
  13. {
  14. public class SkipUntilTest : ReactiveTest
  15. {
  16. #region + Observable +
  17. [Fact]
  18. public void SkipUntil_ArgumentChecking()
  19. {
  20. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SkipUntil<int, int>(null, DummyObservable<int>.Instance));
  21. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SkipUntil<int, int>(DummyObservable<int>.Instance, null));
  22. }
  23. [Fact]
  24. public void SkipUntil_SomeData_Next()
  25. {
  26. var scheduler = new TestScheduler();
  27. var l = scheduler.CreateHotObservable(
  28. OnNext(150, 1),
  29. OnNext(210, 2),
  30. OnNext(220, 3),
  31. OnNext(230, 4), //!
  32. OnNext(240, 5), //!
  33. OnCompleted<int>(250)
  34. );
  35. var r = scheduler.CreateHotObservable(
  36. OnNext(150, 1),
  37. OnNext(225, 99),
  38. OnCompleted<int>(230)
  39. );
  40. var res = scheduler.Start(() =>
  41. l.SkipUntil(r)
  42. );
  43. res.Messages.AssertEqual(
  44. OnNext(230, 4),
  45. OnNext(240, 5),
  46. OnCompleted<int>(250)
  47. );
  48. l.Subscriptions.AssertEqual(
  49. Subscribe(200, 250)
  50. );
  51. r.Subscriptions.AssertEqual(
  52. Subscribe(200, 225)
  53. );
  54. }
  55. [Fact]
  56. public void SkipUntil_SomeData_Error()
  57. {
  58. var scheduler = new TestScheduler();
  59. var ex = new Exception();
  60. var l = scheduler.CreateHotObservable(
  61. OnNext(150, 1),
  62. OnNext(210, 2),
  63. OnNext(220, 3),
  64. OnNext(230, 4),
  65. OnNext(240, 5),
  66. OnCompleted<int>(250)
  67. );
  68. var r = scheduler.CreateHotObservable(
  69. OnNext(150, 1),
  70. OnError<int>(225, ex)
  71. );
  72. var res = scheduler.Start(() =>
  73. l.SkipUntil(r)
  74. );
  75. res.Messages.AssertEqual(
  76. OnError<int>(225, ex)
  77. );
  78. l.Subscriptions.AssertEqual(
  79. Subscribe(200, 225)
  80. );
  81. r.Subscriptions.AssertEqual(
  82. Subscribe(200, 225)
  83. );
  84. }
  85. [Fact]
  86. public void SkipUntil_Error_SomeData()
  87. {
  88. var scheduler = new TestScheduler();
  89. var ex = new Exception();
  90. var l = scheduler.CreateHotObservable(
  91. OnNext(150, 1),
  92. OnNext(210, 2),
  93. OnError<int>(220, ex)
  94. );
  95. var r = scheduler.CreateHotObservable(
  96. OnNext(150, 1),
  97. OnNext(230, 2),
  98. OnCompleted<int>(250)
  99. );
  100. var res = scheduler.Start(() =>
  101. l.SkipUntil(r)
  102. );
  103. res.Messages.AssertEqual(
  104. OnError<int>(220, ex)
  105. );
  106. l.Subscriptions.AssertEqual(
  107. Subscribe(200, 220)
  108. );
  109. r.Subscriptions.AssertEqual(
  110. Subscribe(200, 220)
  111. );
  112. }
  113. [Fact]
  114. public void SkipUntil_SomeData_Empty()
  115. {
  116. var scheduler = new TestScheduler();
  117. var l = scheduler.CreateHotObservable(
  118. OnNext(150, 1),
  119. OnNext(210, 2),
  120. OnNext(220, 3),
  121. OnNext(230, 4),
  122. OnNext(240, 5),
  123. OnCompleted<int>(250)
  124. );
  125. var r = scheduler.CreateHotObservable(
  126. OnNext(150, 1),
  127. OnCompleted<int>(225)
  128. );
  129. var res = scheduler.Start(() =>
  130. l.SkipUntil(r)
  131. );
  132. res.Messages.AssertEqual(
  133. );
  134. l.Subscriptions.AssertEqual(
  135. Subscribe(200, 250)
  136. );
  137. r.Subscriptions.AssertEqual(
  138. Subscribe(200, 225)
  139. );
  140. }
  141. [Fact]
  142. public void SkipUntil_Never_Next()
  143. {
  144. var scheduler = new TestScheduler();
  145. var l = scheduler.CreateHotObservable(
  146. OnNext(150, 1)
  147. );
  148. var r = scheduler.CreateHotObservable(
  149. OnNext(150, 1),
  150. OnNext(225, 2), //!
  151. OnCompleted<int>(250)
  152. );
  153. var res = scheduler.Start(() =>
  154. l.SkipUntil(r)
  155. );
  156. res.Messages.AssertEqual(
  157. );
  158. l.Subscriptions.AssertEqual(
  159. Subscribe(200, 1000)
  160. );
  161. r.Subscriptions.AssertEqual(
  162. Subscribe(200, 225)
  163. );
  164. }
  165. [Fact]
  166. public void SkipUntil_Never_Error1()
  167. {
  168. var scheduler = new TestScheduler();
  169. var ex = new Exception();
  170. var l = scheduler.CreateHotObservable(
  171. OnNext(150, 1)
  172. );
  173. var r = scheduler.CreateHotObservable(
  174. OnNext(150, 1),
  175. OnError<int>(225, ex)
  176. );
  177. var res = scheduler.Start(() =>
  178. l.SkipUntil(r)
  179. );
  180. res.Messages.AssertEqual(
  181. OnError<int>(225, ex)
  182. );
  183. l.Subscriptions.AssertEqual(
  184. Subscribe(200, 225)
  185. );
  186. r.Subscriptions.AssertEqual(
  187. Subscribe(200, 225)
  188. );
  189. }
  190. [Fact]
  191. public void SkipUntil_SomeData_Error2()
  192. {
  193. var scheduler = new TestScheduler();
  194. var ex = new Exception();
  195. var l = scheduler.CreateHotObservable(
  196. OnNext(150, 1),
  197. OnNext(210, 2),
  198. OnNext(220, 3),
  199. OnNext(230, 4),
  200. OnNext(240, 5),
  201. OnCompleted<int>(250)
  202. );
  203. var r = scheduler.CreateHotObservable(
  204. OnNext(150, 1),
  205. OnError<int>(300, ex)
  206. );
  207. var res = scheduler.Start(() =>
  208. l.SkipUntil(r)
  209. );
  210. res.Messages.AssertEqual(
  211. OnError<int>(300, ex)
  212. );
  213. l.Subscriptions.AssertEqual(
  214. Subscribe(200, 250)
  215. );
  216. r.Subscriptions.AssertEqual(
  217. Subscribe(200, 300)
  218. );
  219. }
  220. [Fact]
  221. public void SkipUntil_SomeData_Never()
  222. {
  223. var scheduler = new TestScheduler();
  224. var l = scheduler.CreateHotObservable(
  225. OnNext(150, 1),
  226. OnNext(210, 2),
  227. OnNext(220, 3),
  228. OnNext(230, 4),
  229. OnNext(240, 5),
  230. OnCompleted<int>(250)
  231. );
  232. var r = scheduler.CreateHotObservable(
  233. OnNext(150, 1)
  234. );
  235. var res = scheduler.Start(() =>
  236. l.SkipUntil(r)
  237. );
  238. res.Messages.AssertEqual(
  239. );
  240. l.Subscriptions.AssertEqual(
  241. Subscribe(200, 250)
  242. );
  243. r.Subscriptions.AssertEqual(
  244. Subscribe(200, 1000 /* can't dispose prematurely, could be in flight to dispatch OnError */)
  245. );
  246. }
  247. [Fact]
  248. public void SkipUntil_Never_Empty()
  249. {
  250. var scheduler = new TestScheduler();
  251. var l = scheduler.CreateHotObservable(
  252. OnNext(150, 1)
  253. );
  254. var r = scheduler.CreateHotObservable(
  255. OnNext(150, 1),
  256. OnCompleted<int>(225)
  257. );
  258. var res = scheduler.Start(() =>
  259. l.SkipUntil(r)
  260. );
  261. res.Messages.AssertEqual(
  262. );
  263. l.Subscriptions.AssertEqual(
  264. Subscribe(200, 1000)
  265. );
  266. r.Subscriptions.AssertEqual(
  267. Subscribe(200, 225)
  268. );
  269. }
  270. [Fact]
  271. public void SkipUntil_Never_Never()
  272. {
  273. var scheduler = new TestScheduler();
  274. var l = scheduler.CreateHotObservable(
  275. OnNext(150, 1)
  276. );
  277. var r = scheduler.CreateHotObservable(
  278. OnNext(150, 1)
  279. );
  280. var res = scheduler.Start(() =>
  281. l.SkipUntil(r)
  282. );
  283. res.Messages.AssertEqual(
  284. );
  285. l.Subscriptions.AssertEqual(
  286. Subscribe(200, 1000)
  287. );
  288. r.Subscriptions.AssertEqual(
  289. Subscribe(200, 1000)
  290. );
  291. }
  292. [Fact]
  293. public void SkipUntil_HasCompletedCausesDisposal()
  294. {
  295. var scheduler = new TestScheduler();
  296. var disposed = false;
  297. var l = scheduler.CreateHotObservable(
  298. OnNext(150, 1),
  299. OnNext(210, 2),
  300. OnNext(220, 3),
  301. OnNext(230, 4),
  302. OnNext(240, 5),
  303. OnCompleted<int>(250)
  304. );
  305. var r = Observable.Create<int>(obs => () => { disposed = true; });
  306. var res = scheduler.Start(() =>
  307. l.SkipUntil(r)
  308. );
  309. res.Messages.AssertEqual(
  310. );
  311. Assert.True(disposed, "disposed");
  312. }
  313. [Fact]
  314. public void SkipUntil_Immediate()
  315. {
  316. var scheduler = new TestScheduler();
  317. var xs = Observable.Return(1);
  318. var ys = Observable.Return("bar");
  319. var res = scheduler.Start(() =>
  320. xs.SkipUntil(ys)
  321. );
  322. res.Messages.AssertEqual(
  323. OnNext(200, 1),
  324. OnCompleted<int>(200)
  325. );
  326. }
  327. [Fact] // Asserts behaviour considered buggy. A fix is desirable but breaking.
  328. public void SkipUntil_Empty_Empty()
  329. {
  330. var scheduler = new TestScheduler();
  331. var l = scheduler.CreateHotObservable(
  332. OnCompleted(250, 1)
  333. );
  334. var r = scheduler.CreateHotObservable(
  335. OnCompleted(250, 1)
  336. );
  337. var res = scheduler.Start(() =>
  338. l.SkipUntil(r)
  339. );
  340. res.Messages.AssertEqual(
  341. );
  342. //Desired behaviour:
  343. //res.Messages.AssertEqual(
  344. // OnCompleted(250, 1));
  345. }
  346. #endregion
  347. #region + Timed +
  348. [Fact]
  349. public void SkipUntil_Timed_ArgumentChecking()
  350. {
  351. var xs = Observable.Return(42);
  352. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SkipUntil(default(IObservable<int>), DateTimeOffset.Now));
  353. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SkipUntil(default(IObservable<int>), DateTimeOffset.Now, Scheduler.Default));
  354. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SkipUntil(xs, DateTimeOffset.Now, default));
  355. }
  356. [Fact]
  357. public void SkipUntil_Zero()
  358. {
  359. var scheduler = new TestScheduler();
  360. var xs = scheduler.CreateHotObservable<int>(
  361. OnNext(210, 1),
  362. OnNext(220, 2),
  363. OnCompleted<int>(230)
  364. );
  365. var res = scheduler.Start(() =>
  366. xs.SkipUntil(new DateTimeOffset(), scheduler)
  367. );
  368. res.Messages.AssertEqual(
  369. OnNext(210, 1),
  370. OnNext(220, 2),
  371. OnCompleted<int>(230)
  372. );
  373. xs.Subscriptions.AssertEqual(
  374. Subscribe(200, 230)
  375. );
  376. }
  377. [Fact]
  378. public void SkipUntil_Some()
  379. {
  380. var scheduler = new TestScheduler();
  381. var xs = scheduler.CreateHotObservable<int>(
  382. OnNext(210, 1),
  383. OnNext(220, 2),
  384. OnCompleted<int>(230)
  385. );
  386. var res = scheduler.Start(() =>
  387. xs.SkipUntil(new DateTimeOffset(215, TimeSpan.Zero), scheduler)
  388. );
  389. res.Messages.AssertEqual(
  390. OnNext(220, 2),
  391. OnCompleted<int>(230)
  392. );
  393. xs.Subscriptions.AssertEqual(
  394. Subscribe(200, 230)
  395. );
  396. }
  397. [Fact]
  398. public void SkipUntil_Late()
  399. {
  400. var scheduler = new TestScheduler();
  401. var xs = scheduler.CreateHotObservable<int>(
  402. OnNext(210, 1),
  403. OnNext(220, 2),
  404. OnCompleted<int>(230)
  405. );
  406. var res = scheduler.Start(() =>
  407. xs.SkipUntil(new DateTimeOffset(250, TimeSpan.Zero), scheduler)
  408. );
  409. res.Messages.AssertEqual(
  410. OnCompleted<int>(230)
  411. );
  412. xs.Subscriptions.AssertEqual(
  413. Subscribe(200, 230)
  414. );
  415. }
  416. [Fact]
  417. public void SkipUntil_Error()
  418. {
  419. var scheduler = new TestScheduler();
  420. var ex = new Exception();
  421. var xs = scheduler.CreateHotObservable<int>(
  422. OnError<int>(210, ex)
  423. );
  424. var res = scheduler.Start(() =>
  425. xs.SkipUntil(new DateTimeOffset(250, TimeSpan.Zero), scheduler)
  426. );
  427. res.Messages.AssertEqual(
  428. OnError<int>(210, ex)
  429. );
  430. xs.Subscriptions.AssertEqual(
  431. Subscribe(200, 210)
  432. );
  433. }
  434. [Fact]
  435. public void SkipUntil_Never()
  436. {
  437. var scheduler = new TestScheduler();
  438. var ex = new Exception();
  439. var xs = scheduler.CreateHotObservable<int>(
  440. );
  441. var res = scheduler.Start(() =>
  442. xs.SkipUntil(new DateTimeOffset(250, TimeSpan.Zero), scheduler)
  443. );
  444. res.Messages.AssertEqual(
  445. );
  446. xs.Subscriptions.AssertEqual(
  447. Subscribe(200, 1000)
  448. );
  449. }
  450. [Fact]
  451. public void SkipUntil_Twice1()
  452. {
  453. var scheduler = new TestScheduler();
  454. var ex = new Exception();
  455. var xs = scheduler.CreateHotObservable<int>(
  456. OnNext(210, 1),
  457. OnNext(220, 2),
  458. OnNext(230, 3),
  459. OnNext(240, 4),
  460. OnNext(250, 5),
  461. OnNext(260, 6),
  462. OnCompleted<int>(270)
  463. );
  464. var res = scheduler.Start(() =>
  465. xs.SkipUntil(new DateTimeOffset(215, TimeSpan.Zero), scheduler).SkipUntil(new DateTimeOffset(230, TimeSpan.Zero), scheduler)
  466. );
  467. res.Messages.AssertEqual(
  468. OnNext(240, 4),
  469. OnNext(250, 5),
  470. OnNext(260, 6),
  471. OnCompleted<int>(270)
  472. );
  473. xs.Subscriptions.AssertEqual(
  474. Subscribe(200, 270)
  475. );
  476. }
  477. [Fact]
  478. public void SkipUntil_Twice2()
  479. {
  480. var scheduler = new TestScheduler();
  481. var ex = new Exception();
  482. var xs = scheduler.CreateHotObservable<int>(
  483. OnNext(210, 1),
  484. OnNext(220, 2),
  485. OnNext(230, 3),
  486. OnNext(240, 4),
  487. OnNext(250, 5),
  488. OnNext(260, 6),
  489. OnCompleted<int>(270)
  490. );
  491. var res = scheduler.Start(() =>
  492. xs.SkipUntil(new DateTimeOffset(230, TimeSpan.Zero), scheduler).SkipUntil(new DateTimeOffset(215, TimeSpan.Zero), scheduler)
  493. );
  494. res.Messages.AssertEqual(
  495. OnNext(240, 4),
  496. OnNext(250, 5),
  497. OnNext(260, 6),
  498. OnCompleted<int>(270)
  499. );
  500. xs.Subscriptions.AssertEqual(
  501. Subscribe(200, 270)
  502. );
  503. }
  504. [Fact]
  505. public void SkipUntil_Default()
  506. {
  507. var xs = Observable.Range(0, 10, Scheduler.Default);
  508. var res = xs.SkipUntil(DateTimeOffset.UtcNow.AddMinutes(1));
  509. var e = new ManualResetEvent(false);
  510. var lst = new List<int>();
  511. res.Subscribe(
  512. lst.Add,
  513. () => e.Set()
  514. );
  515. e.WaitOne();
  516. Assert.True(lst.Count == 0);
  517. }
  518. #endregion
  519. }
  520. }