ObservableConversionTests.cs 24 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections;
  6. using System.Collections.Generic;
  7. using System.Linq;
  8. using System.Reactive;
  9. using System.Reactive.Concurrency;
  10. using System.Reactive.Linq;
  11. using System.Reactive.Subjects;
  12. using System.Reflection;
  13. using System.Runtime.CompilerServices;
  14. using System.Threading;
  15. using Microsoft.Reactive.Testing;
  16. using Xunit;
  17. using ReactiveTests.Dummies;
  18. namespace ReactiveTests.Tests
  19. {
  20. public class ObservableConversionTests : ReactiveTest
  21. {
  22. #region + Subscribe +
  23. [Fact]
  24. public void SubscribeToEnumerable_ArgumentChecking()
  25. {
  26. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Subscribe<int>((IEnumerable<int>)null, DummyObserver<int>.Instance));
  27. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Subscribe<int>(DummyEnumerable<int>.Instance, (IObserver<int>)null));
  28. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Subscribe<int>((IEnumerable<int>)null, DummyObserver<int>.Instance, DummyScheduler.Instance));
  29. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Subscribe<int>(DummyEnumerable<int>.Instance, DummyObserver<int>.Instance, null));
  30. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Subscribe<int>(DummyEnumerable<int>.Instance, (IObserver<int>)null, DummyScheduler.Instance));
  31. ReactiveAssert.Throws<NullReferenceException>(() => NullEnumeratorEnumerable<int>.Instance.Subscribe(Observer.Create<int>(x => { }), Scheduler.CurrentThread));
  32. }
  33. [Fact]
  34. public void SubscribeToEnumerable_Finite()
  35. {
  36. var scheduler = new TestScheduler();
  37. var results = scheduler.CreateObserver<int>();
  38. var d = default(IDisposable);
  39. var xs = default(MockEnumerable<int>);
  40. scheduler.ScheduleAbsolute(Created, () => xs = new MockEnumerable<int>(scheduler, Enumerable_Finite()));
  41. scheduler.ScheduleAbsolute(Subscribed, () => d = xs.Subscribe(results, scheduler));
  42. scheduler.ScheduleAbsolute(Disposed, () => d.Dispose());
  43. scheduler.Start();
  44. results.Messages.AssertEqual(
  45. OnNext(201, 1),
  46. OnNext(202, 2),
  47. OnNext(203, 3),
  48. OnNext(204, 4),
  49. OnNext(205, 5),
  50. OnCompleted<int>(206)
  51. );
  52. xs.Subscriptions.AssertEqual(
  53. Subscribe(200, 206)
  54. );
  55. }
  56. [Fact]
  57. public void SubscribeToEnumerable_Infinite()
  58. {
  59. var scheduler = new TestScheduler();
  60. var results = scheduler.CreateObserver<int>();
  61. var d = default(IDisposable);
  62. var xs = default(MockEnumerable<int>);
  63. scheduler.ScheduleAbsolute(Created, () => xs = new MockEnumerable<int>(scheduler, Enumerable_Infinite()));
  64. scheduler.ScheduleAbsolute(Subscribed, () => d = xs.Subscribe(results, scheduler));
  65. scheduler.ScheduleAbsolute(210, () => d.Dispose());
  66. scheduler.Start();
  67. results.Messages.AssertEqual(
  68. OnNext(201, 1),
  69. OnNext(202, 1),
  70. OnNext(203, 1),
  71. OnNext(204, 1),
  72. OnNext(205, 1),
  73. OnNext(206, 1),
  74. OnNext(207, 1),
  75. OnNext(208, 1),
  76. OnNext(209, 1)
  77. );
  78. xs.Subscriptions.AssertEqual(
  79. Subscribe(200, 210)
  80. );
  81. }
  82. [Fact]
  83. public void SubscribeToEnumerable_Error()
  84. {
  85. var scheduler = new TestScheduler();
  86. var results = scheduler.CreateObserver<int>();
  87. var d = default(IDisposable);
  88. var xs = default(MockEnumerable<int>);
  89. var ex = new Exception();
  90. scheduler.ScheduleAbsolute(Created, () => xs = new MockEnumerable<int>(scheduler, Enumerable_Error(ex)));
  91. scheduler.ScheduleAbsolute(Subscribed, () => d = xs.Subscribe(results, scheduler));
  92. scheduler.ScheduleAbsolute(Disposed, () => d.Dispose());
  93. scheduler.Start();
  94. results.Messages.AssertEqual(
  95. OnNext(201, 1),
  96. OnNext(202, 2),
  97. OnNext(203, 3),
  98. OnError<int>(204, ex)
  99. );
  100. xs.Subscriptions.AssertEqual(
  101. Subscribe(200, 204)
  102. );
  103. }
  104. #if !SILVERLIGHTM7
  105. [Fact]
  106. public void SubscribeToEnumerable_DefaultScheduler()
  107. {
  108. for (int i = 0; i < 100; i++)
  109. {
  110. var scheduler = new TestScheduler();
  111. var results1 = new List<int>();
  112. var results2 = new List<int>();
  113. var s1 = new Semaphore(0, 1);
  114. var s2 = new Semaphore(0, 1);
  115. Observable.Subscribe(Enumerable_Finite(),
  116. Observer.Create<int>(x => results1.Add(x), ex => { throw ex; }, () => s1.Release()));
  117. Observable.Subscribe(Enumerable_Finite(),
  118. Observer.Create<int>(x => results2.Add(x), ex => { throw ex; }, () => s2.Release()),
  119. DefaultScheduler.Instance);
  120. s1.WaitOne();
  121. s2.WaitOne();
  122. results1.AssertEqual(results2);
  123. }
  124. }
  125. #endif
  126. #endregion
  127. #region ToEnumerable
  128. [Fact]
  129. public void ToEnumerable_ArgumentChecking()
  130. {
  131. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ToEnumerable(default(IObservable<int>)));
  132. }
  133. [Fact]
  134. public void ToEnumerable_Generic()
  135. {
  136. Assert.True(Observable.Range(0, 10).ToEnumerable().SequenceEqual(Enumerable.Range(0, 10)));
  137. }
  138. [Fact]
  139. public void ToEnumerable_NonGeneric()
  140. {
  141. Assert.True(((IEnumerable)Observable.Range(0, 10).ToEnumerable()).Cast<int>().SequenceEqual(Enumerable.Range(0, 10)));
  142. }
  143. [Fact]
  144. public void ToEnumerable_ManualGeneric()
  145. {
  146. var res = Observable.Range(0, 10).ToEnumerable();
  147. var ieg = res.GetEnumerator();
  148. for (int i = 0; i < 10; i++)
  149. {
  150. Assert.True(ieg.MoveNext());
  151. Assert.Equal(i, ieg.Current);
  152. }
  153. Assert.False(ieg.MoveNext());
  154. }
  155. [Fact]
  156. public void ToEnumerable_ManualNonGeneric()
  157. {
  158. var res = (IEnumerable)Observable.Range(0, 10).ToEnumerable();
  159. var ien = res.GetEnumerator();
  160. for (int i = 0; i < 10; i++)
  161. {
  162. Assert.True(ien.MoveNext());
  163. Assert.Equal(i, ien.Current);
  164. }
  165. Assert.False(ien.MoveNext());
  166. }
  167. [Fact]
  168. public void ToEnumerable_ResetNotSupported()
  169. {
  170. ReactiveAssert.Throws<NotSupportedException>(() => Observable.Range(0, 10).ToEnumerable().GetEnumerator().Reset());
  171. }
  172. #endregion
  173. #region ToEvent
  174. [Fact]
  175. public void ToEvent_ArgumentChecks()
  176. {
  177. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ToEvent(default(IObservable<Unit>)));
  178. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ToEvent(default(IObservable<int>)));
  179. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ToEvent(default(IObservable<EventPattern<EventArgs>>)));
  180. }
  181. [Fact]
  182. public void ToEvent_Unit()
  183. {
  184. var src = new Subject<Unit>();
  185. var evt = src.ToEvent();
  186. var num = 0;
  187. var hnd = new Action<Unit>(_ =>
  188. {
  189. num++;
  190. });
  191. evt.OnNext += hnd;
  192. Assert.Equal(0, num);
  193. src.OnNext(new Unit());
  194. Assert.Equal(1, num);
  195. src.OnNext(new Unit());
  196. Assert.Equal(2, num);
  197. evt.OnNext -= hnd;
  198. src.OnNext(new Unit());
  199. Assert.Equal(2, num);
  200. }
  201. [Fact]
  202. public void ToEvent_NonUnit()
  203. {
  204. var src = new Subject<int>();
  205. var evt = src.ToEvent();
  206. var lst = new List<int>();
  207. var hnd = new Action<int>(e =>
  208. {
  209. lst.Add(e);
  210. });
  211. evt.OnNext += hnd;
  212. src.OnNext(1);
  213. src.OnNext(2);
  214. evt.OnNext -= hnd;
  215. src.OnNext(3);
  216. Assert.True(lst.SequenceEqual(new[] { 1, 2 }));
  217. }
  218. [Fact]
  219. public void ToEvent_FromEvent()
  220. {
  221. var src = new Subject<int>();
  222. var evt = src.ToEvent();
  223. var res = Observable.FromEvent<int>(h => evt.OnNext += h, h => evt.OnNext -= h);
  224. var lst = new List<int>();
  225. using (res.Subscribe(e => lst.Add(e), () => Assert.True(false)))
  226. {
  227. src.OnNext(1);
  228. src.OnNext(2);
  229. }
  230. src.OnNext(3);
  231. Assert.True(lst.SequenceEqual(new[] { 1, 2 }));
  232. }
  233. #endregion
  234. #region ToEventPattern
  235. [Fact]
  236. public void ToEventPattern_ArgumentChecking()
  237. {
  238. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ToEventPattern<EventArgs>(null));
  239. }
  240. [Fact]
  241. public void ToEventPattern_IEvent()
  242. {
  243. var src = new Subject<EventPattern<EventArgs<int>>>();
  244. var evt = src.ToEventPattern();
  245. var snd = new object();
  246. var lst = new List<int>();
  247. var hnd = new EventHandler<EventArgs<int>>((s, e) =>
  248. {
  249. Assert.Same(snd, s);
  250. lst.Add(e.Value);
  251. });
  252. evt.OnNext += hnd;
  253. src.OnNext(new EventPattern<EventArgs<int>>(snd, new EventArgs<int>(42)));
  254. src.OnNext(new EventPattern<EventArgs<int>>(snd, new EventArgs<int>(43)));
  255. evt.OnNext -= hnd;
  256. src.OnNext(new EventPattern<EventArgs<int>>(snd, new EventArgs<int>(44)));
  257. Assert.True(lst.SequenceEqual(new[] { 42, 43 }));
  258. }
  259. [Fact]
  260. public void ToEventPattern_IEvent_Fails()
  261. {
  262. var src = new Subject<EventPattern<EventArgs<int>>>();
  263. var evt = src.ToEventPattern();
  264. var snd = new object();
  265. var lst = new List<int>();
  266. var hnd = new EventHandler<EventArgs<int>>((s, e) =>
  267. {
  268. Assert.Same(snd, s);
  269. lst.Add(e.Value);
  270. });
  271. evt.OnNext += hnd;
  272. src.OnNext(new EventPattern<EventArgs<int>>(snd, new EventArgs<int>(42)));
  273. src.OnNext(new EventPattern<EventArgs<int>>(snd, new EventArgs<int>(43)));
  274. var ex = new Exception();
  275. ReactiveAssert.Throws(ex, () => src.OnError(ex));
  276. Assert.True(lst.SequenceEqual(new[] { 42, 43 }));
  277. }
  278. [Fact]
  279. public void ToEventPattern_IEvent_Completes()
  280. {
  281. var src = new Subject<EventPattern<EventArgs<int>>>();
  282. var evt = src.ToEventPattern();
  283. var snd = new object();
  284. var lst = new List<int>();
  285. var hnd = new EventHandler<EventArgs<int>>((s, e) =>
  286. {
  287. Assert.Same(snd, s);
  288. lst.Add(e.Value);
  289. });
  290. evt.OnNext += hnd;
  291. src.OnNext(new EventPattern<EventArgs<int>>(snd, new EventArgs<int>(42)));
  292. src.OnNext(new EventPattern<EventArgs<int>>(snd, new EventArgs<int>(43)));
  293. src.OnCompleted();
  294. Assert.True(lst.SequenceEqual(new[] { 42, 43 }));
  295. }
  296. class EventSrc
  297. {
  298. public event EventHandler<EventArgs<string>> E;
  299. public void On(string s)
  300. {
  301. var e = E;
  302. if (e != null)
  303. e(this, new EventArgs<string>(s));
  304. }
  305. }
  306. class EventArgs<T> : EventArgs
  307. {
  308. public T Value { get; private set; }
  309. public EventArgs(T value)
  310. {
  311. Value = value;
  312. }
  313. }
  314. [Fact]
  315. public void FromEventPattern_ToEventPattern()
  316. {
  317. var src = new EventSrc();
  318. var evt = Observable.FromEventPattern<EventHandler<EventArgs<string>>, EventArgs<string>>(h => new EventHandler<EventArgs<string>>(h), h => src.E += h, h => src.E -= h);
  319. var res = evt.ToEventPattern();
  320. var lst = new List<string>();
  321. var hnd = new EventHandler<EventArgs<string>>((s, e) =>
  322. {
  323. Assert.Same(src, s);
  324. lst.Add(e.Value);
  325. });
  326. src.On("bar");
  327. res.OnNext += hnd;
  328. src.On("foo");
  329. src.On("baz");
  330. res.OnNext -= hnd;
  331. src.On("qux");
  332. Assert.True(lst.SequenceEqual(new[] { "foo", "baz" }));
  333. }
  334. [Fact]
  335. public void ToEvent_DuplicateHandlers()
  336. {
  337. var src = new Subject<Unit>();
  338. var evt = src.ToEvent();
  339. var num = 0;
  340. var hnd = new Action<Unit>(e => num++);
  341. evt.OnNext += hnd;
  342. Assert.Equal(0, num);
  343. src.OnNext(new Unit());
  344. Assert.Equal(1, num);
  345. evt.OnNext += hnd;
  346. src.OnNext(new Unit());
  347. Assert.Equal(3, num);
  348. evt.OnNext -= hnd;
  349. src.OnNext(new Unit());
  350. Assert.Equal(4, num);
  351. evt.OnNext -= hnd;
  352. src.OnNext(new Unit());
  353. Assert.Equal(4, num);
  354. }
  355. [Fact]
  356. public void ToEvent_SourceCompletes()
  357. {
  358. var src = new Subject<Unit>();
  359. var evt = src.ToEvent();
  360. var num = 0;
  361. var hnd = new Action<Unit>(e => num++);
  362. evt.OnNext += hnd;
  363. Assert.Equal(0, num);
  364. src.OnNext(new Unit());
  365. Assert.Equal(1, num);
  366. src.OnNext(new Unit());
  367. Assert.Equal(2, num);
  368. src.OnCompleted();
  369. Assert.Equal(2, num);
  370. #if !SILVERLIGHT // FieldAccessException
  371. var tbl = GetSubscriptionTable(evt);
  372. Assert.True(tbl.Count == 0);
  373. #endif
  374. }
  375. [Fact]
  376. public void ToEvent_SourceFails()
  377. {
  378. var src = new Subject<Unit>();
  379. var evt = src.ToEvent();
  380. var num = 0;
  381. var hnd = new Action<Unit>(e => num++);
  382. evt.OnNext += hnd;
  383. Assert.Equal(0, num);
  384. src.OnNext(new Unit());
  385. Assert.Equal(1, num);
  386. src.OnNext(new Unit());
  387. Assert.Equal(2, num);
  388. var ex = new Exception();
  389. ReactiveAssert.Throws(ex, () => src.OnError(ex));
  390. #if !SILVERLIGHT // FieldAccessException
  391. var tbl = GetSubscriptionTable(evt);
  392. Assert.True(tbl.Count == 0);
  393. #endif
  394. }
  395. [Fact]
  396. public void ToEvent_DoneImmediately()
  397. {
  398. var src = Observable.Empty<Unit>();
  399. var evt = src.ToEvent();
  400. var num = 0;
  401. var hnd = new Action<Unit>(e => num++);
  402. for (int i = 0; i < 2; i++)
  403. {
  404. evt.OnNext += hnd;
  405. Assert.Equal(0, num);
  406. #if !SILVERLIGHT // FieldAccessException
  407. var tbl = GetSubscriptionTable(evt);
  408. Assert.True(tbl.Count == 0);
  409. #endif
  410. }
  411. }
  412. [Fact]
  413. public void ToEvent_UnbalancedHandlers()
  414. {
  415. var src = new Subject<Unit>();
  416. var evt = src.ToEvent();
  417. var num = 0;
  418. var hnd = new Action<Unit>(e => num++);
  419. evt.OnNext += hnd;
  420. Assert.Equal(0, num);
  421. evt.OnNext -= hnd;
  422. Assert.Equal(0, num);
  423. evt.OnNext -= hnd;
  424. Assert.Equal(0, num);
  425. evt.OnNext += hnd;
  426. Assert.Equal(0, num);
  427. src.OnNext(new Unit());
  428. Assert.Equal(1, num);
  429. src.OnNext(new Unit());
  430. Assert.Equal(2, num);
  431. evt.OnNext -= hnd;
  432. Assert.Equal(2, num);
  433. src.OnNext(new Unit());
  434. Assert.Equal(2, num);
  435. }
  436. private static Dictionary<Delegate, Stack<IDisposable>> GetSubscriptionTable(object evt)
  437. {
  438. return (Dictionary<Delegate, Stack<IDisposable>>)evt.GetType().GetField("_subscriptions", BindingFlags.NonPublic | BindingFlags.Instance).GetValue(evt);
  439. }
  440. [Fact]
  441. public void EventPattern_Equality()
  442. {
  443. var e1 = new EventPattern<string, EventArgs>("Bart", EventArgs.Empty);
  444. var e2 = new EventPattern<string, EventArgs>("Bart", EventArgs.Empty);
  445. Assert.True(e1.Equals(e1));
  446. Assert.True(e1.Equals(e2));
  447. Assert.True(e2.Equals(e1));
  448. Assert.True(e1 == e2);
  449. Assert.True(!(e1 != e2));
  450. Assert.True(e1.GetHashCode() == e2.GetHashCode());
  451. Assert.False(e1.Equals(null));
  452. Assert.False(e1.Equals("xy"));
  453. Assert.False(e1 == null);
  454. }
  455. [Fact]
  456. public void EventPattern_Inequality()
  457. {
  458. var a1 = new MyEventArgs();
  459. var a2 = new MyEventArgs();
  460. var e1 = new EventPattern<string, MyEventArgs>("Bart", a1);
  461. var e2 = new EventPattern<string, MyEventArgs>("John", a1);
  462. var e3 = new EventPattern<string, MyEventArgs>("Bart", a2);
  463. Assert.True(!e1.Equals(e2));
  464. Assert.True(!e2.Equals(e1));
  465. Assert.True(!(e1 == e2));
  466. Assert.True(e1 != e2);
  467. Assert.True(e1.GetHashCode() != e2.GetHashCode());
  468. Assert.True(!e1.Equals(e3));
  469. Assert.True(!e3.Equals(e1));
  470. Assert.True(!(e1 == e3));
  471. Assert.True(e1 != e3);
  472. Assert.True(e1.GetHashCode() != e3.GetHashCode());
  473. }
  474. class MyEventArgs : EventArgs
  475. {
  476. }
  477. #endregion
  478. #region + ToObservable +
  479. [Fact]
  480. public void EnumerableToObservable_ArgumentChecking()
  481. {
  482. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ToObservable((IEnumerable<int>)null, DummyScheduler.Instance));
  483. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ToObservable(DummyEnumerable<int>.Instance, (IScheduler)null));
  484. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ToObservable(DummyEnumerable<int>.Instance, DummyScheduler.Instance).Subscribe(null));
  485. ReactiveAssert.Throws<NullReferenceException>(() => Observable.ToObservable(NullEnumeratorEnumerable<int>.Instance, Scheduler.CurrentThread).Subscribe());
  486. }
  487. [Fact]
  488. public void EnumerableToObservable_Complete()
  489. {
  490. var scheduler = new TestScheduler();
  491. var e = new MockEnumerable<int>(scheduler,
  492. new[] { 3, 1, 2, 4 }
  493. );
  494. var results = scheduler.Start(() =>
  495. e.ToObservable(scheduler)
  496. );
  497. results.Messages.AssertEqual(
  498. OnNext(201, 3),
  499. OnNext(202, 1),
  500. OnNext(203, 2),
  501. OnNext(204, 4),
  502. OnCompleted<int>(205)
  503. );
  504. e.Subscriptions.AssertEqual(
  505. Subscribe(200, 205)
  506. );
  507. }
  508. [Fact]
  509. public void EnumerableToObservable_Dispose()
  510. {
  511. var scheduler = new TestScheduler();
  512. var e = new MockEnumerable<int>(scheduler,
  513. new[] { 3, 1, 2, 4 }
  514. );
  515. var results = scheduler.Start(() =>
  516. e.ToObservable(scheduler),
  517. 203
  518. );
  519. results.Messages.AssertEqual(
  520. OnNext(201, 3),
  521. OnNext(202, 1)
  522. );
  523. e.Subscriptions.AssertEqual(
  524. Subscribe(200, 203)
  525. );
  526. }
  527. [Fact]
  528. public void EnumerableToObservable_Error()
  529. {
  530. var scheduler = new TestScheduler();
  531. var ex = new Exception();
  532. var e = new MockEnumerable<int>(scheduler,
  533. EnumerableToObservable_Error_Core(ex)
  534. );
  535. var results = scheduler.Start(() =>
  536. e.ToObservable(scheduler)
  537. );
  538. results.Messages.AssertEqual(
  539. OnNext(201, 1),
  540. OnNext(202, 2),
  541. OnError<int>(203, ex)
  542. );
  543. e.Subscriptions.AssertEqual(
  544. Subscribe(200, 203)
  545. );
  546. }
  547. [Fact]
  548. public void EnumerableToObservable_Default_ArgumentChecking()
  549. {
  550. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ToObservable((IEnumerable<int>)null));
  551. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.ToObservable(DummyEnumerable<int>.Instance).Subscribe(null));
  552. }
  553. [Fact]
  554. public void EnumerableToObservable_Default()
  555. {
  556. var xs = new[] { 4, 3, 1, 5, 9, 2 };
  557. xs.ToObservable().AssertEqual(xs.ToObservable(DefaultScheduler.Instance));
  558. }
  559. #if !NO_PERF
  560. [Fact]
  561. public void EnumerableToObservable_LongRunning_Complete()
  562. {
  563. var start = default(ManualResetEvent);
  564. var end = default(ManualResetEvent);
  565. var scheduler = new TestLongRunningScheduler(x => start = x, x => end = x);
  566. var e = new[] { 3, 1, 2, 4 };
  567. var results = e.ToObservable(scheduler);
  568. var lst = new List<int>();
  569. results.Subscribe(lst.Add);
  570. start.WaitOne();
  571. end.WaitOne();
  572. Assert.True(e.SequenceEqual(lst));
  573. }
  574. [Fact]
  575. [MethodImpl(MethodImplOptions.NoOptimization)]
  576. public void EnumerableToObservable_LongRunning_Dispose()
  577. {
  578. var start = default(ManualResetEvent);
  579. var end = default(ManualResetEvent);
  580. var scheduler = new TestLongRunningScheduler(x => start = x, x => end = x);
  581. var e = Enumerable.Range(0, int.MaxValue);
  582. var results = e.ToObservable(scheduler);
  583. var lst = new List<int>();
  584. var d = results.Subscribe(lst.Add);
  585. start.WaitOne();
  586. while (lst.Count < 100)
  587. ;
  588. d.Dispose();
  589. end.WaitOne();
  590. Assert.True(e.Take(100).SequenceEqual(lst.Take(100)));
  591. }
  592. [Fact]
  593. public void EnumerableToObservable_LongRunning_Error()
  594. {
  595. var start = default(ManualResetEvent);
  596. var end = default(ManualResetEvent);
  597. var scheduler = new TestLongRunningScheduler(x => start = x, x => end = x);
  598. var ex = new Exception();
  599. var e = EnumerableToObservable_Error_Core(ex);
  600. var results = e.ToObservable(scheduler);
  601. var lst = new List<int>();
  602. var err = default(Exception);
  603. results.Subscribe(lst.Add, ex_ => err = ex_);
  604. start.WaitOne();
  605. end.WaitOne();
  606. Assert.True(new[] { 1, 2 }.SequenceEqual(lst));
  607. Assert.Same(ex, err);
  608. }
  609. #endif
  610. static IEnumerable<int> EnumerableToObservable_Error_Core(Exception ex)
  611. {
  612. yield return 1;
  613. yield return 2;
  614. throw ex;
  615. }
  616. [Fact]
  617. public void EnumerableToObservable_GetEnumeratorThrows()
  618. {
  619. var ex = new Exception();
  620. var scheduler = new TestScheduler();
  621. var xs = new RogueEnumerable<int>(ex);
  622. var res = scheduler.Start(() =>
  623. xs.ToObservable(scheduler)
  624. );
  625. res.Messages.AssertEqual(
  626. OnError<int>(200, ex)
  627. );
  628. }
  629. #endregion
  630. #region |> Helpers <|
  631. IEnumerable<int> Enumerable_Finite()
  632. {
  633. yield return 1;
  634. yield return 2;
  635. yield return 3;
  636. yield return 4;
  637. yield return 5;
  638. yield break;
  639. }
  640. IEnumerable<int> Enumerable_Infinite()
  641. {
  642. while (true)
  643. yield return 1;
  644. }
  645. IEnumerable<int> Enumerable_Error(Exception exception)
  646. {
  647. yield return 1;
  648. yield return 2;
  649. yield return 3;
  650. throw exception;
  651. }
  652. #endregion
  653. }
  654. }