SkipUntilTest.cs 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Text;
  8. using System.Threading.Tasks;
  9. using System.Reactive;
  10. using System.Reactive.Concurrency;
  11. using System.Reactive.Linq;
  12. using Microsoft.Reactive.Testing;
  13. using Xunit;
  14. using ReactiveTests.Dummies;
  15. using System.Reflection;
  16. using System.Threading;
  17. using System.Reactive.Disposables;
  18. using System.Reactive.Subjects;
  19. namespace ReactiveTests.Tests
  20. {
  21. public class SkipUntilTest : ReactiveTest
  22. {
  23. #region + Observable +
  24. [Fact]
  25. public void SkipUntil_ArgumentChecking()
  26. {
  27. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SkipUntil<int, int>(null, DummyObservable<int>.Instance));
  28. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SkipUntil<int, int>(DummyObservable<int>.Instance, null));
  29. }
  30. [Fact]
  31. public void SkipUntil_SomeData_Next()
  32. {
  33. var scheduler = new TestScheduler();
  34. var l = scheduler.CreateHotObservable(
  35. OnNext(150, 1),
  36. OnNext(210, 2),
  37. OnNext(220, 3),
  38. OnNext(230, 4), //!
  39. OnNext(240, 5), //!
  40. OnCompleted<int>(250)
  41. );
  42. var r = scheduler.CreateHotObservable(
  43. OnNext(150, 1),
  44. OnNext(225, 99),
  45. OnCompleted<int>(230)
  46. );
  47. var res = scheduler.Start(() =>
  48. l.SkipUntil(r)
  49. );
  50. res.Messages.AssertEqual(
  51. OnNext(230, 4),
  52. OnNext(240, 5),
  53. OnCompleted<int>(250)
  54. );
  55. l.Subscriptions.AssertEqual(
  56. Subscribe(200, 250)
  57. );
  58. r.Subscriptions.AssertEqual(
  59. Subscribe(200, 225)
  60. );
  61. }
  62. [Fact]
  63. public void SkipUntil_SomeData_Error()
  64. {
  65. var scheduler = new TestScheduler();
  66. var ex = new Exception();
  67. var l = scheduler.CreateHotObservable(
  68. OnNext(150, 1),
  69. OnNext(210, 2),
  70. OnNext(220, 3),
  71. OnNext(230, 4),
  72. OnNext(240, 5),
  73. OnCompleted<int>(250)
  74. );
  75. var r = scheduler.CreateHotObservable(
  76. OnNext(150, 1),
  77. OnError<int>(225, ex)
  78. );
  79. var res = scheduler.Start(() =>
  80. l.SkipUntil(r)
  81. );
  82. res.Messages.AssertEqual(
  83. OnError<int>(225, ex)
  84. );
  85. l.Subscriptions.AssertEqual(
  86. Subscribe(200, 225)
  87. );
  88. r.Subscriptions.AssertEqual(
  89. Subscribe(200, 225)
  90. );
  91. }
  92. [Fact]
  93. public void SkipUntil_Error_SomeData()
  94. {
  95. var scheduler = new TestScheduler();
  96. var ex = new Exception();
  97. var l = scheduler.CreateHotObservable(
  98. OnNext(150, 1),
  99. OnNext(210, 2),
  100. OnError<int>(220, ex)
  101. );
  102. var r = scheduler.CreateHotObservable(
  103. OnNext(150, 1),
  104. OnNext(230, 2),
  105. OnCompleted<int>(250)
  106. );
  107. var res = scheduler.Start(() =>
  108. l.SkipUntil(r)
  109. );
  110. res.Messages.AssertEqual(
  111. OnError<int>(220, ex)
  112. );
  113. l.Subscriptions.AssertEqual(
  114. Subscribe(200, 220)
  115. );
  116. r.Subscriptions.AssertEqual(
  117. Subscribe(200, 220)
  118. );
  119. }
  120. [Fact]
  121. public void SkipUntil_SomeData_Empty()
  122. {
  123. var scheduler = new TestScheduler();
  124. var l = scheduler.CreateHotObservable(
  125. OnNext(150, 1),
  126. OnNext(210, 2),
  127. OnNext(220, 3),
  128. OnNext(230, 4),
  129. OnNext(240, 5),
  130. OnCompleted<int>(250)
  131. );
  132. var r = scheduler.CreateHotObservable(
  133. OnNext(150, 1),
  134. OnCompleted<int>(225)
  135. );
  136. var res = scheduler.Start(() =>
  137. l.SkipUntil(r)
  138. );
  139. res.Messages.AssertEqual(
  140. );
  141. l.Subscriptions.AssertEqual(
  142. Subscribe(200, 250)
  143. );
  144. r.Subscriptions.AssertEqual(
  145. Subscribe(200, 225)
  146. );
  147. }
  148. [Fact]
  149. public void SkipUntil_Never_Next()
  150. {
  151. var scheduler = new TestScheduler();
  152. var l = scheduler.CreateHotObservable(
  153. OnNext(150, 1)
  154. );
  155. var r = scheduler.CreateHotObservable(
  156. OnNext(150, 1),
  157. OnNext(225, 2), //!
  158. OnCompleted<int>(250)
  159. );
  160. var res = scheduler.Start(() =>
  161. l.SkipUntil(r)
  162. );
  163. res.Messages.AssertEqual(
  164. );
  165. l.Subscriptions.AssertEqual(
  166. Subscribe(200, 1000)
  167. );
  168. r.Subscriptions.AssertEqual(
  169. Subscribe(200, 225)
  170. );
  171. }
  172. [Fact]
  173. public void SkipUntil_Never_Error1()
  174. {
  175. var scheduler = new TestScheduler();
  176. var ex = new Exception();
  177. var l = scheduler.CreateHotObservable(
  178. OnNext(150, 1)
  179. );
  180. var r = scheduler.CreateHotObservable(
  181. OnNext(150, 1),
  182. OnError<int>(225, ex)
  183. );
  184. var res = scheduler.Start(() =>
  185. l.SkipUntil(r)
  186. );
  187. res.Messages.AssertEqual(
  188. OnError<int>(225, ex)
  189. );
  190. l.Subscriptions.AssertEqual(
  191. Subscribe(200, 225)
  192. );
  193. r.Subscriptions.AssertEqual(
  194. Subscribe(200, 225)
  195. );
  196. }
  197. [Fact]
  198. public void SkipUntil_SomeData_Error2()
  199. {
  200. var scheduler = new TestScheduler();
  201. var ex = new Exception();
  202. var l = scheduler.CreateHotObservable(
  203. OnNext(150, 1),
  204. OnNext(210, 2),
  205. OnNext(220, 3),
  206. OnNext(230, 4),
  207. OnNext(240, 5),
  208. OnCompleted<int>(250)
  209. );
  210. var r = scheduler.CreateHotObservable(
  211. OnNext(150, 1),
  212. OnError<int>(300, ex)
  213. );
  214. var res = scheduler.Start(() =>
  215. l.SkipUntil(r)
  216. );
  217. res.Messages.AssertEqual(
  218. OnError<int>(300, ex)
  219. );
  220. l.Subscriptions.AssertEqual(
  221. Subscribe(200, 250)
  222. );
  223. r.Subscriptions.AssertEqual(
  224. Subscribe(200, 300)
  225. );
  226. }
  227. [Fact]
  228. public void SkipUntil_SomeData_Never()
  229. {
  230. var scheduler = new TestScheduler();
  231. var l = scheduler.CreateHotObservable(
  232. OnNext(150, 1),
  233. OnNext(210, 2),
  234. OnNext(220, 3),
  235. OnNext(230, 4),
  236. OnNext(240, 5),
  237. OnCompleted<int>(250)
  238. );
  239. var r = scheduler.CreateHotObservable(
  240. OnNext(150, 1)
  241. );
  242. var res = scheduler.Start(() =>
  243. l.SkipUntil(r)
  244. );
  245. res.Messages.AssertEqual(
  246. );
  247. l.Subscriptions.AssertEqual(
  248. Subscribe(200, 250)
  249. );
  250. r.Subscriptions.AssertEqual(
  251. Subscribe(200, 1000 /* can't dispose prematurely, could be in flight to dispatch OnError */)
  252. );
  253. }
  254. [Fact]
  255. public void SkipUntil_Never_Empty()
  256. {
  257. var scheduler = new TestScheduler();
  258. var l = scheduler.CreateHotObservable(
  259. OnNext(150, 1)
  260. );
  261. var r = scheduler.CreateHotObservable(
  262. OnNext(150, 1),
  263. OnCompleted<int>(225)
  264. );
  265. var res = scheduler.Start(() =>
  266. l.SkipUntil(r)
  267. );
  268. res.Messages.AssertEqual(
  269. );
  270. l.Subscriptions.AssertEqual(
  271. Subscribe(200, 1000)
  272. );
  273. r.Subscriptions.AssertEqual(
  274. Subscribe(200, 225)
  275. );
  276. }
  277. [Fact]
  278. public void SkipUntil_Never_Never()
  279. {
  280. var scheduler = new TestScheduler();
  281. var l = scheduler.CreateHotObservable(
  282. OnNext(150, 1)
  283. );
  284. var r = scheduler.CreateHotObservable(
  285. OnNext(150, 1)
  286. );
  287. var res = scheduler.Start(() =>
  288. l.SkipUntil(r)
  289. );
  290. res.Messages.AssertEqual(
  291. );
  292. l.Subscriptions.AssertEqual(
  293. Subscribe(200, 1000)
  294. );
  295. r.Subscriptions.AssertEqual(
  296. Subscribe(200, 1000)
  297. );
  298. }
  299. [Fact]
  300. public void SkipUntil_HasCompletedCausesDisposal()
  301. {
  302. var scheduler = new TestScheduler();
  303. bool disposed = false;
  304. var l = scheduler.CreateHotObservable(
  305. OnNext(150, 1),
  306. OnNext(210, 2),
  307. OnNext(220, 3),
  308. OnNext(230, 4),
  309. OnNext(240, 5),
  310. OnCompleted<int>(250)
  311. );
  312. var r = Observable.Create<int>(obs => () => { disposed = true; });
  313. var res = scheduler.Start(() =>
  314. l.SkipUntil(r)
  315. );
  316. res.Messages.AssertEqual(
  317. );
  318. Assert.True(disposed, "disposed");
  319. }
  320. [Fact]
  321. public void SkipUntil_Immediate()
  322. {
  323. var scheduler = new TestScheduler();
  324. var xs = Observable.Return(1);
  325. var ys = Observable.Return("bar");
  326. var res = scheduler.Start(() =>
  327. xs.SkipUntil(ys)
  328. );
  329. res.Messages.AssertEqual(
  330. OnNext(200, 1),
  331. OnCompleted<int>(200)
  332. );
  333. }
  334. [Fact] // Asserts behaviour considered buggy. A fix is desirable but breaking.
  335. public void SkipUntil_Empty_Empty()
  336. {
  337. var scheduler = new TestScheduler();
  338. var l = scheduler.CreateHotObservable(
  339. OnCompleted(250, 1)
  340. );
  341. var r = scheduler.CreateHotObservable(
  342. OnCompleted(250, 1)
  343. );
  344. var res = scheduler.Start(() =>
  345. l.SkipUntil(r)
  346. );
  347. res.Messages.AssertEqual(
  348. );
  349. //Desired behaviour:
  350. //res.Messages.AssertEqual(
  351. // OnCompleted(250, 1));
  352. }
  353. #endregion
  354. #region + Timed +
  355. [Fact]
  356. public void SkipUntil_Timed_ArgumentChecking()
  357. {
  358. var xs = Observable.Return(42);
  359. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SkipUntil(default(IObservable<int>), DateTimeOffset.Now));
  360. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SkipUntil(default(IObservable<int>), DateTimeOffset.Now, Scheduler.Default));
  361. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.SkipUntil(xs, DateTimeOffset.Now, default(IScheduler)));
  362. }
  363. [Fact]
  364. public void SkipUntil_Zero()
  365. {
  366. var scheduler = new TestScheduler();
  367. var xs = scheduler.CreateHotObservable<int>(
  368. OnNext(210, 1),
  369. OnNext(220, 2),
  370. OnCompleted<int>(230)
  371. );
  372. var res = scheduler.Start(() =>
  373. xs.SkipUntil(new DateTimeOffset(), scheduler)
  374. );
  375. res.Messages.AssertEqual(
  376. OnNext(210, 1),
  377. OnNext(220, 2),
  378. OnCompleted<int>(230)
  379. );
  380. xs.Subscriptions.AssertEqual(
  381. Subscribe(200, 230)
  382. );
  383. }
  384. [Fact]
  385. public void SkipUntil_Some()
  386. {
  387. var scheduler = new TestScheduler();
  388. var xs = scheduler.CreateHotObservable<int>(
  389. OnNext(210, 1),
  390. OnNext(220, 2),
  391. OnCompleted<int>(230)
  392. );
  393. var res = scheduler.Start(() =>
  394. xs.SkipUntil(new DateTimeOffset(215, TimeSpan.Zero), scheduler)
  395. );
  396. res.Messages.AssertEqual(
  397. OnNext(220, 2),
  398. OnCompleted<int>(230)
  399. );
  400. xs.Subscriptions.AssertEqual(
  401. Subscribe(200, 230)
  402. );
  403. }
  404. [Fact]
  405. public void SkipUntil_Late()
  406. {
  407. var scheduler = new TestScheduler();
  408. var xs = scheduler.CreateHotObservable<int>(
  409. OnNext(210, 1),
  410. OnNext(220, 2),
  411. OnCompleted<int>(230)
  412. );
  413. var res = scheduler.Start(() =>
  414. xs.SkipUntil(new DateTimeOffset(250, TimeSpan.Zero), scheduler)
  415. );
  416. res.Messages.AssertEqual(
  417. OnCompleted<int>(230)
  418. );
  419. xs.Subscriptions.AssertEqual(
  420. Subscribe(200, 230)
  421. );
  422. }
  423. [Fact]
  424. public void SkipUntil_Error()
  425. {
  426. var scheduler = new TestScheduler();
  427. var ex = new Exception();
  428. var xs = scheduler.CreateHotObservable<int>(
  429. OnError<int>(210, ex)
  430. );
  431. var res = scheduler.Start(() =>
  432. xs.SkipUntil(new DateTimeOffset(250, TimeSpan.Zero), scheduler)
  433. );
  434. res.Messages.AssertEqual(
  435. OnError<int>(210, ex)
  436. );
  437. xs.Subscriptions.AssertEqual(
  438. Subscribe(200, 210)
  439. );
  440. }
  441. [Fact]
  442. public void SkipUntil_Never()
  443. {
  444. var scheduler = new TestScheduler();
  445. var ex = new Exception();
  446. var xs = scheduler.CreateHotObservable<int>(
  447. );
  448. var res = scheduler.Start(() =>
  449. xs.SkipUntil(new DateTimeOffset(250, TimeSpan.Zero), scheduler)
  450. );
  451. res.Messages.AssertEqual(
  452. );
  453. xs.Subscriptions.AssertEqual(
  454. Subscribe(200, 1000)
  455. );
  456. }
  457. [Fact]
  458. public void SkipUntil_Twice1()
  459. {
  460. var scheduler = new TestScheduler();
  461. var ex = new Exception();
  462. var xs = scheduler.CreateHotObservable<int>(
  463. OnNext(210, 1),
  464. OnNext(220, 2),
  465. OnNext(230, 3),
  466. OnNext(240, 4),
  467. OnNext(250, 5),
  468. OnNext(260, 6),
  469. OnCompleted<int>(270)
  470. );
  471. var res = scheduler.Start(() =>
  472. xs.SkipUntil(new DateTimeOffset(215, TimeSpan.Zero), scheduler).SkipUntil(new DateTimeOffset(230, TimeSpan.Zero), scheduler)
  473. );
  474. res.Messages.AssertEqual(
  475. OnNext(240, 4),
  476. OnNext(250, 5),
  477. OnNext(260, 6),
  478. OnCompleted<int>(270)
  479. );
  480. xs.Subscriptions.AssertEqual(
  481. Subscribe(200, 270)
  482. );
  483. }
  484. [Fact]
  485. public void SkipUntil_Twice2()
  486. {
  487. var scheduler = new TestScheduler();
  488. var ex = new Exception();
  489. var xs = scheduler.CreateHotObservable<int>(
  490. OnNext(210, 1),
  491. OnNext(220, 2),
  492. OnNext(230, 3),
  493. OnNext(240, 4),
  494. OnNext(250, 5),
  495. OnNext(260, 6),
  496. OnCompleted<int>(270)
  497. );
  498. var res = scheduler.Start(() =>
  499. xs.SkipUntil(new DateTimeOffset(230, TimeSpan.Zero), scheduler).SkipUntil(new DateTimeOffset(215, TimeSpan.Zero), scheduler)
  500. );
  501. res.Messages.AssertEqual(
  502. OnNext(240, 4),
  503. OnNext(250, 5),
  504. OnNext(260, 6),
  505. OnCompleted<int>(270)
  506. );
  507. xs.Subscriptions.AssertEqual(
  508. Subscribe(200, 270)
  509. );
  510. }
  511. [Fact]
  512. public void SkipUntil_Default()
  513. {
  514. var xs = Observable.Range(0, 10, Scheduler.Default);
  515. var res = xs.SkipUntil(DateTimeOffset.UtcNow.AddMinutes(1));
  516. var e = new ManualResetEvent(false);
  517. var lst = new List<int>();
  518. res.Subscribe(
  519. lst.Add,
  520. () => e.Set()
  521. );
  522. e.WaitOne();
  523. Assert.True(lst.Count == 0);
  524. }
  525. #endregion
  526. }
  527. }