ObservableImperativeTest.cs 53 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Reactive.Concurrency;
  8. using System.Reactive.Disposables;
  9. using System.Reactive.Linq;
  10. using System.Threading;
  11. using Microsoft.Reactive.Testing;
  12. using Xunit;
  13. using ReactiveTests.Dummies;
  14. #if !NO_TPL
  15. using System.Threading.Tasks;
  16. #endif
  17. namespace ReactiveTests.Tests
  18. {
  19. public partial class ObservableImperativeTest : ReactiveTest
  20. {
  21. #region ForEachAsync
  22. #if !NO_TPL
  23. [Fact]
  24. public void ForEachAsync_ArgumentChecking()
  25. {
  26. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ForEachAsync(default(IObservable<int>), x => { }));
  27. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ForEachAsync(Observable.Never<int>(), default(Action<int>)));
  28. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ForEachAsync(default(IObservable<int>), x => { }, CancellationToken.None));
  29. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ForEachAsync(Observable.Never<int>(), default(Action<int>), CancellationToken.None));
  30. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ForEachAsync(default(IObservable<int>), (x, i) => { }));
  31. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ForEachAsync(Observable.Never<int>(), default(Action<int, int>)));
  32. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ForEachAsync(default(IObservable<int>), (x, i) => { }, CancellationToken.None));
  33. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ForEachAsync(Observable.Never<int>(), default(Action<int, int>), CancellationToken.None));
  34. }
  35. [Fact]
  36. public void ForEachAsync_Never()
  37. {
  38. var scheduler = new TestScheduler();
  39. var xs = scheduler.CreateHotObservable(
  40. OnNext(100, 1),
  41. OnNext(200, 2),
  42. OnNext(300, 3),
  43. OnNext(400, 4),
  44. OnNext(500, 5)
  45. );
  46. var task = default(Task);
  47. var cts = new CancellationTokenSource();
  48. var list = new List<Recorded<int>>();
  49. scheduler.ScheduleAbsolute(150, () => task = xs.ForEachAsync(x => list.Add(new Recorded<int>(scheduler.Clock, x)), cts.Token));
  50. scheduler.Start();
  51. xs.Subscriptions.AssertEqual(
  52. Subscribe(150)
  53. );
  54. list.AssertEqual(
  55. new Recorded<int>(200, 2),
  56. new Recorded<int>(300, 3),
  57. new Recorded<int>(400, 4),
  58. new Recorded<int>(500, 5)
  59. );
  60. Assert.Equal(TaskStatus.WaitingForActivation, task.Status);
  61. }
  62. [Fact]
  63. public void ForEachAsync_Completed()
  64. {
  65. var scheduler = new TestScheduler();
  66. var xs = scheduler.CreateHotObservable(
  67. OnNext(100, 1),
  68. OnNext(200, 2),
  69. OnNext(300, 3),
  70. OnNext(400, 4),
  71. OnNext(500, 5),
  72. OnCompleted<int>(600)
  73. );
  74. var task = default(Task);
  75. var cts = new CancellationTokenSource();
  76. var list = new List<Recorded<int>>();
  77. scheduler.ScheduleAbsolute(150, () => task = xs.ForEachAsync(x => list.Add(new Recorded<int>(scheduler.Clock, x)), cts.Token));
  78. scheduler.Start();
  79. xs.Subscriptions.AssertEqual(
  80. Subscribe(150, 600)
  81. );
  82. list.AssertEqual(
  83. new Recorded<int>(200, 2),
  84. new Recorded<int>(300, 3),
  85. new Recorded<int>(400, 4),
  86. new Recorded<int>(500, 5)
  87. );
  88. Assert.Equal(TaskStatus.RanToCompletion, task.Status);
  89. }
  90. [Fact]
  91. public void ForEachAsync_Error()
  92. {
  93. var scheduler = new TestScheduler();
  94. var exception = new Exception();
  95. var xs = scheduler.CreateHotObservable(
  96. OnNext(100, 1),
  97. OnNext(200, 2),
  98. OnNext(300, 3),
  99. OnNext(400, 4),
  100. OnNext(500, 5),
  101. OnError<int>(600, exception)
  102. );
  103. var task = default(Task);
  104. var cts = new CancellationTokenSource();
  105. var list = new List<Recorded<int>>();
  106. scheduler.ScheduleAbsolute(150, () => task = xs.ForEachAsync(x => list.Add(new Recorded<int>(scheduler.Clock, x)), cts.Token));
  107. scheduler.Start();
  108. xs.Subscriptions.AssertEqual(
  109. Subscribe(150, 600)
  110. );
  111. list.AssertEqual(
  112. new Recorded<int>(200, 2),
  113. new Recorded<int>(300, 3),
  114. new Recorded<int>(400, 4),
  115. new Recorded<int>(500, 5)
  116. );
  117. Assert.Equal(TaskStatus.Faulted, task.Status);
  118. Assert.Same(exception, task.Exception.InnerException);
  119. }
  120. [Fact]
  121. public void ForEachAsync_Throw()
  122. {
  123. var scheduler = new TestScheduler();
  124. var exception = new Exception();
  125. var xs = scheduler.CreateHotObservable(
  126. OnNext(100, 1),
  127. OnNext(200, 2),
  128. OnNext(300, 3),
  129. OnNext(400, 4),
  130. OnNext(500, 5),
  131. OnCompleted<int>(600)
  132. );
  133. var task = default(Task);
  134. var cts = new CancellationTokenSource();
  135. var list = new List<Recorded<int>>();
  136. scheduler.ScheduleAbsolute(150, () => task = xs.ForEachAsync(x =>
  137. {
  138. if (scheduler.Clock > 400)
  139. throw exception;
  140. list.Add(new Recorded<int>(scheduler.Clock, x));
  141. }, cts.Token));
  142. scheduler.Start();
  143. xs.Subscriptions.AssertEqual(
  144. Subscribe(150, 500)
  145. );
  146. list.AssertEqual(
  147. new Recorded<int>(200, 2),
  148. new Recorded<int>(300, 3),
  149. new Recorded<int>(400, 4)
  150. );
  151. Assert.Equal(TaskStatus.Faulted, task.Status);
  152. Assert.Same(exception, task.Exception.InnerException);
  153. }
  154. [Fact]
  155. public void ForEachAsync_CancelDuring()
  156. {
  157. var scheduler = new TestScheduler();
  158. var xs = scheduler.CreateHotObservable(
  159. OnNext(100, 1),
  160. OnNext(200, 2),
  161. OnNext(300, 3),
  162. OnNext(400, 4),
  163. OnNext(500, 5),
  164. OnCompleted<int>(600)
  165. );
  166. var task = default(Task);
  167. var cts = new CancellationTokenSource();
  168. var list = new List<Recorded<int>>();
  169. scheduler.ScheduleAbsolute(150, () => task = xs.ForEachAsync(x => list.Add(new Recorded<int>(scheduler.Clock, x)), cts.Token));
  170. scheduler.ScheduleAbsolute(350, () => cts.Cancel());
  171. scheduler.Start();
  172. xs.Subscriptions.AssertEqual(
  173. Subscribe(150, 350)
  174. );
  175. list.AssertEqual(
  176. new Recorded<int>(200, 2),
  177. new Recorded<int>(300, 3)
  178. );
  179. Assert.Equal(TaskStatus.Canceled, task.Status);
  180. }
  181. [Fact]
  182. public void ForEachAsync_CancelBefore()
  183. {
  184. var scheduler = new TestScheduler();
  185. var xs = scheduler.CreateHotObservable(
  186. OnNext(100, 1),
  187. OnNext(200, 2),
  188. OnNext(300, 3),
  189. OnNext(400, 4),
  190. OnNext(500, 5),
  191. OnCompleted<int>(600)
  192. );
  193. var task = default(Task);
  194. var cts = new CancellationTokenSource();
  195. var list = new List<Recorded<int>>();
  196. cts.Cancel();
  197. scheduler.ScheduleAbsolute(150, () => task = xs.ForEachAsync(x => list.Add(new Recorded<int>(scheduler.Clock, x)), cts.Token));
  198. scheduler.Start();
  199. xs.Subscriptions.AssertEqual(
  200. );
  201. list.AssertEqual(
  202. );
  203. Assert.Equal(TaskStatus.Canceled, task.Status);
  204. }
  205. [Fact]
  206. public void ForEachAsync_CancelAfter()
  207. {
  208. var scheduler = new TestScheduler();
  209. var xs = scheduler.CreateHotObservable(
  210. OnNext(100, 1),
  211. OnNext(200, 2),
  212. OnNext(300, 3),
  213. OnNext(400, 4),
  214. OnNext(500, 5),
  215. OnCompleted<int>(600)
  216. );
  217. var task = default(Task);
  218. var cts = new CancellationTokenSource();
  219. var list = new List<Recorded<int>>();
  220. scheduler.ScheduleAbsolute(150, () => task = xs.ForEachAsync(x => list.Add(new Recorded<int>(scheduler.Clock, x)), cts.Token));
  221. scheduler.ScheduleAbsolute(700, () => cts.Cancel());
  222. scheduler.Start();
  223. xs.Subscriptions.AssertEqual(
  224. Subscribe(150, 600)
  225. );
  226. list.AssertEqual(
  227. new Recorded<int>(200, 2),
  228. new Recorded<int>(300, 3),
  229. new Recorded<int>(400, 4),
  230. new Recorded<int>(500, 5)
  231. );
  232. Assert.Equal(TaskStatus.RanToCompletion, task.Status);
  233. }
  234. [Fact]
  235. public void ForEachAsync_Default()
  236. {
  237. var list = new List<int>();
  238. Observable.Range(1, 10).ForEachAsync(list.Add).Wait();
  239. list.AssertEqual(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
  240. }
  241. [Fact]
  242. public void ForEachAsync_Index()
  243. {
  244. var list = new List<int>();
  245. Observable.Range(3, 5).ForEachAsync((x, i) => list.Add(x * i)).Wait();
  246. list.AssertEqual(3 * 0, 4 * 1, 5 * 2, 6 * 3, 7 * 4);
  247. }
  248. [Fact]
  249. public void ForEachAsync_Default_Cancel()
  250. {
  251. var N = 10;
  252. for (int n = 0; n < N; n++)
  253. {
  254. var cts = new CancellationTokenSource();
  255. var done = false;
  256. var xs = Observable.Create<int>(observer =>
  257. {
  258. return new CompositeDisposable(
  259. Observable.Repeat(42, Scheduler.Default).Subscribe(observer),
  260. Disposable.Create(() => done = true)
  261. );
  262. });
  263. var lst = new List<int>();
  264. var t = xs.ForEachAsync(
  265. x =>
  266. {
  267. lock (lst)
  268. lst.Add(x);
  269. },
  270. cts.Token
  271. );
  272. while (true)
  273. {
  274. lock (lst)
  275. if (lst.Count >= 10)
  276. break;
  277. }
  278. cts.Cancel();
  279. while (!t.IsCompleted)
  280. ;
  281. for (int i = 0; i < 10; i++)
  282. Assert.Equal(42, lst[i]);
  283. Assert.True(done);
  284. Assert.True(t.IsCanceled);
  285. }
  286. }
  287. [Fact]
  288. public void ForEachAsync_Index_Cancel()
  289. {
  290. var N = 10;
  291. for (int n = 0; n < N; n++)
  292. {
  293. var cts = new CancellationTokenSource();
  294. var done = false;
  295. var xs = Observable.Create<int>(observer =>
  296. {
  297. return new CompositeDisposable(
  298. Observable.Repeat(42, Scheduler.Default).Subscribe(observer),
  299. Disposable.Create(() => done = true)
  300. );
  301. });
  302. var lst = new List<int>();
  303. var t = xs.ForEachAsync(
  304. (x, i) =>
  305. {
  306. lock (lst)
  307. lst.Add(x * i);
  308. },
  309. cts.Token
  310. );
  311. while (true)
  312. {
  313. lock (lst)
  314. if (lst.Count >= 10)
  315. break;
  316. }
  317. cts.Cancel();
  318. while (!t.IsCompleted)
  319. ;
  320. for (int i = 0; i < 10; i++)
  321. Assert.Equal(i * 42, lst[i]);
  322. Assert.True(done);
  323. Assert.True(t.IsCanceled);
  324. }
  325. }
  326. [Fact]
  327. public void ForEachAsync_DisposeThrows1()
  328. {
  329. var cts = new CancellationTokenSource();
  330. var ex = new Exception();
  331. var xs = Observable.Create<int>(observer =>
  332. {
  333. return new CompositeDisposable(
  334. Observable.Range(0, 10, Scheduler.CurrentThread).Subscribe(observer),
  335. Disposable.Create(() => { throw ex; })
  336. );
  337. });
  338. var lst = new List<int>();
  339. var t = xs.ForEachAsync(lst.Add, cts.Token);
  340. //
  341. // Unfortunately, this doesn't throw for CurrentThread scheduling. The
  342. // subscription completes prior to assignment of the disposable, so we
  343. // succeed calling the TrySetResult method for the OnCompleted handler
  344. // prior to observing the exception of the Dispose operation, which is
  345. // surfacing upon assignment to the SingleAssignmentDisposable. As a
  346. // result, the exception evaporates.
  347. //
  348. // It'd be a breaking change at this point to rethrow the exception in
  349. // that case, so we're merely asserting regressions here.
  350. //
  351. try
  352. {
  353. t.Wait();
  354. }
  355. catch
  356. {
  357. Assert.True(false);
  358. }
  359. }
  360. [Fact]
  361. public void ForEachAsync_DisposeThrows2()
  362. {
  363. var cts = new CancellationTokenSource();
  364. var ex = new Exception();
  365. var xs = Observable.Create<int>(observer =>
  366. {
  367. return new CompositeDisposable(
  368. Observable.Range(0, 10, Scheduler.CurrentThread).Subscribe(observer),
  369. Disposable.Create(() => { throw ex; })
  370. );
  371. });
  372. var lst = new List<int>();
  373. var t = default(Task);
  374. Scheduler.CurrentThread.Schedule(() =>
  375. {
  376. t = xs.ForEachAsync(lst.Add, cts.Token);
  377. });
  378. //
  379. // If the trampoline of the CurrentThread has been installed higher
  380. // up the stack, the assignment of the subscription's disposable to
  381. // the SingleAssignmentDisposable can complete prior to the Dispose
  382. // method being called from the OnCompleted handler. In this case,
  383. // the OnCompleted handler's invocation of Dispose will cause the
  384. // exception to occur, and it bubbles out through TrySetException.
  385. //
  386. try
  387. {
  388. t.Wait();
  389. }
  390. catch (AggregateException err)
  391. {
  392. Assert.Equal(1, err.InnerExceptions.Count);
  393. Assert.Same(ex, err.InnerExceptions[0]);
  394. }
  395. }
  396. #if !NO_THREAD
  397. [Fact]
  398. public void ForEachAsync_DisposeThrows()
  399. {
  400. //
  401. // Unfortunately, this test is non-deterministic due to the race
  402. // conditions described above in the tests using the CurrentThread
  403. // scheduler. The exception can come out through the OnCompleted
  404. // handler but can equally well get swallowed if the main thread
  405. // hasn't reached the assignment of the disposable yet, causing
  406. // the OnCompleted handler to win the race. The user can deal with
  407. // this by hooking an exception handler to the scheduler, so we
  408. // assert this behavior here.
  409. //
  410. // It'd be a breaking change at this point to change rethrowing
  411. // behavior, so we're merely asserting regressions here.
  412. //
  413. var hasCaughtEscapingException = 0;
  414. var cts = new CancellationTokenSource();
  415. var ex = new Exception();
  416. var s = Scheduler.Default.Catch<Exception>(err =>
  417. {
  418. Volatile.Write(ref hasCaughtEscapingException, 1);
  419. return ex == err;
  420. });
  421. while (Volatile.Read(ref hasCaughtEscapingException) == 0)
  422. {
  423. var xs = Observable.Create<int>(observer =>
  424. {
  425. return new CompositeDisposable(
  426. Observable.Range(0, 10, s).Subscribe(observer),
  427. Disposable.Create(() => { throw ex; })
  428. );
  429. });
  430. var lst = new List<int>();
  431. var t = xs.ForEachAsync(lst.Add, cts.Token);
  432. try
  433. {
  434. t.Wait();
  435. }
  436. catch (AggregateException err)
  437. {
  438. Assert.Equal(1, err.InnerExceptions.Count);
  439. Assert.Same(ex, err.InnerExceptions[0]);
  440. }
  441. }
  442. }
  443. #endif
  444. [Fact]
  445. public void ForEachAsync_SubscribeThrows()
  446. {
  447. var ex = new Exception();
  448. var x = 42;
  449. var xs = Observable.Create<int>(observer =>
  450. {
  451. if (x == 42)
  452. throw ex;
  453. return Disposable.Empty;
  454. });
  455. var t = xs.ForEachAsync(_ => { });
  456. try
  457. {
  458. t.Wait();
  459. Assert.True(false);
  460. }
  461. catch (AggregateException err)
  462. {
  463. Assert.Equal(1, err.InnerExceptions.Count);
  464. Assert.Same(ex, err.InnerExceptions[0]);
  465. }
  466. }
  467. #endif
  468. #endregion
  469. #region + Case +
  470. [Fact]
  471. public void Case_ArgumentChecking()
  472. {
  473. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Case(null, new Dictionary<int, IObservable<int>>(), DummyObservable<int>.Instance));
  474. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Case(DummyFunc<int>.Instance, null, DummyObservable<int>.Instance));
  475. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Case(DummyFunc<int>.Instance, new Dictionary<int, IObservable<int>>(), default(IObservable<int>)));
  476. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Case(null, new Dictionary<int, IObservable<int>>(), DummyScheduler.Instance));
  477. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Case<int, int>(DummyFunc<int>.Instance, null, DummyScheduler.Instance));
  478. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Case(DummyFunc<int>.Instance, new Dictionary<int, IObservable<int>>(), default(IScheduler)));
  479. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Case(null, new Dictionary<int, IObservable<int>>()));
  480. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Case<int, int>(DummyFunc<int>.Instance, null));
  481. }
  482. [Fact]
  483. public void Case_One()
  484. {
  485. var scheduler = new TestScheduler();
  486. var xs = scheduler.CreateHotObservable(
  487. OnNext(210, 1),
  488. OnNext(240, 2),
  489. OnNext(270, 3),
  490. OnCompleted<int>(300)
  491. );
  492. var ys = scheduler.CreateHotObservable(
  493. OnNext(220, 11),
  494. OnNext(250, 12),
  495. OnNext(280, 13),
  496. OnCompleted<int>(310)
  497. );
  498. var zs = scheduler.CreateHotObservable(
  499. OnNext(230, 21),
  500. OnNext(240, 22),
  501. OnNext(290, 23),
  502. OnCompleted<int>(320)
  503. );
  504. var map = new Dictionary<int, IObservable<int>>
  505. {
  506. { 1, xs },
  507. { 2, ys }
  508. };
  509. var results = scheduler.Start(() => Observable.Case(() => 1, map, zs));
  510. results.Messages.AssertEqual(
  511. OnNext(210, 1),
  512. OnNext(240, 2),
  513. OnNext(270, 3),
  514. OnCompleted<int>(300)
  515. );
  516. xs.Subscriptions.AssertEqual(
  517. Subscribe(200, 300)
  518. );
  519. ys.Subscriptions.AssertEqual(
  520. );
  521. zs.Subscriptions.AssertEqual(
  522. );
  523. }
  524. [Fact]
  525. public void Case_Two()
  526. {
  527. var scheduler = new TestScheduler();
  528. var xs = scheduler.CreateHotObservable(
  529. OnNext(210, 1),
  530. OnNext(240, 2),
  531. OnNext(270, 3),
  532. OnCompleted<int>(300)
  533. );
  534. var ys = scheduler.CreateHotObservable(
  535. OnNext(220, 11),
  536. OnNext(250, 12),
  537. OnNext(280, 13),
  538. OnCompleted<int>(310)
  539. );
  540. var zs = scheduler.CreateHotObservable(
  541. OnNext(230, 21),
  542. OnNext(240, 22),
  543. OnNext(290, 23),
  544. OnCompleted<int>(320)
  545. );
  546. var map = new Dictionary<int, IObservable<int>>
  547. {
  548. { 1, xs },
  549. { 2, ys }
  550. };
  551. var results = scheduler.Start(() => Observable.Case(() => 2, map, zs));
  552. results.Messages.AssertEqual(
  553. OnNext(220, 11),
  554. OnNext(250, 12),
  555. OnNext(280, 13),
  556. OnCompleted<int>(310)
  557. );
  558. xs.Subscriptions.AssertEqual(
  559. );
  560. ys.Subscriptions.AssertEqual(
  561. Subscribe(200, 310)
  562. );
  563. zs.Subscriptions.AssertEqual(
  564. );
  565. }
  566. [Fact]
  567. public void Case_Three()
  568. {
  569. var scheduler = new TestScheduler();
  570. var xs = scheduler.CreateHotObservable(
  571. OnNext(210, 1),
  572. OnNext(240, 2),
  573. OnNext(270, 3),
  574. OnCompleted<int>(300)
  575. );
  576. var ys = scheduler.CreateHotObservable(
  577. OnNext(220, 11),
  578. OnNext(250, 12),
  579. OnNext(280, 13),
  580. OnCompleted<int>(310)
  581. );
  582. var zs = scheduler.CreateHotObservable(
  583. OnNext(230, 21),
  584. OnNext(240, 22),
  585. OnNext(290, 23),
  586. OnCompleted<int>(320)
  587. );
  588. var map = new Dictionary<int, IObservable<int>>
  589. {
  590. { 1, xs },
  591. { 2, ys }
  592. };
  593. var results = scheduler.Start(() => Observable.Case(() => 3, map, zs));
  594. results.Messages.AssertEqual(
  595. OnNext(230, 21),
  596. OnNext(240, 22),
  597. OnNext(290, 23),
  598. OnCompleted<int>(320)
  599. );
  600. xs.Subscriptions.AssertEqual(
  601. );
  602. ys.Subscriptions.AssertEqual(
  603. );
  604. zs.Subscriptions.AssertEqual(
  605. Subscribe(200, 320)
  606. );
  607. }
  608. [Fact]
  609. public void Case_Throw()
  610. {
  611. var scheduler = new TestScheduler();
  612. var xs = scheduler.CreateHotObservable(
  613. OnNext(210, 1),
  614. OnNext(240, 2),
  615. OnNext(270, 3),
  616. OnCompleted<int>(300)
  617. );
  618. var ys = scheduler.CreateHotObservable(
  619. OnNext(220, 11),
  620. OnNext(250, 12),
  621. OnNext(280, 13),
  622. OnCompleted<int>(310)
  623. );
  624. var zs = scheduler.CreateHotObservable(
  625. OnNext(230, 21),
  626. OnNext(240, 22),
  627. OnNext(290, 23),
  628. OnCompleted<int>(320)
  629. );
  630. var map = new Dictionary<int, IObservable<int>>
  631. {
  632. { 1, xs },
  633. { 2, ys }
  634. };
  635. var ex = new Exception();
  636. var results = scheduler.Start(() => Observable.Case(() => Throw<int>(ex), map, zs));
  637. results.Messages.AssertEqual(
  638. OnError<int>(200, ex)
  639. );
  640. xs.Subscriptions.AssertEqual(
  641. );
  642. ys.Subscriptions.AssertEqual(
  643. );
  644. zs.Subscriptions.AssertEqual(
  645. );
  646. }
  647. [Fact]
  648. public void CaseWithDefault_One()
  649. {
  650. var scheduler = new TestScheduler();
  651. var xs = scheduler.CreateHotObservable(
  652. OnNext(210, 1),
  653. OnNext(240, 2),
  654. OnNext(270, 3),
  655. OnCompleted<int>(300)
  656. );
  657. var ys = scheduler.CreateHotObservable(
  658. OnNext(220, 11),
  659. OnNext(250, 12),
  660. OnNext(280, 13),
  661. OnCompleted<int>(310)
  662. );
  663. var map = new Dictionary<int, IObservable<int>>
  664. {
  665. { 1, xs },
  666. { 2, ys }
  667. };
  668. var results = scheduler.Start(() => Observable.Case(() => 1, map, scheduler));
  669. results.Messages.AssertEqual(
  670. OnNext(210, 1),
  671. OnNext(240, 2),
  672. OnNext(270, 3),
  673. OnCompleted<int>(300)
  674. );
  675. xs.Subscriptions.AssertEqual(
  676. Subscribe(200, 300)
  677. );
  678. ys.Subscriptions.AssertEqual(
  679. );
  680. }
  681. [Fact]
  682. public void CaseWithDefault_Two()
  683. {
  684. var scheduler = new TestScheduler();
  685. var xs = scheduler.CreateHotObservable(
  686. OnNext(210, 1),
  687. OnNext(240, 2),
  688. OnNext(270, 3),
  689. OnCompleted<int>(300)
  690. );
  691. var ys = scheduler.CreateHotObservable(
  692. OnNext(220, 11),
  693. OnNext(250, 12),
  694. OnNext(280, 13),
  695. OnCompleted<int>(310)
  696. );
  697. var map = new Dictionary<int, IObservable<int>>
  698. {
  699. { 1, xs },
  700. { 2, ys }
  701. };
  702. var results = scheduler.Start(() => Observable.Case(() => 2, map, scheduler));
  703. results.Messages.AssertEqual(
  704. OnNext(220, 11),
  705. OnNext(250, 12),
  706. OnNext(280, 13),
  707. OnCompleted<int>(310)
  708. );
  709. xs.Subscriptions.AssertEqual(
  710. );
  711. ys.Subscriptions.AssertEqual(
  712. Subscribe(200, 310)
  713. );
  714. }
  715. [Fact]
  716. public void CaseWithDefault_Three()
  717. {
  718. var scheduler = new TestScheduler();
  719. var xs = scheduler.CreateHotObservable(
  720. OnNext(210, 1),
  721. OnNext(240, 2),
  722. OnNext(270, 3),
  723. OnCompleted<int>(300)
  724. );
  725. var ys = scheduler.CreateHotObservable(
  726. OnNext(220, 11),
  727. OnNext(250, 12),
  728. OnNext(280, 13),
  729. OnCompleted<int>(310)
  730. );
  731. var map = new Dictionary<int, IObservable<int>>
  732. {
  733. { 1, xs },
  734. { 2, ys }
  735. };
  736. var results = scheduler.Start(() => Observable.Case(() => 3, map, scheduler));
  737. results.Messages.AssertEqual(
  738. OnCompleted<int>(201)
  739. );
  740. xs.Subscriptions.AssertEqual(
  741. );
  742. ys.Subscriptions.AssertEqual(
  743. );
  744. }
  745. [Fact]
  746. public void CaseWithDefault_Throw()
  747. {
  748. var scheduler = new TestScheduler();
  749. var xs = scheduler.CreateHotObservable(
  750. OnNext(210, 1),
  751. OnNext(240, 2),
  752. OnNext(270, 3),
  753. OnCompleted<int>(300)
  754. );
  755. var ys = scheduler.CreateHotObservable(
  756. OnNext(220, 11),
  757. OnNext(250, 12),
  758. OnNext(280, 13),
  759. OnCompleted<int>(310)
  760. );
  761. var map = new Dictionary<int, IObservable<int>>
  762. {
  763. { 1, xs },
  764. { 2, ys }
  765. };
  766. var ex = new Exception();
  767. var results = scheduler.Start(() => Observable.Case(() => Throw<int>(ex), map, scheduler));
  768. results.Messages.AssertEqual(
  769. OnError<int>(200, ex)
  770. );
  771. xs.Subscriptions.AssertEqual(
  772. );
  773. ys.Subscriptions.AssertEqual(
  774. );
  775. }
  776. [Fact]
  777. public void CaseWithDefault_CheckDefault()
  778. {
  779. Observable.Case(() => 1, new Dictionary<int, IObservable<int>>(), DefaultScheduler.Instance)
  780. .AssertEqual(Observable.Case(() => 1, new Dictionary<int, IObservable<int>>()));
  781. }
  782. [Fact]
  783. public void Case_Error()
  784. {
  785. var scheduler = new TestScheduler();
  786. var ex = new Exception();
  787. var xs = scheduler.CreateHotObservable(
  788. OnNext(210, 1),
  789. OnNext(240, 2),
  790. OnNext(270, 3),
  791. OnError<int>(300, ex)
  792. );
  793. var ys = scheduler.CreateHotObservable(
  794. OnNext(220, 11),
  795. OnNext(250, 12),
  796. OnNext(280, 13),
  797. OnCompleted<int>(310)
  798. );
  799. var map = new Dictionary<int, IObservable<int>>
  800. {
  801. { 1, xs },
  802. { 2, ys }
  803. };
  804. var results = scheduler.Start(() => Observable.Case(() => 1, map, scheduler));
  805. results.Messages.AssertEqual(
  806. OnNext(210, 1),
  807. OnNext(240, 2),
  808. OnNext(270, 3),
  809. OnError<int>(300, ex)
  810. );
  811. xs.Subscriptions.AssertEqual(
  812. Subscribe(200, 300)
  813. );
  814. ys.Subscriptions.AssertEqual(
  815. );
  816. }
  817. #endregion
  818. #region + DoWhile +
  819. [Fact]
  820. public void DoWhile_ArgumentChecking()
  821. {
  822. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.DoWhile(DummyObservable<int>.Instance, null));
  823. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.DoWhile(default(IObservable<int>), DummyFunc<bool>.Instance));
  824. }
  825. [Fact]
  826. public void DoWhile_AlwaysFalse()
  827. {
  828. var scheduler = new TestScheduler();
  829. var xs = scheduler.CreateColdObservable(
  830. OnNext(50, 1),
  831. OnNext(100, 2),
  832. OnNext(150, 3),
  833. OnNext(200, 4),
  834. OnCompleted<int>(250)
  835. );
  836. var results = scheduler.Start(() => Observable.DoWhile(xs, () => false));
  837. results.Messages.AssertEqual(
  838. OnNext(250, 1),
  839. OnNext(300, 2),
  840. OnNext(350, 3),
  841. OnNext(400, 4),
  842. OnCompleted<int>(450)
  843. );
  844. xs.Subscriptions.AssertEqual(
  845. Subscribe(200, 450)
  846. );
  847. }
  848. [Fact]
  849. public void DoWhile_AlwaysTrue()
  850. {
  851. var scheduler = new TestScheduler();
  852. var xs = scheduler.CreateColdObservable(
  853. OnNext(50, 1),
  854. OnNext(100, 2),
  855. OnNext(150, 3),
  856. OnNext(200, 4),
  857. OnCompleted<int>(250)
  858. );
  859. var results = scheduler.Start(() => Observable.DoWhile(xs, () => true));
  860. results.Messages.AssertEqual(
  861. OnNext(250, 1),
  862. OnNext(300, 2),
  863. OnNext(350, 3),
  864. OnNext(400, 4),
  865. OnNext(500, 1),
  866. OnNext(550, 2),
  867. OnNext(600, 3),
  868. OnNext(650, 4),
  869. OnNext(750, 1),
  870. OnNext(800, 2),
  871. OnNext(850, 3),
  872. OnNext(900, 4)
  873. );
  874. xs.Subscriptions.AssertEqual(
  875. Subscribe(200, 450),
  876. Subscribe(450, 700),
  877. Subscribe(700, 950),
  878. Subscribe(950, 1000)
  879. );
  880. }
  881. [Fact]
  882. public void DoWhile_AlwaysTrue_Throw()
  883. {
  884. var scheduler = new TestScheduler();
  885. var ex = new Exception();
  886. var xs = scheduler.CreateColdObservable(
  887. OnError<int>(50, ex)
  888. );
  889. var results = scheduler.Start(() => Observable.DoWhile(xs, () => true));
  890. results.Messages.AssertEqual(
  891. OnError<int>(250, ex)
  892. );
  893. xs.Subscriptions.AssertEqual(
  894. Subscribe(200, 250)
  895. );
  896. }
  897. [Fact]
  898. public void DoWhile_AlwaysTrue_Infinite()
  899. {
  900. var scheduler = new TestScheduler();
  901. var xs = scheduler.CreateColdObservable(
  902. OnNext(50, 1)
  903. );
  904. var results = scheduler.Start(() => Observable.DoWhile(xs, () => true));
  905. results.Messages.AssertEqual(
  906. OnNext(250, 1)
  907. );
  908. xs.Subscriptions.AssertEqual(
  909. Subscribe(200, 1000)
  910. );
  911. }
  912. [Fact]
  913. public void DoWhile_SometimesTrue()
  914. {
  915. var scheduler = new TestScheduler();
  916. var xs = scheduler.CreateColdObservable(
  917. OnNext(50, 1),
  918. OnNext(100, 2),
  919. OnNext(150, 3),
  920. OnNext(200, 4),
  921. OnCompleted<int>(250)
  922. );
  923. int n = 0;
  924. var results = scheduler.Start(() => Observable.DoWhile(xs, () => ++n < 3));
  925. results.Messages.AssertEqual(
  926. OnNext(250, 1),
  927. OnNext(300, 2),
  928. OnNext(350, 3),
  929. OnNext(400, 4),
  930. OnNext(500, 1),
  931. OnNext(550, 2),
  932. OnNext(600, 3),
  933. OnNext(650, 4),
  934. OnNext(750, 1),
  935. OnNext(800, 2),
  936. OnNext(850, 3),
  937. OnNext(900, 4),
  938. OnCompleted<int>(950)
  939. );
  940. xs.Subscriptions.AssertEqual(
  941. Subscribe(200, 450),
  942. Subscribe(450, 700),
  943. Subscribe(700, 950)
  944. );
  945. }
  946. [Fact]
  947. public void DoWhile_SometimesThrows()
  948. {
  949. var scheduler = new TestScheduler();
  950. var xs = scheduler.CreateColdObservable(
  951. OnNext(50, 1),
  952. OnNext(100, 2),
  953. OnNext(150, 3),
  954. OnNext(200, 4),
  955. OnCompleted<int>(250)
  956. );
  957. int n = 0;
  958. var ex = new Exception();
  959. var results = scheduler.Start(() => Observable.DoWhile(xs, () => ++n < 3 ? true : Throw<bool>(ex)));
  960. results.Messages.AssertEqual(
  961. OnNext(250, 1),
  962. OnNext(300, 2),
  963. OnNext(350, 3),
  964. OnNext(400, 4),
  965. OnNext(500, 1),
  966. OnNext(550, 2),
  967. OnNext(600, 3),
  968. OnNext(650, 4),
  969. OnNext(750, 1),
  970. OnNext(800, 2),
  971. OnNext(850, 3),
  972. OnNext(900, 4),
  973. OnError<int>(950, ex)
  974. );
  975. xs.Subscriptions.AssertEqual(
  976. Subscribe(200, 450),
  977. Subscribe(450, 700),
  978. Subscribe(700, 950)
  979. );
  980. }
  981. #endregion
  982. #region + For +
  983. [Fact]
  984. public void For_ArgumentChecking()
  985. {
  986. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.For(DummyEnumerable<int>.Instance, default(Func<int, IObservable<int>>)));
  987. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.For(null, DummyFunc<int, IObservable<int>>.Instance));
  988. }
  989. [Fact]
  990. public void For_Basic()
  991. {
  992. var scheduler = new TestScheduler();
  993. var results = scheduler.Start(() => Observable.For(new[] { 1, 2, 3 }, x => scheduler.CreateColdObservable(
  994. OnNext<int>((ushort)(x * 100 + 10), x * 10 + 1),
  995. OnNext<int>((ushort)(x * 100 + 20), x * 10 + 2),
  996. OnNext<int>((ushort)(x * 100 + 30), x * 10 + 3),
  997. OnCompleted<int>((ushort)(x * 100 + 40)))));
  998. results.Messages.AssertEqual(
  999. OnNext(310, 11),
  1000. OnNext(320, 12),
  1001. OnNext(330, 13),
  1002. OnNext(550, 21),
  1003. OnNext(560, 22),
  1004. OnNext(570, 23),
  1005. OnNext(890, 31),
  1006. OnNext(900, 32),
  1007. OnNext(910, 33),
  1008. OnCompleted<int>(920)
  1009. );
  1010. }
  1011. IEnumerable<int> For_Error_Core(Exception ex)
  1012. {
  1013. yield return 1;
  1014. yield return 2;
  1015. yield return 3;
  1016. throw ex;
  1017. }
  1018. [Fact]
  1019. public void For_Error_Iterator()
  1020. {
  1021. var scheduler = new TestScheduler();
  1022. var ex = new Exception();
  1023. var results = scheduler.Start(() => Observable.For(For_Error_Core(ex), x => scheduler.CreateColdObservable(
  1024. OnNext<int>((ushort)(x * 100 + 10), x * 10 + 1),
  1025. OnNext<int>((ushort)(x * 100 + 20), x * 10 + 2),
  1026. OnNext<int>((ushort)(x * 100 + 30), x * 10 + 3),
  1027. OnCompleted<int>((ushort)(x * 100 + 40)))));
  1028. results.Messages.AssertEqual(
  1029. OnNext(310, 11),
  1030. OnNext(320, 12),
  1031. OnNext(330, 13),
  1032. OnNext(550, 21),
  1033. OnNext(560, 22),
  1034. OnNext(570, 23),
  1035. OnNext(890, 31),
  1036. OnNext(900, 32),
  1037. OnNext(910, 33),
  1038. OnError<int>(920, ex)
  1039. );
  1040. }
  1041. [Fact]
  1042. public void For_Error_Source()
  1043. {
  1044. var scheduler = new TestScheduler();
  1045. var ex = new Exception();
  1046. var results = scheduler.Start(() => Observable.For(new[] { 1, 2, 3 }, x => Observable.Throw<int>(ex)));
  1047. results.Messages.AssertEqual(
  1048. OnError<int>(200, ex)
  1049. );
  1050. }
  1051. [Fact]
  1052. public void For_SelectorThrows()
  1053. {
  1054. var scheduler = new TestScheduler();
  1055. var ex = new Exception();
  1056. var results = scheduler.Start(() => Observable.For(new[] { 1, 2, 3 }, x => Throw<IObservable<int>>(ex)));
  1057. results.Messages.AssertEqual(
  1058. OnError<int>(200, ex)
  1059. );
  1060. }
  1061. #endregion
  1062. #region + If +
  1063. [Fact]
  1064. public void If_ArgumentChecking()
  1065. {
  1066. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.If(null, DummyObservable<int>.Instance, DummyObservable<int>.Instance));
  1067. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.If(DummyFunc<bool>.Instance, null, DummyObservable<int>.Instance));
  1068. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.If(DummyFunc<bool>.Instance, DummyObservable<int>.Instance, default(IObservable<int>)));
  1069. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.If(null, DummyObservable<int>.Instance, Scheduler.Default));
  1070. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.If(DummyFunc<bool>.Instance, default(IObservable<int>), Scheduler.Default));
  1071. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.If(DummyFunc<bool>.Instance, DummyObservable<int>.Instance, default(IScheduler)));
  1072. }
  1073. [Fact]
  1074. public void If_True()
  1075. {
  1076. var scheduler = new TestScheduler();
  1077. var xs = scheduler.CreateHotObservable(
  1078. OnNext(210, 1),
  1079. OnNext(250, 2),
  1080. OnCompleted<int>(300)
  1081. );
  1082. var ys = scheduler.CreateHotObservable(
  1083. OnNext(310, 3),
  1084. OnNext(350, 4),
  1085. OnCompleted<int>(400)
  1086. );
  1087. var results = scheduler.Start(() => Observable.If(() => true, xs, ys));
  1088. results.Messages.AssertEqual(
  1089. OnNext(210, 1),
  1090. OnNext(250, 2),
  1091. OnCompleted<int>(300)
  1092. );
  1093. xs.Subscriptions.AssertEqual(
  1094. Subscribe(200, 300)
  1095. );
  1096. ys.Subscriptions.AssertEqual(
  1097. );
  1098. }
  1099. [Fact]
  1100. public void If_False()
  1101. {
  1102. var scheduler = new TestScheduler();
  1103. var xs = scheduler.CreateHotObservable(
  1104. OnNext(210, 1),
  1105. OnNext(250, 2),
  1106. OnCompleted<int>(300)
  1107. );
  1108. var ys = scheduler.CreateHotObservable(
  1109. OnNext(310, 3),
  1110. OnNext(350, 4),
  1111. OnCompleted<int>(400)
  1112. );
  1113. var results = scheduler.Start(() => Observable.If(() => false, xs, ys));
  1114. results.Messages.AssertEqual(
  1115. OnNext(310, 3),
  1116. OnNext(350, 4),
  1117. OnCompleted<int>(400)
  1118. );
  1119. xs.Subscriptions.AssertEqual(
  1120. );
  1121. ys.Subscriptions.AssertEqual(
  1122. Subscribe(200, 400)
  1123. );
  1124. }
  1125. [Fact]
  1126. public void If_Throw()
  1127. {
  1128. var scheduler = new TestScheduler();
  1129. var xs = scheduler.CreateHotObservable(
  1130. OnNext(210, 1),
  1131. OnNext(250, 2),
  1132. OnCompleted<int>(300)
  1133. );
  1134. var ys = scheduler.CreateHotObservable(
  1135. OnNext(310, 3),
  1136. OnNext(350, 4),
  1137. OnCompleted<int>(400)
  1138. );
  1139. var ex = new Exception();
  1140. var results = scheduler.Start(() => Observable.If(() => Throw<bool>(ex), xs, ys));
  1141. results.Messages.AssertEqual(
  1142. OnError<int>(200, ex)
  1143. );
  1144. xs.Subscriptions.AssertEqual(
  1145. );
  1146. ys.Subscriptions.AssertEqual(
  1147. );
  1148. }
  1149. [Fact]
  1150. public void If_Dispose()
  1151. {
  1152. var scheduler = new TestScheduler();
  1153. var xs = scheduler.CreateHotObservable(
  1154. OnNext(210, 1),
  1155. OnNext(250, 2)
  1156. );
  1157. var ys = scheduler.CreateHotObservable(
  1158. OnNext(310, 3),
  1159. OnNext(350, 4),
  1160. OnCompleted<int>(400)
  1161. );
  1162. var results = scheduler.Start(() => Observable.If(() => true, xs, ys));
  1163. results.Messages.AssertEqual(
  1164. OnNext(210, 1),
  1165. OnNext(250, 2)
  1166. );
  1167. xs.Subscriptions.AssertEqual(
  1168. Subscribe(200, 1000)
  1169. );
  1170. ys.Subscriptions.AssertEqual(
  1171. );
  1172. }
  1173. [Fact]
  1174. public void If_Default_ArgumentChecking()
  1175. {
  1176. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.If<int>(null, DummyObservable<int>.Instance));
  1177. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.If<int>(DummyFunc<bool>.Instance, null));
  1178. }
  1179. [Fact]
  1180. public void If_Default_Completed()
  1181. {
  1182. var scheduler = new TestScheduler();
  1183. var xs = scheduler.CreateHotObservable(
  1184. OnNext(110, 1),
  1185. OnNext(220, 2),
  1186. OnNext(330, 3),
  1187. OnCompleted<int>(440)
  1188. );
  1189. var b = false;
  1190. scheduler.ScheduleAbsolute(150, () => b = true);
  1191. var results = scheduler.Start(() => Observable.If(() => b, xs));
  1192. results.Messages.AssertEqual(
  1193. OnNext(220, 2),
  1194. OnNext(330, 3),
  1195. OnCompleted<int>(440)
  1196. );
  1197. xs.Subscriptions.AssertEqual(
  1198. Subscribe(200, 440)
  1199. );
  1200. }
  1201. [Fact]
  1202. public void If_Default_Error()
  1203. {
  1204. var scheduler = new TestScheduler();
  1205. var ex = new Exception();
  1206. var xs = scheduler.CreateHotObservable(
  1207. OnNext(110, 1),
  1208. OnNext(220, 2),
  1209. OnNext(330, 3),
  1210. OnError<int>(440, ex)
  1211. );
  1212. var b = false;
  1213. scheduler.ScheduleAbsolute(150, () => b = true);
  1214. var results = scheduler.Start(() => Observable.If(() => b, xs));
  1215. results.Messages.AssertEqual(
  1216. OnNext(220, 2),
  1217. OnNext(330, 3),
  1218. OnError<int>(440, ex)
  1219. );
  1220. xs.Subscriptions.AssertEqual(
  1221. Subscribe(200, 440)
  1222. );
  1223. }
  1224. [Fact]
  1225. public void If_Default_Never()
  1226. {
  1227. var scheduler = new TestScheduler();
  1228. var xs = scheduler.CreateHotObservable(
  1229. OnNext(110, 1),
  1230. OnNext(220, 2),
  1231. OnNext(330, 3)
  1232. );
  1233. var b = false;
  1234. scheduler.ScheduleAbsolute(150, () => b = true);
  1235. var results = scheduler.Start(() => Observable.If(() => b, xs));
  1236. results.Messages.AssertEqual(
  1237. OnNext(220, 2),
  1238. OnNext(330, 3)
  1239. );
  1240. xs.Subscriptions.AssertEqual(
  1241. Subscribe(200, 1000)
  1242. );
  1243. }
  1244. [Fact]
  1245. public void If_Default_Other()
  1246. {
  1247. var scheduler = new TestScheduler();
  1248. var xs = scheduler.CreateHotObservable(
  1249. OnNext(110, 1),
  1250. OnNext(220, 2),
  1251. OnNext(330, 3),
  1252. OnError<int>(440, new Exception())
  1253. );
  1254. var b = true;
  1255. scheduler.ScheduleAbsolute(150, () => b = false);
  1256. var results = scheduler.Start(() => Observable.If(() => b, xs));
  1257. results.Messages.AssertEqual(
  1258. OnCompleted<int>(200)
  1259. );
  1260. xs.Subscriptions.AssertEqual(
  1261. );
  1262. }
  1263. [Fact]
  1264. public void If_Default_Scheduler()
  1265. {
  1266. var scheduler = new TestScheduler();
  1267. var xs = scheduler.CreateHotObservable(
  1268. OnNext(110, 1),
  1269. OnNext(220, 2),
  1270. OnNext(330, 3),
  1271. OnError<int>(440, new Exception())
  1272. );
  1273. var results = scheduler.Start(() => Observable.If(() => false, xs, scheduler));
  1274. results.Messages.AssertEqual(
  1275. OnCompleted<int>(201)
  1276. );
  1277. xs.Subscriptions.AssertEqual(
  1278. );
  1279. }
  1280. #endregion
  1281. #region + While +
  1282. [Fact]
  1283. public void While_ArgumentChecking()
  1284. {
  1285. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.While(default(Func<bool>), DummyObservable<int>.Instance));
  1286. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.While(DummyFunc<bool>.Instance, default(IObservable<int>)));
  1287. }
  1288. [Fact]
  1289. public void While_AlwaysFalse()
  1290. {
  1291. var scheduler = new TestScheduler();
  1292. var xs = scheduler.CreateColdObservable(
  1293. OnNext(50, 1),
  1294. OnNext(100, 2),
  1295. OnNext(150, 3),
  1296. OnNext(200, 4),
  1297. OnCompleted<int>(250)
  1298. );
  1299. var results = scheduler.Start(() => Observable.While(() => false, xs));
  1300. results.Messages.AssertEqual(
  1301. OnCompleted<int>(200)
  1302. );
  1303. xs.Subscriptions.AssertEqual(
  1304. );
  1305. }
  1306. [Fact]
  1307. public void While_AlwaysTrue()
  1308. {
  1309. var scheduler = new TestScheduler();
  1310. var xs = scheduler.CreateColdObservable(
  1311. OnNext(50, 1),
  1312. OnNext(100, 2),
  1313. OnNext(150, 3),
  1314. OnNext(200, 4),
  1315. OnCompleted<int>(250)
  1316. );
  1317. var results = scheduler.Start(() => Observable.While(() => true, xs));
  1318. results.Messages.AssertEqual(
  1319. OnNext(250, 1),
  1320. OnNext(300, 2),
  1321. OnNext(350, 3),
  1322. OnNext(400, 4),
  1323. OnNext(500, 1),
  1324. OnNext(550, 2),
  1325. OnNext(600, 3),
  1326. OnNext(650, 4),
  1327. OnNext(750, 1),
  1328. OnNext(800, 2),
  1329. OnNext(850, 3),
  1330. OnNext(900, 4)
  1331. );
  1332. xs.Subscriptions.AssertEqual(
  1333. Subscribe(200, 450),
  1334. Subscribe(450, 700),
  1335. Subscribe(700, 950),
  1336. Subscribe(950, 1000)
  1337. );
  1338. }
  1339. [Fact]
  1340. public void While_AlwaysTrue_Throw()
  1341. {
  1342. var scheduler = new TestScheduler();
  1343. var ex = new Exception();
  1344. var xs = scheduler.CreateColdObservable(
  1345. OnError<int>(50, ex)
  1346. );
  1347. var results = scheduler.Start(() => Observable.While(() => true, xs));
  1348. results.Messages.AssertEqual(
  1349. OnError<int>(250, ex)
  1350. );
  1351. xs.Subscriptions.AssertEqual(
  1352. Subscribe(200, 250)
  1353. );
  1354. }
  1355. [Fact]
  1356. public void While_AlwaysTrue_Infinite()
  1357. {
  1358. var scheduler = new TestScheduler();
  1359. var xs = scheduler.CreateColdObservable(
  1360. OnNext(50, 1)
  1361. );
  1362. var results = scheduler.Start(() => Observable.While(() => true, xs));
  1363. results.Messages.AssertEqual(
  1364. OnNext(250, 1)
  1365. );
  1366. xs.Subscriptions.AssertEqual(
  1367. Subscribe(200, 1000)
  1368. );
  1369. }
  1370. [Fact]
  1371. public void While_SometimesTrue()
  1372. {
  1373. var scheduler = new TestScheduler();
  1374. var xs = scheduler.CreateColdObservable(
  1375. OnNext(50, 1),
  1376. OnNext(100, 2),
  1377. OnNext(150, 3),
  1378. OnNext(200, 4),
  1379. OnCompleted<int>(250)
  1380. );
  1381. int n = 0;
  1382. var results = scheduler.Start(() => Observable.While(() => ++n < 3, xs));
  1383. results.Messages.AssertEqual(
  1384. OnNext(250, 1),
  1385. OnNext(300, 2),
  1386. OnNext(350, 3),
  1387. OnNext(400, 4),
  1388. OnNext(500, 1),
  1389. OnNext(550, 2),
  1390. OnNext(600, 3),
  1391. OnNext(650, 4),
  1392. OnCompleted<int>(700)
  1393. );
  1394. xs.Subscriptions.AssertEqual(
  1395. Subscribe(200, 450),
  1396. Subscribe(450, 700)
  1397. );
  1398. }
  1399. static T Throw<T>(Exception ex)
  1400. {
  1401. throw ex;
  1402. }
  1403. [Fact]
  1404. public void While_SometimesThrows()
  1405. {
  1406. var scheduler = new TestScheduler();
  1407. var xs = scheduler.CreateColdObservable(
  1408. OnNext(50, 1),
  1409. OnNext(100, 2),
  1410. OnNext(150, 3),
  1411. OnNext(200, 4),
  1412. OnCompleted<int>(250)
  1413. );
  1414. int n = 0;
  1415. var ex = new Exception();
  1416. var results = scheduler.Start(() => Observable.While(() => ++n < 3 ? true : Throw<bool>(ex), xs));
  1417. results.Messages.AssertEqual(
  1418. OnNext(250, 1),
  1419. OnNext(300, 2),
  1420. OnNext(350, 3),
  1421. OnNext(400, 4),
  1422. OnNext(500, 1),
  1423. OnNext(550, 2),
  1424. OnNext(600, 3),
  1425. OnNext(650, 4),
  1426. OnError<int>(700, ex)
  1427. );
  1428. xs.Subscriptions.AssertEqual(
  1429. Subscribe(200, 450),
  1430. Subscribe(450, 700)
  1431. );
  1432. }
  1433. #endregion
  1434. #region General tests for loops
  1435. #if HAS_STACKTRACE
  1436. [Fact]
  1437. public void LoopTest1()
  1438. {
  1439. var loop = Observable.Defer(() =>
  1440. {
  1441. var n = 0;
  1442. return Observable.While(
  1443. () => n++ < 5,
  1444. Observable.Defer(() =>
  1445. {
  1446. return Observable.For(
  1447. Enumerable.Range(0, n),
  1448. x => Observable.Return(x)
  1449. );
  1450. })
  1451. );
  1452. });
  1453. var res = new List<int>();
  1454. var std = new List<int>();
  1455. loop.ForEach(x =>
  1456. {
  1457. res.Add(x);
  1458. std.Add(new System.Diagnostics.StackTrace().FrameCount);
  1459. });
  1460. Assert.True(res.SequenceEqual(new[] { 0, 0, 1, 0, 1, 2, 0, 1, 2, 3, 0, 1, 2, 3, 4 }));
  1461. Assert.True(std.Distinct().Count() == 1);
  1462. }
  1463. [Fact]
  1464. public void LoopTest2()
  1465. {
  1466. var n = 0;
  1467. var loop = default(IObservable<int>);
  1468. loop = Observable.While(
  1469. () => n++ < 10,
  1470. Observable.Concat(
  1471. Observable.Return(42),
  1472. Observable.Defer(() => loop)
  1473. )
  1474. );
  1475. var res = new List<int>();
  1476. var std = new List<int>();
  1477. loop.ForEach(x =>
  1478. {
  1479. res.Add(x);
  1480. std.Add(new System.Diagnostics.StackTrace().FrameCount);
  1481. });
  1482. Assert.True(res.SequenceEqual(Enumerable.Repeat(42, 10)));
  1483. Assert.True(std.Distinct().Count() == 1);
  1484. }
  1485. #endif
  1486. #endregion
  1487. }
  1488. }