ObservableConversionTests.cs 24 KB


  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. using System;
  3. using System.Collections;
  4. using System.Collections.Generic;
  5. using System.Linq;
  6. using System.Reactive;
  7. using System.Reactive.Concurrency;
  8. using System.Reactive.Linq;
  9. using System.Reactive.Subjects;
  10. using System.Reflection;
  11. using System.Runtime.CompilerServices;
  12. using System.Threading;
  13. using Microsoft.Reactive.Testing;
  14. using Xunit;
  15. using ReactiveTests.Dummies;
  16. namespace ReactiveTests.Tests
  17. {
  18. public class ObservableConversionTests : ReactiveTest
  19. {
  20. #region + Subscribe +
  21. [Fact]
  22. public void SubscribeToEnumerable_ArgumentChecking()
  23. {
  24. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Subscribe<int>((IEnumerable<int>)null, DummyObserver<int>.Instance));
  25. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Subscribe<int>(DummyEnumerable<int>.Instance, (IObserver<int>)null));
  26. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Subscribe<int>((IEnumerable<int>)null, DummyObserver<int>.Instance, DummyScheduler.Instance));
  27. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Subscribe<int>(DummyEnumerable<int>.Instance, DummyObserver<int>.Instance, null));
  28. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Subscribe<int>(DummyEnumerable<int>.Instance, (IObserver<int>)null, DummyScheduler.Instance));
  29. ReactiveAssert.Throws<NullReferenceException>(() => NullEnumeratorEnumerable<int>.Instance.Subscribe(Observer.Create<int>(x => { }), Scheduler.CurrentThread));
  30. }
  31. [Fact]
  32. public void SubscribeToEnumerable_Finite()
  33. {
  34. var scheduler = new TestScheduler();
  35. var results = scheduler.CreateObserver<int>();
  36. var d = default(IDisposable);
  37. var xs = default(MockEnumerable<int>);
  38. scheduler.ScheduleAbsolute(Created, () => xs = new MockEnumerable<int>(scheduler, Enumerable_Finite()));
  39. scheduler.ScheduleAbsolute(Subscribed, () => d = xs.Subscribe(results, scheduler));
  40. scheduler.ScheduleAbsolute(Disposed, () => d.Dispose());
  41. scheduler.Start();
  42. results.Messages.AssertEqual(
  43. OnNext(201, 1),
  44. OnNext(202, 2),
  45. OnNext(203, 3),
  46. OnNext(204, 4),
  47. OnNext(205, 5),
  48. OnCompleted<int>(206)
  49. );
  50. xs.Subscriptions.AssertEqual(
  51. Subscribe(200, 206)
  52. );
  53. }
  54. [Fact]
  55. public void SubscribeToEnumerable_Infinite()
  56. {
  57. var scheduler = new TestScheduler();
  58. var results = scheduler.CreateObserver<int>();
  59. var d = default(IDisposable);
  60. var xs = default(MockEnumerable<int>);
  61. scheduler.ScheduleAbsolute(Created, () => xs = new MockEnumerable<int>(scheduler, Enumerable_Infinite()));
  62. scheduler.ScheduleAbsolute(Subscribed, () => d = xs.Subscribe(results, scheduler));
  63. scheduler.ScheduleAbsolute(210, () => d.Dispose());
  64. scheduler.Start();
  65. results.Messages.AssertEqual(
  66. OnNext(201, 1),
  67. OnNext(202, 1),
  68. OnNext(203, 1),
  69. OnNext(204, 1),
  70. OnNext(205, 1),
  71. OnNext(206, 1),
  72. OnNext(207, 1),
  73. OnNext(208, 1),
  74. OnNext(209, 1)
  75. );
  76. xs.Subscriptions.AssertEqual(
  77. Subscribe(200, 210)
  78. );
  79. }
  80. [Fact]
  81. public void SubscribeToEnumerable_Error()
  82. {
  83. var scheduler = new TestScheduler();
  84. var results = scheduler.CreateObserver<int>();
  85. var d = default(IDisposable);
  86. var xs = default(MockEnumerable<int>);
  87. var ex = new Exception();
  88. scheduler.ScheduleAbsolute(Created, () => xs = new MockEnumerable<int>(scheduler, Enumerable_Error(ex)));
  89. scheduler.ScheduleAbsolute(Subscribed, () => d = xs.Subscribe(results, scheduler));
  90. scheduler.ScheduleAbsolute(Disposed, () => d.Dispose());
  91. scheduler.Start();
  92. results.Messages.AssertEqual(
  93. OnNext(201, 1),
  94. OnNext(202, 2),
  95. OnNext(203, 3),
  96. OnError<int>(204, ex)
  97. );
  98. xs.Subscriptions.AssertEqual(
  99. Subscribe(200, 204)
  100. );
  101. }
  102. #if !SILVERLIGHTM7
  103. [Fact]
  104. public void SubscribeToEnumerable_DefaultScheduler()
  105. {
  106. for (int i = 0; i < 100; i++)
  107. {
  108. var scheduler = new TestScheduler();
  109. var results1 = new List<int>();
  110. var results2 = new List<int>();
  111. var s1 = new Semaphore(0, 1);
  112. var s2 = new Semaphore(0, 1);
  113. Observable.Subscribe(Enumerable_Finite(),
  114. Observer.Create<int>(x => results1.Add(x), ex => { throw ex; }, () => s1.Release()));
  115. Observable.Subscribe(Enumerable_Finite(),
  116. Observer.Create<int>(x => results2.Add(x), ex => { throw ex; }, () => s2.Release()),
  117. DefaultScheduler.Instance);
  118. s1.WaitOne();
  119. s2.WaitOne();
  120. results1.AssertEqual(results2);
  121. }
  122. }
  123. #endif
  124. #endregion
  125. #region ToEnumerable
  126. [Fact]
  127. public void ToEnumerable_ArgumentChecking()
  128. {
  129. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ToEnumerable(default(IObservable<int>)));
  130. }
  131. [Fact]
  132. public void ToEnumerable_Generic()
  133. {
  134. Assert.True(Observable.Range(0, 10).ToEnumerable().SequenceEqual(Enumerable.Range(0, 10)));
  135. }
  136. [Fact]
  137. public void ToEnumerable_NonGeneric()
  138. {
  139. Assert.True(((IEnumerable)Observable.Range(0, 10).ToEnumerable()).Cast<int>().SequenceEqual(Enumerable.Range(0, 10)));
  140. }
  141. [Fact]
  142. public void ToEnumerable_ManualGeneric()
  143. {
  144. var res = Observable.Range(0, 10).ToEnumerable();
  145. var ieg = res.GetEnumerator();
  146. for (int i = 0; i < 10; i++)
  147. {
  148. Assert.True(ieg.MoveNext());
  149. Assert.Equal(i, ieg.Current);
  150. }
  151. Assert.False(ieg.MoveNext());
  152. }
  153. [Fact]
  154. public void ToEnumerable_ManualNonGeneric()
  155. {
  156. var res = (IEnumerable)Observable.Range(0, 10).ToEnumerable();
  157. var ien = res.GetEnumerator();
  158. for (int i = 0; i < 10; i++)
  159. {
  160. Assert.True(ien.MoveNext());
  161. Assert.Equal(i, ien.Current);
  162. }
  163. Assert.False(ien.MoveNext());
  164. }
  165. [Fact]
  166. public void ToEnumerable_ResetNotSupported()
  167. {
  168. ReactiveAssert.Throws<NotSupportedException>(() => Observable.Range(0, 10).ToEnumerable().GetEnumerator().Reset());
  169. }
  170. #endregion
  171. #region ToEvent
  172. [Fact]
  173. public void ToEvent_ArgumentChecks()
  174. {
  175. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ToEvent(default(IObservable<Unit>)));
  176. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ToEvent(default(IObservable<int>)));
  177. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ToEvent(default(IObservable<EventPattern<EventArgs>>)));
  178. }
  179. [Fact]
  180. public void ToEvent_Unit()
  181. {
  182. var src = new Subject<Unit>();
  183. var evt = src.ToEvent();
  184. var num = 0;
  185. var hnd = new Action<Unit>(_ =>
  186. {
  187. num++;
  188. });
  189. evt.OnNext += hnd;
  190. Assert.Equal(0, num);
  191. src.OnNext(new Unit());
  192. Assert.Equal(1, num);
  193. src.OnNext(new Unit());
  194. Assert.Equal(2, num);
  195. evt.OnNext -= hnd;
  196. src.OnNext(new Unit());
  197. Assert.Equal(2, num);
  198. }
  199. [Fact]
  200. public void ToEvent_NonUnit()
  201. {
  202. var src = new Subject<int>();
  203. var evt = src.ToEvent();
  204. var lst = new List<int>();
  205. var hnd = new Action<int>(e =>
  206. {
  207. lst.Add(e);
  208. });
  209. evt.OnNext += hnd;
  210. src.OnNext(1);
  211. src.OnNext(2);
  212. evt.OnNext -= hnd;
  213. src.OnNext(3);
  214. Assert.True(lst.SequenceEqual(new[] { 1, 2 }));
  215. }
  216. [Fact]
  217. public void ToEvent_FromEvent()
  218. {
  219. var src = new Subject<int>();
  220. var evt = src.ToEvent();
  221. var res = Observable.FromEvent<int>(h => evt.OnNext += h, h => evt.OnNext -= h);
  222. var lst = new List<int>();
  223. using (res.Subscribe(e => lst.Add(e), () => Assert.True(false)))
  224. {
  225. src.OnNext(1);
  226. src.OnNext(2);
  227. }
  228. src.OnNext(3);
  229. Assert.True(lst.SequenceEqual(new[] { 1, 2 }));
  230. }
  231. #endregion
  232. #region ToEventPattern
  233. [Fact]
  234. public void ToEventPattern_ArgumentChecking()
  235. {
  236. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ToEventPattern<EventArgs>(null));
  237. }
  238. [Fact]
  239. public void ToEventPattern_IEvent()
  240. {
  241. var src = new Subject<EventPattern<EventArgs<int>>>();
  242. var evt = src.ToEventPattern();
  243. var snd = new object();
  244. var lst = new List<int>();
  245. var hnd = new EventHandler<EventArgs<int>>((s, e) =>
  246. {
  247. Assert.Same(snd, s);
  248. lst.Add(e.Value);
  249. });
  250. evt.OnNext += hnd;
  251. src.OnNext(new EventPattern<EventArgs<int>>(snd, new EventArgs<int>(42)));
  252. src.OnNext(new EventPattern<EventArgs<int>>(snd, new EventArgs<int>(43)));
  253. evt.OnNext -= hnd;
  254. src.OnNext(new EventPattern<EventArgs<int>>(snd, new EventArgs<int>(44)));
  255. Assert.True(lst.SequenceEqual(new[] { 42, 43 }));
  256. }
  257. [Fact]
  258. public void ToEventPattern_IEvent_Fails()
  259. {
  260. var src = new Subject<EventPattern<EventArgs<int>>>();
  261. var evt = src.ToEventPattern();
  262. var snd = new object();
  263. var lst = new List<int>();
  264. var hnd = new EventHandler<EventArgs<int>>((s, e) =>
  265. {
  266. Assert.Same(snd, s);
  267. lst.Add(e.Value);
  268. });
  269. evt.OnNext += hnd;
  270. src.OnNext(new EventPattern<EventArgs<int>>(snd, new EventArgs<int>(42)));
  271. src.OnNext(new EventPattern<EventArgs<int>>(snd, new EventArgs<int>(43)));
  272. var ex = new Exception();
  273. ReactiveAssert.Throws(ex, () => src.OnError(ex));
  274. Assert.True(lst.SequenceEqual(new[] { 42, 43 }));
  275. }
  276. [Fact]
  277. public void ToEventPattern_IEvent_Completes()
  278. {
  279. var src = new Subject<EventPattern<EventArgs<int>>>();
  280. var evt = src.ToEventPattern();
  281. var snd = new object();
  282. var lst = new List<int>();
  283. var hnd = new EventHandler<EventArgs<int>>((s, e) =>
  284. {
  285. Assert.Same(snd, s);
  286. lst.Add(e.Value);
  287. });
  288. evt.OnNext += hnd;
  289. src.OnNext(new EventPattern<EventArgs<int>>(snd, new EventArgs<int>(42)));
  290. src.OnNext(new EventPattern<EventArgs<int>>(snd, new EventArgs<int>(43)));
  291. src.OnCompleted();
  292. Assert.True(lst.SequenceEqual(new[] { 42, 43 }));
  293. }
  294. class EventSrc
  295. {
  296. public event EventHandler<EventArgs<string>> E;
  297. public void On(string s)
  298. {
  299. var e = E;
  300. if (e != null)
  301. e(this, new EventArgs<string>(s));
  302. }
  303. }
  304. class EventArgs<T> : EventArgs
  305. {
  306. public T Value { get; private set; }
  307. public EventArgs(T value)
  308. {
  309. Value = value;
  310. }
  311. }
  312. [Fact]
  313. public void FromEventPattern_ToEventPattern()
  314. {
  315. var src = new EventSrc();
  316. var evt = Observable.FromEventPattern<EventHandler<EventArgs<string>>, EventArgs<string>>(h => new EventHandler<EventArgs<string>>(h), h => src.E += h, h => src.E -= h);
  317. var res = evt.ToEventPattern();
  318. var lst = new List<string>();
  319. var hnd = new EventHandler<EventArgs<string>>((s, e) =>
  320. {
  321. Assert.Same(src, s);
  322. lst.Add(e.Value);
  323. });
  324. src.On("bar");
  325. res.OnNext += hnd;
  326. src.On("foo");
  327. src.On("baz");
  328. res.OnNext -= hnd;
  329. src.On("qux");
  330. Assert.True(lst.SequenceEqual(new[] { "foo", "baz" }));
  331. }
  332. [Fact]
  333. public void ToEvent_DuplicateHandlers()
  334. {
  335. var src = new Subject<Unit>();
  336. var evt = src.ToEvent();
  337. var num = 0;
  338. var hnd = new Action<Unit>(e => num++);
  339. evt.OnNext += hnd;
  340. Assert.Equal(0, num);
  341. src.OnNext(new Unit());
  342. Assert.Equal(1, num);
  343. evt.OnNext += hnd;
  344. src.OnNext(new Unit());
  345. Assert.Equal(3, num);
  346. evt.OnNext -= hnd;
  347. src.OnNext(new Unit());
  348. Assert.Equal(4, num);
  349. evt.OnNext -= hnd;
  350. src.OnNext(new Unit());
  351. Assert.Equal(4, num);
  352. }
  353. [Fact]
  354. public void ToEvent_SourceCompletes()
  355. {
  356. var src = new Subject<Unit>();
  357. var evt = src.ToEvent();
  358. var num = 0;
  359. var hnd = new Action<Unit>(e => num++);
  360. evt.OnNext += hnd;
  361. Assert.Equal(0, num);
  362. src.OnNext(new Unit());
  363. Assert.Equal(1, num);
  364. src.OnNext(new Unit());
  365. Assert.Equal(2, num);
  366. src.OnCompleted();
  367. Assert.Equal(2, num);
  368. #if !SILVERLIGHT // FieldAccessException
  369. var tbl = GetSubscriptionTable(evt);
  370. Assert.True(tbl.Count == 0);
  371. #endif
  372. }
  373. [Fact]
  374. public void ToEvent_SourceFails()
  375. {
  376. var src = new Subject<Unit>();
  377. var evt = src.ToEvent();
  378. var num = 0;
  379. var hnd = new Action<Unit>(e => num++);
  380. evt.OnNext += hnd;
  381. Assert.Equal(0, num);
  382. src.OnNext(new Unit());
  383. Assert.Equal(1, num);
  384. src.OnNext(new Unit());
  385. Assert.Equal(2, num);
  386. var ex = new Exception();
  387. ReactiveAssert.Throws(ex, () => src.OnError(ex));
  388. #if !SILVERLIGHT // FieldAccessException
  389. var tbl = GetSubscriptionTable(evt);
  390. Assert.True(tbl.Count == 0);
  391. #endif
  392. }
  393. [Fact]
  394. public void ToEvent_DoneImmediately()
  395. {
  396. var src = Observable.Empty<Unit>();
  397. var evt = src.ToEvent();
  398. var num = 0;
  399. var hnd = new Action<Unit>(e => num++);
  400. for (int i = 0; i < 2; i++)
  401. {
  402. evt.OnNext += hnd;
  403. Assert.Equal(0, num);
  404. #if !SILVERLIGHT // FieldAccessException
  405. var tbl = GetSubscriptionTable(evt);
  406. Assert.True(tbl.Count == 0);
  407. #endif
  408. }
  409. }
  410. [Fact]
  411. public void ToEvent_UnbalancedHandlers()
  412. {
  413. var src = new Subject<Unit>();
  414. var evt = src.ToEvent();
  415. var num = 0;
  416. var hnd = new Action<Unit>(e => num++);
  417. evt.OnNext += hnd;
  418. Assert.Equal(0, num);
  419. evt.OnNext -= hnd;
  420. Assert.Equal(0, num);
  421. evt.OnNext -= hnd;
  422. Assert.Equal(0, num);
  423. evt.OnNext += hnd;
  424. Assert.Equal(0, num);
  425. src.OnNext(new Unit());
  426. Assert.Equal(1, num);
  427. src.OnNext(new Unit());
  428. Assert.Equal(2, num);
  429. evt.OnNext -= hnd;
  430. Assert.Equal(2, num);
  431. src.OnNext(new Unit());
  432. Assert.Equal(2, num);
  433. }
  434. private static Dictionary<Delegate, Stack<IDisposable>> GetSubscriptionTable(object evt)
  435. {
  436. return (Dictionary<Delegate, Stack<IDisposable>>)evt.GetType().GetField("_subscriptions", BindingFlags.NonPublic | BindingFlags.Instance).GetValue(evt);
  437. }
  438. [Fact]
  439. public void EventPattern_Equality()
  440. {
  441. var e1 = new EventPattern<string, EventArgs>("Bart", EventArgs.Empty);
  442. var e2 = new EventPattern<string, EventArgs>("Bart", EventArgs.Empty);
  443. Assert.True(e1.Equals(e1));
  444. Assert.True(e1.Equals(e2));
  445. Assert.True(e2.Equals(e1));
  446. Assert.True(e1 == e2);
  447. Assert.True(!(e1 != e2));
  448. Assert.True(e1.GetHashCode() == e2.GetHashCode());
  449. Assert.False(e1.Equals(null));
  450. Assert.False(e1.Equals("xy"));
  451. Assert.False(e1 == null);
  452. }
  453. [Fact]
  454. public void EventPattern_Inequality()
  455. {
  456. var a1 = new MyEventArgs();
  457. var a2 = new MyEventArgs();
  458. var e1 = new EventPattern<string, MyEventArgs>("Bart", a1);
  459. var e2 = new EventPattern<string, MyEventArgs>("John", a1);
  460. var e3 = new EventPattern<string, MyEventArgs>("Bart", a2);
  461. Assert.True(!e1.Equals(e2));
  462. Assert.True(!e2.Equals(e1));
  463. Assert.True(!(e1 == e2));
  464. Assert.True(e1 != e2);
  465. Assert.True(e1.GetHashCode() != e2.GetHashCode());
  466. Assert.True(!e1.Equals(e3));
  467. Assert.True(!e3.Equals(e1));
  468. Assert.True(!(e1 == e3));
  469. Assert.True(e1 != e3);
  470. Assert.True(e1.GetHashCode() != e3.GetHashCode());
  471. }
  472. class MyEventArgs : EventArgs
  473. {
  474. }
  475. #endregion
  476. #region + ToObservable +
  477. [Fact]
  478. public void EnumerableToObservable_ArgumentChecking()
  479. {
  480. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ToObservable((IEnumerable<int>)null, DummyScheduler.Instance));
  481. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ToObservable(DummyEnumerable<int>.Instance, (IScheduler)null));
  482. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ToObservable(DummyEnumerable<int>.Instance, DummyScheduler.Instance).Subscribe(null));
  483. ReactiveAssert.Throws<NullReferenceException>(() => Observable.ToObservable(NullEnumeratorEnumerable<int>.Instance, Scheduler.CurrentThread).Subscribe());
  484. }
  485. [Fact]
  486. public void EnumerableToObservable_Complete()
  487. {
  488. var scheduler = new TestScheduler();
  489. var e = new MockEnumerable<int>(scheduler,
  490. new[] { 3, 1, 2, 4 }
  491. );
  492. var results = scheduler.Start(() =>
  493. e.ToObservable(scheduler)
  494. );
  495. results.Messages.AssertEqual(
  496. OnNext(201, 3),
  497. OnNext(202, 1),
  498. OnNext(203, 2),
  499. OnNext(204, 4),
  500. OnCompleted<int>(205)
  501. );
  502. e.Subscriptions.AssertEqual(
  503. Subscribe(200, 205)
  504. );
  505. }
  506. [Fact]
  507. public void EnumerableToObservable_Dispose()
  508. {
  509. var scheduler = new TestScheduler();
  510. var e = new MockEnumerable<int>(scheduler,
  511. new[] { 3, 1, 2, 4 }
  512. );
  513. var results = scheduler.Start(() =>
  514. e.ToObservable(scheduler),
  515. 203
  516. );
  517. results.Messages.AssertEqual(
  518. OnNext(201, 3),
  519. OnNext(202, 1)
  520. );
  521. e.Subscriptions.AssertEqual(
  522. Subscribe(200, 203)
  523. );
  524. }
  525. [Fact]
  526. public void EnumerableToObservable_Error()
  527. {
  528. var scheduler = new TestScheduler();
  529. var ex = new Exception();
  530. var e = new MockEnumerable<int>(scheduler,
  531. EnumerableToObservable_Error_Core(ex)
  532. );
  533. var results = scheduler.Start(() =>
  534. e.ToObservable(scheduler)
  535. );
  536. results.Messages.AssertEqual(
  537. OnNext(201, 1),
  538. OnNext(202, 2),
  539. OnError<int>(203, ex)
  540. );
  541. e.Subscriptions.AssertEqual(
  542. Subscribe(200, 203)
  543. );
  544. }
  545. [Fact]
  546. public void EnumerableToObservable_Default_ArgumentChecking()
  547. {
  548. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ToObservable((IEnumerable<int>)null));
  549. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ToObservable(DummyEnumerable<int>.Instance).Subscribe(null));
  550. }
  551. [Fact]
  552. public void EnumerableToObservable_Default()
  553. {
  554. var xs = new[] { 4, 3, 1, 5, 9, 2 };
  555. xs.ToObservable().AssertEqual(xs.ToObservable(DefaultScheduler.Instance));
  556. }
  557. #if !NO_PERF
  558. [Fact]
  559. public void EnumerableToObservable_LongRunning_Complete()
  560. {
  561. var start = default(ManualResetEvent);
  562. var end = default(ManualResetEvent);
  563. var scheduler = new TestLongRunningScheduler(x => start = x, x => end = x);
  564. var e = new[] { 3, 1, 2, 4 };
  565. var results = e.ToObservable(scheduler);
  566. var lst = new List<int>();
  567. results.Subscribe(lst.Add);
  568. start.WaitOne();
  569. end.WaitOne();
  570. Assert.True(e.SequenceEqual(lst));
  571. }
  572. [Fact]
  573. [MethodImpl(MethodImplOptions.NoOptimization)]
  574. public void EnumerableToObservable_LongRunning_Dispose()
  575. {
  576. var start = default(ManualResetEvent);
  577. var end = default(ManualResetEvent);
  578. var scheduler = new TestLongRunningScheduler(x => start = x, x => end = x);
  579. var e = Enumerable.Range(0, int.MaxValue);
  580. var results = e.ToObservable(scheduler);
  581. var lst = new List<int>();
  582. var d = results.Subscribe(lst.Add);
  583. start.WaitOne();
  584. while (lst.Count < 100)
  585. ;
  586. d.Dispose();
  587. end.WaitOne();
  588. Assert.True(e.Take(100).SequenceEqual(lst.Take(100)));
  589. }
  590. [Fact]
  591. public void EnumerableToObservable_LongRunning_Error()
  592. {
  593. var start = default(ManualResetEvent);
  594. var end = default(ManualResetEvent);
  595. var scheduler = new TestLongRunningScheduler(x => start = x, x => end = x);
  596. var ex = new Exception();
  597. var e = EnumerableToObservable_Error_Core(ex);
  598. var results = e.ToObservable(scheduler);
  599. var lst = new List<int>();
  600. var err = default(Exception);
  601. results.Subscribe(lst.Add, ex_ => err = ex_);
  602. start.WaitOne();
  603. end.WaitOne();
  604. Assert.True(new[] { 1, 2 }.SequenceEqual(lst));
  605. Assert.Same(ex, err);
  606. }
  607. #endif
  608. static IEnumerable<int> EnumerableToObservable_Error_Core(Exception ex)
  609. {
  610. yield return 1;
  611. yield return 2;
  612. throw ex;
  613. }
  614. [Fact]
  615. public void EnumerableToObservable_GetEnumeratorThrows()
  616. {
  617. var ex = new Exception();
  618. var scheduler = new TestScheduler();
  619. var xs = new RogueEnumerable<int>(ex);
  620. var res = scheduler.Start(() =>
  621. xs.ToObservable(scheduler)
  622. );
  623. res.Messages.AssertEqual(
  624. OnError<int>(200, ex)
  625. );
  626. }
  627. #endregion
  628. #region |> Helpers <|
  629. IEnumerable<int> Enumerable_Finite()
  630. {
  631. yield return 1;
  632. yield return 2;
  633. yield return 3;
  634. yield return 4;
  635. yield return 5;
  636. yield break;
  637. }
  638. IEnumerable<int> Enumerable_Infinite()
  639. {
  640. while (true)
  641. yield return 1;
  642. }
  643. IEnumerable<int> Enumerable_Error(Exception exception)
  644. {
  645. yield return 1;
  646. yield return 2;
  647. yield return 3;
  648. throw exception;
  649. }
  650. #endregion
  651. }
  652. }