1
0

ObservableImperativeTest.cs 50 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. using System;
  3. using System.Collections.Generic;
  4. using System.Linq;
  5. using System.Reactive.Concurrency;
  6. using System.Reactive.Disposables;
  7. using System.Reactive.Linq;
  8. using System.Threading;
  9. using Microsoft.Reactive.Testing;
  10. using Microsoft.VisualStudio.TestTools.UnitTesting;
  11. using ReactiveTests.Dummies;
  12. #if !NO_TPL
  13. using System.Threading.Tasks;
  14. #endif
  15. namespace ReactiveTests.Tests
  16. {
  17. [TestClass]
  18. public partial class ObservableImperativeTest : ReactiveTest
  19. {
  20. #region ForEachAsync
  21. #if !NO_TPL
  22. [TestMethod]
  23. public void ForEachAsync_ArgumentChecking()
  24. {
  25. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ForEachAsync(default(IObservable<int>), x => { }));
  26. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ForEachAsync(Observable.Never<int>(), default(Action<int>)));
  27. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ForEachAsync(default(IObservable<int>), x => { }, CancellationToken.None));
  28. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ForEachAsync(Observable.Never<int>(), default(Action<int>), CancellationToken.None));
  29. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ForEachAsync(default(IObservable<int>), (x, i) => { }));
  30. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ForEachAsync(Observable.Never<int>(), default(Action<int, int>)));
  31. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ForEachAsync(default(IObservable<int>), (x, i) => { }, CancellationToken.None));
  32. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ForEachAsync(Observable.Never<int>(), default(Action<int, int>), CancellationToken.None));
  33. }
  34. [TestMethod]
  35. public void ForEachAsync_Never()
  36. {
  37. var scheduler = new TestScheduler();
  38. var xs = scheduler.CreateHotObservable(
  39. OnNext(100, 1),
  40. OnNext(200, 2),
  41. OnNext(300, 3),
  42. OnNext(400, 4),
  43. OnNext(500, 5)
  44. );
  45. var task = default(Task);
  46. var cts = new CancellationTokenSource();
  47. var list = new List<Recorded<int>>();
  48. scheduler.ScheduleAbsolute(150, () => task = xs.ForEachAsync(x => list.Add(new Recorded<int>(scheduler.Clock, x)), cts.Token));
  49. scheduler.Start();
  50. xs.Subscriptions.AssertEqual(
  51. Subscribe(150)
  52. );
  53. list.AssertEqual(
  54. new Recorded<int>(200, 2),
  55. new Recorded<int>(300, 3),
  56. new Recorded<int>(400, 4),
  57. new Recorded<int>(500, 5)
  58. );
  59. Assert.AreEqual(TaskStatus.WaitingForActivation, task.Status);
  60. }
  61. [TestMethod]
  62. public void ForEachAsync_Completed()
  63. {
  64. var scheduler = new TestScheduler();
  65. var xs = scheduler.CreateHotObservable(
  66. OnNext(100, 1),
  67. OnNext(200, 2),
  68. OnNext(300, 3),
  69. OnNext(400, 4),
  70. OnNext(500, 5),
  71. OnCompleted<int>(600)
  72. );
  73. var task = default(Task);
  74. var cts = new CancellationTokenSource();
  75. var list = new List<Recorded<int>>();
  76. scheduler.ScheduleAbsolute(150, () => task = xs.ForEachAsync(x => list.Add(new Recorded<int>(scheduler.Clock, x)), cts.Token));
  77. scheduler.Start();
  78. xs.Subscriptions.AssertEqual(
  79. Subscribe(150, 600)
  80. );
  81. list.AssertEqual(
  82. new Recorded<int>(200, 2),
  83. new Recorded<int>(300, 3),
  84. new Recorded<int>(400, 4),
  85. new Recorded<int>(500, 5)
  86. );
  87. Assert.AreEqual(TaskStatus.RanToCompletion, task.Status);
  88. }
  89. [TestMethod]
  90. public void ForEachAsync_Error()
  91. {
  92. var scheduler = new TestScheduler();
  93. var exception = new Exception();
  94. var xs = scheduler.CreateHotObservable(
  95. OnNext(100, 1),
  96. OnNext(200, 2),
  97. OnNext(300, 3),
  98. OnNext(400, 4),
  99. OnNext(500, 5),
  100. OnError<int>(600, exception)
  101. );
  102. var task = default(Task);
  103. var cts = new CancellationTokenSource();
  104. var list = new List<Recorded<int>>();
  105. scheduler.ScheduleAbsolute(150, () => task = xs.ForEachAsync(x => list.Add(new Recorded<int>(scheduler.Clock, x)), cts.Token));
  106. scheduler.Start();
  107. xs.Subscriptions.AssertEqual(
  108. Subscribe(150, 600)
  109. );
  110. list.AssertEqual(
  111. new Recorded<int>(200, 2),
  112. new Recorded<int>(300, 3),
  113. new Recorded<int>(400, 4),
  114. new Recorded<int>(500, 5)
  115. );
  116. Assert.AreEqual(TaskStatus.Faulted, task.Status);
  117. Assert.AreSame(exception, task.Exception.InnerException);
  118. }
  119. [TestMethod]
  120. public void ForEachAsync_Throw()
  121. {
  122. var scheduler = new TestScheduler();
  123. var exception = new Exception();
  124. var xs = scheduler.CreateHotObservable(
  125. OnNext(100, 1),
  126. OnNext(200, 2),
  127. OnNext(300, 3),
  128. OnNext(400, 4),
  129. OnNext(500, 5),
  130. OnCompleted<int>(600)
  131. );
  132. var task = default(Task);
  133. var cts = new CancellationTokenSource();
  134. var list = new List<Recorded<int>>();
  135. scheduler.ScheduleAbsolute(150, () => task = xs.ForEachAsync(x =>
  136. {
  137. if (scheduler.Clock > 400)
  138. throw exception;
  139. list.Add(new Recorded<int>(scheduler.Clock, x));
  140. }, cts.Token));
  141. scheduler.Start();
  142. xs.Subscriptions.AssertEqual(
  143. Subscribe(150, 500)
  144. );
  145. list.AssertEqual(
  146. new Recorded<int>(200, 2),
  147. new Recorded<int>(300, 3),
  148. new Recorded<int>(400, 4)
  149. );
  150. Assert.AreEqual(TaskStatus.Faulted, task.Status);
  151. Assert.AreSame(exception, task.Exception.InnerException);
  152. }
  153. [TestMethod]
  154. public void ForEachAsync_CancelDuring()
  155. {
  156. var scheduler = new TestScheduler();
  157. var xs = scheduler.CreateHotObservable(
  158. OnNext(100, 1),
  159. OnNext(200, 2),
  160. OnNext(300, 3),
  161. OnNext(400, 4),
  162. OnNext(500, 5),
  163. OnCompleted<int>(600)
  164. );
  165. var task = default(Task);
  166. var cts = new CancellationTokenSource();
  167. var list = new List<Recorded<int>>();
  168. scheduler.ScheduleAbsolute(150, () => task = xs.ForEachAsync(x => list.Add(new Recorded<int>(scheduler.Clock, x)), cts.Token));
  169. scheduler.ScheduleAbsolute(350, () => cts.Cancel());
  170. scheduler.Start();
  171. xs.Subscriptions.AssertEqual(
  172. Subscribe(150, 350)
  173. );
  174. list.AssertEqual(
  175. new Recorded<int>(200, 2),
  176. new Recorded<int>(300, 3)
  177. );
  178. Assert.AreEqual(TaskStatus.Canceled, task.Status);
  179. }
  180. [TestMethod]
  181. public void ForEachAsync_CancelBefore()
  182. {
  183. var scheduler = new TestScheduler();
  184. var xs = scheduler.CreateHotObservable(
  185. OnNext(100, 1),
  186. OnNext(200, 2),
  187. OnNext(300, 3),
  188. OnNext(400, 4),
  189. OnNext(500, 5),
  190. OnCompleted<int>(600)
  191. );
  192. var task = default(Task);
  193. var cts = new CancellationTokenSource();
  194. var list = new List<Recorded<int>>();
  195. cts.Cancel();
  196. scheduler.ScheduleAbsolute(150, () => task = xs.ForEachAsync(x => list.Add(new Recorded<int>(scheduler.Clock, x)), cts.Token));
  197. scheduler.Start();
  198. xs.Subscriptions.AssertEqual(
  199. );
  200. list.AssertEqual(
  201. );
  202. Assert.AreEqual(TaskStatus.Canceled, task.Status);
  203. }
  204. [TestMethod]
  205. public void ForEachAsync_CancelAfter()
  206. {
  207. var scheduler = new TestScheduler();
  208. var xs = scheduler.CreateHotObservable(
  209. OnNext(100, 1),
  210. OnNext(200, 2),
  211. OnNext(300, 3),
  212. OnNext(400, 4),
  213. OnNext(500, 5),
  214. OnCompleted<int>(600)
  215. );
  216. var task = default(Task);
  217. var cts = new CancellationTokenSource();
  218. var list = new List<Recorded<int>>();
  219. scheduler.ScheduleAbsolute(150, () => task = xs.ForEachAsync(x => list.Add(new Recorded<int>(scheduler.Clock, x)), cts.Token));
  220. scheduler.ScheduleAbsolute(700, () => cts.Cancel());
  221. scheduler.Start();
  222. xs.Subscriptions.AssertEqual(
  223. Subscribe(150, 600)
  224. );
  225. list.AssertEqual(
  226. new Recorded<int>(200, 2),
  227. new Recorded<int>(300, 3),
  228. new Recorded<int>(400, 4),
  229. new Recorded<int>(500, 5)
  230. );
  231. Assert.AreEqual(TaskStatus.RanToCompletion, task.Status);
  232. }
  233. [TestMethod]
  234. public void ForEachAsync_Default()
  235. {
  236. var list = new List<int>();
  237. Observable.Range(1, 10).ForEachAsync(list.Add).Wait();
  238. list.AssertEqual(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
  239. }
  240. [TestMethod]
  241. public void ForEachAsync_Index()
  242. {
  243. var list = new List<int>();
  244. Observable.Range(3, 5).ForEachAsync((x, i) => list.Add(x * i)).Wait();
  245. list.AssertEqual(3 * 0, 4 * 1, 5 * 2, 6 * 3, 7 * 4);
  246. }
  247. [TestMethod]
  248. public void ForEachAsync_Default_Cancel()
  249. {
  250. var N = 10;
  251. for (int n = 0; n < N; n++)
  252. {
  253. var cts = new CancellationTokenSource();
  254. var done = false;
  255. var xs = Observable.Create<int>(observer =>
  256. {
  257. return new CompositeDisposable(
  258. Observable.Repeat(42, Scheduler.Default).Subscribe(observer),
  259. Disposable.Create(() => done = true)
  260. );
  261. });
  262. var lst = new List<int>();
  263. var t = xs.ForEachAsync(
  264. x =>
  265. {
  266. lock (lst)
  267. lst.Add(x);
  268. },
  269. cts.Token
  270. );
  271. while (true)
  272. {
  273. lock (lst)
  274. if (lst.Count >= 10)
  275. break;
  276. }
  277. cts.Cancel();
  278. while (!t.IsCompleted)
  279. ;
  280. for (int i = 0; i < 10; i++)
  281. Assert.AreEqual(42, lst[i]);
  282. Assert.IsTrue(done);
  283. Assert.IsTrue(t.IsCanceled);
  284. }
  285. }
  286. [TestMethod]
  287. public void ForEachAsync_Index_Cancel()
  288. {
  289. var N = 10;
  290. for (int n = 0; n < N; n++)
  291. {
  292. var cts = new CancellationTokenSource();
  293. var done = false;
  294. var xs = Observable.Create<int>(observer =>
  295. {
  296. return new CompositeDisposable(
  297. Observable.Repeat(42, Scheduler.Default).Subscribe(observer),
  298. Disposable.Create(() => done = true)
  299. );
  300. });
  301. var lst = new List<int>();
  302. var t = xs.ForEachAsync(
  303. (x, i) =>
  304. {
  305. lock (lst)
  306. lst.Add(x * i);
  307. },
  308. cts.Token
  309. );
  310. while (true)
  311. {
  312. lock (lst)
  313. if (lst.Count >= 10)
  314. break;
  315. }
  316. cts.Cancel();
  317. while (!t.IsCompleted)
  318. ;
  319. for (int i = 0; i < 10; i++)
  320. Assert.AreEqual(i * 42, lst[i]);
  321. Assert.IsTrue(done);
  322. Assert.IsTrue(t.IsCanceled);
  323. }
  324. }
  325. [TestMethod]
  326. [Ignore]
  327. public void ForEachAsync_DisposeThrows()
  328. {
  329. var cts = new CancellationTokenSource();
  330. var ex = new Exception();
  331. var xs = Observable.Create<int>(observer =>
  332. {
  333. return new CompositeDisposable(
  334. Observable.Range(0, 10, Scheduler.Default).Subscribe(observer),
  335. Disposable.Create(() => { throw ex; })
  336. );
  337. });
  338. var lst = new List<int>();
  339. var t = xs.ForEachAsync(lst.Add, cts.Token);
  340. try
  341. {
  342. t.Wait();
  343. Assert.Fail();
  344. }
  345. catch (AggregateException err)
  346. {
  347. Assert.AreEqual(1, err.InnerExceptions.Count);
  348. Assert.AreSame(ex, err.InnerExceptions[0]);
  349. }
  350. }
  351. [TestMethod]
  352. public void ForEachAsync_SubscribeThrows()
  353. {
  354. var ex = new Exception();
  355. var x = 42;
  356. var xs = Observable.Create<int>(observer =>
  357. {
  358. if (x == 42)
  359. throw ex;
  360. return Disposable.Empty;
  361. });
  362. var t = xs.ForEachAsync(_ => { });
  363. try
  364. {
  365. t.Wait();
  366. Assert.Fail();
  367. }
  368. catch (AggregateException err)
  369. {
  370. Assert.AreEqual(1, err.InnerExceptions.Count);
  371. Assert.AreSame(ex, err.InnerExceptions[0]);
  372. }
  373. }
  374. #endif
  375. #endregion
  376. #region + Case +
  377. [TestMethod]
  378. public void Case_ArgumentChecking()
  379. {
  380. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Case(null, new Dictionary<int, IObservable<int>>(), DummyObservable<int>.Instance));
  381. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Case(DummyFunc<int>.Instance, null, DummyObservable<int>.Instance));
  382. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Case(DummyFunc<int>.Instance, new Dictionary<int, IObservable<int>>(), default(IObservable<int>)));
  383. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Case(null, new Dictionary<int, IObservable<int>>(), DummyScheduler.Instance));
  384. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Case<int, int>(DummyFunc<int>.Instance, null, DummyScheduler.Instance));
  385. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Case(DummyFunc<int>.Instance, new Dictionary<int, IObservable<int>>(), default(IScheduler)));
  386. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Case(null, new Dictionary<int, IObservable<int>>()));
  387. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Case<int, int>(DummyFunc<int>.Instance, null));
  388. }
  389. [TestMethod]
  390. public void Case_One()
  391. {
  392. var scheduler = new TestScheduler();
  393. var xs = scheduler.CreateHotObservable(
  394. OnNext(210, 1),
  395. OnNext(240, 2),
  396. OnNext(270, 3),
  397. OnCompleted<int>(300)
  398. );
  399. var ys = scheduler.CreateHotObservable(
  400. OnNext(220, 11),
  401. OnNext(250, 12),
  402. OnNext(280, 13),
  403. OnCompleted<int>(310)
  404. );
  405. var zs = scheduler.CreateHotObservable(
  406. OnNext(230, 21),
  407. OnNext(240, 22),
  408. OnNext(290, 23),
  409. OnCompleted<int>(320)
  410. );
  411. var map = new Dictionary<int, IObservable<int>>
  412. {
  413. { 1, xs },
  414. { 2, ys }
  415. };
  416. var results = scheduler.Start(() => Observable.Case(() => 1, map, zs));
  417. results.Messages.AssertEqual(
  418. OnNext(210, 1),
  419. OnNext(240, 2),
  420. OnNext(270, 3),
  421. OnCompleted<int>(300)
  422. );
  423. xs.Subscriptions.AssertEqual(
  424. Subscribe(200, 300)
  425. );
  426. ys.Subscriptions.AssertEqual(
  427. );
  428. zs.Subscriptions.AssertEqual(
  429. );
  430. }
  431. [TestMethod]
  432. public void Case_Two()
  433. {
  434. var scheduler = new TestScheduler();
  435. var xs = scheduler.CreateHotObservable(
  436. OnNext(210, 1),
  437. OnNext(240, 2),
  438. OnNext(270, 3),
  439. OnCompleted<int>(300)
  440. );
  441. var ys = scheduler.CreateHotObservable(
  442. OnNext(220, 11),
  443. OnNext(250, 12),
  444. OnNext(280, 13),
  445. OnCompleted<int>(310)
  446. );
  447. var zs = scheduler.CreateHotObservable(
  448. OnNext(230, 21),
  449. OnNext(240, 22),
  450. OnNext(290, 23),
  451. OnCompleted<int>(320)
  452. );
  453. var map = new Dictionary<int, IObservable<int>>
  454. {
  455. { 1, xs },
  456. { 2, ys }
  457. };
  458. var results = scheduler.Start(() => Observable.Case(() => 2, map, zs));
  459. results.Messages.AssertEqual(
  460. OnNext(220, 11),
  461. OnNext(250, 12),
  462. OnNext(280, 13),
  463. OnCompleted<int>(310)
  464. );
  465. xs.Subscriptions.AssertEqual(
  466. );
  467. ys.Subscriptions.AssertEqual(
  468. Subscribe(200, 310)
  469. );
  470. zs.Subscriptions.AssertEqual(
  471. );
  472. }
  473. [TestMethod]
  474. public void Case_Three()
  475. {
  476. var scheduler = new TestScheduler();
  477. var xs = scheduler.CreateHotObservable(
  478. OnNext(210, 1),
  479. OnNext(240, 2),
  480. OnNext(270, 3),
  481. OnCompleted<int>(300)
  482. );
  483. var ys = scheduler.CreateHotObservable(
  484. OnNext(220, 11),
  485. OnNext(250, 12),
  486. OnNext(280, 13),
  487. OnCompleted<int>(310)
  488. );
  489. var zs = scheduler.CreateHotObservable(
  490. OnNext(230, 21),
  491. OnNext(240, 22),
  492. OnNext(290, 23),
  493. OnCompleted<int>(320)
  494. );
  495. var map = new Dictionary<int, IObservable<int>>
  496. {
  497. { 1, xs },
  498. { 2, ys }
  499. };
  500. var results = scheduler.Start(() => Observable.Case(() => 3, map, zs));
  501. results.Messages.AssertEqual(
  502. OnNext(230, 21),
  503. OnNext(240, 22),
  504. OnNext(290, 23),
  505. OnCompleted<int>(320)
  506. );
  507. xs.Subscriptions.AssertEqual(
  508. );
  509. ys.Subscriptions.AssertEqual(
  510. );
  511. zs.Subscriptions.AssertEqual(
  512. Subscribe(200, 320)
  513. );
  514. }
  515. [TestMethod]
  516. public void Case_Throw()
  517. {
  518. var scheduler = new TestScheduler();
  519. var xs = scheduler.CreateHotObservable(
  520. OnNext(210, 1),
  521. OnNext(240, 2),
  522. OnNext(270, 3),
  523. OnCompleted<int>(300)
  524. );
  525. var ys = scheduler.CreateHotObservable(
  526. OnNext(220, 11),
  527. OnNext(250, 12),
  528. OnNext(280, 13),
  529. OnCompleted<int>(310)
  530. );
  531. var zs = scheduler.CreateHotObservable(
  532. OnNext(230, 21),
  533. OnNext(240, 22),
  534. OnNext(290, 23),
  535. OnCompleted<int>(320)
  536. );
  537. var map = new Dictionary<int, IObservable<int>>
  538. {
  539. { 1, xs },
  540. { 2, ys }
  541. };
  542. var ex = new Exception();
  543. var results = scheduler.Start(() => Observable.Case(() => Throw<int>(ex), map, zs));
  544. results.Messages.AssertEqual(
  545. OnError<int>(200, ex)
  546. );
  547. xs.Subscriptions.AssertEqual(
  548. );
  549. ys.Subscriptions.AssertEqual(
  550. );
  551. zs.Subscriptions.AssertEqual(
  552. );
  553. }
  554. [TestMethod]
  555. public void CaseWithDefault_One()
  556. {
  557. var scheduler = new TestScheduler();
  558. var xs = scheduler.CreateHotObservable(
  559. OnNext(210, 1),
  560. OnNext(240, 2),
  561. OnNext(270, 3),
  562. OnCompleted<int>(300)
  563. );
  564. var ys = scheduler.CreateHotObservable(
  565. OnNext(220, 11),
  566. OnNext(250, 12),
  567. OnNext(280, 13),
  568. OnCompleted<int>(310)
  569. );
  570. var map = new Dictionary<int, IObservable<int>>
  571. {
  572. { 1, xs },
  573. { 2, ys }
  574. };
  575. var results = scheduler.Start(() => Observable.Case(() => 1, map, scheduler));
  576. results.Messages.AssertEqual(
  577. OnNext(210, 1),
  578. OnNext(240, 2),
  579. OnNext(270, 3),
  580. OnCompleted<int>(300)
  581. );
  582. xs.Subscriptions.AssertEqual(
  583. Subscribe(200, 300)
  584. );
  585. ys.Subscriptions.AssertEqual(
  586. );
  587. }
  588. [TestMethod]
  589. public void CaseWithDefault_Two()
  590. {
  591. var scheduler = new TestScheduler();
  592. var xs = scheduler.CreateHotObservable(
  593. OnNext(210, 1),
  594. OnNext(240, 2),
  595. OnNext(270, 3),
  596. OnCompleted<int>(300)
  597. );
  598. var ys = scheduler.CreateHotObservable(
  599. OnNext(220, 11),
  600. OnNext(250, 12),
  601. OnNext(280, 13),
  602. OnCompleted<int>(310)
  603. );
  604. var map = new Dictionary<int, IObservable<int>>
  605. {
  606. { 1, xs },
  607. { 2, ys }
  608. };
  609. var results = scheduler.Start(() => Observable.Case(() => 2, map, scheduler));
  610. results.Messages.AssertEqual(
  611. OnNext(220, 11),
  612. OnNext(250, 12),
  613. OnNext(280, 13),
  614. OnCompleted<int>(310)
  615. );
  616. xs.Subscriptions.AssertEqual(
  617. );
  618. ys.Subscriptions.AssertEqual(
  619. Subscribe(200, 310)
  620. );
  621. }
  622. [TestMethod]
  623. public void CaseWithDefault_Three()
  624. {
  625. var scheduler = new TestScheduler();
  626. var xs = scheduler.CreateHotObservable(
  627. OnNext(210, 1),
  628. OnNext(240, 2),
  629. OnNext(270, 3),
  630. OnCompleted<int>(300)
  631. );
  632. var ys = scheduler.CreateHotObservable(
  633. OnNext(220, 11),
  634. OnNext(250, 12),
  635. OnNext(280, 13),
  636. OnCompleted<int>(310)
  637. );
  638. var map = new Dictionary<int, IObservable<int>>
  639. {
  640. { 1, xs },
  641. { 2, ys }
  642. };
  643. var results = scheduler.Start(() => Observable.Case(() => 3, map, scheduler));
  644. results.Messages.AssertEqual(
  645. OnCompleted<int>(201)
  646. );
  647. xs.Subscriptions.AssertEqual(
  648. );
  649. ys.Subscriptions.AssertEqual(
  650. );
  651. }
  652. [TestMethod]
  653. public void CaseWithDefault_Throw()
  654. {
  655. var scheduler = new TestScheduler();
  656. var xs = scheduler.CreateHotObservable(
  657. OnNext(210, 1),
  658. OnNext(240, 2),
  659. OnNext(270, 3),
  660. OnCompleted<int>(300)
  661. );
  662. var ys = scheduler.CreateHotObservable(
  663. OnNext(220, 11),
  664. OnNext(250, 12),
  665. OnNext(280, 13),
  666. OnCompleted<int>(310)
  667. );
  668. var map = new Dictionary<int, IObservable<int>>
  669. {
  670. { 1, xs },
  671. { 2, ys }
  672. };
  673. var ex = new Exception();
  674. var results = scheduler.Start(() => Observable.Case(() => Throw<int>(ex), map, scheduler));
  675. results.Messages.AssertEqual(
  676. OnError<int>(200, ex)
  677. );
  678. xs.Subscriptions.AssertEqual(
  679. );
  680. ys.Subscriptions.AssertEqual(
  681. );
  682. }
  683. [TestMethod]
  684. public void CaseWithDefault_CheckDefault()
  685. {
  686. Observable.Case(() => 1, new Dictionary<int, IObservable<int>>(), DefaultScheduler.Instance)
  687. .AssertEqual(Observable.Case(() => 1, new Dictionary<int, IObservable<int>>()));
  688. }
  689. [TestMethod]
  690. public void Case_Error()
  691. {
  692. var scheduler = new TestScheduler();
  693. var ex = new Exception();
  694. var xs = scheduler.CreateHotObservable(
  695. OnNext(210, 1),
  696. OnNext(240, 2),
  697. OnNext(270, 3),
  698. OnError<int>(300, ex)
  699. );
  700. var ys = scheduler.CreateHotObservable(
  701. OnNext(220, 11),
  702. OnNext(250, 12),
  703. OnNext(280, 13),
  704. OnCompleted<int>(310)
  705. );
  706. var map = new Dictionary<int, IObservable<int>>
  707. {
  708. { 1, xs },
  709. { 2, ys }
  710. };
  711. var results = scheduler.Start(() => Observable.Case(() => 1, map, scheduler));
  712. results.Messages.AssertEqual(
  713. OnNext(210, 1),
  714. OnNext(240, 2),
  715. OnNext(270, 3),
  716. OnError<int>(300, ex)
  717. );
  718. xs.Subscriptions.AssertEqual(
  719. Subscribe(200, 300)
  720. );
  721. ys.Subscriptions.AssertEqual(
  722. );
  723. }
  724. #endregion
  725. #region + DoWhile +
  726. [TestMethod]
  727. public void DoWhile_ArgumentChecking()
  728. {
  729. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.DoWhile(DummyObservable<int>.Instance, null));
  730. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.DoWhile(default(IObservable<int>), DummyFunc<bool>.Instance));
  731. }
  732. [TestMethod]
  733. public void DoWhile_AlwaysFalse()
  734. {
  735. var scheduler = new TestScheduler();
  736. var xs = scheduler.CreateColdObservable(
  737. OnNext(50, 1),
  738. OnNext(100, 2),
  739. OnNext(150, 3),
  740. OnNext(200, 4),
  741. OnCompleted<int>(250)
  742. );
  743. var results = scheduler.Start(() => Observable.DoWhile(xs, () => false));
  744. results.Messages.AssertEqual(
  745. OnNext(250, 1),
  746. OnNext(300, 2),
  747. OnNext(350, 3),
  748. OnNext(400, 4),
  749. OnCompleted<int>(450)
  750. );
  751. xs.Subscriptions.AssertEqual(
  752. Subscribe(200, 450)
  753. );
  754. }
  755. [TestMethod]
  756. public void DoWhile_AlwaysTrue()
  757. {
  758. var scheduler = new TestScheduler();
  759. var xs = scheduler.CreateColdObservable(
  760. OnNext(50, 1),
  761. OnNext(100, 2),
  762. OnNext(150, 3),
  763. OnNext(200, 4),
  764. OnCompleted<int>(250)
  765. );
  766. var results = scheduler.Start(() => Observable.DoWhile(xs, () => true));
  767. results.Messages.AssertEqual(
  768. OnNext(250, 1),
  769. OnNext(300, 2),
  770. OnNext(350, 3),
  771. OnNext(400, 4),
  772. OnNext(500, 1),
  773. OnNext(550, 2),
  774. OnNext(600, 3),
  775. OnNext(650, 4),
  776. OnNext(750, 1),
  777. OnNext(800, 2),
  778. OnNext(850, 3),
  779. OnNext(900, 4)
  780. );
  781. xs.Subscriptions.AssertEqual(
  782. Subscribe(200, 450),
  783. Subscribe(450, 700),
  784. Subscribe(700, 950),
  785. Subscribe(950, 1000)
  786. );
  787. }
  788. [TestMethod]
  789. public void DoWhile_AlwaysTrue_Throw()
  790. {
  791. var scheduler = new TestScheduler();
  792. var ex = new Exception();
  793. var xs = scheduler.CreateColdObservable(
  794. OnError<int>(50, ex)
  795. );
  796. var results = scheduler.Start(() => Observable.DoWhile(xs, () => true));
  797. results.Messages.AssertEqual(
  798. OnError<int>(250, ex)
  799. );
  800. xs.Subscriptions.AssertEqual(
  801. Subscribe(200, 250)
  802. );
  803. }
  804. [TestMethod]
  805. public void DoWhile_AlwaysTrue_Infinite()
  806. {
  807. var scheduler = new TestScheduler();
  808. var xs = scheduler.CreateColdObservable(
  809. OnNext(50, 1)
  810. );
  811. var results = scheduler.Start(() => Observable.DoWhile(xs, () => true));
  812. results.Messages.AssertEqual(
  813. OnNext(250, 1)
  814. );
  815. xs.Subscriptions.AssertEqual(
  816. Subscribe(200, 1000)
  817. );
  818. }
  819. [TestMethod]
  820. public void DoWhile_SometimesTrue()
  821. {
  822. var scheduler = new TestScheduler();
  823. var xs = scheduler.CreateColdObservable(
  824. OnNext(50, 1),
  825. OnNext(100, 2),
  826. OnNext(150, 3),
  827. OnNext(200, 4),
  828. OnCompleted<int>(250)
  829. );
  830. int n = 0;
  831. var results = scheduler.Start(() => Observable.DoWhile(xs, () => ++n < 3));
  832. results.Messages.AssertEqual(
  833. OnNext(250, 1),
  834. OnNext(300, 2),
  835. OnNext(350, 3),
  836. OnNext(400, 4),
  837. OnNext(500, 1),
  838. OnNext(550, 2),
  839. OnNext(600, 3),
  840. OnNext(650, 4),
  841. OnNext(750, 1),
  842. OnNext(800, 2),
  843. OnNext(850, 3),
  844. OnNext(900, 4),
  845. OnCompleted<int>(950)
  846. );
  847. xs.Subscriptions.AssertEqual(
  848. Subscribe(200, 450),
  849. Subscribe(450, 700),
  850. Subscribe(700, 950)
  851. );
  852. }
  853. [TestMethod]
  854. public void DoWhile_SometimesThrows()
  855. {
  856. var scheduler = new TestScheduler();
  857. var xs = scheduler.CreateColdObservable(
  858. OnNext(50, 1),
  859. OnNext(100, 2),
  860. OnNext(150, 3),
  861. OnNext(200, 4),
  862. OnCompleted<int>(250)
  863. );
  864. int n = 0;
  865. var ex = new Exception();
  866. var results = scheduler.Start(() => Observable.DoWhile(xs, () => ++n < 3 ? true : Throw<bool>(ex)));
  867. results.Messages.AssertEqual(
  868. OnNext(250, 1),
  869. OnNext(300, 2),
  870. OnNext(350, 3),
  871. OnNext(400, 4),
  872. OnNext(500, 1),
  873. OnNext(550, 2),
  874. OnNext(600, 3),
  875. OnNext(650, 4),
  876. OnNext(750, 1),
  877. OnNext(800, 2),
  878. OnNext(850, 3),
  879. OnNext(900, 4),
  880. OnError<int>(950, ex)
  881. );
  882. xs.Subscriptions.AssertEqual(
  883. Subscribe(200, 450),
  884. Subscribe(450, 700),
  885. Subscribe(700, 950)
  886. );
  887. }
  888. #endregion
  889. #region + For +
  890. [TestMethod]
  891. public void For_ArgumentChecking()
  892. {
  893. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.For(DummyEnumerable<int>.Instance, default(Func<int, IObservable<int>>)));
  894. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.For(null, DummyFunc<int, IObservable<int>>.Instance));
  895. }
  896. [TestMethod]
  897. public void For_Basic()
  898. {
  899. var scheduler = new TestScheduler();
  900. var results = scheduler.Start(() => Observable.For(new[] { 1, 2, 3 }, x => scheduler.CreateColdObservable(
  901. OnNext<int>((ushort)(x * 100 + 10), x * 10 + 1),
  902. OnNext<int>((ushort)(x * 100 + 20), x * 10 + 2),
  903. OnNext<int>((ushort)(x * 100 + 30), x * 10 + 3),
  904. OnCompleted<int>((ushort)(x * 100 + 40)))));
  905. results.Messages.AssertEqual(
  906. OnNext(310, 11),
  907. OnNext(320, 12),
  908. OnNext(330, 13),
  909. OnNext(550, 21),
  910. OnNext(560, 22),
  911. OnNext(570, 23),
  912. OnNext(890, 31),
  913. OnNext(900, 32),
  914. OnNext(910, 33),
  915. OnCompleted<int>(920)
  916. );
  917. }
  918. IEnumerable<int> For_Error_Core(Exception ex)
  919. {
  920. yield return 1;
  921. yield return 2;
  922. yield return 3;
  923. throw ex;
  924. }
  925. [TestMethod]
  926. public void For_Error_Iterator()
  927. {
  928. var scheduler = new TestScheduler();
  929. var ex = new Exception();
  930. var results = scheduler.Start(() => Observable.For(For_Error_Core(ex), x => scheduler.CreateColdObservable(
  931. OnNext<int>((ushort)(x * 100 + 10), x * 10 + 1),
  932. OnNext<int>((ushort)(x * 100 + 20), x * 10 + 2),
  933. OnNext<int>((ushort)(x * 100 + 30), x * 10 + 3),
  934. OnCompleted<int>((ushort)(x * 100 + 40)))));
  935. results.Messages.AssertEqual(
  936. OnNext(310, 11),
  937. OnNext(320, 12),
  938. OnNext(330, 13),
  939. OnNext(550, 21),
  940. OnNext(560, 22),
  941. OnNext(570, 23),
  942. OnNext(890, 31),
  943. OnNext(900, 32),
  944. OnNext(910, 33),
  945. OnError<int>(920, ex)
  946. );
  947. }
  948. [TestMethod]
  949. public void For_Error_Source()
  950. {
  951. var scheduler = new TestScheduler();
  952. var ex = new Exception();
  953. var results = scheduler.Start(() => Observable.For(new[] { 1, 2, 3 }, x => Observable.Throw<int>(ex)));
  954. results.Messages.AssertEqual(
  955. OnError<int>(200, ex)
  956. );
  957. }
  958. [TestMethod]
  959. public void For_SelectorThrows()
  960. {
  961. var scheduler = new TestScheduler();
  962. var ex = new Exception();
  963. var results = scheduler.Start(() => Observable.For(new[] { 1, 2, 3 }, x => Throw<IObservable<int>>(ex)));
  964. results.Messages.AssertEqual(
  965. OnError<int>(200, ex)
  966. );
  967. }
  968. #endregion
  969. #region + If +
  970. [TestMethod]
  971. public void If_ArgumentChecking()
  972. {
  973. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.If(null, DummyObservable<int>.Instance, DummyObservable<int>.Instance));
  974. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.If(DummyFunc<bool>.Instance, null, DummyObservable<int>.Instance));
  975. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.If(DummyFunc<bool>.Instance, DummyObservable<int>.Instance, default(IObservable<int>)));
  976. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.If(null, DummyObservable<int>.Instance, Scheduler.Default));
  977. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.If(DummyFunc<bool>.Instance, default(IObservable<int>), Scheduler.Default));
  978. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.If(DummyFunc<bool>.Instance, DummyObservable<int>.Instance, default(IScheduler)));
  979. }
  980. [TestMethod]
  981. public void If_True()
  982. {
  983. var scheduler = new TestScheduler();
  984. var xs = scheduler.CreateHotObservable(
  985. OnNext(210, 1),
  986. OnNext(250, 2),
  987. OnCompleted<int>(300)
  988. );
  989. var ys = scheduler.CreateHotObservable(
  990. OnNext(310, 3),
  991. OnNext(350, 4),
  992. OnCompleted<int>(400)
  993. );
  994. var results = scheduler.Start(() => Observable.If(() => true, xs, ys));
  995. results.Messages.AssertEqual(
  996. OnNext(210, 1),
  997. OnNext(250, 2),
  998. OnCompleted<int>(300)
  999. );
  1000. xs.Subscriptions.AssertEqual(
  1001. Subscribe(200, 300)
  1002. );
  1003. ys.Subscriptions.AssertEqual(
  1004. );
  1005. }
  1006. [TestMethod]
  1007. public void If_False()
  1008. {
  1009. var scheduler = new TestScheduler();
  1010. var xs = scheduler.CreateHotObservable(
  1011. OnNext(210, 1),
  1012. OnNext(250, 2),
  1013. OnCompleted<int>(300)
  1014. );
  1015. var ys = scheduler.CreateHotObservable(
  1016. OnNext(310, 3),
  1017. OnNext(350, 4),
  1018. OnCompleted<int>(400)
  1019. );
  1020. var results = scheduler.Start(() => Observable.If(() => false, xs, ys));
  1021. results.Messages.AssertEqual(
  1022. OnNext(310, 3),
  1023. OnNext(350, 4),
  1024. OnCompleted<int>(400)
  1025. );
  1026. xs.Subscriptions.AssertEqual(
  1027. );
  1028. ys.Subscriptions.AssertEqual(
  1029. Subscribe(200, 400)
  1030. );
  1031. }
  1032. [TestMethod]
  1033. public void If_Throw()
  1034. {
  1035. var scheduler = new TestScheduler();
  1036. var xs = scheduler.CreateHotObservable(
  1037. OnNext(210, 1),
  1038. OnNext(250, 2),
  1039. OnCompleted<int>(300)
  1040. );
  1041. var ys = scheduler.CreateHotObservable(
  1042. OnNext(310, 3),
  1043. OnNext(350, 4),
  1044. OnCompleted<int>(400)
  1045. );
  1046. var ex = new Exception();
  1047. var results = scheduler.Start(() => Observable.If(() => Throw<bool>(ex), xs, ys));
  1048. results.Messages.AssertEqual(
  1049. OnError<int>(200, ex)
  1050. );
  1051. xs.Subscriptions.AssertEqual(
  1052. );
  1053. ys.Subscriptions.AssertEqual(
  1054. );
  1055. }
  1056. [TestMethod]
  1057. public void If_Dispose()
  1058. {
  1059. var scheduler = new TestScheduler();
  1060. var xs = scheduler.CreateHotObservable(
  1061. OnNext(210, 1),
  1062. OnNext(250, 2)
  1063. );
  1064. var ys = scheduler.CreateHotObservable(
  1065. OnNext(310, 3),
  1066. OnNext(350, 4),
  1067. OnCompleted<int>(400)
  1068. );
  1069. var results = scheduler.Start(() => Observable.If(() => true, xs, ys));
  1070. results.Messages.AssertEqual(
  1071. OnNext(210, 1),
  1072. OnNext(250, 2)
  1073. );
  1074. xs.Subscriptions.AssertEqual(
  1075. Subscribe(200, 1000)
  1076. );
  1077. ys.Subscriptions.AssertEqual(
  1078. );
  1079. }
  1080. [TestMethod]
  1081. public void If_Default_ArgumentChecking()
  1082. {
  1083. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.If<int>(null, DummyObservable<int>.Instance));
  1084. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.If<int>(DummyFunc<bool>.Instance, null));
  1085. }
  1086. [TestMethod]
  1087. public void If_Default_Completed()
  1088. {
  1089. var scheduler = new TestScheduler();
  1090. var xs = scheduler.CreateHotObservable(
  1091. OnNext(110, 1),
  1092. OnNext(220, 2),
  1093. OnNext(330, 3),
  1094. OnCompleted<int>(440)
  1095. );
  1096. var b = false;
  1097. scheduler.ScheduleAbsolute(150, () => b = true);
  1098. var results = scheduler.Start(() => Observable.If(() => b, xs));
  1099. results.Messages.AssertEqual(
  1100. OnNext(220, 2),
  1101. OnNext(330, 3),
  1102. OnCompleted<int>(440)
  1103. );
  1104. xs.Subscriptions.AssertEqual(
  1105. Subscribe(200, 440)
  1106. );
  1107. }
  1108. [TestMethod]
  1109. public void If_Default_Error()
  1110. {
  1111. var scheduler = new TestScheduler();
  1112. var ex = new Exception();
  1113. var xs = scheduler.CreateHotObservable(
  1114. OnNext(110, 1),
  1115. OnNext(220, 2),
  1116. OnNext(330, 3),
  1117. OnError<int>(440, ex)
  1118. );
  1119. var b = false;
  1120. scheduler.ScheduleAbsolute(150, () => b = true);
  1121. var results = scheduler.Start(() => Observable.If(() => b, xs));
  1122. results.Messages.AssertEqual(
  1123. OnNext(220, 2),
  1124. OnNext(330, 3),
  1125. OnError<int>(440, ex)
  1126. );
  1127. xs.Subscriptions.AssertEqual(
  1128. Subscribe(200, 440)
  1129. );
  1130. }
  1131. [TestMethod]
  1132. public void If_Default_Never()
  1133. {
  1134. var scheduler = new TestScheduler();
  1135. var xs = scheduler.CreateHotObservable(
  1136. OnNext(110, 1),
  1137. OnNext(220, 2),
  1138. OnNext(330, 3)
  1139. );
  1140. var b = false;
  1141. scheduler.ScheduleAbsolute(150, () => b = true);
  1142. var results = scheduler.Start(() => Observable.If(() => b, xs));
  1143. results.Messages.AssertEqual(
  1144. OnNext(220, 2),
  1145. OnNext(330, 3)
  1146. );
  1147. xs.Subscriptions.AssertEqual(
  1148. Subscribe(200, 1000)
  1149. );
  1150. }
  1151. [TestMethod]
  1152. public void If_Default_Other()
  1153. {
  1154. var scheduler = new TestScheduler();
  1155. var xs = scheduler.CreateHotObservable(
  1156. OnNext(110, 1),
  1157. OnNext(220, 2),
  1158. OnNext(330, 3),
  1159. OnError<int>(440, new Exception())
  1160. );
  1161. var b = true;
  1162. scheduler.ScheduleAbsolute(150, () => b = false);
  1163. var results = scheduler.Start(() => Observable.If(() => b, xs));
  1164. results.Messages.AssertEqual(
  1165. OnCompleted<int>(200)
  1166. );
  1167. xs.Subscriptions.AssertEqual(
  1168. );
  1169. }
  1170. [TestMethod]
  1171. public void If_Default_Scheduler()
  1172. {
  1173. var scheduler = new TestScheduler();
  1174. var xs = scheduler.CreateHotObservable(
  1175. OnNext(110, 1),
  1176. OnNext(220, 2),
  1177. OnNext(330, 3),
  1178. OnError<int>(440, new Exception())
  1179. );
  1180. var results = scheduler.Start(() => Observable.If(() => false, xs, scheduler));
  1181. results.Messages.AssertEqual(
  1182. OnCompleted<int>(201)
  1183. );
  1184. xs.Subscriptions.AssertEqual(
  1185. );
  1186. }
  1187. #endregion
  1188. #region + While +
  1189. [TestMethod]
  1190. public void While_ArgumentChecking()
  1191. {
  1192. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.While(default(Func<bool>), DummyObservable<int>.Instance));
  1193. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.While(DummyFunc<bool>.Instance, default(IObservable<int>)));
  1194. }
  1195. [TestMethod]
  1196. public void While_AlwaysFalse()
  1197. {
  1198. var scheduler = new TestScheduler();
  1199. var xs = scheduler.CreateColdObservable(
  1200. OnNext(50, 1),
  1201. OnNext(100, 2),
  1202. OnNext(150, 3),
  1203. OnNext(200, 4),
  1204. OnCompleted<int>(250)
  1205. );
  1206. var results = scheduler.Start(() => Observable.While(() => false, xs));
  1207. results.Messages.AssertEqual(
  1208. OnCompleted<int>(200)
  1209. );
  1210. xs.Subscriptions.AssertEqual(
  1211. );
  1212. }
  1213. [TestMethod]
  1214. public void While_AlwaysTrue()
  1215. {
  1216. var scheduler = new TestScheduler();
  1217. var xs = scheduler.CreateColdObservable(
  1218. OnNext(50, 1),
  1219. OnNext(100, 2),
  1220. OnNext(150, 3),
  1221. OnNext(200, 4),
  1222. OnCompleted<int>(250)
  1223. );
  1224. var results = scheduler.Start(() => Observable.While(() => true, xs));
  1225. results.Messages.AssertEqual(
  1226. OnNext(250, 1),
  1227. OnNext(300, 2),
  1228. OnNext(350, 3),
  1229. OnNext(400, 4),
  1230. OnNext(500, 1),
  1231. OnNext(550, 2),
  1232. OnNext(600, 3),
  1233. OnNext(650, 4),
  1234. OnNext(750, 1),
  1235. OnNext(800, 2),
  1236. OnNext(850, 3),
  1237. OnNext(900, 4)
  1238. );
  1239. xs.Subscriptions.AssertEqual(
  1240. Subscribe(200, 450),
  1241. Subscribe(450, 700),
  1242. Subscribe(700, 950),
  1243. Subscribe(950, 1000)
  1244. );
  1245. }
  1246. [TestMethod]
  1247. public void While_AlwaysTrue_Throw()
  1248. {
  1249. var scheduler = new TestScheduler();
  1250. var ex = new Exception();
  1251. var xs = scheduler.CreateColdObservable(
  1252. OnError<int>(50, ex)
  1253. );
  1254. var results = scheduler.Start(() => Observable.While(() => true, xs));
  1255. results.Messages.AssertEqual(
  1256. OnError<int>(250, ex)
  1257. );
  1258. xs.Subscriptions.AssertEqual(
  1259. Subscribe(200, 250)
  1260. );
  1261. }
  1262. [TestMethod]
  1263. public void While_AlwaysTrue_Infinite()
  1264. {
  1265. var scheduler = new TestScheduler();
  1266. var xs = scheduler.CreateColdObservable(
  1267. OnNext(50, 1)
  1268. );
  1269. var results = scheduler.Start(() => Observable.While(() => true, xs));
  1270. results.Messages.AssertEqual(
  1271. OnNext(250, 1)
  1272. );
  1273. xs.Subscriptions.AssertEqual(
  1274. Subscribe(200, 1000)
  1275. );
  1276. }
  1277. [TestMethod]
  1278. public void While_SometimesTrue()
  1279. {
  1280. var scheduler = new TestScheduler();
  1281. var xs = scheduler.CreateColdObservable(
  1282. OnNext(50, 1),
  1283. OnNext(100, 2),
  1284. OnNext(150, 3),
  1285. OnNext(200, 4),
  1286. OnCompleted<int>(250)
  1287. );
  1288. int n = 0;
  1289. var results = scheduler.Start(() => Observable.While(() => ++n < 3, xs));
  1290. results.Messages.AssertEqual(
  1291. OnNext(250, 1),
  1292. OnNext(300, 2),
  1293. OnNext(350, 3),
  1294. OnNext(400, 4),
  1295. OnNext(500, 1),
  1296. OnNext(550, 2),
  1297. OnNext(600, 3),
  1298. OnNext(650, 4),
  1299. OnCompleted<int>(700)
  1300. );
  1301. xs.Subscriptions.AssertEqual(
  1302. Subscribe(200, 450),
  1303. Subscribe(450, 700)
  1304. );
  1305. }
  1306. static T Throw<T>(Exception ex)
  1307. {
  1308. throw ex;
  1309. }
  1310. [TestMethod]
  1311. public void While_SometimesThrows()
  1312. {
  1313. var scheduler = new TestScheduler();
  1314. var xs = scheduler.CreateColdObservable(
  1315. OnNext(50, 1),
  1316. OnNext(100, 2),
  1317. OnNext(150, 3),
  1318. OnNext(200, 4),
  1319. OnCompleted<int>(250)
  1320. );
  1321. int n = 0;
  1322. var ex = new Exception();
  1323. var results = scheduler.Start(() => Observable.While(() => ++n < 3 ? true : Throw<bool>(ex), xs));
  1324. results.Messages.AssertEqual(
  1325. OnNext(250, 1),
  1326. OnNext(300, 2),
  1327. OnNext(350, 3),
  1328. OnNext(400, 4),
  1329. OnNext(500, 1),
  1330. OnNext(550, 2),
  1331. OnNext(600, 3),
  1332. OnNext(650, 4),
  1333. OnError<int>(700, ex)
  1334. );
  1335. xs.Subscriptions.AssertEqual(
  1336. Subscribe(200, 450),
  1337. Subscribe(450, 700)
  1338. );
  1339. }
  1340. #endregion
  1341. #region General tests for loops
  1342. [TestMethod]
  1343. public void LoopTest1()
  1344. {
  1345. var loop = Observable.Defer(() =>
  1346. {
  1347. var n = 0;
  1348. return Observable.While(
  1349. () => n++ < 5,
  1350. Observable.Defer(() =>
  1351. {
  1352. return Observable.For(
  1353. Enumerable.Range(0, n),
  1354. x => Observable.Return(x)
  1355. );
  1356. })
  1357. );
  1358. });
  1359. var res = new List<int>();
  1360. var std = new List<int>();
  1361. loop.ForEach(x =>
  1362. {
  1363. res.Add(x);
  1364. std.Add(new System.Diagnostics.StackTrace().FrameCount);
  1365. });
  1366. Assert.IsTrue(res.SequenceEqual(new[] { 0, 0, 1, 0, 1, 2, 0, 1, 2, 3, 0, 1, 2, 3, 4 }));
  1367. Assert.IsTrue(std.Distinct().Count() == 1);
  1368. }
  1369. [TestMethod]
  1370. public void LoopTest2()
  1371. {
  1372. var n = 0;
  1373. var loop = default(IObservable<int>);
  1374. loop = Observable.While(
  1375. () => n++ < 10,
  1376. Observable.Concat(
  1377. Observable.Return(42),
  1378. Observable.Defer(() => loop)
  1379. )
  1380. );
  1381. var res = new List<int>();
  1382. var std = new List<int>();
  1383. loop.ForEach(x =>
  1384. {
  1385. res.Add(x);
  1386. std.Add(new System.Diagnostics.StackTrace().FrameCount);
  1387. });
  1388. Assert.IsTrue(res.SequenceEqual(Enumerable.Repeat(42, 10)));
  1389. Assert.IsTrue(std.Distinct().Count() == 1);
  1390. }
  1391. #endregion
  1392. }
  1393. }