ObservableConversionTests.cs 24 KB


  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. using System;
  3. using System.Collections;
  4. using System.Collections.Generic;
  5. using System.Linq;
  6. using System.Reactive;
  7. using System.Reactive.Concurrency;
  8. using System.Reactive.Linq;
  9. using System.Reactive.Subjects;
  10. using System.Reflection;
  11. using System.Threading;
  12. using Microsoft.Reactive.Testing;
  13. using Xunit;
  14. using ReactiveTests.Dummies;
  15. namespace ReactiveTests.Tests
  16. {
  17. public class ObservableConversionTests : ReactiveTest
  18. {
  19. #region + Subscribe +
  20. [Fact]
  21. public void SubscribeToEnumerable_ArgumentChecking()
  22. {
  23. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Subscribe<int>((IEnumerable<int>)null, DummyObserver<int>.Instance));
  24. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Subscribe<int>(DummyEnumerable<int>.Instance, (IObserver<int>)null));
  25. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Subscribe<int>((IEnumerable<int>)null, DummyObserver<int>.Instance, DummyScheduler.Instance));
  26. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Subscribe<int>(DummyEnumerable<int>.Instance, DummyObserver<int>.Instance, null));
  27. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Subscribe<int>(DummyEnumerable<int>.Instance, (IObserver<int>)null, DummyScheduler.Instance));
  28. ReactiveAssert.Throws<NullReferenceException>(() => NullEnumeratorEnumerable<int>.Instance.Subscribe(Observer.Create<int>(x => { }), Scheduler.CurrentThread));
  29. }
  30. [Fact]
  31. public void SubscribeToEnumerable_Finite()
  32. {
  33. var scheduler = new TestScheduler();
  34. var results = scheduler.CreateObserver<int>();
  35. var d = default(IDisposable);
  36. var xs = default(MockEnumerable<int>);
  37. scheduler.ScheduleAbsolute(Created, () => xs = new MockEnumerable<int>(scheduler, Enumerable_Finite()));
  38. scheduler.ScheduleAbsolute(Subscribed, () => d = xs.Subscribe(results, scheduler));
  39. scheduler.ScheduleAbsolute(Disposed, () => d.Dispose());
  40. scheduler.Start();
  41. results.Messages.AssertEqual(
  42. OnNext(201, 1),
  43. OnNext(202, 2),
  44. OnNext(203, 3),
  45. OnNext(204, 4),
  46. OnNext(205, 5),
  47. OnCompleted<int>(206)
  48. );
  49. xs.Subscriptions.AssertEqual(
  50. Subscribe(200, 206)
  51. );
  52. }
  53. [Fact]
  54. public void SubscribeToEnumerable_Infinite()
  55. {
  56. var scheduler = new TestScheduler();
  57. var results = scheduler.CreateObserver<int>();
  58. var d = default(IDisposable);
  59. var xs = default(MockEnumerable<int>);
  60. scheduler.ScheduleAbsolute(Created, () => xs = new MockEnumerable<int>(scheduler, Enumerable_Infinite()));
  61. scheduler.ScheduleAbsolute(Subscribed, () => d = xs.Subscribe(results, scheduler));
  62. scheduler.ScheduleAbsolute(210, () => d.Dispose());
  63. scheduler.Start();
  64. results.Messages.AssertEqual(
  65. OnNext(201, 1),
  66. OnNext(202, 1),
  67. OnNext(203, 1),
  68. OnNext(204, 1),
  69. OnNext(205, 1),
  70. OnNext(206, 1),
  71. OnNext(207, 1),
  72. OnNext(208, 1),
  73. OnNext(209, 1)
  74. );
  75. xs.Subscriptions.AssertEqual(
  76. Subscribe(200, 210)
  77. );
  78. }
  79. [Fact]
  80. public void SubscribeToEnumerable_Error()
  81. {
  82. var scheduler = new TestScheduler();
  83. var results = scheduler.CreateObserver<int>();
  84. var d = default(IDisposable);
  85. var xs = default(MockEnumerable<int>);
  86. var ex = new Exception();
  87. scheduler.ScheduleAbsolute(Created, () => xs = new MockEnumerable<int>(scheduler, Enumerable_Error(ex)));
  88. scheduler.ScheduleAbsolute(Subscribed, () => d = xs.Subscribe(results, scheduler));
  89. scheduler.ScheduleAbsolute(Disposed, () => d.Dispose());
  90. scheduler.Start();
  91. results.Messages.AssertEqual(
  92. OnNext(201, 1),
  93. OnNext(202, 2),
  94. OnNext(203, 3),
  95. OnError<int>(204, ex)
  96. );
  97. xs.Subscriptions.AssertEqual(
  98. Subscribe(200, 204)
  99. );
  100. }
  101. #if !SILVERLIGHTM7
  102. [Fact]
  103. public void SubscribeToEnumerable_DefaultScheduler()
  104. {
  105. for (int i = 0; i < 100; i++)
  106. {
  107. var scheduler = new TestScheduler();
  108. var results1 = new List<int>();
  109. var results2 = new List<int>();
  110. var s1 = new Semaphore(0, 1);
  111. var s2 = new Semaphore(0, 1);
  112. Observable.Subscribe(Enumerable_Finite(),
  113. Observer.Create<int>(x => results1.Add(x), ex => { throw ex; }, () => s1.Release()));
  114. Observable.Subscribe(Enumerable_Finite(),
  115. Observer.Create<int>(x => results2.Add(x), ex => { throw ex; }, () => s2.Release()),
  116. DefaultScheduler.Instance);
  117. s1.WaitOne();
  118. s2.WaitOne();
  119. results1.AssertEqual(results2);
  120. }
  121. }
  122. #endif
  123. #endregion
  124. #region ToEnumerable
  125. [Fact]
  126. public void ToEnumerable_ArgumentChecking()
  127. {
  128. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ToEnumerable(default(IObservable<int>)));
  129. }
  130. [Fact]
  131. public void ToEnumerable_Generic()
  132. {
  133. Assert.True(Observable.Range(0, 10).ToEnumerable().SequenceEqual(Enumerable.Range(0, 10)));
  134. }
  135. [Fact]
  136. public void ToEnumerable_NonGeneric()
  137. {
  138. Assert.True(((IEnumerable)Observable.Range(0, 10).ToEnumerable()).Cast<int>().SequenceEqual(Enumerable.Range(0, 10)));
  139. }
  140. [Fact]
  141. public void ToEnumerable_ManualGeneric()
  142. {
  143. var res = Observable.Range(0, 10).ToEnumerable();
  144. var ieg = res.GetEnumerator();
  145. for (int i = 0; i < 10; i++)
  146. {
  147. Assert.True(ieg.MoveNext());
  148. Assert.Equal(i, ieg.Current);
  149. }
  150. Assert.False(ieg.MoveNext());
  151. }
  152. [Fact]
  153. public void ToEnumerable_ManualNonGeneric()
  154. {
  155. var res = (IEnumerable)Observable.Range(0, 10).ToEnumerable();
  156. var ien = res.GetEnumerator();
  157. for (int i = 0; i < 10; i++)
  158. {
  159. Assert.True(ien.MoveNext());
  160. Assert.Equal(i, ien.Current);
  161. }
  162. Assert.False(ien.MoveNext());
  163. }
  164. [Fact]
  165. public void ToEnumerable_ResetNotSupported()
  166. {
  167. ReactiveAssert.Throws<NotSupportedException>(() => Observable.Range(0, 10).ToEnumerable().GetEnumerator().Reset());
  168. }
  169. #endregion
  170. #region ToEvent
  171. [Fact]
  172. public void ToEvent_ArgumentChecks()
  173. {
  174. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ToEvent(default(IObservable<Unit>)));
  175. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ToEvent(default(IObservable<int>)));
  176. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ToEvent(default(IObservable<EventPattern<EventArgs>>)));
  177. }
  178. [Fact]
  179. public void ToEvent_Unit()
  180. {
  181. var src = new Subject<Unit>();
  182. var evt = src.ToEvent();
  183. var num = 0;
  184. var hnd = new Action<Unit>(_ =>
  185. {
  186. num++;
  187. });
  188. evt.OnNext += hnd;
  189. Assert.Equal(0, num);
  190. src.OnNext(new Unit());
  191. Assert.Equal(1, num);
  192. src.OnNext(new Unit());
  193. Assert.Equal(2, num);
  194. evt.OnNext -= hnd;
  195. src.OnNext(new Unit());
  196. Assert.Equal(2, num);
  197. }
  198. [Fact]
  199. public void ToEvent_NonUnit()
  200. {
  201. var src = new Subject<int>();
  202. var evt = src.ToEvent();
  203. var lst = new List<int>();
  204. var hnd = new Action<int>(e =>
  205. {
  206. lst.Add(e);
  207. });
  208. evt.OnNext += hnd;
  209. src.OnNext(1);
  210. src.OnNext(2);
  211. evt.OnNext -= hnd;
  212. src.OnNext(3);
  213. Assert.True(lst.SequenceEqual(new[] { 1, 2 }));
  214. }
  215. [Fact]
  216. public void ToEvent_FromEvent()
  217. {
  218. var src = new Subject<int>();
  219. var evt = src.ToEvent();
  220. var res = Observable.FromEvent<int>(h => evt.OnNext += h, h => evt.OnNext -= h);
  221. var lst = new List<int>();
  222. using (res.Subscribe(e => lst.Add(e), () => Assert.True(false)))
  223. {
  224. src.OnNext(1);
  225. src.OnNext(2);
  226. }
  227. src.OnNext(3);
  228. Assert.True(lst.SequenceEqual(new[] { 1, 2 }));
  229. }
  230. #endregion
  231. #region ToEventPattern
  232. [Fact]
  233. public void ToEventPattern_ArgumentChecking()
  234. {
  235. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ToEventPattern<EventArgs>(null));
  236. }
  237. [Fact]
  238. public void ToEventPattern_IEvent()
  239. {
  240. var src = new Subject<EventPattern<EventArgs<int>>>();
  241. var evt = src.ToEventPattern();
  242. var snd = new object();
  243. var lst = new List<int>();
  244. var hnd = new EventHandler<EventArgs<int>>((s, e) =>
  245. {
  246. Assert.Same(snd, s);
  247. lst.Add(e.Value);
  248. });
  249. evt.OnNext += hnd;
  250. src.OnNext(new EventPattern<EventArgs<int>>(snd, new EventArgs<int>(42)));
  251. src.OnNext(new EventPattern<EventArgs<int>>(snd, new EventArgs<int>(43)));
  252. evt.OnNext -= hnd;
  253. src.OnNext(new EventPattern<EventArgs<int>>(snd, new EventArgs<int>(44)));
  254. Assert.True(lst.SequenceEqual(new[] { 42, 43 }));
  255. }
  256. [Fact]
  257. public void ToEventPattern_IEvent_Fails()
  258. {
  259. var src = new Subject<EventPattern<EventArgs<int>>>();
  260. var evt = src.ToEventPattern();
  261. var snd = new object();
  262. var lst = new List<int>();
  263. var hnd = new EventHandler<EventArgs<int>>((s, e) =>
  264. {
  265. Assert.Same(snd, s);
  266. lst.Add(e.Value);
  267. });
  268. evt.OnNext += hnd;
  269. src.OnNext(new EventPattern<EventArgs<int>>(snd, new EventArgs<int>(42)));
  270. src.OnNext(new EventPattern<EventArgs<int>>(snd, new EventArgs<int>(43)));
  271. var ex = new Exception();
  272. ReactiveAssert.Throws(ex, () => src.OnError(ex));
  273. Assert.True(lst.SequenceEqual(new[] { 42, 43 }));
  274. }
  275. [Fact]
  276. public void ToEventPattern_IEvent_Completes()
  277. {
  278. var src = new Subject<EventPattern<EventArgs<int>>>();
  279. var evt = src.ToEventPattern();
  280. var snd = new object();
  281. var lst = new List<int>();
  282. var hnd = new EventHandler<EventArgs<int>>((s, e) =>
  283. {
  284. Assert.Same(snd, s);
  285. lst.Add(e.Value);
  286. });
  287. evt.OnNext += hnd;
  288. src.OnNext(new EventPattern<EventArgs<int>>(snd, new EventArgs<int>(42)));
  289. src.OnNext(new EventPattern<EventArgs<int>>(snd, new EventArgs<int>(43)));
  290. src.OnCompleted();
  291. Assert.True(lst.SequenceEqual(new[] { 42, 43 }));
  292. }
  293. class EventSrc
  294. {
  295. public event EventHandler<EventArgs<string>> E;
  296. public void On(string s)
  297. {
  298. var e = E;
  299. if (e != null)
  300. e(this, new EventArgs<string>(s));
  301. }
  302. }
  303. class EventArgs<T> : EventArgs
  304. {
  305. public T Value { get; private set; }
  306. public EventArgs(T value)
  307. {
  308. Value = value;
  309. }
  310. }
  311. [Fact]
  312. public void FromEventPattern_ToEventPattern()
  313. {
  314. var src = new EventSrc();
  315. var evt = Observable.FromEventPattern<EventHandler<EventArgs<string>>, EventArgs<string>>(h => new EventHandler<EventArgs<string>>(h), h => src.E += h, h => src.E -= h);
  316. var res = evt.ToEventPattern();
  317. var lst = new List<string>();
  318. var hnd = new EventHandler<EventArgs<string>>((s, e) =>
  319. {
  320. Assert.Same(src, s);
  321. lst.Add(e.Value);
  322. });
  323. src.On("bar");
  324. res.OnNext += hnd;
  325. src.On("foo");
  326. src.On("baz");
  327. res.OnNext -= hnd;
  328. src.On("qux");
  329. Assert.True(lst.SequenceEqual(new[] { "foo", "baz" }));
  330. }
  331. [Fact]
  332. public void ToEvent_DuplicateHandlers()
  333. {
  334. var src = new Subject<Unit>();
  335. var evt = src.ToEvent();
  336. var num = 0;
  337. var hnd = new Action<Unit>(e => num++);
  338. evt.OnNext += hnd;
  339. Assert.Equal(0, num);
  340. src.OnNext(new Unit());
  341. Assert.Equal(1, num);
  342. evt.OnNext += hnd;
  343. src.OnNext(new Unit());
  344. Assert.Equal(3, num);
  345. evt.OnNext -= hnd;
  346. src.OnNext(new Unit());
  347. Assert.Equal(4, num);
  348. evt.OnNext -= hnd;
  349. src.OnNext(new Unit());
  350. Assert.Equal(4, num);
  351. }
  352. [Fact]
  353. public void ToEvent_SourceCompletes()
  354. {
  355. var src = new Subject<Unit>();
  356. var evt = src.ToEvent();
  357. var num = 0;
  358. var hnd = new Action<Unit>(e => num++);
  359. evt.OnNext += hnd;
  360. Assert.Equal(0, num);
  361. src.OnNext(new Unit());
  362. Assert.Equal(1, num);
  363. src.OnNext(new Unit());
  364. Assert.Equal(2, num);
  365. src.OnCompleted();
  366. Assert.Equal(2, num);
  367. #if !SILVERLIGHT // FieldAccessException
  368. var tbl = GetSubscriptionTable(evt);
  369. Assert.True(tbl.Count == 0);
  370. #endif
  371. }
  372. [Fact]
  373. public void ToEvent_SourceFails()
  374. {
  375. var src = new Subject<Unit>();
  376. var evt = src.ToEvent();
  377. var num = 0;
  378. var hnd = new Action<Unit>(e => num++);
  379. evt.OnNext += hnd;
  380. Assert.Equal(0, num);
  381. src.OnNext(new Unit());
  382. Assert.Equal(1, num);
  383. src.OnNext(new Unit());
  384. Assert.Equal(2, num);
  385. var ex = new Exception();
  386. ReactiveAssert.Throws(ex, () => src.OnError(ex));
  387. #if !SILVERLIGHT // FieldAccessException
  388. var tbl = GetSubscriptionTable(evt);
  389. Assert.True(tbl.Count == 0);
  390. #endif
  391. }
  392. [Fact]
  393. public void ToEvent_DoneImmediately()
  394. {
  395. var src = Observable.Empty<Unit>();
  396. var evt = src.ToEvent();
  397. var num = 0;
  398. var hnd = new Action<Unit>(e => num++);
  399. for (int i = 0; i < 2; i++)
  400. {
  401. evt.OnNext += hnd;
  402. Assert.Equal(0, num);
  403. #if !SILVERLIGHT // FieldAccessException
  404. var tbl = GetSubscriptionTable(evt);
  405. Assert.True(tbl.Count == 0);
  406. #endif
  407. }
  408. }
  409. [Fact]
  410. public void ToEvent_UnbalancedHandlers()
  411. {
  412. var src = new Subject<Unit>();
  413. var evt = src.ToEvent();
  414. var num = 0;
  415. var hnd = new Action<Unit>(e => num++);
  416. evt.OnNext += hnd;
  417. Assert.Equal(0, num);
  418. evt.OnNext -= hnd;
  419. Assert.Equal(0, num);
  420. evt.OnNext -= hnd;
  421. Assert.Equal(0, num);
  422. evt.OnNext += hnd;
  423. Assert.Equal(0, num);
  424. src.OnNext(new Unit());
  425. Assert.Equal(1, num);
  426. src.OnNext(new Unit());
  427. Assert.Equal(2, num);
  428. evt.OnNext -= hnd;
  429. Assert.Equal(2, num);
  430. src.OnNext(new Unit());
  431. Assert.Equal(2, num);
  432. }
  433. private static Dictionary<Delegate, Stack<IDisposable>> GetSubscriptionTable(object evt)
  434. {
  435. return (Dictionary<Delegate, Stack<IDisposable>>)evt.GetType().GetField("_subscriptions", BindingFlags.NonPublic | BindingFlags.Instance).GetValue(evt);
  436. }
  437. [Fact]
  438. public void EventPattern_Equality()
  439. {
  440. var e1 = new EventPattern<string, EventArgs>("Bart", EventArgs.Empty);
  441. var e2 = new EventPattern<string, EventArgs>("Bart", EventArgs.Empty);
  442. Assert.True(e1.Equals(e1));
  443. Assert.True(e1.Equals(e2));
  444. Assert.True(e2.Equals(e1));
  445. Assert.True(e1 == e2);
  446. Assert.True(!(e1 != e2));
  447. Assert.True(e1.GetHashCode() == e2.GetHashCode());
  448. Assert.False(e1.Equals(null));
  449. Assert.False(e1.Equals("xy"));
  450. Assert.False(e1 == null);
  451. }
  452. [Fact]
  453. public void EventPattern_Inequality()
  454. {
  455. var a1 = new MyEventArgs();
  456. var a2 = new MyEventArgs();
  457. var e1 = new EventPattern<string, MyEventArgs>("Bart", a1);
  458. var e2 = new EventPattern<string, MyEventArgs>("John", a1);
  459. var e3 = new EventPattern<string, MyEventArgs>("Bart", a2);
  460. Assert.True(!e1.Equals(e2));
  461. Assert.True(!e2.Equals(e1));
  462. Assert.True(!(e1 == e2));
  463. Assert.True(e1 != e2);
  464. Assert.True(e1.GetHashCode() != e2.GetHashCode());
  465. Assert.True(!e1.Equals(e3));
  466. Assert.True(!e3.Equals(e1));
  467. Assert.True(!(e1 == e3));
  468. Assert.True(e1 != e3);
  469. Assert.True(e1.GetHashCode() != e3.GetHashCode());
  470. }
  471. class MyEventArgs : EventArgs
  472. {
  473. }
  474. #endregion
  475. #region + ToObservable +
  476. [Fact]
  477. public void EnumerableToObservable_ArgumentChecking()
  478. {
  479. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ToObservable((IEnumerable<int>)null, DummyScheduler.Instance));
  480. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ToObservable(DummyEnumerable<int>.Instance, (IScheduler)null));
  481. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ToObservable(DummyEnumerable<int>.Instance, DummyScheduler.Instance).Subscribe(null));
  482. ReactiveAssert.Throws<NullReferenceException>(() => Observable.ToObservable(NullEnumeratorEnumerable<int>.Instance, Scheduler.CurrentThread).Subscribe());
  483. }
  484. [Fact]
  485. public void EnumerableToObservable_Complete()
  486. {
  487. var scheduler = new TestScheduler();
  488. var e = new MockEnumerable<int>(scheduler,
  489. new[] { 3, 1, 2, 4 }
  490. );
  491. var results = scheduler.Start(() =>
  492. e.ToObservable(scheduler)
  493. );
  494. results.Messages.AssertEqual(
  495. OnNext(201, 3),
  496. OnNext(202, 1),
  497. OnNext(203, 2),
  498. OnNext(204, 4),
  499. OnCompleted<int>(205)
  500. );
  501. e.Subscriptions.AssertEqual(
  502. Subscribe(200, 205)
  503. );
  504. }
  505. [Fact]
  506. public void EnumerableToObservable_Dispose()
  507. {
  508. var scheduler = new TestScheduler();
  509. var e = new MockEnumerable<int>(scheduler,
  510. new[] { 3, 1, 2, 4 }
  511. );
  512. var results = scheduler.Start(() =>
  513. e.ToObservable(scheduler),
  514. 203
  515. );
  516. results.Messages.AssertEqual(
  517. OnNext(201, 3),
  518. OnNext(202, 1)
  519. );
  520. e.Subscriptions.AssertEqual(
  521. Subscribe(200, 203)
  522. );
  523. }
  524. [Fact]
  525. public void EnumerableToObservable_Error()
  526. {
  527. var scheduler = new TestScheduler();
  528. var ex = new Exception();
  529. var e = new MockEnumerable<int>(scheduler,
  530. EnumerableToObservable_Error_Core(ex)
  531. );
  532. var results = scheduler.Start(() =>
  533. e.ToObservable(scheduler)
  534. );
  535. results.Messages.AssertEqual(
  536. OnNext(201, 1),
  537. OnNext(202, 2),
  538. OnError<int>(203, ex)
  539. );
  540. e.Subscriptions.AssertEqual(
  541. Subscribe(200, 203)
  542. );
  543. }
  544. [Fact]
  545. public void EnumerableToObservable_Default_ArgumentChecking()
  546. {
  547. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ToObservable((IEnumerable<int>)null));
  548. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ToObservable(DummyEnumerable<int>.Instance).Subscribe(null));
  549. }
  550. [Fact]
  551. public void EnumerableToObservable_Default()
  552. {
  553. var xs = new[] { 4, 3, 1, 5, 9, 2 };
  554. xs.ToObservable().AssertEqual(xs.ToObservable(DefaultScheduler.Instance));
  555. }
  556. #if !NO_PERF
  557. [Fact]
  558. public void EnumerableToObservable_LongRunning_Complete()
  559. {
  560. var start = default(ManualResetEvent);
  561. var end = default(ManualResetEvent);
  562. var scheduler = new TestLongRunningScheduler(x => start = x, x => end = x);
  563. var e = new[] { 3, 1, 2, 4 };
  564. var results = e.ToObservable(scheduler);
  565. var lst = new List<int>();
  566. results.Subscribe(lst.Add);
  567. start.WaitOne();
  568. end.WaitOne();
  569. Assert.True(e.SequenceEqual(lst));
  570. }
  571. [Fact]
  572. public void EnumerableToObservable_LongRunning_Dispose()
  573. {
  574. var start = default(ManualResetEvent);
  575. var end = default(ManualResetEvent);
  576. var scheduler = new TestLongRunningScheduler(x => start = x, x => end = x);
  577. var e = Enumerable.Range(0, int.MaxValue);
  578. var results = e.ToObservable(scheduler);
  579. var lst = new List<int>();
  580. var d = results.Subscribe(lst.Add);
  581. start.WaitOne();
  582. while (lst.Count < 100)
  583. ;
  584. d.Dispose();
  585. end.WaitOne();
  586. Assert.True(e.Take(100).SequenceEqual(lst.Take(100)));
  587. }
  588. [Fact]
  589. public void EnumerableToObservable_LongRunning_Error()
  590. {
  591. var start = default(ManualResetEvent);
  592. var end = default(ManualResetEvent);
  593. var scheduler = new TestLongRunningScheduler(x => start = x, x => end = x);
  594. var ex = new Exception();
  595. var e = EnumerableToObservable_Error_Core(ex);
  596. var results = e.ToObservable(scheduler);
  597. var lst = new List<int>();
  598. var err = default(Exception);
  599. results.Subscribe(lst.Add, ex_ => err = ex_);
  600. start.WaitOne();
  601. end.WaitOne();
  602. Assert.True(new[] { 1, 2 }.SequenceEqual(lst));
  603. Assert.Same(ex, err);
  604. }
  605. #endif
  606. static IEnumerable<int> EnumerableToObservable_Error_Core(Exception ex)
  607. {
  608. yield return 1;
  609. yield return 2;
  610. throw ex;
  611. }
  612. [Fact]
  613. public void EnumerableToObservable_GetEnumeratorThrows()
  614. {
  615. var ex = new Exception();
  616. var scheduler = new TestScheduler();
  617. var xs = new RogueEnumerable<int>(ex);
  618. var res = scheduler.Start(() =>
  619. xs.ToObservable(scheduler)
  620. );
  621. res.Messages.AssertEqual(
  622. OnError<int>(200, ex)
  623. );
  624. }
  625. #endregion
  626. #region |> Helpers <|
  627. IEnumerable<int> Enumerable_Finite()
  628. {
  629. yield return 1;
  630. yield return 2;
  631. yield return 3;
  632. yield return 4;
  633. yield return 5;
  634. yield break;
  635. }
  636. IEnumerable<int> Enumerable_Infinite()
  637. {
  638. while (true)
  639. yield return 1;
  640. }
  641. IEnumerable<int> Enumerable_Error(Exception exception)
  642. {
  643. yield return 1;
  644. yield return 2;
  645. yield return 3;
  646. throw exception;
  647. }
  648. #endregion
  649. }
  650. }