ObservableCreationTest.cs 75 KB


  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. using System;
  3. using System.Collections.Generic;
  4. using System.ComponentModel;
  5. using System.Diagnostics;
  6. using System.Linq;
  7. using System.Reactive;
  8. using System.Reactive.Concurrency;
  9. using System.Reactive.Disposables;
  10. using System.Reactive.Linq;
  11. using System.Reflection;
  12. using System.Threading;
  13. using Microsoft.Reactive.Testing;
  14. using Xunit;
  15. using ReactiveTests.Dummies;
  16. #if !NO_TPL
  17. using System.Threading.Tasks;
  18. #endif
  19. namespace ReactiveTests.Tests
  20. {
  21. public partial class ObservableTest : ReactiveTest
  22. {
  23. #region - Create -
  24. [Fact]
  25. public void Create_ArgumentChecking()
  26. {
  27. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Create<int>(default(Func<IObserver<int>, Action>)));
  28. //
  29. // BREAKING CHANGE v2.0 > v1.x - Returning null from Subscribe means "nothing to do upon unsubscription"
  30. // all null-coalesces to Disposable.Empty.
  31. //
  32. //ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Create<int>(o => default(Action)).Subscribe(DummyObserver<int>.Instance));
  33. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Create<int>(o => () => { }).Subscribe(null));
  34. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Create<int>(o =>
  35. {
  36. o.OnError(null);
  37. return () => { };
  38. }).Subscribe(null));
  39. }
  40. [Fact]
  41. public void Create_NullCoalescingAction()
  42. {
  43. var xs = Observable.Create<int>(o =>
  44. {
  45. o.OnNext(42);
  46. return default(Action);
  47. });
  48. var lst = new List<int>();
  49. var d = xs.Subscribe(lst.Add);
  50. d.Dispose();
  51. Assert.True(lst.SequenceEqual(new[] { 42 }));
  52. }
  53. [Fact]
  54. public void Create_Next()
  55. {
  56. var scheduler = new TestScheduler();
  57. var res = scheduler.Start(() =>
  58. Observable.Create<int>(o =>
  59. {
  60. o.OnNext(1);
  61. o.OnNext(2);
  62. return () => { };
  63. })
  64. );
  65. res.Messages.AssertEqual(
  66. OnNext(200, 1),
  67. OnNext(200, 2)
  68. );
  69. }
  70. [Fact]
  71. public void Create_Completed()
  72. {
  73. var scheduler = new TestScheduler();
  74. var res = scheduler.Start(() =>
  75. Observable.Create<int>(o =>
  76. {
  77. o.OnCompleted();
  78. o.OnNext(100);
  79. o.OnError(new Exception());
  80. o.OnCompleted();
  81. return () => { };
  82. })
  83. );
  84. res.Messages.AssertEqual(
  85. OnCompleted<int>(200)
  86. );
  87. }
  88. [Fact]
  89. public void Create_Error()
  90. {
  91. var scheduler = new TestScheduler();
  92. var ex = new Exception();
  93. var res = scheduler.Start(() =>
  94. Observable.Create<int>(o =>
  95. {
  96. o.OnError(ex);
  97. o.OnNext(100);
  98. o.OnError(new Exception());
  99. o.OnCompleted();
  100. return () => { };
  101. })
  102. );
  103. res.Messages.AssertEqual(
  104. OnError<int>(200, ex)
  105. );
  106. }
  107. [Fact]
  108. public void Create_Exception()
  109. {
  110. ReactiveAssert.Throws<InvalidOperationException>(() =>
  111. Observable.Create<int>(new Func<IObserver<int>, Action>(o => { throw new InvalidOperationException(); })).Subscribe());
  112. }
  113. [Fact]
  114. public void Create_Dispose()
  115. {
  116. var scheduler = new TestScheduler();
  117. var res = scheduler.Start(() =>
  118. Observable.Create<int>(o =>
  119. {
  120. var stopped = false;
  121. o.OnNext(1);
  122. o.OnNext(2);
  123. scheduler.Schedule(TimeSpan.FromTicks(600), () =>
  124. {
  125. if (!stopped)
  126. o.OnNext(3);
  127. });
  128. scheduler.Schedule(TimeSpan.FromTicks(700), () =>
  129. {
  130. if (!stopped)
  131. o.OnNext(4);
  132. });
  133. scheduler.Schedule(TimeSpan.FromTicks(900), () =>
  134. {
  135. if (!stopped)
  136. o.OnNext(5);
  137. });
  138. scheduler.Schedule(TimeSpan.FromTicks(1100), () =>
  139. {
  140. if (!stopped)
  141. o.OnNext(6);
  142. });
  143. return () => { stopped = true; };
  144. })
  145. );
  146. res.Messages.AssertEqual(
  147. OnNext(200, 1),
  148. OnNext(200, 2),
  149. OnNext(800, 3),
  150. OnNext(900, 4)
  151. );
  152. }
  153. [Fact]
  154. public void Create_ObserverThrows()
  155. {
  156. ReactiveAssert.Throws<InvalidOperationException>(() =>
  157. Observable.Create<int>(o =>
  158. {
  159. o.OnNext(1);
  160. return () => { };
  161. }).Subscribe(x => { throw new InvalidOperationException(); }));
  162. ReactiveAssert.Throws<InvalidOperationException>(() =>
  163. Observable.Create<int>(o =>
  164. {
  165. o.OnError(new Exception());
  166. return () => { };
  167. }).Subscribe(x => { }, ex => { throw new InvalidOperationException(); }));
  168. ReactiveAssert.Throws<InvalidOperationException>(() =>
  169. Observable.Create<int>(o =>
  170. {
  171. o.OnCompleted();
  172. return () => { };
  173. }).Subscribe(x => { }, ex => { }, () => { throw new InvalidOperationException(); }));
  174. }
  175. [Fact]
  176. public void CreateWithDisposable_ArgumentChecking()
  177. {
  178. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Create<int>(default(Func<IObserver<int>, IDisposable>)));
  179. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Create<int>(o => DummyDisposable.Instance).Subscribe(null));
  180. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Create<int>(o =>
  181. {
  182. o.OnError(null);
  183. return DummyDisposable.Instance;
  184. }).Subscribe(null));
  185. }
  186. [Fact]
  187. public void CreateWithDisposable_NullCoalescingAction()
  188. {
  189. var xs = Observable.Create<int>(o =>
  190. {
  191. o.OnNext(42);
  192. return default(IDisposable);
  193. });
  194. var lst = new List<int>();
  195. var d = xs.Subscribe(lst.Add);
  196. d.Dispose();
  197. Assert.True(lst.SequenceEqual(new[] { 42 }));
  198. }
  199. [Fact]
  200. public void CreateWithDisposable_Next()
  201. {
  202. var scheduler = new TestScheduler();
  203. var res = scheduler.Start(() =>
  204. Observable.Create<int>(o =>
  205. {
  206. o.OnNext(1);
  207. o.OnNext(2);
  208. return Disposable.Empty;
  209. })
  210. );
  211. res.Messages.AssertEqual(
  212. OnNext(200, 1),
  213. OnNext(200, 2)
  214. );
  215. }
  216. [Fact]
  217. public void CreateWithDisposable_Completed()
  218. {
  219. var scheduler = new TestScheduler();
  220. var res = scheduler.Start(() =>
  221. Observable.Create<int>(o =>
  222. {
  223. o.OnCompleted();
  224. o.OnNext(100);
  225. o.OnError(new Exception());
  226. o.OnCompleted();
  227. return Disposable.Empty;
  228. })
  229. );
  230. res.Messages.AssertEqual(
  231. OnCompleted<int>(200)
  232. );
  233. }
  234. [Fact]
  235. public void CreateWithDisposable_Error()
  236. {
  237. var scheduler = new TestScheduler();
  238. var ex = new Exception();
  239. var res = scheduler.Start(() =>
  240. Observable.Create<int>(o =>
  241. {
  242. o.OnError(ex);
  243. o.OnNext(100);
  244. o.OnError(new Exception());
  245. o.OnCompleted();
  246. return Disposable.Empty;
  247. })
  248. );
  249. res.Messages.AssertEqual(
  250. OnError<int>(200, ex)
  251. );
  252. }
  253. [Fact]
  254. public void CreateWithDisposable_Exception()
  255. {
  256. ReactiveAssert.Throws<InvalidOperationException>(() =>
  257. Observable.Create<int>(new Func<IObserver<int>, IDisposable>(o => { throw new InvalidOperationException(); })).Subscribe());
  258. }
  259. [Fact]
  260. public void CreateWithDisposable_Dispose()
  261. {
  262. var scheduler = new TestScheduler();
  263. var res = scheduler.Start(() =>
  264. Observable.Create<int>(o =>
  265. {
  266. var d = new BooleanDisposable();
  267. o.OnNext(1);
  268. o.OnNext(2);
  269. scheduler.Schedule(TimeSpan.FromTicks(600), () =>
  270. {
  271. if (!d.IsDisposed)
  272. o.OnNext(3);
  273. });
  274. scheduler.Schedule(TimeSpan.FromTicks(700), () =>
  275. {
  276. if (!d.IsDisposed)
  277. o.OnNext(4);
  278. });
  279. scheduler.Schedule(TimeSpan.FromTicks(900), () =>
  280. {
  281. if (!d.IsDisposed)
  282. o.OnNext(5);
  283. });
  284. scheduler.Schedule(TimeSpan.FromTicks(1100), () =>
  285. {
  286. if (!d.IsDisposed)
  287. o.OnNext(6);
  288. });
  289. return d;
  290. })
  291. );
  292. res.Messages.AssertEqual(
  293. OnNext(200, 1),
  294. OnNext(200, 2),
  295. OnNext(800, 3),
  296. OnNext(900, 4)
  297. );
  298. }
  299. [Fact]
  300. public void CreateWithDisposable_ObserverThrows()
  301. {
  302. ReactiveAssert.Throws<InvalidOperationException>(() =>
  303. Observable.Create<int>(o =>
  304. {
  305. o.OnNext(1);
  306. return Disposable.Empty;
  307. }).Subscribe(x => { throw new InvalidOperationException(); }));
  308. ReactiveAssert.Throws<InvalidOperationException>(() =>
  309. Observable.Create<int>(o =>
  310. {
  311. o.OnError(new Exception());
  312. return Disposable.Empty;
  313. }).Subscribe(x => { }, ex => { throw new InvalidOperationException(); }));
  314. ReactiveAssert.Throws<InvalidOperationException>(() =>
  315. Observable.Create<int>(o =>
  316. {
  317. o.OnCompleted();
  318. return Disposable.Empty;
  319. }).Subscribe(x => { }, ex => { }, () => { throw new InvalidOperationException(); }));
  320. }
  321. #endregion
  322. #region - CreateAsync -
  323. #if !NO_TPL
  324. [Fact]
  325. public void CreateAsync_ArgumentChecking()
  326. {
  327. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Create<int>(default(Func<IObserver<int>, Task>)));
  328. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Create<int>(default(Func<IObserver<int>, CancellationToken, Task>)));
  329. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Create<int>(default(Func<IObserver<int>, Task<IDisposable>>)));
  330. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Create<int>(default(Func<IObserver<int>, CancellationToken, Task<IDisposable>>)));
  331. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Create<int>(default(Func<IObserver<int>, Task<Action>>)));
  332. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Create<int>(default(Func<IObserver<int>, CancellationToken, Task<Action>>)));
  333. }
  334. [Fact]
  335. public void CreateAsync_NullCoalescingAction1()
  336. {
  337. var xs = Observable.Create<int>(o =>
  338. {
  339. o.OnNext(42);
  340. return Task.Factory.StartNew(() => default(Action));
  341. });
  342. var lst = new List<int>();
  343. var d = xs.Subscribe(lst.Add);
  344. d.Dispose();
  345. Assert.True(lst.SequenceEqual(new[] { 42 }));
  346. }
  347. [Fact]
  348. public void CreateAsync_NullCoalescingAction2()
  349. {
  350. var xs = Observable.Create<int>((o, ct) =>
  351. {
  352. o.OnNext(42);
  353. return Task.Factory.StartNew(() => default(Action));
  354. });
  355. var lst = new List<int>();
  356. var d = xs.Subscribe(lst.Add);
  357. d.Dispose();
  358. Assert.True(lst.SequenceEqual(new[] { 42 }));
  359. }
  360. [Fact]
  361. public void CreateAsync_NullCoalescingDisposable1()
  362. {
  363. var xs = Observable.Create<int>(o =>
  364. {
  365. o.OnNext(42);
  366. return Task.Factory.StartNew(() => default(IDisposable));
  367. });
  368. var lst = new List<int>();
  369. var d = xs.Subscribe(lst.Add);
  370. d.Dispose();
  371. Assert.True(lst.SequenceEqual(new[] { 42 }));
  372. }
  373. [Fact]
  374. public void CreateAsync_NullCoalescingDisposable2()
  375. {
  376. var xs = Observable.Create<int>((o, ct) =>
  377. {
  378. o.OnNext(42);
  379. return Task.Factory.StartNew(() => default(IDisposable));
  380. });
  381. var lst = new List<int>();
  382. var d = xs.Subscribe(lst.Add);
  383. d.Dispose();
  384. Assert.True(lst.SequenceEqual(new[] { 42 }));
  385. }
  386. Task Producer1(IObserver<int> results, CancellationToken token, IScheduler scheduler)
  387. {
  388. var tcs = new TaskCompletionSource<object>();
  389. var x = 0;
  390. var d = scheduler.Schedule(TimeSpan.FromTicks(100), self =>
  391. {
  392. results.OnNext(++x);
  393. self(TimeSpan.FromTicks(100));
  394. });
  395. token.Register(d.Dispose);
  396. return tcs.Task;
  397. }
  398. [Fact]
  399. public void CreateAsync_Never()
  400. {
  401. RunSynchronously(() =>
  402. {
  403. var scheduler = new TestScheduler();
  404. var res = scheduler.Start(() =>
  405. Observable.Create<int>((observer, token) => Producer1(observer, token, scheduler))
  406. );
  407. res.Messages.AssertEqual(
  408. OnNext(300, 1),
  409. OnNext(400, 2),
  410. OnNext(500, 3),
  411. OnNext(600, 4),
  412. OnNext(700, 5),
  413. OnNext(800, 6),
  414. OnNext(900, 7)
  415. );
  416. });
  417. }
  418. Task Producer2(IObserver<int> results, CancellationToken token, IScheduler scheduler)
  419. {
  420. var tcs = new TaskCompletionSource<object>();
  421. var x = 0;
  422. var d = scheduler.Schedule(TimeSpan.FromTicks(100), self =>
  423. {
  424. if (x == 4)
  425. {
  426. tcs.SetResult(null);
  427. }
  428. results.OnNext(++x);
  429. self(TimeSpan.FromTicks(100));
  430. });
  431. token.Register(d.Dispose);
  432. return tcs.Task;
  433. }
  434. [Fact]
  435. public void CreateAsync_Completed1()
  436. {
  437. RunSynchronously(() =>
  438. {
  439. var scheduler = new TestScheduler();
  440. var res = scheduler.Start(() =>
  441. Observable.Create<int>((observer, token) => Producer2(observer, token, scheduler))
  442. );
  443. res.Messages.AssertEqual(
  444. OnNext(300, 1),
  445. OnNext(400, 2),
  446. OnNext(500, 3),
  447. OnNext(600, 4),
  448. OnCompleted<int>(700)
  449. );
  450. });
  451. }
  452. Task Producer3(IObserver<int> results, CancellationToken token, IScheduler scheduler)
  453. {
  454. var tcs = new TaskCompletionSource<object>();
  455. var x = 0;
  456. var d = scheduler.Schedule(TimeSpan.FromTicks(100), self =>
  457. {
  458. if (x == 4)
  459. {
  460. results.OnCompleted();
  461. }
  462. results.OnNext(++x);
  463. self(TimeSpan.FromTicks(100));
  464. });
  465. token.Register(d.Dispose);
  466. return tcs.Task;
  467. }
  468. [Fact]
  469. public void CreateAsync_Completed2()
  470. {
  471. RunSynchronously(() =>
  472. {
  473. var scheduler = new TestScheduler();
  474. var res = scheduler.Start(() =>
  475. Observable.Create<int>((observer, token) => Producer3(observer, token, scheduler))
  476. );
  477. res.Messages.AssertEqual(
  478. OnNext(300, 1),
  479. OnNext(400, 2),
  480. OnNext(500, 3),
  481. OnNext(600, 4),
  482. OnCompleted<int>(700)
  483. );
  484. });
  485. }
  486. Task Producer4(IObserver<int> results, CancellationToken token, IScheduler scheduler, Exception exception)
  487. {
  488. var tcs = new TaskCompletionSource<object>();
  489. var x = 0;
  490. var d = scheduler.Schedule(TimeSpan.FromTicks(100), self =>
  491. {
  492. if (x == 4)
  493. {
  494. results.OnError(exception);
  495. }
  496. results.OnNext(++x);
  497. self(TimeSpan.FromTicks(100));
  498. });
  499. token.Register(d.Dispose);
  500. return tcs.Task;
  501. }
  502. [Fact]
  503. public void CreateAsync_Error1()
  504. {
  505. RunSynchronously(() =>
  506. {
  507. var scheduler = new TestScheduler();
  508. var exception = new Exception();
  509. var res = scheduler.Start(() =>
  510. Observable.Create<int>((observer, token) => Producer4(observer, token, scheduler, exception))
  511. );
  512. res.Messages.AssertEqual(
  513. OnNext(300, 1),
  514. OnNext(400, 2),
  515. OnNext(500, 3),
  516. OnNext(600, 4),
  517. OnError<int>(700, exception)
  518. );
  519. });
  520. }
  521. Task Producer5(IObserver<int> results, CancellationToken token, IScheduler scheduler, Exception exception)
  522. {
  523. var tcs = new TaskCompletionSource<object>();
  524. var x = 0;
  525. var d = scheduler.Schedule(TimeSpan.FromTicks(100), self =>
  526. {
  527. if (x == 4)
  528. {
  529. tcs.SetException(exception);
  530. }
  531. results.OnNext(++x);
  532. self(TimeSpan.FromTicks(100));
  533. });
  534. token.Register(d.Dispose);
  535. return tcs.Task;
  536. }
  537. [Fact]
  538. public void CreateAsync_Error2()
  539. {
  540. RunSynchronously(() =>
  541. {
  542. var scheduler = new TestScheduler();
  543. var exception = new Exception();
  544. var res = scheduler.Start(() =>
  545. Observable.Create<int>((observer, token) => Producer5(observer, token, scheduler, exception))
  546. );
  547. res.Messages.AssertEqual(
  548. OnNext(300, 1),
  549. OnNext(400, 2),
  550. OnNext(500, 3),
  551. OnNext(600, 4),
  552. OnError<int>(700, exception)
  553. );
  554. });
  555. }
  556. Task Producer6(IObserver<int> results, CancellationToken token, Exception exception)
  557. {
  558. throw exception;
  559. }
  560. [Fact]
  561. public void CreateAsync_Error3()
  562. {
  563. RunSynchronously(() =>
  564. {
  565. var scheduler = new TestScheduler();
  566. var exception = new InvalidOperationException();
  567. var res = scheduler.Start(() =>
  568. Observable.Create<int>((observer, token) => Producer6(observer, token, exception))
  569. );
  570. res.Messages.AssertEqual(
  571. OnError<int>(200, exception)
  572. );
  573. });
  574. }
  575. Task Producer7(IObserver<int> results, CancellationToken token, IScheduler scheduler)
  576. {
  577. var tcs = new TaskCompletionSource<object>();
  578. var x = 0;
  579. var d = scheduler.Schedule(TimeSpan.FromTicks(100), self =>
  580. {
  581. if (x == 4)
  582. {
  583. tcs.SetResult(null);
  584. }
  585. results.OnNext(++x);
  586. self(TimeSpan.FromTicks(100));
  587. });
  588. token.Register(d.Dispose);
  589. return tcs.Task;
  590. }
  591. [Fact]
  592. public void CreateAsync_Cancel1()
  593. {
  594. RunSynchronously(() =>
  595. {
  596. var scheduler = new TestScheduler();
  597. var res = scheduler.Start(() =>
  598. Observable.Create<int>((observer, token) => Producer7(observer, token, scheduler)),
  599. 650
  600. );
  601. res.Messages.AssertEqual(
  602. OnNext(300, 1),
  603. OnNext(400, 2),
  604. OnNext(500, 3),
  605. OnNext(600, 4)
  606. );
  607. });
  608. }
  609. Task Producer8(IObserver<int> results, CancellationToken token, IScheduler scheduler)
  610. {
  611. var tcs = new TaskCompletionSource<object>();
  612. var x = 0;
  613. var d = scheduler.Schedule(TimeSpan.FromTicks(100), self =>
  614. {
  615. if (x == 4)
  616. {
  617. results.OnCompleted();
  618. }
  619. results.OnNext(++x);
  620. self(TimeSpan.FromTicks(100));
  621. });
  622. token.Register(d.Dispose);
  623. return tcs.Task;
  624. }
  625. [Fact]
  626. public void CreateAsync_Cancel2()
  627. {
  628. RunSynchronously(() =>
  629. {
  630. var scheduler = new TestScheduler();
  631. var res = scheduler.Start(() =>
  632. Observable.Create<int>((observer, token) => Producer8(observer, token, scheduler)),
  633. 650
  634. );
  635. res.Messages.AssertEqual(
  636. OnNext(300, 1),
  637. OnNext(400, 2),
  638. OnNext(500, 3),
  639. OnNext(600, 4)
  640. );
  641. });
  642. }
  643. Task Producer9(IObserver<int> results, CancellationToken token, IScheduler scheduler)
  644. {
  645. var tcs = new TaskCompletionSource<object>();
  646. var x = 0;
  647. var d = scheduler.Schedule(TimeSpan.FromTicks(100), self =>
  648. {
  649. if (x == 4)
  650. {
  651. results.OnCompleted();
  652. }
  653. results.OnNext(++x);
  654. self(TimeSpan.FromTicks(100));
  655. });
  656. token.Register(d.Dispose);
  657. return tcs.Task;
  658. }
  659. [Fact]
  660. public void CreateAsync_Cancel3()
  661. {
  662. RunSynchronously(() =>
  663. {
  664. var scheduler = new TestScheduler();
  665. var res = scheduler.Start(() =>
  666. Observable.Create<int>((observer, token) => Producer9(observer, token, scheduler)),
  667. 750
  668. );
  669. res.Messages.AssertEqual(
  670. OnNext(300, 1),
  671. OnNext(400, 2),
  672. OnNext(500, 3),
  673. OnNext(600, 4),
  674. OnCompleted<int>(700)
  675. );
  676. });
  677. }
  678. Task Producer10(IObserver<int> results, CancellationToken token, IScheduler scheduler)
  679. {
  680. var tcs = new TaskCompletionSource<object>();
  681. var x = 0;
  682. var d = scheduler.Schedule(TimeSpan.FromTicks(100), self =>
  683. {
  684. if (x == 4)
  685. {
  686. tcs.SetCanceled();
  687. }
  688. results.OnNext(++x);
  689. self(TimeSpan.FromTicks(100));
  690. });
  691. token.Register(d.Dispose);
  692. return tcs.Task;
  693. }
  694. [Fact]
  695. public void CreateAsync_Cancel4()
  696. {
  697. RunSynchronously(() =>
  698. {
  699. var scheduler = new TestScheduler();
  700. var res = scheduler.Start(() =>
  701. Observable.Create<int>((observer, token) => Producer10(observer, token, scheduler))
  702. );
  703. res.Messages.Take(4).AssertEqual(
  704. OnNext(300, 1),
  705. OnNext(400, 2),
  706. OnNext(500, 3),
  707. OnNext(600, 4)
  708. );
  709. Assert.Equal(5, res.Messages.Count);
  710. Assert.Equal(700, res.Messages[4].Time);
  711. Assert.Equal(NotificationKind.OnError, res.Messages[4].Value.Kind);
  712. Assert.True(res.Messages[4].Value.Exception is OperationCanceledException);
  713. });
  714. }
  715. void RunSynchronously(Action action)
  716. {
  717. var t = new Task(action);
  718. t.RunSynchronously(new SynchronousScheduler());
  719. t.Wait();
  720. }
  721. class SynchronousScheduler : TaskScheduler
  722. {
  723. protected override IEnumerable<Task> GetScheduledTasks()
  724. {
  725. throw new NotImplementedException();
  726. }
  727. protected override void QueueTask(Task task)
  728. {
  729. TryExecuteTask(task);
  730. }
  731. protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
  732. {
  733. return TryExecuteTask(task);
  734. }
  735. }
  736. [Fact]
  737. public void CreateAsync_Task_Simple()
  738. {
  739. var xs = Observable.Create<int>(observer =>
  740. {
  741. return Task.Factory.StartNew(() =>
  742. {
  743. observer.OnNext(42);
  744. observer.OnCompleted();
  745. });
  746. });
  747. var lst = new List<int>();
  748. xs.ForEach(lst.Add);
  749. Assert.True(new[] { 42 }.SequenceEqual(lst));
  750. }
  751. [Fact]
  752. public void CreateAsync_Task_Token()
  753. {
  754. var e = new ManualResetEvent(false);
  755. var xs = Observable.Create<int>((observer, ct) =>
  756. {
  757. return Task.Factory.StartNew(() =>
  758. {
  759. var i = 0;
  760. while (!ct.IsCancellationRequested)
  761. {
  762. if (i++ == 10)
  763. e.Set();
  764. observer.OnNext(42);
  765. }
  766. });
  767. });
  768. var lst = new List<int>();
  769. var d = xs.Subscribe(lst.Add);
  770. e.WaitOne();
  771. d.Dispose();
  772. Assert.True(lst.Take(10).SequenceEqual(Enumerable.Repeat(42, 10)));
  773. }
  774. [Fact]
  775. public void CreateAsync_IDisposable_Simple()
  776. {
  777. var stopped = new ManualResetEvent(false);
  778. var s = Disposable.Create(() => stopped.Set());
  779. var xs = Observable.Create<int>(observer =>
  780. {
  781. return Task.Factory.StartNew(() =>
  782. {
  783. observer.OnNext(42);
  784. observer.OnCompleted();
  785. return s;
  786. });
  787. });
  788. var lst = new List<int>();
  789. xs.ForEach(lst.Add);
  790. stopped.WaitOne();
  791. Assert.True(new[] { 42 }.SequenceEqual(lst));
  792. }
  793. [Fact]
  794. public void CreateAsync_IDisposable_Token()
  795. {
  796. var stopped = new ManualResetEvent(false);
  797. var s = Disposable.Create(() => stopped.Set());
  798. var e = new ManualResetEvent(false);
  799. var xs = Observable.Create<int>((observer, ct) =>
  800. {
  801. return Task.Factory.StartNew(() =>
  802. {
  803. var i = 0;
  804. while (!ct.IsCancellationRequested)
  805. {
  806. if (i++ == 10)
  807. e.Set();
  808. observer.OnNext(42);
  809. }
  810. return s;
  811. });
  812. });
  813. var lst = new List<int>();
  814. var d = xs.Subscribe(lst.Add);
  815. e.WaitOne();
  816. d.Dispose();
  817. stopped.WaitOne();
  818. Assert.True(lst.Take(10).SequenceEqual(Enumerable.Repeat(42, 10)));
  819. }
  820. [Fact]
  821. public void CreateAsync_Action_Simple()
  822. {
  823. var stopped = new ManualResetEvent(false);
  824. var s = new Action(() => stopped.Set());
  825. var xs = Observable.Create<int>(observer =>
  826. {
  827. return Task.Factory.StartNew(() =>
  828. {
  829. observer.OnNext(42);
  830. observer.OnCompleted();
  831. return s;
  832. });
  833. });
  834. var lst = new List<int>();
  835. xs.ForEach(lst.Add);
  836. stopped.WaitOne();
  837. Assert.True(new[] { 42 }.SequenceEqual(lst));
  838. }
  839. [Fact]
  840. public void CreateAsync_Action_Token()
  841. {
  842. var stopped = new ManualResetEvent(false);
  843. var s = new Action(() => stopped.Set());
  844. var e = new ManualResetEvent(false);
  845. var xs = Observable.Create<int>((observer, ct) =>
  846. {
  847. return Task.Factory.StartNew(() =>
  848. {
  849. var i = 0;
  850. while (!ct.IsCancellationRequested)
  851. {
  852. if (i++ == 10)
  853. e.Set();
  854. observer.OnNext(42);
  855. }
  856. return s;
  857. });
  858. });
  859. var lst = new List<int>();
  860. var d = xs.Subscribe(lst.Add);
  861. e.WaitOne();
  862. d.Dispose();
  863. stopped.WaitOne();
  864. Assert.True(lst.Take(10).SequenceEqual(Enumerable.Repeat(42, 10)));
  865. }
  866. #endif
  867. #endregion
  868. #region + Defer +
  869. [Fact]
  870. public void Defer_ArgumentChecking()
  871. {
  872. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Defer<int>(default(Func<IObservable<int>>)));
  873. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Defer(() => DummyObservable<int>.Instance).Subscribe(null));
  874. ReactiveAssert.Throws</*some*/Exception>(() => Observable.Defer<int>(() => default(IObservable<int>)).Subscribe());
  875. }
  876. [Fact]
  877. public void Defer_Complete()
  878. {
  879. var scheduler = new TestScheduler();
  880. var invoked = 0;
  881. var xs = default(ITestableObservable<long>);
  882. var res = scheduler.Start(() =>
  883. Observable.Defer(() =>
  884. {
  885. invoked++;
  886. xs = scheduler.CreateColdObservable(
  887. OnNext<long>(100, scheduler.Clock),
  888. OnCompleted<long>(200));
  889. return xs;
  890. })
  891. );
  892. res.Messages.AssertEqual(
  893. OnNext(300, 200L),
  894. OnCompleted<long>(400)
  895. );
  896. Assert.Equal(1, invoked);
  897. xs.Subscriptions.AssertEqual(
  898. Subscribe(200, 400)
  899. );
  900. }
  901. [Fact]
  902. public void Defer_Error()
  903. {
  904. var scheduler = new TestScheduler();
  905. var invoked = 0;
  906. var xs = default(ITestableObservable<long>);
  907. var ex = new Exception();
  908. var res = scheduler.Start(() =>
  909. Observable.Defer(() =>
  910. {
  911. invoked++;
  912. xs = scheduler.CreateColdObservable(
  913. OnNext<long>(100, scheduler.Clock),
  914. OnError<long>(200, ex));
  915. return xs;
  916. })
  917. );
  918. res.Messages.AssertEqual(
  919. OnNext(300, 200L),
  920. OnError<long>(400, ex)
  921. );
  922. Assert.Equal(1, invoked);
  923. xs.Subscriptions.AssertEqual(
  924. Subscribe(200, 400)
  925. );
  926. }
  927. [Fact]
  928. public void Defer_Dispose()
  929. {
  930. var scheduler = new TestScheduler();
  931. var invoked = 0;
  932. var xs = default(ITestableObservable<long>);
  933. var res = scheduler.Start(() =>
  934. Observable.Defer(() =>
  935. {
  936. invoked++;
  937. xs = scheduler.CreateColdObservable(
  938. OnNext<long>(100, scheduler.Clock),
  939. OnNext<long>(200, invoked),
  940. OnNext<long>(1100, 1000));
  941. return xs;
  942. })
  943. );
  944. res.Messages.AssertEqual(
  945. OnNext(300, 200L),
  946. OnNext(400, 1L)
  947. );
  948. Assert.Equal(1, invoked);
  949. xs.Subscriptions.AssertEqual(
  950. Subscribe(200, 1000)
  951. );
  952. }
  953. [Fact]
  954. public void Defer_Throw()
  955. {
  956. var scheduler = new TestScheduler();
  957. var invoked = 0;
  958. var ex = new Exception();
  959. var res = scheduler.Start(() =>
  960. Observable.Defer<int>(new Func<IObservable<int>>(() =>
  961. {
  962. invoked++;
  963. throw ex;
  964. }))
  965. );
  966. res.Messages.AssertEqual(
  967. OnError<int>(200, ex)
  968. );
  969. Assert.Equal(1, invoked);
  970. }
  971. #endregion
  972. #region - DeferAsync -
  973. #if !NO_TPL
  974. [Fact]
  975. public void DeferAsync_ArgumentChecking()
  976. {
  977. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Defer(default(Func<Task<IObservable<int>>>)));
  978. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.DeferAsync(default(Func<CancellationToken, Task<IObservable<int>>>)));
  979. }
  980. [Fact]
  981. public void DeferAsync_Simple()
  982. {
  983. var xs = Observable.Defer<int>(() => Task.Factory.StartNew(() => Observable.Return(42)));
  984. var res = xs.ToEnumerable().ToList();
  985. Assert.True(new[] { 42 }.SequenceEqual(res));
  986. }
  987. [Fact]
  988. public void DeferAsync_WithCancel_Simple()
  989. {
  990. var xs = Observable.DeferAsync<int>(ct => Task.Factory.StartNew(() => Observable.Return(42)));
  991. var res = xs.ToEnumerable().ToList();
  992. Assert.True(new[] { 42 }.SequenceEqual(res));
  993. }
  994. [Fact]
  995. public void DeferAsync_WithCancel_Cancel()
  996. {
  997. var N = 10;// 0000;
  998. for (int i = 0; i < N; i++)
  999. {
  1000. var e = new ManualResetEvent(false);
  1001. var called = false;
  1002. var xs = Observable.DeferAsync<int>(ct => Task.Factory.StartNew(() =>
  1003. {
  1004. e.Set();
  1005. while (!ct.IsCancellationRequested)
  1006. ;
  1007. return Observable.Defer(() => { called = true; return Observable.Return(42); });
  1008. }));
  1009. var d = xs.Subscribe(_ => { });
  1010. e.WaitOne();
  1011. d.Dispose();
  1012. Assert.False(called);
  1013. }
  1014. }
  1015. #endif
  1016. #endregion
  1017. #region + Empty +
  1018. [Fact]
  1019. public void Empty_ArgumentChecking()
  1020. {
  1021. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Empty<int>(null));
  1022. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Empty<int>(null, 42));
  1023. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Empty<int>(DummyScheduler.Instance).Subscribe(null));
  1024. }
  1025. [Fact]
  1026. public void Empty_Basic()
  1027. {
  1028. var scheduler = new TestScheduler();
  1029. var res = scheduler.Start(() =>
  1030. Observable.Empty<int>(scheduler)
  1031. );
  1032. res.Messages.AssertEqual(
  1033. OnCompleted<int>(201)
  1034. );
  1035. }
  1036. [Fact]
  1037. public void Empty_Disposed()
  1038. {
  1039. var scheduler = new TestScheduler();
  1040. var res = scheduler.Start(() =>
  1041. Observable.Empty<int>(scheduler),
  1042. 200
  1043. );
  1044. res.Messages.AssertEqual(
  1045. );
  1046. }
  1047. [Fact]
  1048. public void Empty_ObserverThrows()
  1049. {
  1050. var scheduler1 = new TestScheduler();
  1051. var xs = Observable.Empty<int>(scheduler1);
  1052. xs.Subscribe(x => { }, exception => { }, () => { throw new InvalidOperationException(); });
  1053. ReactiveAssert.Throws<InvalidOperationException>(() => scheduler1.Start());
  1054. }
  1055. [Fact]
  1056. public void Empty_DefaultScheduler()
  1057. {
  1058. Observable.Empty<int>().AssertEqual(Observable.Empty<int>(DefaultScheduler.Instance));
  1059. }
  1060. [Fact]
  1061. public void Empty_Basic_Witness1()
  1062. {
  1063. var scheduler = new TestScheduler();
  1064. var res = scheduler.Start(() =>
  1065. Observable.Empty<int>(scheduler, 42)
  1066. );
  1067. res.Messages.AssertEqual(
  1068. OnCompleted<int>(201)
  1069. );
  1070. }
  1071. [Fact]
  1072. public void Empty_Basic_Witness2()
  1073. {
  1074. var e = new ManualResetEvent(false);
  1075. Observable.Empty<int>(42).Subscribe(
  1076. _ => { Assert.True(false); },
  1077. _ => { Assert.True(false); },
  1078. () => e.Set()
  1079. );
  1080. e.WaitOne();
  1081. }
  1082. #endregion
  1083. #region + Generate +
  1084. [Fact]
  1085. public void Generate_ArgumentChecking()
  1086. {
  1087. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Generate(0, DummyFunc<int, bool>.Instance, DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, (IScheduler)null));
  1088. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Generate(0, (Func<int, bool>)null, DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, DummyScheduler.Instance));
  1089. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Generate(0, DummyFunc<int, bool>.Instance, DummyFunc<int, int>.Instance, (Func<int, int>)null, DummyScheduler.Instance));
  1090. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Generate(0, DummyFunc<int, bool>.Instance, (Func<int, int>)null, DummyFunc<int, int>.Instance, DummyScheduler.Instance));
  1091. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Generate(0, DummyFunc<int, bool>.Instance, DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, DummyScheduler.Instance).Subscribe(null));
  1092. }
  1093. [Fact]
  1094. public void Generate_Finite()
  1095. {
  1096. var scheduler = new TestScheduler();
  1097. var res = scheduler.Start(() =>
  1098. Observable.Generate(0, x => x <= 3, x => x + 1, x => x, scheduler)
  1099. );
  1100. res.Messages.AssertEqual(
  1101. OnNext(201, 0),
  1102. OnNext(202, 1),
  1103. OnNext(203, 2),
  1104. OnNext(204, 3),
  1105. OnCompleted<int>(205)
  1106. );
  1107. }
  1108. [Fact]
  1109. public void Generate_Throw_Condition()
  1110. {
  1111. var scheduler = new TestScheduler();
  1112. var ex = new Exception();
  1113. var res = scheduler.Start(() =>
  1114. Observable.Generate(0, new Func<int, bool>(x => { throw ex; }), x => x + 1, x => x, scheduler)
  1115. );
  1116. res.Messages.AssertEqual(
  1117. OnError<int>(201, ex)
  1118. );
  1119. }
  1120. [Fact]
  1121. public void Generate_Throw_ResultSelector()
  1122. {
  1123. var scheduler = new TestScheduler();
  1124. var ex = new Exception();
  1125. var res = scheduler.Start(() =>
  1126. Observable.Generate(0, x => true, x => x + 1, new Func<int, int>(x => { throw ex; }), scheduler)
  1127. );
  1128. res.Messages.AssertEqual(
  1129. OnError<int>(201, ex)
  1130. );
  1131. }
  1132. [Fact]
  1133. public void Generate_Throw_Iterate()
  1134. {
  1135. var scheduler = new TestScheduler();
  1136. var ex = new Exception();
  1137. var res = scheduler.Start(() =>
  1138. Observable.Generate(0, x => true, new Func<int, int>(x => { throw ex; }), x => x, scheduler)
  1139. );
  1140. res.Messages.AssertEqual(
  1141. OnNext(201, 0),
  1142. OnError<int>(202, ex)
  1143. );
  1144. }
  1145. [Fact]
  1146. public void Generate_Dispose()
  1147. {
  1148. var scheduler = new TestScheduler();
  1149. var res = scheduler.Start(() =>
  1150. Observable.Generate(0, x => true, x => x + 1, x => x, scheduler),
  1151. 203
  1152. );
  1153. res.Messages.AssertEqual(
  1154. OnNext(201, 0),
  1155. OnNext(202, 1)
  1156. );
  1157. }
  1158. [Fact]
  1159. public void Generate_DefaultScheduler_ArgumentChecking()
  1160. {
  1161. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Generate(0, (Func<int, bool>)null, DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance));
  1162. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Generate(0, DummyFunc<int, bool>.Instance, DummyFunc<int, int>.Instance, (Func<int, int>)null));
  1163. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Generate(0, DummyFunc<int, bool>.Instance, (Func<int, int>)null, DummyFunc<int, int>.Instance));
  1164. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Generate(0, DummyFunc<int, bool>.Instance, DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance).Subscribe(null));
  1165. }
  1166. [Fact]
  1167. public void Generate_DefaultScheduler()
  1168. {
  1169. Observable.Generate(0, x => x < 10, x => x + 1, x => x).AssertEqual(Observable.Generate(0, x => x < 10, x => x + 1, x => x, DefaultScheduler.Instance));
  1170. }
  1171. #if !NO_PERF
  1172. [Fact]
  1173. public void Generate_LongRunning1()
  1174. {
  1175. var start = default(ManualResetEvent);
  1176. var end = default(ManualResetEvent);
  1177. var s = new TestLongRunningScheduler(x => start = x, x => end = x);
  1178. var xs = Observable.Generate(0, x => x < 100, x => x + 1, x => x, s);
  1179. var lst = new List<int>();
  1180. var done = false;
  1181. xs.Subscribe(x => { lst.Add(x); }, () => done = true);
  1182. end.WaitOne();
  1183. Assert.True(lst.SequenceEqual(Enumerable.Range(0, 100)));
  1184. Assert.True(done);
  1185. }
  1186. [Fact]
  1187. public void Generate_LongRunning2()
  1188. {
  1189. var start = default(ManualResetEvent);
  1190. var end = default(ManualResetEvent);
  1191. var s = new TestLongRunningScheduler(x => start = x, x => end = x);
  1192. var xs = Observable.Generate(0, _ => true, x => x + 1, x => x, s);
  1193. var lst = new List<int>();
  1194. var d = xs.Subscribe(x => { lst.Add(x); });
  1195. start.WaitOne();
  1196. while (lst.Count < 100)
  1197. ;
  1198. d.Dispose();
  1199. end.WaitOne();
  1200. Assert.True(lst.Take(100).SequenceEqual(Enumerable.Range(0, 100)));
  1201. }
  1202. [Fact]
  1203. public void Generate_LongRunning_Throw()
  1204. {
  1205. var start = default(ManualResetEvent);
  1206. var end = default(ManualResetEvent);
  1207. var s = new TestLongRunningScheduler(x => start = x, x => end = x);
  1208. var ex = new Exception();
  1209. var xs = Observable.Generate(0, x => { if (x < 100) return true; throw ex; }, x => x + 1, x => x, s);
  1210. var lst = new List<int>();
  1211. var e = default(Exception);
  1212. var done = false;
  1213. xs.Subscribe(x => { lst.Add(x); }, e_ => e = e_, () => done = true);
  1214. end.WaitOne();
  1215. Assert.True(lst.SequenceEqual(Enumerable.Range(0, 100)));
  1216. Assert.Same(ex, e);
  1217. Assert.False(done);
  1218. }
  1219. #endif
  1220. #endregion
  1221. #region + Never +
  1222. [Fact]
  1223. public void Never_ArgumentChecking()
  1224. {
  1225. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Never<int>().Subscribe(null));
  1226. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Never<int>(42).Subscribe(null));
  1227. }
  1228. [Fact]
  1229. public void Never_Basic()
  1230. {
  1231. var scheduler = new TestScheduler();
  1232. var xs = Observable.Never<int>();
  1233. var res = scheduler.CreateObserver<int>();
  1234. xs.Subscribe(res);
  1235. scheduler.Start();
  1236. res.Messages.AssertEqual(
  1237. );
  1238. }
  1239. [Fact]
  1240. public void Never_Basic_Witness()
  1241. {
  1242. var scheduler = new TestScheduler();
  1243. var xs = Observable.Never<int>(42);
  1244. var res = scheduler.CreateObserver<int>();
  1245. xs.Subscribe(res);
  1246. scheduler.Start();
  1247. res.Messages.AssertEqual(
  1248. );
  1249. }
  1250. #endregion
  1251. #region + Range +
  1252. [Fact]
  1253. public void Range_ArgumentChecking()
  1254. {
  1255. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Range(0, 0, null));
  1256. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Range(0, -1, DummyScheduler.Instance));
  1257. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Range(int.MaxValue, 2, DummyScheduler.Instance));
  1258. }
  1259. [Fact]
  1260. public void Range_Zero()
  1261. {
  1262. var scheduler = new TestScheduler();
  1263. var res = scheduler.Start(() =>
  1264. Observable.Range(0, 0, scheduler)
  1265. );
  1266. res.Messages.AssertEqual(
  1267. OnCompleted<int>(201)
  1268. );
  1269. }
  1270. [Fact]
  1271. public void Range_One()
  1272. {
  1273. var scheduler = new TestScheduler();
  1274. var res = scheduler.Start(() =>
  1275. Observable.Range(0, 1, scheduler)
  1276. );
  1277. res.Messages.AssertEqual(
  1278. OnNext(201, 0),
  1279. OnCompleted<int>(202)
  1280. );
  1281. }
  1282. [Fact]
  1283. public void Range_Five()
  1284. {
  1285. var scheduler = new TestScheduler();
  1286. var res = scheduler.Start(() =>
  1287. Observable.Range(10, 5, scheduler)
  1288. );
  1289. res.Messages.AssertEqual(
  1290. OnNext(201, 10),
  1291. OnNext(202, 11),
  1292. OnNext(203, 12),
  1293. OnNext(204, 13),
  1294. OnNext(205, 14),
  1295. OnCompleted<int>(206)
  1296. );
  1297. }
  1298. [Fact]
  1299. public void Range_Boundaries()
  1300. {
  1301. var scheduler = new TestScheduler();
  1302. var res = scheduler.Start(() =>
  1303. Observable.Range(int.MaxValue, 1, scheduler)
  1304. );
  1305. res.Messages.AssertEqual(
  1306. OnNext(201, int.MaxValue),
  1307. OnCompleted<int>(202)
  1308. );
  1309. }
  1310. [Fact]
  1311. public void Range_Dispose()
  1312. {
  1313. var scheduler = new TestScheduler();
  1314. var res = scheduler.Start(() =>
  1315. Observable.Range(-10, 5, scheduler),
  1316. 204
  1317. );
  1318. res.Messages.AssertEqual(
  1319. OnNext(201, -10),
  1320. OnNext(202, -9),
  1321. OnNext(203, -8)
  1322. );
  1323. }
  1324. [Fact]
  1325. public void Range_Default_ArgumentChecking()
  1326. {
  1327. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Range(0, -1));
  1328. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Range(int.MaxValue, 2));
  1329. }
  1330. [Fact]
  1331. public void Range_Default()
  1332. {
  1333. for (int i = 0; i < 100; i++)
  1334. Observable.Range(100, 100).AssertEqual(Observable.Range(100, 100, DefaultScheduler.Instance));
  1335. }
  1336. #if !NO_PERF
  1337. [Fact]
  1338. public void Range_LongRunning1()
  1339. {
  1340. var start = default(ManualResetEvent);
  1341. var end = default(ManualResetEvent);
  1342. var s = new TestLongRunningScheduler(x => start = x, x => end = x);
  1343. var xs = Observable.Range(0, 100, s);
  1344. var lst = new List<int>();
  1345. var done = false;
  1346. xs.Subscribe(x => { lst.Add(x); }, () => done = true);
  1347. end.WaitOne();
  1348. Assert.True(lst.SequenceEqual(Enumerable.Range(0, 100)));
  1349. Assert.True(done);
  1350. }
  1351. [Fact]
  1352. public void Range_LongRunning2()
  1353. {
  1354. var start = default(ManualResetEvent);
  1355. var end = default(ManualResetEvent);
  1356. var s = new TestLongRunningScheduler(x => start = x, x => end = x);
  1357. var xs = Observable.Range(0, int.MaxValue, s);
  1358. var lst = new List<int>();
  1359. var d = xs.Subscribe(x => { lst.Add(x); });
  1360. start.WaitOne();
  1361. while (lst.Count < 100)
  1362. ;
  1363. d.Dispose();
  1364. end.WaitOne();
  1365. Assert.True(true);
  1366. }
  1367. [Fact]
  1368. public void Range_LongRunning_Empty()
  1369. {
  1370. var start = default(ManualResetEvent);
  1371. var end = default(ManualResetEvent);
  1372. var scheduler = new TestLongRunningScheduler(x => start = x, x => end = x);
  1373. var xs = Observable.Range(5, 0, scheduler);
  1374. var lst = new List<int>();
  1375. xs.ForEach(lst.Add);
  1376. Assert.True(lst.SequenceEqual(Enumerable.Range(5, 0)));
  1377. }
  1378. [Fact]
  1379. public void Range_LongRunning_Regular()
  1380. {
  1381. var start = default(ManualResetEvent);
  1382. var end = default(ManualResetEvent);
  1383. var scheduler = new TestLongRunningScheduler(x => start = x, x => end = x);
  1384. var xs = Observable.Range(5, 17, scheduler);
  1385. var lst = new List<int>();
  1386. xs.ForEach(lst.Add);
  1387. Assert.True(lst.SequenceEqual(Enumerable.Range(5, 17)));
  1388. }
  1389. [Fact]
  1390. public void Range_LongRunning_Boundaries()
  1391. {
  1392. var start = default(ManualResetEvent);
  1393. var end = default(ManualResetEvent);
  1394. var scheduler = new TestLongRunningScheduler(x => start = x, x => end = x);
  1395. var xs = Observable.Range(int.MaxValue, 1, scheduler);
  1396. var lst = new List<int>();
  1397. xs.ForEach(lst.Add);
  1398. Assert.True(lst.SequenceEqual(Enumerable.Range(int.MaxValue, 1)));
  1399. }
  1400. #endif
  1401. #endregion
  1402. #region + Repeat +
  1403. [Fact]
  1404. public void Repeat_Value_Count_ArgumentChecking()
  1405. {
  1406. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Repeat(1, 0, default(IScheduler)));
  1407. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Repeat(1, -1, DummyScheduler.Instance));
  1408. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Repeat(1, 1, DummyScheduler.Instance).Subscribe(null));
  1409. }
  1410. [Fact]
  1411. public void Repeat_Value_Count_Zero()
  1412. {
  1413. var scheduler = new TestScheduler();
  1414. var res = scheduler.Start(() =>
  1415. Observable.Repeat(42, 0, scheduler)
  1416. );
  1417. #if !NO_PERF
  1418. res.Messages.AssertEqual(
  1419. OnCompleted<int>(201)
  1420. );
  1421. #else
  1422. res.Messages.AssertEqual(
  1423. OnCompleted<int>(200)
  1424. );
  1425. #endif
  1426. }
  1427. [Fact]
  1428. public void Repeat_Value_Count_One()
  1429. {
  1430. var scheduler = new TestScheduler();
  1431. var res = scheduler.Start(() =>
  1432. Observable.Repeat(42, 1, scheduler)
  1433. );
  1434. res.Messages.AssertEqual(
  1435. OnNext(201, 42),
  1436. OnCompleted<int>(201)
  1437. );
  1438. }
  1439. [Fact]
  1440. public void Repeat_Value_Count_Ten()
  1441. {
  1442. var scheduler = new TestScheduler();
  1443. var res = scheduler.Start(() =>
  1444. Observable.Repeat(42, 10, scheduler)
  1445. );
  1446. res.Messages.AssertEqual(
  1447. OnNext(201, 42),
  1448. OnNext(202, 42),
  1449. OnNext(203, 42),
  1450. OnNext(204, 42),
  1451. OnNext(205, 42),
  1452. OnNext(206, 42),
  1453. OnNext(207, 42),
  1454. OnNext(208, 42),
  1455. OnNext(209, 42),
  1456. OnNext(210, 42),
  1457. OnCompleted<int>(210)
  1458. );
  1459. }
  1460. [Fact]
  1461. public void Repeat_Value_Count_Dispose()
  1462. {
  1463. var scheduler = new TestScheduler();
  1464. var res = scheduler.Start(() =>
  1465. Observable.Repeat(42, 10, scheduler),
  1466. 207
  1467. );
  1468. res.Messages.AssertEqual(
  1469. OnNext(201, 42),
  1470. OnNext(202, 42),
  1471. OnNext(203, 42),
  1472. OnNext(204, 42),
  1473. OnNext(205, 42),
  1474. OnNext(206, 42)
  1475. );
  1476. }
  1477. [Fact]
  1478. public void Repeat_Value_Count_Default_ArgumentChecking()
  1479. {
  1480. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Repeat(1, -1));
  1481. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Repeat(1, 1).Subscribe(null));
  1482. }
  1483. [Fact]
  1484. public void Repeat_Value_Count_Default()
  1485. {
  1486. Observable.Repeat(42, 10).AssertEqual(Observable.Repeat(42, 10, DefaultScheduler.Instance));
  1487. }
  1488. [Fact]
  1489. public void Repeat_Value_ArgumentChecking()
  1490. {
  1491. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Repeat(1, (IScheduler)null));
  1492. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Repeat(DummyScheduler.Instance, 1).Subscribe(null));
  1493. }
  1494. [Fact]
  1495. public void Repeat_Value()
  1496. {
  1497. var scheduler = new TestScheduler();
  1498. var res = scheduler.Start(() =>
  1499. Observable.Repeat(42, scheduler),
  1500. 207
  1501. );
  1502. res.Messages.AssertEqual(
  1503. OnNext(201, 42),
  1504. OnNext(202, 42),
  1505. OnNext(203, 42),
  1506. OnNext(204, 42),
  1507. OnNext(205, 42),
  1508. OnNext(206, 42)
  1509. );
  1510. }
  1511. [Fact]
  1512. public void Repeat_Value_Default_ArgumentChecking()
  1513. {
  1514. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Repeat(1).Subscribe(null));
  1515. }
  1516. [Fact]
  1517. public void Repeat_Value_Default()
  1518. {
  1519. Observable.Repeat(42).Take(100).AssertEqual(Observable.Repeat(42, DefaultScheduler.Instance).Take(100));
  1520. }
  1521. #if !NO_PERF
  1522. [Fact]
  1523. public void Repeat_Count_LongRunning1()
  1524. {
  1525. var start = default(ManualResetEvent);
  1526. var end = default(ManualResetEvent);
  1527. var s = new TestLongRunningScheduler(x => start = x, x => end = x);
  1528. var xs = Observable.Repeat(42, 100, s);
  1529. var lst = new List<int>();
  1530. var done = false;
  1531. xs.Subscribe(x => { lst.Add(x); }, () => done = true);
  1532. end.WaitOne();
  1533. Assert.True(lst.SequenceEqual(Enumerable.Repeat(42, 100)));
  1534. Assert.True(done);
  1535. }
  1536. [Fact]
  1537. public void Repeat_Count_LongRunning2()
  1538. {
  1539. var start = default(ManualResetEvent);
  1540. var end = default(ManualResetEvent);
  1541. var s = new TestLongRunningScheduler(x => start = x, x => end = x);
  1542. var xs = Observable.Repeat(42, int.MaxValue, s);
  1543. var lst = new List<int>();
  1544. var d = xs.Subscribe(x => { lst.Add(x); });
  1545. start.WaitOne();
  1546. while (lst.Count < 100)
  1547. ;
  1548. d.Dispose();
  1549. end.WaitOne();
  1550. Assert.True(true);
  1551. }
  1552. [Fact]
  1553. public void Repeat_Inf_LongRunning()
  1554. {
  1555. var start = default(ManualResetEvent);
  1556. var end = default(ManualResetEvent);
  1557. var s = new TestLongRunningScheduler(x => start = x, x => end = x);
  1558. var xs = Observable.Repeat(42, s);
  1559. var lst = new List<int>();
  1560. var d = xs.Subscribe(x => { lst.Add(x); });
  1561. start.WaitOne();
  1562. while (lst.Count < 100)
  1563. ;
  1564. d.Dispose();
  1565. end.WaitOne();
  1566. Assert.True(true);
  1567. }
  1568. #endif
  1569. #endregion
  1570. #region + Return +
  1571. [Fact]
  1572. public void Return_ArgumentChecking()
  1573. {
  1574. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Return(0, null));
  1575. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Return(0, DummyScheduler.Instance).Subscribe(null));
  1576. }
  1577. [Fact]
  1578. public void Return_Basic()
  1579. {
  1580. var scheduler = new TestScheduler();
  1581. var res = scheduler.Start(() =>
  1582. Observable.Return(42, scheduler)
  1583. );
  1584. res.Messages.AssertEqual(
  1585. OnNext(201, 42),
  1586. OnCompleted<int>(201)
  1587. );
  1588. }
  1589. [Fact]
  1590. public void Return_Disposed()
  1591. {
  1592. var scheduler = new TestScheduler();
  1593. var res = scheduler.Start(() =>
  1594. Observable.Return(42, scheduler),
  1595. 200
  1596. );
  1597. res.Messages.AssertEqual(
  1598. );
  1599. }
  1600. [Fact]
  1601. public void Return_DisposedAfterNext()
  1602. {
  1603. var scheduler = new TestScheduler();
  1604. var d = new SerialDisposable();
  1605. var xs = Observable.Return(42, scheduler);
  1606. var res = scheduler.CreateObserver<int>();
  1607. scheduler.ScheduleAbsolute(100, () =>
  1608. d.Disposable = xs.Subscribe(
  1609. x =>
  1610. {
  1611. d.Dispose();
  1612. res.OnNext(x);
  1613. },
  1614. res.OnError,
  1615. res.OnCompleted
  1616. )
  1617. );
  1618. scheduler.Start();
  1619. res.Messages.AssertEqual(
  1620. OnNext(101, 42)
  1621. );
  1622. }
  1623. [Fact]
  1624. public void Return_ObserverThrows()
  1625. {
  1626. var scheduler1 = new TestScheduler();
  1627. var xs = Observable.Return(1, scheduler1);
  1628. xs.Subscribe(x => { throw new InvalidOperationException(); });
  1629. ReactiveAssert.Throws<InvalidOperationException>(() => scheduler1.Start());
  1630. var scheduler2 = new TestScheduler();
  1631. var ys = Observable.Return(1, scheduler2);
  1632. ys.Subscribe(x => { }, ex => { }, () => { throw new InvalidOperationException(); });
  1633. ReactiveAssert.Throws<InvalidOperationException>(() => scheduler2.Start());
  1634. }
  1635. [Fact]
  1636. public void Return_DefaultScheduler()
  1637. {
  1638. Observable.Return(42).AssertEqual(Observable.Return(42, DefaultScheduler.Instance));
  1639. }
  1640. #endregion
  1641. #region + Throw +
  1642. [Fact]
  1643. public void Throw_ArgumentChecking()
  1644. {
  1645. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Throw<int>(null));
  1646. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Throw<int>(null, 42));
  1647. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Throw<int>(new Exception(), null));
  1648. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Throw<int>(new Exception(), null, 42));
  1649. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Throw<int>(null, DummyScheduler.Instance));
  1650. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Throw<int>(null, DummyScheduler.Instance, 42));
  1651. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Throw<int>(new Exception(), DummyScheduler.Instance).Subscribe(null));
  1652. }
  1653. [Fact]
  1654. public void Throw_Basic()
  1655. {
  1656. var scheduler = new TestScheduler();
  1657. var ex = new Exception();
  1658. var res = scheduler.Start(() =>
  1659. Observable.Throw<int>(ex, scheduler)
  1660. );
  1661. res.Messages.AssertEqual(
  1662. OnError<int>(201, ex)
  1663. );
  1664. }
  1665. [Fact]
  1666. public void Throw_Disposed()
  1667. {
  1668. var scheduler = new TestScheduler();
  1669. var res = scheduler.Start(() =>
  1670. Observable.Throw<int>(new Exception(), scheduler),
  1671. 200
  1672. );
  1673. res.Messages.AssertEqual(
  1674. );
  1675. }
  1676. [Fact]
  1677. public void Throw_ObserverThrows()
  1678. {
  1679. var scheduler1 = new TestScheduler();
  1680. var xs = Observable.Throw<int>(new Exception(), scheduler1);
  1681. xs.Subscribe(x => { }, ex => { throw new InvalidOperationException(); }, () => { });
  1682. ReactiveAssert.Throws<InvalidOperationException>(() => scheduler1.Start());
  1683. }
  1684. [Fact]
  1685. public void Throw_DefaultScheduler()
  1686. {
  1687. var ex = new Exception();
  1688. Observable.Throw<int>(ex).AssertEqual(Observable.Throw<int>(ex, DefaultScheduler.Instance));
  1689. }
  1690. [Fact]
  1691. public void Throw_Witness_Basic1()
  1692. {
  1693. var scheduler = new TestScheduler();
  1694. var ex = new Exception();
  1695. var res = scheduler.Start(() =>
  1696. Observable.Throw<int>(ex, scheduler, 42)
  1697. );
  1698. res.Messages.AssertEqual(
  1699. OnError<int>(201, ex)
  1700. );
  1701. }
  1702. [Fact]
  1703. public void Throw_Witness_Basic2()
  1704. {
  1705. var e = new ManualResetEvent(false);
  1706. var ex = new Exception();
  1707. var res = default(Exception);
  1708. Observable.Throw<int>(ex, 42).Subscribe(
  1709. _ => { Assert.True(false); },
  1710. err => { res = err; e.Set(); },
  1711. () => { Assert.True(false); }
  1712. );
  1713. e.WaitOne();
  1714. Assert.Same(ex, res);
  1715. }
  1716. #endregion
  1717. #region + Using +
  1718. [Fact]
  1719. public void Using_ArgumentChecking()
  1720. {
  1721. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Using((Func<IDisposable>)null, DummyFunc<IDisposable, IObservable<int>>.Instance));
  1722. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Using(DummyFunc<IDisposable>.Instance, (Func<IDisposable, IObservable<int>>)null));
  1723. ReactiveAssert.Throws</*some*/Exception>(() => Observable.Using(() => DummyDisposable.Instance, d => default(IObservable<int>)).Subscribe());
  1724. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Using(() => DummyDisposable.Instance, d => DummyObservable<int>.Instance).Subscribe(null));
  1725. }
  1726. [Fact]
  1727. public void Using_Null()
  1728. {
  1729. var scheduler = new TestScheduler();
  1730. var disposeInvoked = 0L;
  1731. var createInvoked = 0L;
  1732. var xs = default(ITestableObservable<long>);
  1733. var disposable = default(MockDisposable);
  1734. var _d = default(MockDisposable);
  1735. var res = scheduler.Start(() =>
  1736. Observable.Using(
  1737. () =>
  1738. {
  1739. disposeInvoked++;
  1740. disposable = default(MockDisposable);
  1741. return disposable;
  1742. },
  1743. d =>
  1744. {
  1745. _d = d;
  1746. createInvoked++;
  1747. xs = scheduler.CreateColdObservable(
  1748. OnNext<long>(100, scheduler.Clock),
  1749. OnCompleted<long>(200));
  1750. return xs;
  1751. }
  1752. )
  1753. );
  1754. Assert.Same(disposable, _d);
  1755. res.Messages.AssertEqual(
  1756. OnNext(300, 200L),
  1757. OnCompleted<long>(400)
  1758. );
  1759. Assert.Equal(1, createInvoked);
  1760. Assert.Equal(1, disposeInvoked);
  1761. xs.Subscriptions.AssertEqual(
  1762. Subscribe(200, 400)
  1763. );
  1764. Assert.Null(disposable);
  1765. }
  1766. [Fact]
  1767. public void Using_Complete()
  1768. {
  1769. var scheduler = new TestScheduler();
  1770. var disposeInvoked = 0;
  1771. var createInvoked = 0;
  1772. var xs = default(ITestableObservable<long>);
  1773. var disposable = default(MockDisposable);
  1774. var _d = default(MockDisposable);
  1775. var res = scheduler.Start(() =>
  1776. Observable.Using(
  1777. () =>
  1778. {
  1779. disposeInvoked++;
  1780. disposable = new MockDisposable(scheduler);
  1781. return disposable;
  1782. },
  1783. d =>
  1784. {
  1785. _d = d;
  1786. createInvoked++;
  1787. xs = scheduler.CreateColdObservable(
  1788. OnNext<long>(100, scheduler.Clock),
  1789. OnCompleted<long>(200));
  1790. return xs;
  1791. }
  1792. )
  1793. );
  1794. Assert.Same(disposable, _d);
  1795. res.Messages.AssertEqual(
  1796. OnNext(300, 200L),
  1797. OnCompleted<long>(400)
  1798. );
  1799. Assert.Equal(1, createInvoked);
  1800. Assert.Equal(1, disposeInvoked);
  1801. xs.Subscriptions.AssertEqual(
  1802. Subscribe(200, 400)
  1803. );
  1804. disposable.AssertEqual(
  1805. 200,
  1806. 400
  1807. );
  1808. }
  1809. [Fact]
  1810. public void Using_Error()
  1811. {
  1812. var scheduler = new TestScheduler();
  1813. var disposeInvoked = 0;
  1814. var createInvoked = 0;
  1815. var xs = default(ITestableObservable<long>);
  1816. var disposable = default(MockDisposable);
  1817. var _d = default(MockDisposable);
  1818. var ex = new Exception();
  1819. var res = scheduler.Start(() =>
  1820. Observable.Using(
  1821. () =>
  1822. {
  1823. disposeInvoked++;
  1824. disposable = new MockDisposable(scheduler);
  1825. return disposable;
  1826. },
  1827. d =>
  1828. {
  1829. _d = d;
  1830. createInvoked++;
  1831. xs = scheduler.CreateColdObservable(
  1832. OnNext<long>(100, scheduler.Clock),
  1833. OnError<long>(200, ex));
  1834. return xs;
  1835. }
  1836. )
  1837. );
  1838. Assert.Same(disposable, _d);
  1839. res.Messages.AssertEqual(
  1840. OnNext(300, 200L),
  1841. OnError<long>(400, ex)
  1842. );
  1843. Assert.Equal(1, createInvoked);
  1844. Assert.Equal(1, disposeInvoked);
  1845. xs.Subscriptions.AssertEqual(
  1846. Subscribe(200, 400)
  1847. );
  1848. disposable.AssertEqual(
  1849. 200,
  1850. 400
  1851. );
  1852. }
  1853. [Fact]
  1854. public void Using_Dispose()
  1855. {
  1856. var scheduler = new TestScheduler();
  1857. var disposeInvoked = 0;
  1858. var createInvoked = 0;
  1859. var xs = default(ITestableObservable<long>);
  1860. var disposable = default(MockDisposable);
  1861. var _d = default(MockDisposable);
  1862. var res = scheduler.Start(() =>
  1863. Observable.Using(
  1864. () =>
  1865. {
  1866. disposeInvoked++;
  1867. disposable = new MockDisposable(scheduler);
  1868. return disposable;
  1869. },
  1870. d =>
  1871. {
  1872. _d = d;
  1873. createInvoked++;
  1874. xs = scheduler.CreateColdObservable(
  1875. OnNext<long>(100, scheduler.Clock),
  1876. OnNext<long>(1000, scheduler.Clock + 1));
  1877. return xs;
  1878. }
  1879. )
  1880. );
  1881. Assert.Same(disposable, _d);
  1882. res.Messages.AssertEqual(
  1883. OnNext(300, 200L)
  1884. );
  1885. Assert.Equal(1, createInvoked);
  1886. Assert.Equal(1, disposeInvoked);
  1887. xs.Subscriptions.AssertEqual(
  1888. Subscribe(200, 1000)
  1889. );
  1890. disposable.AssertEqual(
  1891. 200,
  1892. 1000
  1893. );
  1894. }
  1895. [Fact]
  1896. public void Using_ThrowResourceSelector()
  1897. {
  1898. var scheduler = new TestScheduler();
  1899. var disposeInvoked = 0;
  1900. var createInvoked = 0;
  1901. var ex = new Exception();
  1902. var res = scheduler.Start(() =>
  1903. Observable.Using<int, IDisposable>(
  1904. () =>
  1905. {
  1906. disposeInvoked++;
  1907. throw ex;
  1908. },
  1909. d =>
  1910. {
  1911. createInvoked++;
  1912. return Observable.Never<int>();
  1913. }
  1914. )
  1915. );
  1916. res.Messages.AssertEqual(
  1917. OnError<int>(200, ex)
  1918. );
  1919. Assert.Equal(0, createInvoked);
  1920. Assert.Equal(1, disposeInvoked);
  1921. }
  1922. [Fact]
  1923. public void Using_ThrowResourceUsage()
  1924. {
  1925. var scheduler = new TestScheduler();
  1926. var ex = new Exception();
  1927. var disposeInvoked = 0;
  1928. var createInvoked = 0;
  1929. var disposable = default(MockDisposable);
  1930. var res = scheduler.Start(() =>
  1931. Observable.Using<int, IDisposable>(
  1932. () =>
  1933. {
  1934. disposeInvoked++;
  1935. disposable = new MockDisposable(scheduler);
  1936. return disposable;
  1937. },
  1938. d =>
  1939. {
  1940. createInvoked++;
  1941. throw ex;
  1942. }
  1943. )
  1944. );
  1945. res.Messages.AssertEqual(
  1946. OnError<int>(200, ex)
  1947. );
  1948. Assert.Equal(1, createInvoked);
  1949. Assert.Equal(1, disposeInvoked);
  1950. disposable.AssertEqual(
  1951. 200,
  1952. 200
  1953. );
  1954. }
  1955. #endregion
  1956. #region - UsingAsync -
  1957. #if !NO_TPL
  1958. [Fact]
  1959. public void UsingAsync_ArgumentChecking()
  1960. {
  1961. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Using<int, IDisposable>(null, (res, ct) => null));
  1962. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Using<int, IDisposable>(ct => null, null));
  1963. }
  1964. [Fact]
  1965. public void UsingAsync_Simple()
  1966. {
  1967. var done = false;
  1968. var xs = Observable.Using<int, IDisposable>(
  1969. ct => Task.Factory.StartNew<IDisposable>(() => Disposable.Create(() => done = true)),
  1970. (_, ct) => Task.Factory.StartNew<IObservable<int>>(() => Observable.Return(42))
  1971. );
  1972. var res = xs.ToEnumerable().ToList();
  1973. Assert.True(new[] { 42 }.SequenceEqual(res));
  1974. Assert.True(done);
  1975. }
  1976. [Fact]
  1977. public void UsingAsync_CancelResource()
  1978. {
  1979. var N = 10;// 0000;
  1980. for (int i = 0; i < N; i++)
  1981. {
  1982. var called = false;
  1983. var s = new ManualResetEvent(false);
  1984. var e = new ManualResetEvent(false);
  1985. var x = new ManualResetEvent(false);
  1986. var xs = Observable.Using<int, IDisposable>(
  1987. ct => Task.Factory.StartNew<IDisposable>(() =>
  1988. {
  1989. s.Set();
  1990. e.WaitOne();
  1991. while (!ct.IsCancellationRequested)
  1992. ;
  1993. x.Set();
  1994. return Disposable.Empty;
  1995. }),
  1996. (_, ct) =>
  1997. {
  1998. called = true;
  1999. return Task.Factory.StartNew<IObservable<int>>(() =>
  2000. Observable.Return(42)
  2001. );
  2002. }
  2003. );
  2004. var d = xs.Subscribe(_ => { });
  2005. s.WaitOne();
  2006. d.Dispose();
  2007. e.Set();
  2008. x.WaitOne();
  2009. Assert.False(called);
  2010. }
  2011. }
  2012. [Fact]
  2013. public void UsingAsync_CancelFactory()
  2014. {
  2015. var N = 10;// 0000;
  2016. for (int i = 0; i < N; i++)
  2017. {
  2018. var gate = new object();
  2019. var disposed = false;
  2020. var called = false;
  2021. var s = new ManualResetEvent(false);
  2022. var e = new ManualResetEvent(false);
  2023. var x = new ManualResetEvent(false);
  2024. var xs = Observable.Using<int, IDisposable>(
  2025. ct => Task.Factory.StartNew<IDisposable>(() =>
  2026. Disposable.Create(() =>
  2027. {
  2028. lock (gate)
  2029. disposed = true;
  2030. })
  2031. ),
  2032. (_, ct) => Task.Factory.StartNew<IObservable<int>>(() =>
  2033. {
  2034. s.Set();
  2035. e.WaitOne();
  2036. while (!ct.IsCancellationRequested)
  2037. ;
  2038. x.Set();
  2039. return Observable.Defer<int>(() =>
  2040. {
  2041. called = true;
  2042. return Observable.Return(42);
  2043. });
  2044. })
  2045. );
  2046. var d = xs.Subscribe(_ => { });
  2047. s.WaitOne();
  2048. //
  2049. // This will *eventually* set the CancellationToken. There's a fundamental race between observing the CancellationToken
  2050. // and returning the IDisposable that will set the CancellationTokenSource. Notice this is reflected in the code above,
  2051. // by looping until the CancellationToken is set.
  2052. //
  2053. d.Dispose();
  2054. e.Set();
  2055. x.WaitOne();
  2056. while (true)
  2057. {
  2058. lock (gate)
  2059. if (disposed)
  2060. break;
  2061. }
  2062. Assert.False(called, i.ToString());
  2063. }
  2064. }
  2065. #endif
  2066. #endregion
  2067. }
  2068. }