GroupByTest.cs 119 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Reactive.Concurrency;
  8. using System.Reactive.Linq;
  9. using System.Text;
  10. using Microsoft.Reactive.Testing;
  11. using ReactiveTests.Dummies;
  12. using Xunit;
  13. namespace ReactiveTests.Tests
  14. {
  15. public class GroupByTest : ReactiveTest
  16. {
  17. #region + GroupBy +
  18. [Fact]
  19. public void GroupBy_ArgumentChecking()
  20. {
  21. ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).GroupBy(DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, EqualityComparer<int>.Default));
  22. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy(null, DummyFunc<int, int>.Instance, EqualityComparer<int>.Default));
  23. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, (Func<int, int>)null, EqualityComparer<int>.Default));
  24. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, null));
  25. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, EqualityComparer<int>.Default).Subscribe(null));
  26. }
  27. [Fact]
  28. public void GroupBy_KeyEle_ArgumentChecking()
  29. {
  30. ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).GroupBy(DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance));
  31. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy((Func<int, int>)null, DummyFunc<int, int>.Instance));
  32. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, (Func<int, int>)null));
  33. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance).Subscribe(null));
  34. }
  35. [Fact]
  36. public void GroupBy_KeyComparer_ArgumentChecking()
  37. {
  38. ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).GroupBy(DummyFunc<int, int>.Instance, EqualityComparer<int>.Default));
  39. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy(null, EqualityComparer<int>.Default));
  40. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, null));
  41. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, EqualityComparer<int>.Default).Subscribe(null));
  42. }
  43. [Fact]
  44. public void GroupBy_Key_ArgumentChecking()
  45. {
  46. ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).GroupBy(DummyFunc<int, int>.Instance));
  47. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy((Func<int, int>)null));
  48. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance).Subscribe(null));
  49. }
  50. [Fact]
  51. public void GroupBy_WithKeyComparer()
  52. {
  53. var scheduler = new TestScheduler();
  54. var keyInvoked = 0;
  55. var xs = scheduler.CreateHotObservable(
  56. OnNext(90, "error"),
  57. OnNext(110, "error"),
  58. OnNext(130, "error"),
  59. OnNext(220, " foo"),
  60. OnNext(240, " FoO "),
  61. OnNext(270, "baR "),
  62. OnNext(310, "foO "),
  63. OnNext(350, " Baz "),
  64. OnNext(360, " qux "),
  65. OnNext(390, " bar"),
  66. OnNext(420, " BAR "),
  67. OnNext(470, "FOO "),
  68. OnNext(480, "baz "),
  69. OnNext(510, " bAZ "),
  70. OnNext(530, " fOo "),
  71. OnCompleted<string>(570),
  72. OnNext(580, "error"),
  73. OnCompleted<string>(600),
  74. OnError<string>(650, new Exception())
  75. );
  76. var comparer = new GroupByComparer(scheduler);
  77. var res = scheduler.Start(() =>
  78. xs.GroupBy(x =>
  79. {
  80. keyInvoked++;
  81. return x.Trim();
  82. }, comparer).Select(g => g.Key)
  83. );
  84. res.Messages.AssertEqual(
  85. OnNext(220, "foo"),
  86. OnNext(270, "baR"),
  87. OnNext(350, "Baz"),
  88. OnNext(360, "qux"),
  89. OnCompleted<string>(570)
  90. );
  91. xs.Subscriptions.AssertEqual(
  92. Subscribe(200, 570)
  93. );
  94. Assert.Equal(12, keyInvoked);
  95. }
  96. [Fact]
  97. public void GroupBy_Outer_Complete()
  98. {
  99. var scheduler = new TestScheduler();
  100. var keyInvoked = 0;
  101. var eleInvoked = 0;
  102. var xs = scheduler.CreateHotObservable(
  103. OnNext(90, "error"),
  104. OnNext(110, "error"),
  105. OnNext(130, "error"),
  106. OnNext(220, " foo"),
  107. OnNext(240, " FoO "),
  108. OnNext(270, "baR "),
  109. OnNext(310, "foO "),
  110. OnNext(350, " Baz "),
  111. OnNext(360, " qux "),
  112. OnNext(390, " bar"),
  113. OnNext(420, " BAR "),
  114. OnNext(470, "FOO "),
  115. OnNext(480, "baz "),
  116. OnNext(510, " bAZ "),
  117. OnNext(530, " fOo "),
  118. OnCompleted<string>(570),
  119. OnNext(580, "error"),
  120. OnCompleted<string>(600),
  121. OnError<string>(650, new Exception())
  122. );
  123. var comparer = new GroupByComparer(scheduler);
  124. var res = scheduler.Start(() =>
  125. xs.GroupBy(
  126. x =>
  127. {
  128. keyInvoked++;
  129. return x.Trim();
  130. },
  131. x =>
  132. {
  133. eleInvoked++;
  134. return Reverse(x);
  135. },
  136. comparer
  137. ).Select(g => g.Key)
  138. );
  139. res.Messages.AssertEqual(
  140. OnNext(220, "foo"),
  141. OnNext(270, "baR"),
  142. OnNext(350, "Baz"),
  143. OnNext(360, "qux"),
  144. OnCompleted<string>(570)
  145. );
  146. xs.Subscriptions.AssertEqual(
  147. Subscribe(200, 570)
  148. );
  149. Assert.Equal(12, keyInvoked);
  150. Assert.Equal(12, eleInvoked);
  151. }
  152. [Fact]
  153. public void GroupBy_Outer_Error()
  154. {
  155. var scheduler = new TestScheduler();
  156. var keyInvoked = 0;
  157. var eleInvoked = 0;
  158. var ex = new Exception();
  159. var xs = scheduler.CreateHotObservable(
  160. OnNext(90, "error"),
  161. OnNext(110, "error"),
  162. OnNext(130, "error"),
  163. OnNext(220, " foo"),
  164. OnNext(240, " FoO "),
  165. OnNext(270, "baR "),
  166. OnNext(310, "foO "),
  167. OnNext(350, " Baz "),
  168. OnNext(360, " qux "),
  169. OnNext(390, " bar"),
  170. OnNext(420, " BAR "),
  171. OnNext(470, "FOO "),
  172. OnNext(480, "baz "),
  173. OnNext(510, " bAZ "),
  174. OnNext(530, " fOo "),
  175. OnError<string>(570, ex),
  176. OnNext(580, "error"),
  177. OnCompleted<string>(600),
  178. OnError<string>(650, new Exception())
  179. );
  180. var comparer = new GroupByComparer(scheduler);
  181. var res = scheduler.Start(() =>
  182. xs.GroupBy(
  183. x =>
  184. {
  185. keyInvoked++;
  186. return x.Trim();
  187. },
  188. x =>
  189. {
  190. eleInvoked++;
  191. return Reverse(x);
  192. },
  193. comparer
  194. ).Select(g => g.Key)
  195. );
  196. res.Messages.AssertEqual(
  197. OnNext(220, "foo"),
  198. OnNext(270, "baR"),
  199. OnNext(350, "Baz"),
  200. OnNext(360, "qux"),
  201. OnError<string>(570, ex)
  202. );
  203. xs.Subscriptions.AssertEqual(
  204. Subscribe(200, 570)
  205. );
  206. Assert.Equal(12, keyInvoked);
  207. Assert.Equal(12, eleInvoked);
  208. }
  209. [Fact]
  210. public void GroupBy_Outer_Dispose()
  211. {
  212. var scheduler = new TestScheduler();
  213. var keyInvoked = 0;
  214. var eleInvoked = 0;
  215. var xs = scheduler.CreateHotObservable(
  216. OnNext(90, "error"),
  217. OnNext(110, "error"),
  218. OnNext(130, "error"),
  219. OnNext(220, " foo"),
  220. OnNext(240, " FoO "),
  221. OnNext(270, "baR "),
  222. OnNext(310, "foO "),
  223. OnNext(350, " Baz "),
  224. OnNext(360, " qux "),
  225. OnNext(390, " bar"),
  226. OnNext(420, " BAR "),
  227. OnNext(470, "FOO "),
  228. OnNext(480, "baz "),
  229. OnNext(510, " bAZ "),
  230. OnNext(530, " fOo "),
  231. OnCompleted<string>(570),
  232. OnNext(580, "error"),
  233. OnCompleted<string>(600),
  234. OnError<string>(650, new Exception())
  235. );
  236. var comparer = new GroupByComparer(scheduler);
  237. var res = scheduler.Start(() =>
  238. xs.GroupBy(
  239. x =>
  240. {
  241. keyInvoked++;
  242. return x.Trim();
  243. }, x =>
  244. {
  245. eleInvoked++;
  246. return Reverse(x);
  247. }, comparer
  248. ).Select(g => g.Key),
  249. 355
  250. );
  251. res.Messages.AssertEqual(
  252. OnNext(220, "foo"),
  253. OnNext(270, "baR"),
  254. OnNext(350, "Baz")
  255. );
  256. xs.Subscriptions.AssertEqual(
  257. Subscribe(200, 355)
  258. );
  259. Assert.Equal(5, keyInvoked);
  260. Assert.Equal(5, eleInvoked);
  261. }
  262. [Fact]
  263. public void GroupBy_Outer_KeyThrow()
  264. {
  265. var scheduler = new TestScheduler();
  266. var keyInvoked = 0;
  267. var eleInvoked = 0;
  268. var ex = new Exception();
  269. var xs = scheduler.CreateHotObservable(
  270. OnNext(90, "error"),
  271. OnNext(110, "error"),
  272. OnNext(130, "error"),
  273. OnNext(220, " foo"),
  274. OnNext(240, " FoO "),
  275. OnNext(270, "baR "),
  276. OnNext(310, "foO "),
  277. OnNext(350, " Baz "),
  278. OnNext(360, " qux "),
  279. OnNext(390, " bar"),
  280. OnNext(420, " BAR "),
  281. OnNext(470, "FOO "),
  282. OnNext(480, "baz "),
  283. OnNext(510, " bAZ "),
  284. OnNext(530, " fOo "),
  285. OnCompleted<string>(570),
  286. OnNext(580, "error"),
  287. OnCompleted<string>(600),
  288. OnError<string>(650, new Exception())
  289. );
  290. var comparer = new GroupByComparer(scheduler);
  291. var res = scheduler.Start(() =>
  292. xs.GroupBy(
  293. x =>
  294. {
  295. keyInvoked++;
  296. if (keyInvoked == 10)
  297. {
  298. throw ex;
  299. }
  300. return x.Trim();
  301. },
  302. x =>
  303. {
  304. eleInvoked++;
  305. return Reverse(x);
  306. },
  307. comparer
  308. ).Select(g => g.Key)
  309. );
  310. res.Messages.AssertEqual(
  311. OnNext(220, "foo"),
  312. OnNext(270, "baR"),
  313. OnNext(350, "Baz"),
  314. OnNext(360, "qux"),
  315. OnError<string>(480, ex)
  316. );
  317. xs.Subscriptions.AssertEqual(
  318. Subscribe(200, 480)
  319. );
  320. Assert.Equal(10, keyInvoked);
  321. Assert.Equal(9, eleInvoked);
  322. }
  323. [Fact]
  324. public void GroupBy_Outer_EleThrow()
  325. {
  326. var scheduler = new TestScheduler();
  327. var keyInvoked = 0;
  328. var eleInvoked = 0;
  329. var ex = new Exception();
  330. var xs = scheduler.CreateHotObservable(
  331. OnNext(90, "error"),
  332. OnNext(110, "error"),
  333. OnNext(130, "error"),
  334. OnNext(220, " foo"),
  335. OnNext(240, " FoO "),
  336. OnNext(270, "baR "),
  337. OnNext(310, "foO "),
  338. OnNext(350, " Baz "),
  339. OnNext(360, " qux "),
  340. OnNext(390, " bar"),
  341. OnNext(420, " BAR "),
  342. OnNext(470, "FOO "),
  343. OnNext(480, "baz "),
  344. OnNext(510, " bAZ "),
  345. OnNext(530, " fOo "),
  346. OnCompleted<string>(570),
  347. OnNext(580, "error"),
  348. OnCompleted<string>(600),
  349. OnError<string>(650, new Exception())
  350. );
  351. var comparer = new GroupByComparer(scheduler);
  352. var res = scheduler.Start(() =>
  353. xs.GroupBy(
  354. x =>
  355. {
  356. keyInvoked++;
  357. return x.Trim();
  358. },
  359. x =>
  360. {
  361. eleInvoked++;
  362. if (eleInvoked == 10)
  363. {
  364. throw ex;
  365. }
  366. return Reverse(x);
  367. },
  368. comparer
  369. ).Select(g => g.Key)
  370. );
  371. res.Messages.AssertEqual(
  372. OnNext(220, "foo"),
  373. OnNext(270, "baR"),
  374. OnNext(350, "Baz"),
  375. OnNext(360, "qux"),
  376. OnError<string>(480, ex)
  377. );
  378. xs.Subscriptions.AssertEqual(
  379. Subscribe(200, 480)
  380. );
  381. Assert.Equal(10, keyInvoked);
  382. Assert.Equal(10, eleInvoked);
  383. }
  384. [Fact]
  385. public void GroupBy_Outer_ComparerEqualsThrow()
  386. {
  387. var scheduler = new TestScheduler();
  388. var keyInvoked = 0;
  389. var eleInvoked = 0;
  390. var xs = scheduler.CreateHotObservable(
  391. OnNext(90, "error"),
  392. OnNext(110, "error"),
  393. OnNext(130, "error"),
  394. OnNext(220, " foo"),
  395. OnNext(240, " FoO "),
  396. OnNext(270, "baR "),
  397. OnNext(310, "foO "),
  398. OnNext(350, " Baz "),
  399. OnNext(360, " qux "),
  400. OnNext(390, " bar"),
  401. OnNext(420, " BAR "),
  402. OnNext(470, "FOO "),
  403. OnNext(480, "baz "),
  404. OnNext(510, " bAZ "),
  405. OnNext(530, " fOo "),
  406. OnCompleted<string>(570),
  407. OnNext(580, "error"),
  408. OnCompleted<string>(600),
  409. OnError<string>(650, new Exception())
  410. );
  411. var comparer = new GroupByComparer(scheduler, 250, ushort.MaxValue);
  412. var res = scheduler.Start(() =>
  413. xs.GroupBy(
  414. x =>
  415. {
  416. keyInvoked++;
  417. return x.Trim();
  418. },
  419. x =>
  420. {
  421. eleInvoked++;
  422. return Reverse(x);
  423. },
  424. comparer
  425. ).Select(g => g.Key)
  426. );
  427. res.Messages.AssertEqual(
  428. OnNext(220, "foo"),
  429. OnNext(270, "baR"),
  430. OnError<string>(310, comparer.EqualsException)
  431. );
  432. xs.Subscriptions.AssertEqual(
  433. Subscribe(200, 310)
  434. );
  435. Assert.Equal(4, keyInvoked);
  436. Assert.Equal(3, eleInvoked);
  437. }
  438. [Fact]
  439. public void GroupBy_Outer_ComparerGetHashCodeThrow()
  440. {
  441. var scheduler = new TestScheduler();
  442. var keyInvoked = 0;
  443. var eleInvoked = 0;
  444. var xs = scheduler.CreateHotObservable(
  445. OnNext(90, "error"),
  446. OnNext(110, "error"),
  447. OnNext(130, "error"),
  448. OnNext(220, " foo"),
  449. OnNext(240, " FoO "),
  450. OnNext(270, "baR "),
  451. OnNext(310, "foO "),
  452. OnNext(350, " Baz "),
  453. OnNext(360, " qux "),
  454. OnNext(390, " bar"),
  455. OnNext(420, " BAR "),
  456. OnNext(470, "FOO "),
  457. OnNext(480, "baz "),
  458. OnNext(510, " bAZ "),
  459. OnNext(530, " fOo "),
  460. OnCompleted<string>(570),
  461. OnNext(580, "error"),
  462. OnCompleted<string>(600),
  463. OnError<string>(650, new Exception())
  464. );
  465. var comparer = new GroupByComparer(scheduler, ushort.MaxValue, 410);
  466. var res = scheduler.Start(() =>
  467. xs.GroupBy(
  468. x =>
  469. {
  470. keyInvoked++;
  471. return x.Trim();
  472. },
  473. x =>
  474. {
  475. eleInvoked++;
  476. return Reverse(x);
  477. },
  478. comparer
  479. ).Select(g => g.Key)
  480. );
  481. res.Messages.AssertEqual(
  482. OnNext(220, "foo"),
  483. OnNext(270, "baR"),
  484. OnNext(350, "Baz"),
  485. OnNext(360, "qux"),
  486. OnError<string>(420, comparer.HashCodeException)
  487. );
  488. xs.Subscriptions.AssertEqual(
  489. Subscribe(200, 420)
  490. );
  491. Assert.Equal(8, keyInvoked);
  492. Assert.Equal(7, eleInvoked);
  493. }
  494. [Fact]
  495. public void GroupBy_Inner_Complete()
  496. {
  497. var scheduler = new TestScheduler();
  498. var xs = scheduler.CreateHotObservable(
  499. OnNext(90, "error"),
  500. OnNext(110, "error"),
  501. OnNext(130, "error"),
  502. OnNext(220, " foo"),
  503. OnNext(240, " FoO "),
  504. OnNext(270, "baR "),
  505. OnNext(310, "foO "),
  506. OnNext(350, " Baz "),
  507. OnNext(360, " qux "),
  508. OnNext(390, " bar"),
  509. OnNext(420, " BAR "),
  510. OnNext(470, "FOO "),
  511. OnNext(480, "baz "),
  512. OnNext(510, " bAZ "),
  513. OnNext(530, " fOo "),
  514. OnCompleted<string>(570),
  515. OnNext(580, "error"),
  516. OnCompleted<string>(600),
  517. OnError<string>(650, new Exception())
  518. );
  519. var comparer = new GroupByComparer(scheduler);
  520. var outer = default(IObservable<IGroupedObservable<string, string>>);
  521. var outerSubscription = default(IDisposable);
  522. var inners = new Dictionary<string, IObservable<string>>();
  523. var innerSubscriptions = new Dictionary<string, IDisposable>();
  524. var res = new Dictionary<string, ITestableObserver<string>>();
  525. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x => Reverse(x), comparer));
  526. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  527. {
  528. var result = scheduler.CreateObserver<string>();
  529. inners[group.Key] = group;
  530. res[group.Key] = result;
  531. scheduler.ScheduleRelative(100, () => innerSubscriptions[group.Key] = group.Subscribe(result));
  532. }));
  533. scheduler.ScheduleAbsolute(Disposed, () =>
  534. {
  535. outerSubscription.Dispose();
  536. foreach (var d in innerSubscriptions.Values)
  537. {
  538. d.Dispose();
  539. }
  540. });
  541. scheduler.Start();
  542. Assert.Equal(4, inners.Count);
  543. res["foo"].Messages.AssertEqual(
  544. OnNext(470, " OOF"),
  545. OnNext(530, " oOf "),
  546. OnCompleted<string>(570)
  547. );
  548. res["baR"].Messages.AssertEqual(
  549. OnNext(390, "rab "),
  550. OnNext(420, " RAB "),
  551. OnCompleted<string>(570)
  552. );
  553. res["Baz"].Messages.AssertEqual(
  554. OnNext(480, " zab"),
  555. OnNext(510, " ZAb "),
  556. OnCompleted<string>(570)
  557. );
  558. res["qux"].Messages.AssertEqual(
  559. OnCompleted<string>(570)
  560. );
  561. xs.Subscriptions.AssertEqual(
  562. Subscribe(200, 570)
  563. );
  564. }
  565. [Fact]
  566. public void GroupBy_Inner_Complete_All()
  567. {
  568. var scheduler = new TestScheduler();
  569. var xs = scheduler.CreateHotObservable(
  570. OnNext(90, "error"),
  571. OnNext(110, "error"),
  572. OnNext(130, "error"),
  573. OnNext(220, " foo"),
  574. OnNext(240, " FoO "),
  575. OnNext(270, "baR "),
  576. OnNext(310, "foO "),
  577. OnNext(350, " Baz "),
  578. OnNext(360, " qux "),
  579. OnNext(390, " bar"),
  580. OnNext(420, " BAR "),
  581. OnNext(470, "FOO "),
  582. OnNext(480, "baz "),
  583. OnNext(510, " bAZ "),
  584. OnNext(530, " fOo "),
  585. OnCompleted<string>(570),
  586. OnNext(580, "error"),
  587. OnCompleted<string>(600),
  588. OnError<string>(650, new Exception())
  589. );
  590. var comparer = new GroupByComparer(scheduler);
  591. var outer = default(IObservable<IGroupedObservable<string, string>>);
  592. var outerSubscription = default(IDisposable);
  593. var inners = new Dictionary<string, IObservable<string>>();
  594. var innerSubscriptions = new Dictionary<string, IDisposable>();
  595. var res = new Dictionary<string, ITestableObserver<string>>();
  596. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x => Reverse(x), comparer));
  597. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  598. {
  599. var result = scheduler.CreateObserver<string>();
  600. inners[group.Key] = group;
  601. res[group.Key] = result;
  602. innerSubscriptions[group.Key] = group.Subscribe(result);
  603. }));
  604. scheduler.ScheduleAbsolute(Disposed, () =>
  605. {
  606. outerSubscription.Dispose();
  607. foreach (var d in innerSubscriptions.Values)
  608. {
  609. d.Dispose();
  610. }
  611. });
  612. scheduler.Start();
  613. Assert.Equal(4, inners.Count);
  614. res["foo"].Messages.AssertEqual(
  615. OnNext(220, "oof "),
  616. OnNext(240, " OoF "),
  617. OnNext(310, " Oof"),
  618. OnNext(470, " OOF"),
  619. OnNext(530, " oOf "),
  620. OnCompleted<string>(570)
  621. );
  622. res["baR"].Messages.AssertEqual(
  623. OnNext(270, " Rab"),
  624. OnNext(390, "rab "),
  625. OnNext(420, " RAB "),
  626. OnCompleted<string>(570)
  627. );
  628. res["Baz"].Messages.AssertEqual(
  629. OnNext(350, " zaB "),
  630. OnNext(480, " zab"),
  631. OnNext(510, " ZAb "),
  632. OnCompleted<string>(570)
  633. );
  634. res["qux"].Messages.AssertEqual(
  635. OnNext(360, " xuq "),
  636. OnCompleted<string>(570)
  637. );
  638. xs.Subscriptions.AssertEqual(
  639. Subscribe(200, 570)
  640. );
  641. }
  642. [Fact]
  643. public void GroupBy_Inner_Error()
  644. {
  645. var scheduler = new TestScheduler();
  646. var ex1 = new Exception();
  647. var xs = scheduler.CreateHotObservable(
  648. OnNext(90, "error"),
  649. OnNext(110, "error"),
  650. OnNext(130, "error"),
  651. OnNext(220, " foo"),
  652. OnNext(240, " FoO "),
  653. OnNext(270, "baR "),
  654. OnNext(310, "foO "),
  655. OnNext(350, " Baz "),
  656. OnNext(360, " qux "),
  657. OnNext(390, " bar"),
  658. OnNext(420, " BAR "),
  659. OnNext(470, "FOO "),
  660. OnNext(480, "baz "),
  661. OnNext(510, " bAZ "),
  662. OnNext(530, " fOo "),
  663. OnError<string>(570, ex1),
  664. OnNext(580, "error"),
  665. OnCompleted<string>(600),
  666. OnError<string>(650, new Exception())
  667. );
  668. var comparer = new GroupByComparer(scheduler);
  669. var outer = default(IObservable<IGroupedObservable<string, string>>);
  670. var outerSubscription = default(IDisposable);
  671. var inners = new Dictionary<string, IObservable<string>>();
  672. var innerSubscriptions = new Dictionary<string, IDisposable>();
  673. var res = new Dictionary<string, ITestableObserver<string>>();
  674. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x => Reverse(x), comparer));
  675. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  676. {
  677. var result = scheduler.CreateObserver<string>();
  678. inners[group.Key] = group;
  679. res[group.Key] = result;
  680. scheduler.ScheduleRelative(100, () => innerSubscriptions[group.Key] = group.Subscribe(result));
  681. }, ex => { }));
  682. scheduler.ScheduleAbsolute(Disposed, () =>
  683. {
  684. outerSubscription.Dispose();
  685. foreach (var d in innerSubscriptions.Values)
  686. {
  687. d.Dispose();
  688. }
  689. });
  690. scheduler.Start();
  691. Assert.Equal(4, inners.Count);
  692. res["foo"].Messages.AssertEqual(
  693. OnNext(470, " OOF"),
  694. OnNext(530, " oOf "),
  695. OnError<string>(570, ex1)
  696. );
  697. res["baR"].Messages.AssertEqual(
  698. OnNext(390, "rab "),
  699. OnNext(420, " RAB "),
  700. OnError<string>(570, ex1)
  701. );
  702. res["Baz"].Messages.AssertEqual(
  703. OnNext(480, " zab"),
  704. OnNext(510, " ZAb "),
  705. OnError<string>(570, ex1)
  706. );
  707. res["qux"].Messages.AssertEqual(
  708. OnError<string>(570, ex1)
  709. );
  710. xs.Subscriptions.AssertEqual(
  711. Subscribe(200, 570)
  712. );
  713. }
  714. [Fact]
  715. public void GroupBy_Inner_Dispose()
  716. {
  717. var scheduler = new TestScheduler();
  718. var xs = scheduler.CreateHotObservable(
  719. OnNext(90, "error"),
  720. OnNext(110, "error"),
  721. OnNext(130, "error"),
  722. OnNext(220, " foo"),
  723. OnNext(240, " FoO "),
  724. OnNext(270, "baR "),
  725. OnNext(310, "foO "),
  726. OnNext(350, " Baz "),
  727. OnNext(360, " qux "),
  728. OnNext(390, " bar"),
  729. OnNext(420, " BAR "),
  730. OnNext(470, "FOO "),
  731. OnNext(480, "baz "),
  732. OnNext(510, " bAZ "),
  733. OnNext(530, " fOo "),
  734. OnCompleted<string>(570),
  735. OnNext(580, "error"),
  736. OnCompleted<string>(600),
  737. OnError<string>(650, new Exception())
  738. );
  739. var comparer = new GroupByComparer(scheduler);
  740. var outer = default(IObservable<IGroupedObservable<string, string>>);
  741. var outerSubscription = default(IDisposable);
  742. var inners = new Dictionary<string, IObservable<string>>();
  743. var innerSubscriptions = new Dictionary<string, IDisposable>();
  744. var res = new Dictionary<string, ITestableObserver<string>>();
  745. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x => Reverse(x), comparer));
  746. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  747. {
  748. var result = scheduler.CreateObserver<string>();
  749. inners[group.Key] = group;
  750. res[group.Key] = result;
  751. innerSubscriptions[group.Key] = group.Subscribe(result);
  752. }));
  753. scheduler.ScheduleAbsolute(400, () =>
  754. {
  755. outerSubscription.Dispose();
  756. foreach (var d in innerSubscriptions.Values)
  757. {
  758. d.Dispose();
  759. }
  760. });
  761. scheduler.Start();
  762. Assert.Equal(4, inners.Count);
  763. res["foo"].Messages.AssertEqual(
  764. OnNext(220, "oof "),
  765. OnNext(240, " OoF "),
  766. OnNext(310, " Oof")
  767. );
  768. res["baR"].Messages.AssertEqual(
  769. OnNext(270, " Rab"),
  770. OnNext(390, "rab ")
  771. );
  772. res["Baz"].Messages.AssertEqual(
  773. OnNext(350, " zaB ")
  774. );
  775. res["qux"].Messages.AssertEqual(
  776. OnNext(360, " xuq ")
  777. );
  778. xs.Subscriptions.AssertEqual(
  779. Subscribe(200, 400)
  780. );
  781. }
  782. [Fact]
  783. public void GroupBy_Inner_KeyThrow()
  784. {
  785. var scheduler = new TestScheduler();
  786. var ex = new Exception();
  787. var xs = scheduler.CreateHotObservable(
  788. OnNext(90, "error"),
  789. OnNext(110, "error"),
  790. OnNext(130, "error"),
  791. OnNext(220, " foo"),
  792. OnNext(240, " FoO "),
  793. OnNext(270, "baR "),
  794. OnNext(310, "foO "),
  795. OnNext(350, " Baz "),
  796. OnNext(360, " qux "),
  797. OnNext(390, " bar"),
  798. OnNext(420, " BAR "),
  799. OnNext(470, "FOO "),
  800. OnNext(480, "baz "),
  801. OnNext(510, " bAZ "),
  802. OnNext(530, " fOo "),
  803. OnCompleted<string>(570),
  804. OnNext(580, "error"),
  805. OnCompleted<string>(600),
  806. OnError<string>(650, new Exception())
  807. );
  808. var comparer = new GroupByComparer(scheduler);
  809. var outer = default(IObservable<IGroupedObservable<string, string>>);
  810. var outerSubscription = default(IDisposable);
  811. var inners = new Dictionary<string, IObservable<string>>();
  812. var innerSubscriptions = new Dictionary<string, IDisposable>();
  813. var res = new Dictionary<string, ITestableObserver<string>>();
  814. var keyInvoked = 0;
  815. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x =>
  816. {
  817. keyInvoked++;
  818. if (keyInvoked == 6)
  819. {
  820. throw ex;
  821. }
  822. return x.Trim();
  823. }, x => Reverse(x), comparer));
  824. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  825. {
  826. var result = scheduler.CreateObserver<string>();
  827. inners[group.Key] = group;
  828. res[group.Key] = result;
  829. innerSubscriptions[group.Key] = group.Subscribe(result);
  830. }, _ => { }));
  831. scheduler.ScheduleAbsolute(Disposed, () =>
  832. {
  833. outerSubscription.Dispose();
  834. foreach (var d in innerSubscriptions.Values)
  835. {
  836. d.Dispose();
  837. }
  838. });
  839. scheduler.Start();
  840. Assert.Equal(3, inners.Count);
  841. res["foo"].Messages.AssertEqual(
  842. OnNext(220, "oof "),
  843. OnNext(240, " OoF "),
  844. OnNext(310, " Oof"),
  845. OnError<string>(360, ex)
  846. );
  847. res["baR"].Messages.AssertEqual(
  848. OnNext(270, " Rab"),
  849. OnError<string>(360, ex)
  850. );
  851. res["Baz"].Messages.AssertEqual(
  852. OnNext(350, " zaB "),
  853. OnError<string>(360, ex)
  854. );
  855. xs.Subscriptions.AssertEqual(
  856. Subscribe(200, 360)
  857. );
  858. }
  859. [Fact]
  860. public void GroupBy_Inner_EleThrow()
  861. {
  862. var scheduler = new TestScheduler();
  863. var ex = new Exception();
  864. var xs = scheduler.CreateHotObservable(
  865. OnNext(90, "error"),
  866. OnNext(110, "error"),
  867. OnNext(130, "error"),
  868. OnNext(220, " foo"),
  869. OnNext(240, " FoO "),
  870. OnNext(270, "baR "),
  871. OnNext(310, "foO "),
  872. OnNext(350, " Baz "),
  873. OnNext(360, " qux "),
  874. OnNext(390, " bar"),
  875. OnNext(420, " BAR "),
  876. OnNext(470, "FOO "),
  877. OnNext(480, "baz "),
  878. OnNext(510, " bAZ "),
  879. OnNext(530, " fOo "),
  880. OnCompleted<string>(570),
  881. OnNext(580, "error"),
  882. OnCompleted<string>(600),
  883. OnError<string>(650, new Exception())
  884. );
  885. var comparer = new GroupByComparer(scheduler);
  886. var outer = default(IObservable<IGroupedObservable<string, string>>);
  887. var outerSubscription = default(IDisposable);
  888. var inners = new Dictionary<string, IObservable<string>>();
  889. var innerSubscriptions = new Dictionary<string, IDisposable>();
  890. var res = new Dictionary<string, ITestableObserver<string>>();
  891. var eleInvoked = 0;
  892. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x =>
  893. {
  894. eleInvoked++;
  895. if (eleInvoked == 6)
  896. {
  897. throw ex;
  898. }
  899. return Reverse(x);
  900. }, comparer));
  901. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  902. {
  903. var result = scheduler.CreateObserver<string>();
  904. inners[group.Key] = group;
  905. res[group.Key] = result;
  906. innerSubscriptions[group.Key] = group.Subscribe(result);
  907. }, _ => { }));
  908. scheduler.ScheduleAbsolute(Disposed, () =>
  909. {
  910. outerSubscription.Dispose();
  911. foreach (var d in innerSubscriptions.Values)
  912. {
  913. d.Dispose();
  914. }
  915. });
  916. scheduler.Start();
  917. Assert.Equal(4, inners.Count);
  918. res["foo"].Messages.AssertEqual(
  919. OnNext(220, "oof "),
  920. OnNext(240, " OoF "),
  921. OnNext(310, " Oof"),
  922. OnError<string>(360, ex)
  923. );
  924. res["baR"].Messages.AssertEqual(
  925. OnNext(270, " Rab"),
  926. OnError<string>(360, ex)
  927. );
  928. res["Baz"].Messages.AssertEqual(
  929. OnNext(350, " zaB "),
  930. OnError<string>(360, ex)
  931. );
  932. res["qux"].Messages.AssertEqual(
  933. OnError<string>(360, ex)
  934. );
  935. xs.Subscriptions.AssertEqual(
  936. Subscribe(200, 360)
  937. );
  938. }
  939. [Fact]
  940. public void GroupBy_Inner_Comparer_EqualsThrow()
  941. {
  942. var scheduler = new TestScheduler();
  943. var xs = scheduler.CreateHotObservable(
  944. OnNext(90, "error"),
  945. OnNext(110, "error"),
  946. OnNext(130, "error"),
  947. OnNext(220, " foo"),
  948. OnNext(240, " FoO "),
  949. OnNext(270, "baR "),
  950. OnNext(310, "foO "),
  951. OnNext(350, " Baz "),
  952. OnNext(360, " qux "),
  953. OnNext(390, " bar"),
  954. OnNext(420, " BAR "),
  955. OnNext(470, "FOO "),
  956. OnNext(480, "baz "),
  957. OnNext(510, " bAZ "),
  958. OnNext(530, " fOo "),
  959. OnCompleted<string>(570),
  960. OnNext(580, "error"),
  961. OnCompleted<string>(600),
  962. OnError<string>(650, new Exception())
  963. );
  964. var comparer = new GroupByComparer(scheduler, 400, ushort.MaxValue);
  965. var outer = default(IObservable<IGroupedObservable<string, string>>);
  966. var outerSubscription = default(IDisposable);
  967. var inners = new Dictionary<string, IObservable<string>>();
  968. var innerSubscriptions = new Dictionary<string, IDisposable>();
  969. var res = new Dictionary<string, ITestableObserver<string>>();
  970. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x => Reverse(x), comparer));
  971. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  972. {
  973. var result = scheduler.CreateObserver<string>();
  974. inners[group.Key] = group;
  975. res[group.Key] = result;
  976. innerSubscriptions[group.Key] = group.Subscribe(result);
  977. }, _ => { }));
  978. scheduler.ScheduleAbsolute(Disposed, () =>
  979. {
  980. outerSubscription.Dispose();
  981. foreach (var d in innerSubscriptions.Values)
  982. {
  983. d.Dispose();
  984. }
  985. });
  986. scheduler.Start();
  987. Assert.Equal(4, inners.Count);
  988. res["foo"].Messages.AssertEqual(
  989. OnNext(220, "oof "),
  990. OnNext(240, " OoF "),
  991. OnNext(310, " Oof"),
  992. OnError<string>(420, comparer.EqualsException)
  993. );
  994. res["baR"].Messages.AssertEqual(
  995. OnNext(270, " Rab"),
  996. OnNext(390, "rab "),
  997. OnError<string>(420, comparer.EqualsException)
  998. );
  999. res["Baz"].Messages.AssertEqual(
  1000. OnNext(350, " zaB "),
  1001. OnError<string>(420, comparer.EqualsException)
  1002. );
  1003. res["qux"].Messages.AssertEqual(
  1004. OnNext(360, " xuq "),
  1005. OnError<string>(420, comparer.EqualsException)
  1006. );
  1007. xs.Subscriptions.AssertEqual(
  1008. Subscribe(200, 420)
  1009. );
  1010. }
  1011. [Fact]
  1012. public void GroupBy_Inner_Comparer_GetHashCodeThrow()
  1013. {
  1014. var scheduler = new TestScheduler();
  1015. var xs = scheduler.CreateHotObservable(
  1016. OnNext(90, "error"),
  1017. OnNext(110, "error"),
  1018. OnNext(130, "error"),
  1019. OnNext(220, " foo"),
  1020. OnNext(240, " FoO "),
  1021. OnNext(270, "baR "),
  1022. OnNext(310, "foO "),
  1023. OnNext(350, " Baz "),
  1024. OnNext(360, " qux "),
  1025. OnNext(390, " bar"),
  1026. OnNext(420, " BAR "),
  1027. OnNext(470, "FOO "),
  1028. OnNext(480, "baz "),
  1029. OnNext(510, " bAZ "),
  1030. OnNext(530, " fOo "),
  1031. OnCompleted<string>(570),
  1032. OnNext(580, "error"),
  1033. OnCompleted<string>(600),
  1034. OnError<string>(650, new Exception())
  1035. );
  1036. var comparer = new GroupByComparer(scheduler, ushort.MaxValue, 400);
  1037. var outer = default(IObservable<IGroupedObservable<string, string>>);
  1038. var outerSubscription = default(IDisposable);
  1039. var inners = new Dictionary<string, IObservable<string>>();
  1040. var innerSubscriptions = new Dictionary<string, IDisposable>();
  1041. var res = new Dictionary<string, ITestableObserver<string>>();
  1042. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x => Reverse(x), comparer));
  1043. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  1044. {
  1045. var result = scheduler.CreateObserver<string>();
  1046. inners[group.Key] = group;
  1047. res[group.Key] = result;
  1048. innerSubscriptions[group.Key] = group.Subscribe(result);
  1049. }, _ => { }));
  1050. scheduler.ScheduleAbsolute(Disposed, () =>
  1051. {
  1052. outerSubscription.Dispose();
  1053. foreach (var d in innerSubscriptions.Values)
  1054. {
  1055. d.Dispose();
  1056. }
  1057. });
  1058. scheduler.Start();
  1059. Assert.Equal(4, inners.Count);
  1060. res["foo"].Messages.AssertEqual(
  1061. OnNext(220, "oof "),
  1062. OnNext(240, " OoF "),
  1063. OnNext(310, " Oof"),
  1064. OnError<string>(420, comparer.HashCodeException)
  1065. );
  1066. res["baR"].Messages.AssertEqual(
  1067. OnNext(270, " Rab"),
  1068. OnNext(390, "rab "),
  1069. OnError<string>(420, comparer.HashCodeException)
  1070. );
  1071. res["Baz"].Messages.AssertEqual(
  1072. OnNext(350, " zaB "),
  1073. OnError<string>(420, comparer.HashCodeException)
  1074. );
  1075. res["qux"].Messages.AssertEqual(
  1076. OnNext(360, " xuq "),
  1077. OnError<string>(420, comparer.HashCodeException)
  1078. );
  1079. xs.Subscriptions.AssertEqual(
  1080. Subscribe(200, 420)
  1081. );
  1082. }
  1083. [Fact]
  1084. public void GroupBy_Outer_Independence()
  1085. {
  1086. var scheduler = new TestScheduler();
  1087. var xs = scheduler.CreateHotObservable(
  1088. OnNext(90, "error"),
  1089. OnNext(110, "error"),
  1090. OnNext(130, "error"),
  1091. OnNext(220, " foo"),
  1092. OnNext(240, " FoO "),
  1093. OnNext(270, "baR "),
  1094. OnNext(310, "foO "),
  1095. OnNext(350, " Baz "),
  1096. OnNext(360, " qux "),
  1097. OnNext(390, " bar"),
  1098. OnNext(420, " BAR "),
  1099. OnNext(470, "FOO "),
  1100. OnNext(480, "baz "),
  1101. OnNext(510, " bAZ "),
  1102. OnNext(530, " fOo "),
  1103. OnCompleted<string>(570),
  1104. OnNext(580, "error"),
  1105. OnCompleted<string>(600),
  1106. OnError<string>(650, new Exception())
  1107. );
  1108. var comparer = new GroupByComparer(scheduler);
  1109. var outer = default(IObservable<IGroupedObservable<string, string>>);
  1110. var outerSubscription = default(IDisposable);
  1111. var inners = new Dictionary<string, IObservable<string>>();
  1112. var innerSubscriptions = new Dictionary<string, IDisposable>();
  1113. var res = new Dictionary<string, ITestableObserver<string>>();
  1114. var outerResults = scheduler.CreateObserver<string>();
  1115. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x => Reverse(x), comparer));
  1116. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  1117. {
  1118. outerResults.OnNext(group.Key);
  1119. var result = scheduler.CreateObserver<string>();
  1120. inners[group.Key] = group;
  1121. res[group.Key] = result;
  1122. innerSubscriptions[group.Key] = group.Subscribe(result);
  1123. }, outerResults.OnError, outerResults.OnCompleted));
  1124. scheduler.ScheduleAbsolute(Disposed, () =>
  1125. {
  1126. outerSubscription.Dispose();
  1127. foreach (var d in innerSubscriptions.Values)
  1128. {
  1129. d.Dispose();
  1130. }
  1131. });
  1132. scheduler.ScheduleAbsolute(320, () => outerSubscription.Dispose());
  1133. scheduler.Start();
  1134. Assert.Equal(2, inners.Count);
  1135. outerResults.Messages.AssertEqual(
  1136. OnNext(220, "foo"),
  1137. OnNext(270, "baR")
  1138. );
  1139. res["foo"].Messages.AssertEqual(
  1140. OnNext(220, "oof "),
  1141. OnNext(240, " OoF "),
  1142. OnNext(310, " Oof"),
  1143. OnNext(470, " OOF"),
  1144. OnNext(530, " oOf "),
  1145. OnCompleted<string>(570)
  1146. );
  1147. res["baR"].Messages.AssertEqual(
  1148. OnNext(270, " Rab"),
  1149. OnNext(390, "rab "),
  1150. OnNext(420, " RAB "),
  1151. OnCompleted<string>(570)
  1152. );
  1153. xs.Subscriptions.AssertEqual(
  1154. Subscribe(200, 570)
  1155. );
  1156. }
  1157. [Fact]
  1158. public void GroupBy_Inner_Independence()
  1159. {
  1160. var scheduler = new TestScheduler();
  1161. var xs = scheduler.CreateHotObservable(
  1162. OnNext(90, "error"),
  1163. OnNext(110, "error"),
  1164. OnNext(130, "error"),
  1165. OnNext(220, " foo"),
  1166. OnNext(240, " FoO "),
  1167. OnNext(270, "baR "),
  1168. OnNext(310, "foO "),
  1169. OnNext(350, " Baz "),
  1170. OnNext(360, " qux "),
  1171. OnNext(390, " bar"),
  1172. OnNext(420, " BAR "),
  1173. OnNext(470, "FOO "),
  1174. OnNext(480, "baz "),
  1175. OnNext(510, " bAZ "),
  1176. OnNext(530, " fOo "),
  1177. OnCompleted<string>(570),
  1178. OnNext(580, "error"),
  1179. OnCompleted<string>(600),
  1180. OnError<string>(650, new Exception())
  1181. );
  1182. var comparer = new GroupByComparer(scheduler);
  1183. var outer = default(IObservable<IGroupedObservable<string, string>>);
  1184. var outerSubscription = default(IDisposable);
  1185. var inners = new Dictionary<string, IObservable<string>>();
  1186. var innerSubscriptions = new Dictionary<string, IDisposable>();
  1187. var res = new Dictionary<string, ITestableObserver<string>>();
  1188. var outerResults = scheduler.CreateObserver<string>();
  1189. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x => Reverse(x), comparer));
  1190. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  1191. {
  1192. outerResults.OnNext(group.Key);
  1193. var result = scheduler.CreateObserver<string>();
  1194. inners[group.Key] = group;
  1195. res[group.Key] = result;
  1196. innerSubscriptions[group.Key] = group.Subscribe(result);
  1197. }, outerResults.OnError, outerResults.OnCompleted));
  1198. scheduler.ScheduleAbsolute(Disposed, () =>
  1199. {
  1200. outerSubscription.Dispose();
  1201. foreach (var d in innerSubscriptions.Values)
  1202. {
  1203. d.Dispose();
  1204. }
  1205. });
  1206. scheduler.ScheduleAbsolute(320, () => innerSubscriptions["foo"].Dispose());
  1207. scheduler.Start();
  1208. Assert.Equal(4, inners.Count);
  1209. res["foo"].Messages.AssertEqual(
  1210. OnNext(220, "oof "),
  1211. OnNext(240, " OoF "),
  1212. OnNext(310, " Oof")
  1213. );
  1214. res["baR"].Messages.AssertEqual(
  1215. OnNext(270, " Rab"),
  1216. OnNext(390, "rab "),
  1217. OnNext(420, " RAB "),
  1218. OnCompleted<string>(570)
  1219. );
  1220. res["Baz"].Messages.AssertEqual(
  1221. OnNext(350, " zaB "),
  1222. OnNext(480, " zab"),
  1223. OnNext(510, " ZAb "),
  1224. OnCompleted<string>(570)
  1225. );
  1226. res["qux"].Messages.AssertEqual(
  1227. OnNext(360, " xuq "),
  1228. OnCompleted<string>(570)
  1229. );
  1230. xs.Subscriptions.AssertEqual(
  1231. Subscribe(200, 570)
  1232. );
  1233. }
  1234. [Fact]
  1235. public void GroupBy_Inner_Multiple_Independence()
  1236. {
  1237. var scheduler = new TestScheduler();
  1238. var xs = scheduler.CreateHotObservable(
  1239. OnNext(90, "error"),
  1240. OnNext(110, "error"),
  1241. OnNext(130, "error"),
  1242. OnNext(220, " foo"),
  1243. OnNext(240, " FoO "),
  1244. OnNext(270, "baR "),
  1245. OnNext(310, "foO "),
  1246. OnNext(350, " Baz "),
  1247. OnNext(360, " qux "),
  1248. OnNext(390, " bar"),
  1249. OnNext(420, " BAR "),
  1250. OnNext(470, "FOO "),
  1251. OnNext(480, "baz "),
  1252. OnNext(510, " bAZ "),
  1253. OnNext(530, " fOo "),
  1254. OnCompleted<string>(570),
  1255. OnNext(580, "error"),
  1256. OnCompleted<string>(600),
  1257. OnError<string>(650, new Exception())
  1258. );
  1259. var comparer = new GroupByComparer(scheduler);
  1260. var outer = default(IObservable<IGroupedObservable<string, string>>);
  1261. var outerSubscription = default(IDisposable);
  1262. var inners = new Dictionary<string, IObservable<string>>();
  1263. var innerSubscriptions = new Dictionary<string, IDisposable>();
  1264. var res = new Dictionary<string, ITestableObserver<string>>();
  1265. var outerResults = scheduler.CreateObserver<string>();
  1266. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x => Reverse(x), comparer));
  1267. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  1268. {
  1269. outerResults.OnNext(group.Key);
  1270. var result = scheduler.CreateObserver<string>();
  1271. inners[group.Key] = group;
  1272. res[group.Key] = result;
  1273. innerSubscriptions[group.Key] = group.Subscribe(result);
  1274. }, outerResults.OnError, outerResults.OnCompleted));
  1275. scheduler.ScheduleAbsolute(Disposed, () =>
  1276. {
  1277. outerSubscription.Dispose();
  1278. foreach (var d in innerSubscriptions.Values)
  1279. {
  1280. d.Dispose();
  1281. }
  1282. });
  1283. scheduler.ScheduleAbsolute(320, () => innerSubscriptions["foo"].Dispose());
  1284. scheduler.ScheduleAbsolute(280, () => innerSubscriptions["baR"].Dispose());
  1285. scheduler.ScheduleAbsolute(355, () => innerSubscriptions["Baz"].Dispose());
  1286. scheduler.ScheduleAbsolute(400, () => innerSubscriptions["qux"].Dispose());
  1287. scheduler.Start();
  1288. Assert.Equal(4, inners.Count);
  1289. res["foo"].Messages.AssertEqual(
  1290. OnNext(220, "oof "),
  1291. OnNext(240, " OoF "),
  1292. OnNext(310, " Oof")
  1293. );
  1294. res["baR"].Messages.AssertEqual(
  1295. OnNext(270, " Rab")
  1296. );
  1297. res["Baz"].Messages.AssertEqual(
  1298. OnNext(350, " zaB ")
  1299. );
  1300. res["qux"].Messages.AssertEqual(
  1301. OnNext(360, " xuq ")
  1302. );
  1303. xs.Subscriptions.AssertEqual(
  1304. Subscribe(200, 570)
  1305. );
  1306. }
  1307. [Fact]
  1308. public void GroupBy_Inner_Escape_Complete()
  1309. {
  1310. var scheduler = new TestScheduler();
  1311. var xs = scheduler.CreateHotObservable(
  1312. OnNext(220, " foo"),
  1313. OnNext(240, " FoO "),
  1314. OnNext(310, "foO "),
  1315. OnNext(470, "FOO "),
  1316. OnNext(530, " fOo "),
  1317. OnCompleted<string>(570)
  1318. );
  1319. var outer = default(IObservable<IGroupedObservable<string, string>>);
  1320. var outerSubscription = default(IDisposable);
  1321. var inner = default(IObservable<string>);
  1322. var innerSubscription = default(IDisposable);
  1323. var res = scheduler.CreateObserver<string>();
  1324. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim()));
  1325. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  1326. {
  1327. inner = group;
  1328. }));
  1329. scheduler.ScheduleAbsolute(600, () => innerSubscription = inner.Subscribe(res));
  1330. scheduler.ScheduleAbsolute(Disposed, () =>
  1331. {
  1332. outerSubscription.Dispose();
  1333. innerSubscription.Dispose();
  1334. });
  1335. scheduler.Start();
  1336. xs.Subscriptions.AssertEqual(
  1337. Subscribe(200, 570)
  1338. );
  1339. res.Messages.AssertEqual(
  1340. OnCompleted<string>(600)
  1341. );
  1342. }
  1343. [Fact]
  1344. public void GroupBy_Inner_Escape_Error()
  1345. {
  1346. var scheduler = new TestScheduler();
  1347. var ex = new Exception();
  1348. var xs = scheduler.CreateHotObservable(
  1349. OnNext(220, " foo"),
  1350. OnNext(240, " FoO "),
  1351. OnNext(310, "foO "),
  1352. OnNext(470, "FOO "),
  1353. OnNext(530, " fOo "),
  1354. OnError<string>(570, ex)
  1355. );
  1356. var outer = default(IObservable<IGroupedObservable<string, string>>);
  1357. var outerSubscription = default(IDisposable);
  1358. var inner = default(IObservable<string>);
  1359. var innerSubscription = default(IDisposable);
  1360. var res = scheduler.CreateObserver<string>();
  1361. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim()));
  1362. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  1363. {
  1364. inner = group;
  1365. }, _ => { }));
  1366. scheduler.ScheduleAbsolute(600, () => innerSubscription = inner.Subscribe(res));
  1367. scheduler.ScheduleAbsolute(Disposed, () =>
  1368. {
  1369. outerSubscription.Dispose();
  1370. innerSubscription.Dispose();
  1371. });
  1372. scheduler.Start();
  1373. xs.Subscriptions.AssertEqual(
  1374. Subscribe(200, 570)
  1375. );
  1376. res.Messages.AssertEqual(
  1377. OnError<string>(600, ex)
  1378. );
  1379. }
  1380. [Fact]
  1381. public void GroupBy_Inner_Escape_Dispose()
  1382. {
  1383. var scheduler = new TestScheduler();
  1384. var xs = scheduler.CreateHotObservable(
  1385. OnNext(220, " foo"),
  1386. OnNext(240, " FoO "),
  1387. OnNext(310, "foO "),
  1388. OnNext(470, "FOO "),
  1389. OnNext(530, " fOo "),
  1390. OnError<string>(570, new Exception())
  1391. );
  1392. var outer = default(IObservable<IGroupedObservable<string, string>>);
  1393. var outerSubscription = default(IDisposable);
  1394. var inner = default(IObservable<string>);
  1395. var innerSubscription = default(IDisposable);
  1396. var res = scheduler.CreateObserver<string>();
  1397. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim()));
  1398. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  1399. {
  1400. inner = group;
  1401. }));
  1402. scheduler.ScheduleAbsolute(400, () => outerSubscription.Dispose());
  1403. scheduler.ScheduleAbsolute(600, () => innerSubscription = inner.Subscribe(res));
  1404. scheduler.ScheduleAbsolute(Disposed, () =>
  1405. {
  1406. innerSubscription.Dispose();
  1407. });
  1408. scheduler.Start();
  1409. xs.Subscriptions.AssertEqual(
  1410. Subscribe(200, 400)
  1411. );
  1412. res.Messages.AssertEqual(
  1413. );
  1414. }
  1415. [Fact]
  1416. public void GroupBy_NullKeys_Simple()
  1417. {
  1418. var scheduler = new TestScheduler();
  1419. var xs = scheduler.CreateHotObservable(
  1420. OnNext(220, "bar"),
  1421. OnNext(240, "foo"),
  1422. OnNext(310, "qux"),
  1423. OnNext(470, "baz"),
  1424. OnCompleted<string>(500)
  1425. );
  1426. var res = scheduler.Start(() => xs.GroupBy(x => x[0] == 'b' ? null : x.ToUpper()).SelectMany(g => g, (g, x) => (g.Key ?? "(null)") + x));
  1427. res.Messages.AssertEqual(
  1428. OnNext(220, "(null)bar"),
  1429. OnNext(240, "FOOfoo"),
  1430. OnNext(310, "QUXqux"),
  1431. OnNext(470, "(null)baz"),
  1432. OnCompleted<string>(500)
  1433. );
  1434. xs.Subscriptions.AssertEqual(
  1435. Subscribe(200, 500)
  1436. );
  1437. }
  1438. [Fact]
  1439. public void GroupBy_NullKeys_Error()
  1440. {
  1441. var scheduler = new TestScheduler();
  1442. var ex = new Exception();
  1443. var xs = scheduler.CreateHotObservable(
  1444. OnNext(220, "bar"),
  1445. OnNext(240, "foo"),
  1446. OnNext(310, "qux"),
  1447. OnNext(470, "baz"),
  1448. OnError<string>(500, ex)
  1449. );
  1450. var nullGroup = scheduler.CreateObserver<string>();
  1451. var err = default(Exception);
  1452. scheduler.ScheduleAbsolute(200, () => xs.GroupBy(x => x[0] == 'b' ? null : x.ToUpper()).Where(g => g.Key == null).Subscribe(g => g.Subscribe(nullGroup), ex_ => err = ex_));
  1453. scheduler.Start();
  1454. Assert.Same(ex, err);
  1455. nullGroup.Messages.AssertEqual(
  1456. OnNext(220, "bar"),
  1457. OnNext(470, "baz"),
  1458. OnError<string>(500, ex)
  1459. );
  1460. xs.Subscriptions.AssertEqual(
  1461. Subscribe(200, 500)
  1462. );
  1463. }
  1464. private static string Reverse(string s)
  1465. {
  1466. var sb = new StringBuilder();
  1467. for (var i = s.Length - 1; i >= 0; i--)
  1468. {
  1469. sb.Append(s[i]);
  1470. }
  1471. return sb.ToString();
  1472. }
  1473. #endregion
  1474. #region + GroupBy w/capacity +
  1475. private const int _groupByCapacity = 1024;
  1476. [Fact]
  1477. public void GroupBy_Capacity_ArgumentChecking()
  1478. {
  1479. ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).GroupBy(DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, _groupByCapacity, EqualityComparer<int>.Default));
  1480. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy(null, DummyFunc<int, int>.Instance, _groupByCapacity, EqualityComparer<int>.Default));
  1481. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, (Func<int, int>)null, _groupByCapacity, EqualityComparer<int>.Default));
  1482. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, _groupByCapacity, null));
  1483. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, _groupByCapacity, EqualityComparer<int>.Default).Subscribe(null));
  1484. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, -1, EqualityComparer<int>.Default));
  1485. }
  1486. [Fact]
  1487. public void GroupBy_Capacity_KeyEle_ArgumentChecking()
  1488. {
  1489. ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).GroupBy(DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, _groupByCapacity));
  1490. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy((Func<int, int>)null, DummyFunc<int, int>.Instance, _groupByCapacity));
  1491. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, (Func<int, int>)null, _groupByCapacity));
  1492. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, _groupByCapacity).Subscribe(null));
  1493. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, -1));
  1494. }
  1495. [Fact]
  1496. public void GroupBy_Capacity_KeyComparer_ArgumentChecking()
  1497. {
  1498. ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).GroupBy(DummyFunc<int, int>.Instance, _groupByCapacity, EqualityComparer<int>.Default));
  1499. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy(null, _groupByCapacity, EqualityComparer<int>.Default));
  1500. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, _groupByCapacity, null));
  1501. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, _groupByCapacity, EqualityComparer<int>.Default).Subscribe(null));
  1502. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, -1, EqualityComparer<int>.Default));
  1503. }
  1504. [Fact]
  1505. public void GroupBy_Capacity_Key_ArgumentChecking()
  1506. {
  1507. ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).GroupBy(DummyFunc<int, int>.Instance, _groupByCapacity));
  1508. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy((Func<int, int>)null, _groupByCapacity));
  1509. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, _groupByCapacity).Subscribe(null));
  1510. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, -1));
  1511. }
  1512. [Fact]
  1513. public void GroupBy_Capacity_WithKeyComparer()
  1514. {
  1515. var scheduler = new TestScheduler();
  1516. var keyInvoked = 0;
  1517. var xs = scheduler.CreateHotObservable(
  1518. OnNext(90, "error"),
  1519. OnNext(110, "error"),
  1520. OnNext(130, "error"),
  1521. OnNext(220, " foo"),
  1522. OnNext(240, " FoO "),
  1523. OnNext(270, "baR "),
  1524. OnNext(310, "foO "),
  1525. OnNext(350, " Baz "),
  1526. OnNext(360, " qux "),
  1527. OnNext(390, " bar"),
  1528. OnNext(420, " BAR "),
  1529. OnNext(470, "FOO "),
  1530. OnNext(480, "baz "),
  1531. OnNext(510, " bAZ "),
  1532. OnNext(530, " fOo "),
  1533. OnCompleted<string>(570),
  1534. OnNext(580, "error"),
  1535. OnCompleted<string>(600),
  1536. OnError<string>(650, new Exception())
  1537. );
  1538. var comparer = new GroupByComparer(scheduler);
  1539. var res = scheduler.Start(() =>
  1540. xs.GroupBy(x =>
  1541. {
  1542. keyInvoked++;
  1543. return x.Trim();
  1544. }, _groupByCapacity, comparer).Select(g => g.Key)
  1545. );
  1546. res.Messages.AssertEqual(
  1547. OnNext(220, "foo"),
  1548. OnNext(270, "baR"),
  1549. OnNext(350, "Baz"),
  1550. OnNext(360, "qux"),
  1551. OnCompleted<string>(570)
  1552. );
  1553. xs.Subscriptions.AssertEqual(
  1554. Subscribe(200, 570)
  1555. );
  1556. Assert.Equal(12, keyInvoked);
  1557. }
  1558. [Fact]
  1559. public void GroupBy_Capacity_Outer_Complete()
  1560. {
  1561. var scheduler = new TestScheduler();
  1562. var keyInvoked = 0;
  1563. var eleInvoked = 0;
  1564. var xs = scheduler.CreateHotObservable(
  1565. OnNext(90, "error"),
  1566. OnNext(110, "error"),
  1567. OnNext(130, "error"),
  1568. OnNext(220, " foo"),
  1569. OnNext(240, " FoO "),
  1570. OnNext(270, "baR "),
  1571. OnNext(310, "foO "),
  1572. OnNext(350, " Baz "),
  1573. OnNext(360, " qux "),
  1574. OnNext(390, " bar"),
  1575. OnNext(420, " BAR "),
  1576. OnNext(470, "FOO "),
  1577. OnNext(480, "baz "),
  1578. OnNext(510, " bAZ "),
  1579. OnNext(530, " fOo "),
  1580. OnCompleted<string>(570),
  1581. OnNext(580, "error"),
  1582. OnCompleted<string>(600),
  1583. OnError<string>(650, new Exception())
  1584. );
  1585. var comparer = new GroupByComparer(scheduler);
  1586. var res = scheduler.Start(() =>
  1587. xs.GroupBy(
  1588. x =>
  1589. {
  1590. keyInvoked++;
  1591. return x.Trim();
  1592. },
  1593. x =>
  1594. {
  1595. eleInvoked++;
  1596. return Reverse(x);
  1597. },
  1598. _groupByCapacity,
  1599. comparer
  1600. ).Select(g => g.Key)
  1601. );
  1602. res.Messages.AssertEqual(
  1603. OnNext(220, "foo"),
  1604. OnNext(270, "baR"),
  1605. OnNext(350, "Baz"),
  1606. OnNext(360, "qux"),
  1607. OnCompleted<string>(570)
  1608. );
  1609. xs.Subscriptions.AssertEqual(
  1610. Subscribe(200, 570)
  1611. );
  1612. Assert.Equal(12, keyInvoked);
  1613. Assert.Equal(12, eleInvoked);
  1614. }
  1615. [Fact]
  1616. public void GroupBy_Capacity_Outer_Error()
  1617. {
  1618. var scheduler = new TestScheduler();
  1619. var keyInvoked = 0;
  1620. var eleInvoked = 0;
  1621. var ex = new Exception();
  1622. var xs = scheduler.CreateHotObservable(
  1623. OnNext(90, "error"),
  1624. OnNext(110, "error"),
  1625. OnNext(130, "error"),
  1626. OnNext(220, " foo"),
  1627. OnNext(240, " FoO "),
  1628. OnNext(270, "baR "),
  1629. OnNext(310, "foO "),
  1630. OnNext(350, " Baz "),
  1631. OnNext(360, " qux "),
  1632. OnNext(390, " bar"),
  1633. OnNext(420, " BAR "),
  1634. OnNext(470, "FOO "),
  1635. OnNext(480, "baz "),
  1636. OnNext(510, " bAZ "),
  1637. OnNext(530, " fOo "),
  1638. OnError<string>(570, ex),
  1639. OnNext(580, "error"),
  1640. OnCompleted<string>(600),
  1641. OnError<string>(650, new Exception())
  1642. );
  1643. var comparer = new GroupByComparer(scheduler);
  1644. var res = scheduler.Start(() =>
  1645. xs.GroupBy(
  1646. x =>
  1647. {
  1648. keyInvoked++;
  1649. return x.Trim();
  1650. },
  1651. x =>
  1652. {
  1653. eleInvoked++;
  1654. return Reverse(x);
  1655. },
  1656. _groupByCapacity,
  1657. comparer
  1658. ).Select(g => g.Key)
  1659. );
  1660. res.Messages.AssertEqual(
  1661. OnNext(220, "foo"),
  1662. OnNext(270, "baR"),
  1663. OnNext(350, "Baz"),
  1664. OnNext(360, "qux"),
  1665. OnError<string>(570, ex)
  1666. );
  1667. xs.Subscriptions.AssertEqual(
  1668. Subscribe(200, 570)
  1669. );
  1670. Assert.Equal(12, keyInvoked);
  1671. Assert.Equal(12, eleInvoked);
  1672. }
  1673. [Fact]
  1674. public void GroupBy_Capacity_Outer_Dispose()
  1675. {
  1676. var scheduler = new TestScheduler();
  1677. var keyInvoked = 0;
  1678. var eleInvoked = 0;
  1679. var xs = scheduler.CreateHotObservable(
  1680. OnNext(90, "error"),
  1681. OnNext(110, "error"),
  1682. OnNext(130, "error"),
  1683. OnNext(220, " foo"),
  1684. OnNext(240, " FoO "),
  1685. OnNext(270, "baR "),
  1686. OnNext(310, "foO "),
  1687. OnNext(350, " Baz "),
  1688. OnNext(360, " qux "),
  1689. OnNext(390, " bar"),
  1690. OnNext(420, " BAR "),
  1691. OnNext(470, "FOO "),
  1692. OnNext(480, "baz "),
  1693. OnNext(510, " bAZ "),
  1694. OnNext(530, " fOo "),
  1695. OnCompleted<string>(570),
  1696. OnNext(580, "error"),
  1697. OnCompleted<string>(600),
  1698. OnError<string>(650, new Exception())
  1699. );
  1700. var comparer = new GroupByComparer(scheduler);
  1701. var res = scheduler.Start(() =>
  1702. xs.GroupBy(
  1703. x =>
  1704. {
  1705. keyInvoked++;
  1706. return x.Trim();
  1707. }, x =>
  1708. {
  1709. eleInvoked++;
  1710. return Reverse(x);
  1711. }, _groupByCapacity, comparer
  1712. ).Select(g => g.Key),
  1713. 355
  1714. );
  1715. res.Messages.AssertEqual(
  1716. OnNext(220, "foo"),
  1717. OnNext(270, "baR"),
  1718. OnNext(350, "Baz")
  1719. );
  1720. xs.Subscriptions.AssertEqual(
  1721. Subscribe(200, 355)
  1722. );
  1723. Assert.Equal(5, keyInvoked);
  1724. Assert.Equal(5, eleInvoked);
  1725. }
  1726. [Fact]
  1727. public void GroupBy_Capacity_Outer_KeyThrow()
  1728. {
  1729. var scheduler = new TestScheduler();
  1730. var keyInvoked = 0;
  1731. var eleInvoked = 0;
  1732. var ex = new Exception();
  1733. var xs = scheduler.CreateHotObservable(
  1734. OnNext(90, "error"),
  1735. OnNext(110, "error"),
  1736. OnNext(130, "error"),
  1737. OnNext(220, " foo"),
  1738. OnNext(240, " FoO "),
  1739. OnNext(270, "baR "),
  1740. OnNext(310, "foO "),
  1741. OnNext(350, " Baz "),
  1742. OnNext(360, " qux "),
  1743. OnNext(390, " bar"),
  1744. OnNext(420, " BAR "),
  1745. OnNext(470, "FOO "),
  1746. OnNext(480, "baz "),
  1747. OnNext(510, " bAZ "),
  1748. OnNext(530, " fOo "),
  1749. OnCompleted<string>(570),
  1750. OnNext(580, "error"),
  1751. OnCompleted<string>(600),
  1752. OnError<string>(650, new Exception())
  1753. );
  1754. var comparer = new GroupByComparer(scheduler);
  1755. var res = scheduler.Start(() =>
  1756. xs.GroupBy(
  1757. x =>
  1758. {
  1759. keyInvoked++;
  1760. if (keyInvoked == 10)
  1761. {
  1762. throw ex;
  1763. }
  1764. return x.Trim();
  1765. },
  1766. x =>
  1767. {
  1768. eleInvoked++;
  1769. return Reverse(x);
  1770. },
  1771. _groupByCapacity,
  1772. comparer
  1773. ).Select(g => g.Key)
  1774. );
  1775. res.Messages.AssertEqual(
  1776. OnNext(220, "foo"),
  1777. OnNext(270, "baR"),
  1778. OnNext(350, "Baz"),
  1779. OnNext(360, "qux"),
  1780. OnError<string>(480, ex)
  1781. );
  1782. xs.Subscriptions.AssertEqual(
  1783. Subscribe(200, 480)
  1784. );
  1785. Assert.Equal(10, keyInvoked);
  1786. Assert.Equal(9, eleInvoked);
  1787. }
  1788. [Fact]
  1789. public void GroupBy_Capacity_Outer_EleThrow()
  1790. {
  1791. var scheduler = new TestScheduler();
  1792. var keyInvoked = 0;
  1793. var eleInvoked = 0;
  1794. var ex = new Exception();
  1795. var xs = scheduler.CreateHotObservable(
  1796. OnNext(90, "error"),
  1797. OnNext(110, "error"),
  1798. OnNext(130, "error"),
  1799. OnNext(220, " foo"),
  1800. OnNext(240, " FoO "),
  1801. OnNext(270, "baR "),
  1802. OnNext(310, "foO "),
  1803. OnNext(350, " Baz "),
  1804. OnNext(360, " qux "),
  1805. OnNext(390, " bar"),
  1806. OnNext(420, " BAR "),
  1807. OnNext(470, "FOO "),
  1808. OnNext(480, "baz "),
  1809. OnNext(510, " bAZ "),
  1810. OnNext(530, " fOo "),
  1811. OnCompleted<string>(570),
  1812. OnNext(580, "error"),
  1813. OnCompleted<string>(600),
  1814. OnError<string>(650, new Exception())
  1815. );
  1816. var comparer = new GroupByComparer(scheduler);
  1817. var res = scheduler.Start(() =>
  1818. xs.GroupBy(
  1819. x =>
  1820. {
  1821. keyInvoked++;
  1822. return x.Trim();
  1823. },
  1824. x =>
  1825. {
  1826. eleInvoked++;
  1827. if (eleInvoked == 10)
  1828. {
  1829. throw ex;
  1830. }
  1831. return Reverse(x);
  1832. },
  1833. _groupByCapacity,
  1834. comparer
  1835. ).Select(g => g.Key)
  1836. );
  1837. res.Messages.AssertEqual(
  1838. OnNext(220, "foo"),
  1839. OnNext(270, "baR"),
  1840. OnNext(350, "Baz"),
  1841. OnNext(360, "qux"),
  1842. OnError<string>(480, ex)
  1843. );
  1844. xs.Subscriptions.AssertEqual(
  1845. Subscribe(200, 480)
  1846. );
  1847. Assert.Equal(10, keyInvoked);
  1848. Assert.Equal(10, eleInvoked);
  1849. }
  1850. [Fact]
  1851. public void GroupBy_Capacity_Outer_ComparerEqualsThrow()
  1852. {
  1853. var scheduler = new TestScheduler();
  1854. var keyInvoked = 0;
  1855. var eleInvoked = 0;
  1856. var xs = scheduler.CreateHotObservable(
  1857. OnNext(90, "error"),
  1858. OnNext(110, "error"),
  1859. OnNext(130, "error"),
  1860. OnNext(220, " foo"),
  1861. OnNext(240, " FoO "),
  1862. OnNext(270, "baR "),
  1863. OnNext(310, "foO "),
  1864. OnNext(350, " Baz "),
  1865. OnNext(360, " qux "),
  1866. OnNext(390, " bar"),
  1867. OnNext(420, " BAR "),
  1868. OnNext(470, "FOO "),
  1869. OnNext(480, "baz "),
  1870. OnNext(510, " bAZ "),
  1871. OnNext(530, " fOo "),
  1872. OnCompleted<string>(570),
  1873. OnNext(580, "error"),
  1874. OnCompleted<string>(600),
  1875. OnError<string>(650, new Exception())
  1876. );
  1877. var comparer = new GroupByComparer(scheduler, 250, ushort.MaxValue);
  1878. var res = scheduler.Start(() =>
  1879. xs.GroupBy(
  1880. x =>
  1881. {
  1882. keyInvoked++;
  1883. return x.Trim();
  1884. },
  1885. x =>
  1886. {
  1887. eleInvoked++;
  1888. return Reverse(x);
  1889. },
  1890. _groupByCapacity,
  1891. comparer
  1892. ).Select(g => g.Key)
  1893. );
  1894. res.Messages.AssertEqual(
  1895. OnNext(220, "foo"),
  1896. OnNext(270, "baR"),
  1897. OnError<string>(310, comparer.EqualsException)
  1898. );
  1899. xs.Subscriptions.AssertEqual(
  1900. Subscribe(200, 310)
  1901. );
  1902. Assert.Equal(4, keyInvoked);
  1903. Assert.Equal(3, eleInvoked);
  1904. }
  1905. [Fact]
  1906. public void GroupBy_Capacity_Outer_ComparerGetHashCodeThrow()
  1907. {
  1908. var scheduler = new TestScheduler();
  1909. var keyInvoked = 0;
  1910. var eleInvoked = 0;
  1911. var xs = scheduler.CreateHotObservable(
  1912. OnNext(90, "error"),
  1913. OnNext(110, "error"),
  1914. OnNext(130, "error"),
  1915. OnNext(220, " foo"),
  1916. OnNext(240, " FoO "),
  1917. OnNext(270, "baR "),
  1918. OnNext(310, "foO "),
  1919. OnNext(350, " Baz "),
  1920. OnNext(360, " qux "),
  1921. OnNext(390, " bar"),
  1922. OnNext(420, " BAR "),
  1923. OnNext(470, "FOO "),
  1924. OnNext(480, "baz "),
  1925. OnNext(510, " bAZ "),
  1926. OnNext(530, " fOo "),
  1927. OnCompleted<string>(570),
  1928. OnNext(580, "error"),
  1929. OnCompleted<string>(600),
  1930. OnError<string>(650, new Exception())
  1931. );
  1932. var comparer = new GroupByComparer(scheduler, ushort.MaxValue, 410);
  1933. var res = scheduler.Start(() =>
  1934. xs.GroupBy(
  1935. x =>
  1936. {
  1937. keyInvoked++;
  1938. return x.Trim();
  1939. },
  1940. x =>
  1941. {
  1942. eleInvoked++;
  1943. return Reverse(x);
  1944. },
  1945. _groupByCapacity,
  1946. comparer
  1947. ).Select(g => g.Key)
  1948. );
  1949. res.Messages.AssertEqual(
  1950. OnNext(220, "foo"),
  1951. OnNext(270, "baR"),
  1952. OnNext(350, "Baz"),
  1953. OnNext(360, "qux"),
  1954. OnError<string>(420, comparer.HashCodeException)
  1955. );
  1956. xs.Subscriptions.AssertEqual(
  1957. Subscribe(200, 420)
  1958. );
  1959. Assert.Equal(8, keyInvoked);
  1960. Assert.Equal(7, eleInvoked);
  1961. }
  1962. [Fact]
  1963. public void GroupBy_Capacity_Inner_Complete()
  1964. {
  1965. var scheduler = new TestScheduler();
  1966. var xs = scheduler.CreateHotObservable(
  1967. OnNext(90, "error"),
  1968. OnNext(110, "error"),
  1969. OnNext(130, "error"),
  1970. OnNext(220, " foo"),
  1971. OnNext(240, " FoO "),
  1972. OnNext(270, "baR "),
  1973. OnNext(310, "foO "),
  1974. OnNext(350, " Baz "),
  1975. OnNext(360, " qux "),
  1976. OnNext(390, " bar"),
  1977. OnNext(420, " BAR "),
  1978. OnNext(470, "FOO "),
  1979. OnNext(480, "baz "),
  1980. OnNext(510, " bAZ "),
  1981. OnNext(530, " fOo "),
  1982. OnCompleted<string>(570),
  1983. OnNext(580, "error"),
  1984. OnCompleted<string>(600),
  1985. OnError<string>(650, new Exception())
  1986. );
  1987. var comparer = new GroupByComparer(scheduler);
  1988. var outer = default(IObservable<IGroupedObservable<string, string>>);
  1989. var outerSubscription = default(IDisposable);
  1990. var inners = new Dictionary<string, IObservable<string>>();
  1991. var innerSubscriptions = new Dictionary<string, IDisposable>();
  1992. var res = new Dictionary<string, ITestableObserver<string>>();
  1993. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x => Reverse(x), _groupByCapacity, comparer));
  1994. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  1995. {
  1996. var result = scheduler.CreateObserver<string>();
  1997. inners[group.Key] = group;
  1998. res[group.Key] = result;
  1999. scheduler.ScheduleRelative(100, () => innerSubscriptions[group.Key] = group.Subscribe(result));
  2000. }));
  2001. scheduler.ScheduleAbsolute(Disposed, () =>
  2002. {
  2003. outerSubscription.Dispose();
  2004. foreach (var d in innerSubscriptions.Values)
  2005. {
  2006. d.Dispose();
  2007. }
  2008. });
  2009. scheduler.Start();
  2010. Assert.Equal(4, inners.Count);
  2011. res["foo"].Messages.AssertEqual(
  2012. OnNext(470, " OOF"),
  2013. OnNext(530, " oOf "),
  2014. OnCompleted<string>(570)
  2015. );
  2016. res["baR"].Messages.AssertEqual(
  2017. OnNext(390, "rab "),
  2018. OnNext(420, " RAB "),
  2019. OnCompleted<string>(570)
  2020. );
  2021. res["Baz"].Messages.AssertEqual(
  2022. OnNext(480, " zab"),
  2023. OnNext(510, " ZAb "),
  2024. OnCompleted<string>(570)
  2025. );
  2026. res["qux"].Messages.AssertEqual(
  2027. OnCompleted<string>(570)
  2028. );
  2029. xs.Subscriptions.AssertEqual(
  2030. Subscribe(200, 570)
  2031. );
  2032. }
  2033. [Fact]
  2034. public void GroupBy_Capacity_Inner_Complete_All()
  2035. {
  2036. var scheduler = new TestScheduler();
  2037. var xs = scheduler.CreateHotObservable(
  2038. OnNext(90, "error"),
  2039. OnNext(110, "error"),
  2040. OnNext(130, "error"),
  2041. OnNext(220, " foo"),
  2042. OnNext(240, " FoO "),
  2043. OnNext(270, "baR "),
  2044. OnNext(310, "foO "),
  2045. OnNext(350, " Baz "),
  2046. OnNext(360, " qux "),
  2047. OnNext(390, " bar"),
  2048. OnNext(420, " BAR "),
  2049. OnNext(470, "FOO "),
  2050. OnNext(480, "baz "),
  2051. OnNext(510, " bAZ "),
  2052. OnNext(530, " fOo "),
  2053. OnCompleted<string>(570),
  2054. OnNext(580, "error"),
  2055. OnCompleted<string>(600),
  2056. OnError<string>(650, new Exception())
  2057. );
  2058. var comparer = new GroupByComparer(scheduler);
  2059. var outer = default(IObservable<IGroupedObservable<string, string>>);
  2060. var outerSubscription = default(IDisposable);
  2061. var inners = new Dictionary<string, IObservable<string>>();
  2062. var innerSubscriptions = new Dictionary<string, IDisposable>();
  2063. var res = new Dictionary<string, ITestableObserver<string>>();
  2064. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x => Reverse(x), _groupByCapacity, comparer));
  2065. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  2066. {
  2067. var result = scheduler.CreateObserver<string>();
  2068. inners[group.Key] = group;
  2069. res[group.Key] = result;
  2070. innerSubscriptions[group.Key] = group.Subscribe(result);
  2071. }));
  2072. scheduler.ScheduleAbsolute(Disposed, () =>
  2073. {
  2074. outerSubscription.Dispose();
  2075. foreach (var d in innerSubscriptions.Values)
  2076. {
  2077. d.Dispose();
  2078. }
  2079. });
  2080. scheduler.Start();
  2081. Assert.Equal(4, inners.Count);
  2082. res["foo"].Messages.AssertEqual(
  2083. OnNext(220, "oof "),
  2084. OnNext(240, " OoF "),
  2085. OnNext(310, " Oof"),
  2086. OnNext(470, " OOF"),
  2087. OnNext(530, " oOf "),
  2088. OnCompleted<string>(570)
  2089. );
  2090. res["baR"].Messages.AssertEqual(
  2091. OnNext(270, " Rab"),
  2092. OnNext(390, "rab "),
  2093. OnNext(420, " RAB "),
  2094. OnCompleted<string>(570)
  2095. );
  2096. res["Baz"].Messages.AssertEqual(
  2097. OnNext(350, " zaB "),
  2098. OnNext(480, " zab"),
  2099. OnNext(510, " ZAb "),
  2100. OnCompleted<string>(570)
  2101. );
  2102. res["qux"].Messages.AssertEqual(
  2103. OnNext(360, " xuq "),
  2104. OnCompleted<string>(570)
  2105. );
  2106. xs.Subscriptions.AssertEqual(
  2107. Subscribe(200, 570)
  2108. );
  2109. }
  2110. [Fact]
  2111. public void GroupBy_Capacity_Inner_Error()
  2112. {
  2113. var scheduler = new TestScheduler();
  2114. var ex1 = new Exception();
  2115. var xs = scheduler.CreateHotObservable(
  2116. OnNext(90, "error"),
  2117. OnNext(110, "error"),
  2118. OnNext(130, "error"),
  2119. OnNext(220, " foo"),
  2120. OnNext(240, " FoO "),
  2121. OnNext(270, "baR "),
  2122. OnNext(310, "foO "),
  2123. OnNext(350, " Baz "),
  2124. OnNext(360, " qux "),
  2125. OnNext(390, " bar"),
  2126. OnNext(420, " BAR "),
  2127. OnNext(470, "FOO "),
  2128. OnNext(480, "baz "),
  2129. OnNext(510, " bAZ "),
  2130. OnNext(530, " fOo "),
  2131. OnError<string>(570, ex1),
  2132. OnNext(580, "error"),
  2133. OnCompleted<string>(600),
  2134. OnError<string>(650, new Exception())
  2135. );
  2136. var comparer = new GroupByComparer(scheduler);
  2137. var outer = default(IObservable<IGroupedObservable<string, string>>);
  2138. var outerSubscription = default(IDisposable);
  2139. var inners = new Dictionary<string, IObservable<string>>();
  2140. var innerSubscriptions = new Dictionary<string, IDisposable>();
  2141. var res = new Dictionary<string, ITestableObserver<string>>();
  2142. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x => Reverse(x), _groupByCapacity, comparer));
  2143. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  2144. {
  2145. var result = scheduler.CreateObserver<string>();
  2146. inners[group.Key] = group;
  2147. res[group.Key] = result;
  2148. scheduler.ScheduleRelative(100, () => innerSubscriptions[group.Key] = group.Subscribe(result));
  2149. }, ex => { }));
  2150. scheduler.ScheduleAbsolute(Disposed, () =>
  2151. {
  2152. outerSubscription.Dispose();
  2153. foreach (var d in innerSubscriptions.Values)
  2154. {
  2155. d.Dispose();
  2156. }
  2157. });
  2158. scheduler.Start();
  2159. Assert.Equal(4, inners.Count);
  2160. res["foo"].Messages.AssertEqual(
  2161. OnNext(470, " OOF"),
  2162. OnNext(530, " oOf "),
  2163. OnError<string>(570, ex1)
  2164. );
  2165. res["baR"].Messages.AssertEqual(
  2166. OnNext(390, "rab "),
  2167. OnNext(420, " RAB "),
  2168. OnError<string>(570, ex1)
  2169. );
  2170. res["Baz"].Messages.AssertEqual(
  2171. OnNext(480, " zab"),
  2172. OnNext(510, " ZAb "),
  2173. OnError<string>(570, ex1)
  2174. );
  2175. res["qux"].Messages.AssertEqual(
  2176. OnError<string>(570, ex1)
  2177. );
  2178. xs.Subscriptions.AssertEqual(
  2179. Subscribe(200, 570)
  2180. );
  2181. }
  2182. [Fact]
  2183. public void GroupBy_Capacity_Inner_Dispose()
  2184. {
  2185. var scheduler = new TestScheduler();
  2186. var xs = scheduler.CreateHotObservable(
  2187. OnNext(90, "error"),
  2188. OnNext(110, "error"),
  2189. OnNext(130, "error"),
  2190. OnNext(220, " foo"),
  2191. OnNext(240, " FoO "),
  2192. OnNext(270, "baR "),
  2193. OnNext(310, "foO "),
  2194. OnNext(350, " Baz "),
  2195. OnNext(360, " qux "),
  2196. OnNext(390, " bar"),
  2197. OnNext(420, " BAR "),
  2198. OnNext(470, "FOO "),
  2199. OnNext(480, "baz "),
  2200. OnNext(510, " bAZ "),
  2201. OnNext(530, " fOo "),
  2202. OnCompleted<string>(570),
  2203. OnNext(580, "error"),
  2204. OnCompleted<string>(600),
  2205. OnError<string>(650, new Exception())
  2206. );
  2207. var comparer = new GroupByComparer(scheduler);
  2208. var outer = default(IObservable<IGroupedObservable<string, string>>);
  2209. var outerSubscription = default(IDisposable);
  2210. var inners = new Dictionary<string, IObservable<string>>();
  2211. var innerSubscriptions = new Dictionary<string, IDisposable>();
  2212. var res = new Dictionary<string, ITestableObserver<string>>();
  2213. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x => Reverse(x), _groupByCapacity, comparer));
  2214. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  2215. {
  2216. var result = scheduler.CreateObserver<string>();
  2217. inners[group.Key] = group;
  2218. res[group.Key] = result;
  2219. innerSubscriptions[group.Key] = group.Subscribe(result);
  2220. }));
  2221. scheduler.ScheduleAbsolute(400, () =>
  2222. {
  2223. outerSubscription.Dispose();
  2224. foreach (var d in innerSubscriptions.Values)
  2225. {
  2226. d.Dispose();
  2227. }
  2228. });
  2229. scheduler.Start();
  2230. Assert.Equal(4, inners.Count);
  2231. res["foo"].Messages.AssertEqual(
  2232. OnNext(220, "oof "),
  2233. OnNext(240, " OoF "),
  2234. OnNext(310, " Oof")
  2235. );
  2236. res["baR"].Messages.AssertEqual(
  2237. OnNext(270, " Rab"),
  2238. OnNext(390, "rab ")
  2239. );
  2240. res["Baz"].Messages.AssertEqual(
  2241. OnNext(350, " zaB ")
  2242. );
  2243. res["qux"].Messages.AssertEqual(
  2244. OnNext(360, " xuq ")
  2245. );
  2246. xs.Subscriptions.AssertEqual(
  2247. Subscribe(200, 400)
  2248. );
  2249. }
  2250. [Fact]
  2251. public void GroupBy_Capacity_Inner_KeyThrow()
  2252. {
  2253. var scheduler = new TestScheduler();
  2254. var ex = new Exception();
  2255. var xs = scheduler.CreateHotObservable(
  2256. OnNext(90, "error"),
  2257. OnNext(110, "error"),
  2258. OnNext(130, "error"),
  2259. OnNext(220, " foo"),
  2260. OnNext(240, " FoO "),
  2261. OnNext(270, "baR "),
  2262. OnNext(310, "foO "),
  2263. OnNext(350, " Baz "),
  2264. OnNext(360, " qux "),
  2265. OnNext(390, " bar"),
  2266. OnNext(420, " BAR "),
  2267. OnNext(470, "FOO "),
  2268. OnNext(480, "baz "),
  2269. OnNext(510, " bAZ "),
  2270. OnNext(530, " fOo "),
  2271. OnCompleted<string>(570),
  2272. OnNext(580, "error"),
  2273. OnCompleted<string>(600),
  2274. OnError<string>(650, new Exception())
  2275. );
  2276. var comparer = new GroupByComparer(scheduler);
  2277. var outer = default(IObservable<IGroupedObservable<string, string>>);
  2278. var outerSubscription = default(IDisposable);
  2279. var inners = new Dictionary<string, IObservable<string>>();
  2280. var innerSubscriptions = new Dictionary<string, IDisposable>();
  2281. var res = new Dictionary<string, ITestableObserver<string>>();
  2282. var keyInvoked = 0;
  2283. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x =>
  2284. {
  2285. keyInvoked++;
  2286. if (keyInvoked == 6)
  2287. {
  2288. throw ex;
  2289. }
  2290. return x.Trim();
  2291. }, x => Reverse(x), _groupByCapacity, comparer));
  2292. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  2293. {
  2294. var result = scheduler.CreateObserver<string>();
  2295. inners[group.Key] = group;
  2296. res[group.Key] = result;
  2297. innerSubscriptions[group.Key] = group.Subscribe(result);
  2298. }, _ => { }));
  2299. scheduler.ScheduleAbsolute(Disposed, () =>
  2300. {
  2301. outerSubscription.Dispose();
  2302. foreach (var d in innerSubscriptions.Values)
  2303. {
  2304. d.Dispose();
  2305. }
  2306. });
  2307. scheduler.Start();
  2308. Assert.Equal(3, inners.Count);
  2309. res["foo"].Messages.AssertEqual(
  2310. OnNext(220, "oof "),
  2311. OnNext(240, " OoF "),
  2312. OnNext(310, " Oof"),
  2313. OnError<string>(360, ex)
  2314. );
  2315. res["baR"].Messages.AssertEqual(
  2316. OnNext(270, " Rab"),
  2317. OnError<string>(360, ex)
  2318. );
  2319. res["Baz"].Messages.AssertEqual(
  2320. OnNext(350, " zaB "),
  2321. OnError<string>(360, ex)
  2322. );
  2323. xs.Subscriptions.AssertEqual(
  2324. Subscribe(200, 360)
  2325. );
  2326. }
  2327. [Fact]
  2328. public void GroupBy_Capacity_Inner_EleThrow()
  2329. {
  2330. var scheduler = new TestScheduler();
  2331. var ex = new Exception();
  2332. var xs = scheduler.CreateHotObservable(
  2333. OnNext(90, "error"),
  2334. OnNext(110, "error"),
  2335. OnNext(130, "error"),
  2336. OnNext(220, " foo"),
  2337. OnNext(240, " FoO "),
  2338. OnNext(270, "baR "),
  2339. OnNext(310, "foO "),
  2340. OnNext(350, " Baz "),
  2341. OnNext(360, " qux "),
  2342. OnNext(390, " bar"),
  2343. OnNext(420, " BAR "),
  2344. OnNext(470, "FOO "),
  2345. OnNext(480, "baz "),
  2346. OnNext(510, " bAZ "),
  2347. OnNext(530, " fOo "),
  2348. OnCompleted<string>(570),
  2349. OnNext(580, "error"),
  2350. OnCompleted<string>(600),
  2351. OnError<string>(650, new Exception())
  2352. );
  2353. var comparer = new GroupByComparer(scheduler);
  2354. var outer = default(IObservable<IGroupedObservable<string, string>>);
  2355. var outerSubscription = default(IDisposable);
  2356. var inners = new Dictionary<string, IObservable<string>>();
  2357. var innerSubscriptions = new Dictionary<string, IDisposable>();
  2358. var res = new Dictionary<string, ITestableObserver<string>>();
  2359. var eleInvoked = 0;
  2360. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x =>
  2361. {
  2362. eleInvoked++;
  2363. if (eleInvoked == 6)
  2364. {
  2365. throw ex;
  2366. }
  2367. return Reverse(x);
  2368. }, _groupByCapacity, comparer));
  2369. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  2370. {
  2371. var result = scheduler.CreateObserver<string>();
  2372. inners[group.Key] = group;
  2373. res[group.Key] = result;
  2374. innerSubscriptions[group.Key] = group.Subscribe(result);
  2375. }, _ => { }));
  2376. scheduler.ScheduleAbsolute(Disposed, () =>
  2377. {
  2378. outerSubscription.Dispose();
  2379. foreach (var d in innerSubscriptions.Values)
  2380. {
  2381. d.Dispose();
  2382. }
  2383. });
  2384. scheduler.Start();
  2385. Assert.Equal(4, inners.Count);
  2386. res["foo"].Messages.AssertEqual(
  2387. OnNext(220, "oof "),
  2388. OnNext(240, " OoF "),
  2389. OnNext(310, " Oof"),
  2390. OnError<string>(360, ex)
  2391. );
  2392. res["baR"].Messages.AssertEqual(
  2393. OnNext(270, " Rab"),
  2394. OnError<string>(360, ex)
  2395. );
  2396. res["Baz"].Messages.AssertEqual(
  2397. OnNext(350, " zaB "),
  2398. OnError<string>(360, ex)
  2399. );
  2400. res["qux"].Messages.AssertEqual(
  2401. OnError<string>(360, ex)
  2402. );
  2403. xs.Subscriptions.AssertEqual(
  2404. Subscribe(200, 360)
  2405. );
  2406. }
  2407. [Fact]
  2408. public void GroupBy_Capacity_Inner_Comparer_EqualsThrow()
  2409. {
  2410. var scheduler = new TestScheduler();
  2411. var xs = scheduler.CreateHotObservable(
  2412. OnNext(90, "error"),
  2413. OnNext(110, "error"),
  2414. OnNext(130, "error"),
  2415. OnNext(220, " foo"),
  2416. OnNext(240, " FoO "),
  2417. OnNext(270, "baR "),
  2418. OnNext(310, "foO "),
  2419. OnNext(350, " Baz "),
  2420. OnNext(360, " qux "),
  2421. OnNext(390, " bar"),
  2422. OnNext(420, " BAR "),
  2423. OnNext(470, "FOO "),
  2424. OnNext(480, "baz "),
  2425. OnNext(510, " bAZ "),
  2426. OnNext(530, " fOo "),
  2427. OnCompleted<string>(570),
  2428. OnNext(580, "error"),
  2429. OnCompleted<string>(600),
  2430. OnError<string>(650, new Exception())
  2431. );
  2432. var comparer = new GroupByComparer(scheduler, 400, ushort.MaxValue);
  2433. var outer = default(IObservable<IGroupedObservable<string, string>>);
  2434. var outerSubscription = default(IDisposable);
  2435. var inners = new Dictionary<string, IObservable<string>>();
  2436. var innerSubscriptions = new Dictionary<string, IDisposable>();
  2437. var res = new Dictionary<string, ITestableObserver<string>>();
  2438. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x => Reverse(x), _groupByCapacity, comparer));
  2439. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  2440. {
  2441. var result = scheduler.CreateObserver<string>();
  2442. inners[group.Key] = group;
  2443. res[group.Key] = result;
  2444. innerSubscriptions[group.Key] = group.Subscribe(result);
  2445. }, _ => { }));
  2446. scheduler.ScheduleAbsolute(Disposed, () =>
  2447. {
  2448. outerSubscription.Dispose();
  2449. foreach (var d in innerSubscriptions.Values)
  2450. {
  2451. d.Dispose();
  2452. }
  2453. });
  2454. scheduler.Start();
  2455. Assert.Equal(4, inners.Count);
  2456. res["foo"].Messages.AssertEqual(
  2457. OnNext(220, "oof "),
  2458. OnNext(240, " OoF "),
  2459. OnNext(310, " Oof"),
  2460. OnError<string>(420, comparer.EqualsException)
  2461. );
  2462. res["baR"].Messages.AssertEqual(
  2463. OnNext(270, " Rab"),
  2464. OnNext(390, "rab "),
  2465. OnError<string>(420, comparer.EqualsException)
  2466. );
  2467. res["Baz"].Messages.AssertEqual(
  2468. OnNext(350, " zaB "),
  2469. OnError<string>(420, comparer.EqualsException)
  2470. );
  2471. res["qux"].Messages.AssertEqual(
  2472. OnNext(360, " xuq "),
  2473. OnError<string>(420, comparer.EqualsException)
  2474. );
  2475. xs.Subscriptions.AssertEqual(
  2476. Subscribe(200, 420)
  2477. );
  2478. }
  2479. [Fact]
  2480. public void GroupBy_Capacity_Inner_Comparer_GetHashCodeThrow()
  2481. {
  2482. var scheduler = new TestScheduler();
  2483. var xs = scheduler.CreateHotObservable(
  2484. OnNext(90, "error"),
  2485. OnNext(110, "error"),
  2486. OnNext(130, "error"),
  2487. OnNext(220, " foo"),
  2488. OnNext(240, " FoO "),
  2489. OnNext(270, "baR "),
  2490. OnNext(310, "foO "),
  2491. OnNext(350, " Baz "),
  2492. OnNext(360, " qux "),
  2493. OnNext(390, " bar"),
  2494. OnNext(420, " BAR "),
  2495. OnNext(470, "FOO "),
  2496. OnNext(480, "baz "),
  2497. OnNext(510, " bAZ "),
  2498. OnNext(530, " fOo "),
  2499. OnCompleted<string>(570),
  2500. OnNext(580, "error"),
  2501. OnCompleted<string>(600),
  2502. OnError<string>(650, new Exception())
  2503. );
  2504. var comparer = new GroupByComparer(scheduler, ushort.MaxValue, 400);
  2505. var outer = default(IObservable<IGroupedObservable<string, string>>);
  2506. var outerSubscription = default(IDisposable);
  2507. var inners = new Dictionary<string, IObservable<string>>();
  2508. var innerSubscriptions = new Dictionary<string, IDisposable>();
  2509. var res = new Dictionary<string, ITestableObserver<string>>();
  2510. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x => Reverse(x), _groupByCapacity, comparer));
  2511. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  2512. {
  2513. var result = scheduler.CreateObserver<string>();
  2514. inners[group.Key] = group;
  2515. res[group.Key] = result;
  2516. innerSubscriptions[group.Key] = group.Subscribe(result);
  2517. }, _ => { }));
  2518. scheduler.ScheduleAbsolute(Disposed, () =>
  2519. {
  2520. outerSubscription.Dispose();
  2521. foreach (var d in innerSubscriptions.Values)
  2522. {
  2523. d.Dispose();
  2524. }
  2525. });
  2526. scheduler.Start();
  2527. Assert.Equal(4, inners.Count);
  2528. res["foo"].Messages.AssertEqual(
  2529. OnNext(220, "oof "),
  2530. OnNext(240, " OoF "),
  2531. OnNext(310, " Oof"),
  2532. OnError<string>(420, comparer.HashCodeException)
  2533. );
  2534. res["baR"].Messages.AssertEqual(
  2535. OnNext(270, " Rab"),
  2536. OnNext(390, "rab "),
  2537. OnError<string>(420, comparer.HashCodeException)
  2538. );
  2539. res["Baz"].Messages.AssertEqual(
  2540. OnNext(350, " zaB "),
  2541. OnError<string>(420, comparer.HashCodeException)
  2542. );
  2543. res["qux"].Messages.AssertEqual(
  2544. OnNext(360, " xuq "),
  2545. OnError<string>(420, comparer.HashCodeException)
  2546. );
  2547. xs.Subscriptions.AssertEqual(
  2548. Subscribe(200, 420)
  2549. );
  2550. }
  2551. [Fact]
  2552. public void GroupBy_Capacity_Outer_Independence()
  2553. {
  2554. var scheduler = new TestScheduler();
  2555. var xs = scheduler.CreateHotObservable(
  2556. OnNext(90, "error"),
  2557. OnNext(110, "error"),
  2558. OnNext(130, "error"),
  2559. OnNext(220, " foo"),
  2560. OnNext(240, " FoO "),
  2561. OnNext(270, "baR "),
  2562. OnNext(310, "foO "),
  2563. OnNext(350, " Baz "),
  2564. OnNext(360, " qux "),
  2565. OnNext(390, " bar"),
  2566. OnNext(420, " BAR "),
  2567. OnNext(470, "FOO "),
  2568. OnNext(480, "baz "),
  2569. OnNext(510, " bAZ "),
  2570. OnNext(530, " fOo "),
  2571. OnCompleted<string>(570),
  2572. OnNext(580, "error"),
  2573. OnCompleted<string>(600),
  2574. OnError<string>(650, new Exception())
  2575. );
  2576. var comparer = new GroupByComparer(scheduler);
  2577. var outer = default(IObservable<IGroupedObservable<string, string>>);
  2578. var outerSubscription = default(IDisposable);
  2579. var inners = new Dictionary<string, IObservable<string>>();
  2580. var innerSubscriptions = new Dictionary<string, IDisposable>();
  2581. var res = new Dictionary<string, ITestableObserver<string>>();
  2582. var outerResults = scheduler.CreateObserver<string>();
  2583. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x => Reverse(x), _groupByCapacity, comparer));
  2584. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  2585. {
  2586. outerResults.OnNext(group.Key);
  2587. var result = scheduler.CreateObserver<string>();
  2588. inners[group.Key] = group;
  2589. res[group.Key] = result;
  2590. innerSubscriptions[group.Key] = group.Subscribe(result);
  2591. }, outerResults.OnError, outerResults.OnCompleted));
  2592. scheduler.ScheduleAbsolute(Disposed, () =>
  2593. {
  2594. outerSubscription.Dispose();
  2595. foreach (var d in innerSubscriptions.Values)
  2596. {
  2597. d.Dispose();
  2598. }
  2599. });
  2600. scheduler.ScheduleAbsolute(320, () => outerSubscription.Dispose());
  2601. scheduler.Start();
  2602. Assert.Equal(2, inners.Count);
  2603. outerResults.Messages.AssertEqual(
  2604. OnNext(220, "foo"),
  2605. OnNext(270, "baR")
  2606. );
  2607. res["foo"].Messages.AssertEqual(
  2608. OnNext(220, "oof "),
  2609. OnNext(240, " OoF "),
  2610. OnNext(310, " Oof"),
  2611. OnNext(470, " OOF"),
  2612. OnNext(530, " oOf "),
  2613. OnCompleted<string>(570)
  2614. );
  2615. res["baR"].Messages.AssertEqual(
  2616. OnNext(270, " Rab"),
  2617. OnNext(390, "rab "),
  2618. OnNext(420, " RAB "),
  2619. OnCompleted<string>(570)
  2620. );
  2621. xs.Subscriptions.AssertEqual(
  2622. Subscribe(200, 570)
  2623. );
  2624. }
  2625. [Fact]
  2626. public void GroupBy_Capacity_Inner_Independence()
  2627. {
  2628. var scheduler = new TestScheduler();
  2629. var xs = scheduler.CreateHotObservable(
  2630. OnNext(90, "error"),
  2631. OnNext(110, "error"),
  2632. OnNext(130, "error"),
  2633. OnNext(220, " foo"),
  2634. OnNext(240, " FoO "),
  2635. OnNext(270, "baR "),
  2636. OnNext(310, "foO "),
  2637. OnNext(350, " Baz "),
  2638. OnNext(360, " qux "),
  2639. OnNext(390, " bar"),
  2640. OnNext(420, " BAR "),
  2641. OnNext(470, "FOO "),
  2642. OnNext(480, "baz "),
  2643. OnNext(510, " bAZ "),
  2644. OnNext(530, " fOo "),
  2645. OnCompleted<string>(570),
  2646. OnNext(580, "error"),
  2647. OnCompleted<string>(600),
  2648. OnError<string>(650, new Exception())
  2649. );
  2650. var comparer = new GroupByComparer(scheduler);
  2651. var outer = default(IObservable<IGroupedObservable<string, string>>);
  2652. var outerSubscription = default(IDisposable);
  2653. var inners = new Dictionary<string, IObservable<string>>();
  2654. var innerSubscriptions = new Dictionary<string, IDisposable>();
  2655. var res = new Dictionary<string, ITestableObserver<string>>();
  2656. var outerResults = scheduler.CreateObserver<string>();
  2657. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x => Reverse(x), _groupByCapacity, comparer));
  2658. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  2659. {
  2660. outerResults.OnNext(group.Key);
  2661. var result = scheduler.CreateObserver<string>();
  2662. inners[group.Key] = group;
  2663. res[group.Key] = result;
  2664. innerSubscriptions[group.Key] = group.Subscribe(result);
  2665. }, outerResults.OnError, outerResults.OnCompleted));
  2666. scheduler.ScheduleAbsolute(Disposed, () =>
  2667. {
  2668. outerSubscription.Dispose();
  2669. foreach (var d in innerSubscriptions.Values)
  2670. {
  2671. d.Dispose();
  2672. }
  2673. });
  2674. scheduler.ScheduleAbsolute(320, () => innerSubscriptions["foo"].Dispose());
  2675. scheduler.Start();
  2676. Assert.Equal(4, inners.Count);
  2677. res["foo"].Messages.AssertEqual(
  2678. OnNext(220, "oof "),
  2679. OnNext(240, " OoF "),
  2680. OnNext(310, " Oof")
  2681. );
  2682. res["baR"].Messages.AssertEqual(
  2683. OnNext(270, " Rab"),
  2684. OnNext(390, "rab "),
  2685. OnNext(420, " RAB "),
  2686. OnCompleted<string>(570)
  2687. );
  2688. res["Baz"].Messages.AssertEqual(
  2689. OnNext(350, " zaB "),
  2690. OnNext(480, " zab"),
  2691. OnNext(510, " ZAb "),
  2692. OnCompleted<string>(570)
  2693. );
  2694. res["qux"].Messages.AssertEqual(
  2695. OnNext(360, " xuq "),
  2696. OnCompleted<string>(570)
  2697. );
  2698. xs.Subscriptions.AssertEqual(
  2699. Subscribe(200, 570)
  2700. );
  2701. }
  2702. [Fact]
  2703. public void GroupBy_Capacity_Inner_Multiple_Independence()
  2704. {
  2705. var scheduler = new TestScheduler();
  2706. var xs = scheduler.CreateHotObservable(
  2707. OnNext(90, "error"),
  2708. OnNext(110, "error"),
  2709. OnNext(130, "error"),
  2710. OnNext(220, " foo"),
  2711. OnNext(240, " FoO "),
  2712. OnNext(270, "baR "),
  2713. OnNext(310, "foO "),
  2714. OnNext(350, " Baz "),
  2715. OnNext(360, " qux "),
  2716. OnNext(390, " bar"),
  2717. OnNext(420, " BAR "),
  2718. OnNext(470, "FOO "),
  2719. OnNext(480, "baz "),
  2720. OnNext(510, " bAZ "),
  2721. OnNext(530, " fOo "),
  2722. OnCompleted<string>(570),
  2723. OnNext(580, "error"),
  2724. OnCompleted<string>(600),
  2725. OnError<string>(650, new Exception())
  2726. );
  2727. var comparer = new GroupByComparer(scheduler);
  2728. var outer = default(IObservable<IGroupedObservable<string, string>>);
  2729. var outerSubscription = default(IDisposable);
  2730. var inners = new Dictionary<string, IObservable<string>>();
  2731. var innerSubscriptions = new Dictionary<string, IDisposable>();
  2732. var res = new Dictionary<string, ITestableObserver<string>>();
  2733. var outerResults = scheduler.CreateObserver<string>();
  2734. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x => Reverse(x), _groupByCapacity, comparer));
  2735. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  2736. {
  2737. outerResults.OnNext(group.Key);
  2738. var result = scheduler.CreateObserver<string>();
  2739. inners[group.Key] = group;
  2740. res[group.Key] = result;
  2741. innerSubscriptions[group.Key] = group.Subscribe(result);
  2742. }, outerResults.OnError, outerResults.OnCompleted));
  2743. scheduler.ScheduleAbsolute(Disposed, () =>
  2744. {
  2745. outerSubscription.Dispose();
  2746. foreach (var d in innerSubscriptions.Values)
  2747. {
  2748. d.Dispose();
  2749. }
  2750. });
  2751. scheduler.ScheduleAbsolute(320, () => innerSubscriptions["foo"].Dispose());
  2752. scheduler.ScheduleAbsolute(280, () => innerSubscriptions["baR"].Dispose());
  2753. scheduler.ScheduleAbsolute(355, () => innerSubscriptions["Baz"].Dispose());
  2754. scheduler.ScheduleAbsolute(400, () => innerSubscriptions["qux"].Dispose());
  2755. scheduler.Start();
  2756. Assert.Equal(4, inners.Count);
  2757. res["foo"].Messages.AssertEqual(
  2758. OnNext(220, "oof "),
  2759. OnNext(240, " OoF "),
  2760. OnNext(310, " Oof")
  2761. );
  2762. res["baR"].Messages.AssertEqual(
  2763. OnNext(270, " Rab")
  2764. );
  2765. res["Baz"].Messages.AssertEqual(
  2766. OnNext(350, " zaB ")
  2767. );
  2768. res["qux"].Messages.AssertEqual(
  2769. OnNext(360, " xuq ")
  2770. );
  2771. xs.Subscriptions.AssertEqual(
  2772. Subscribe(200, 570)
  2773. );
  2774. }
  2775. [Fact]
  2776. public void GroupBy_Capacity_Inner_Escape_Complete()
  2777. {
  2778. var scheduler = new TestScheduler();
  2779. var xs = scheduler.CreateHotObservable(
  2780. OnNext(220, " foo"),
  2781. OnNext(240, " FoO "),
  2782. OnNext(310, "foO "),
  2783. OnNext(470, "FOO "),
  2784. OnNext(530, " fOo "),
  2785. OnCompleted<string>(570)
  2786. );
  2787. var outer = default(IObservable<IGroupedObservable<string, string>>);
  2788. var outerSubscription = default(IDisposable);
  2789. var inner = default(IObservable<string>);
  2790. var innerSubscription = default(IDisposable);
  2791. var res = scheduler.CreateObserver<string>();
  2792. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), _groupByCapacity));
  2793. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  2794. {
  2795. inner = group;
  2796. }));
  2797. scheduler.ScheduleAbsolute(600, () => innerSubscription = inner.Subscribe(res));
  2798. scheduler.ScheduleAbsolute(Disposed, () =>
  2799. {
  2800. outerSubscription.Dispose();
  2801. innerSubscription.Dispose();
  2802. });
  2803. scheduler.Start();
  2804. xs.Subscriptions.AssertEqual(
  2805. Subscribe(200, 570)
  2806. );
  2807. res.Messages.AssertEqual(
  2808. OnCompleted<string>(600)
  2809. );
  2810. }
  2811. [Fact]
  2812. public void GroupBy_Capacity_Inner_Escape_Error()
  2813. {
  2814. var scheduler = new TestScheduler();
  2815. var ex = new Exception();
  2816. var xs = scheduler.CreateHotObservable(
  2817. OnNext(220, " foo"),
  2818. OnNext(240, " FoO "),
  2819. OnNext(310, "foO "),
  2820. OnNext(470, "FOO "),
  2821. OnNext(530, " fOo "),
  2822. OnError<string>(570, ex)
  2823. );
  2824. var outer = default(IObservable<IGroupedObservable<string, string>>);
  2825. var outerSubscription = default(IDisposable);
  2826. var inner = default(IObservable<string>);
  2827. var innerSubscription = default(IDisposable);
  2828. var res = scheduler.CreateObserver<string>();
  2829. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), _groupByCapacity));
  2830. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  2831. {
  2832. inner = group;
  2833. }, _ => { }));
  2834. scheduler.ScheduleAbsolute(600, () => innerSubscription = inner.Subscribe(res));
  2835. scheduler.ScheduleAbsolute(Disposed, () =>
  2836. {
  2837. outerSubscription.Dispose();
  2838. innerSubscription.Dispose();
  2839. });
  2840. scheduler.Start();
  2841. xs.Subscriptions.AssertEqual(
  2842. Subscribe(200, 570)
  2843. );
  2844. res.Messages.AssertEqual(
  2845. OnError<string>(600, ex)
  2846. );
  2847. }
  2848. [Fact]
  2849. public void GroupBy_Capacity_Inner_Escape_Dispose()
  2850. {
  2851. var scheduler = new TestScheduler();
  2852. var xs = scheduler.CreateHotObservable(
  2853. OnNext(220, " foo"),
  2854. OnNext(240, " FoO "),
  2855. OnNext(310, "foO "),
  2856. OnNext(470, "FOO "),
  2857. OnNext(530, " fOo "),
  2858. OnError<string>(570, new Exception())
  2859. );
  2860. var outer = default(IObservable<IGroupedObservable<string, string>>);
  2861. var outerSubscription = default(IDisposable);
  2862. var inner = default(IObservable<string>);
  2863. var innerSubscription = default(IDisposable);
  2864. var res = scheduler.CreateObserver<string>();
  2865. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), _groupByCapacity));
  2866. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  2867. {
  2868. inner = group;
  2869. }));
  2870. scheduler.ScheduleAbsolute(400, () => outerSubscription.Dispose());
  2871. scheduler.ScheduleAbsolute(600, () => innerSubscription = inner.Subscribe(res));
  2872. scheduler.ScheduleAbsolute(Disposed, () =>
  2873. {
  2874. innerSubscription.Dispose();
  2875. });
  2876. scheduler.Start();
  2877. xs.Subscriptions.AssertEqual(
  2878. Subscribe(200, 400)
  2879. );
  2880. res.Messages.AssertEqual(
  2881. );
  2882. }
  2883. [Fact]
  2884. public void GroupBy_Capacity_NullKeys_Simple()
  2885. {
  2886. var scheduler = new TestScheduler();
  2887. var xs = scheduler.CreateHotObservable(
  2888. OnNext(220, "bar"),
  2889. OnNext(240, "foo"),
  2890. OnNext(310, "qux"),
  2891. OnNext(470, "baz"),
  2892. OnCompleted<string>(500)
  2893. );
  2894. var res = scheduler.Start(() => xs.GroupBy(x => x[0] == 'b' ? null : x.ToUpper(), _groupByCapacity).SelectMany(g => g, (g, x) => (g.Key ?? "(null)") + x));
  2895. res.Messages.AssertEqual(
  2896. OnNext(220, "(null)bar"),
  2897. OnNext(240, "FOOfoo"),
  2898. OnNext(310, "QUXqux"),
  2899. OnNext(470, "(null)baz"),
  2900. OnCompleted<string>(500)
  2901. );
  2902. xs.Subscriptions.AssertEqual(
  2903. Subscribe(200, 500)
  2904. );
  2905. }
  2906. [Fact]
  2907. public void GroupBy_Capacity_NullKeys_Error()
  2908. {
  2909. var scheduler = new TestScheduler();
  2910. var ex = new Exception();
  2911. var xs = scheduler.CreateHotObservable(
  2912. OnNext(220, "bar"),
  2913. OnNext(240, "foo"),
  2914. OnNext(310, "qux"),
  2915. OnNext(470, "baz"),
  2916. OnError<string>(500, ex)
  2917. );
  2918. var nullGroup = scheduler.CreateObserver<string>();
  2919. var err = default(Exception);
  2920. scheduler.ScheduleAbsolute(200, () => xs.GroupBy(x => x[0] == 'b' ? null : x.ToUpper(), _groupByCapacity).Where(g => g.Key == null).Subscribe(g => g.Subscribe(nullGroup), ex_ => err = ex_));
  2921. scheduler.Start();
  2922. Assert.Same(ex, err);
  2923. nullGroup.Messages.AssertEqual(
  2924. OnNext(220, "bar"),
  2925. OnNext(470, "baz"),
  2926. OnError<string>(500, ex)
  2927. );
  2928. xs.Subscriptions.AssertEqual(
  2929. Subscribe(200, 500)
  2930. );
  2931. }
  2932. #endregion
  2933. }
  2934. internal class GroupByComparer : IEqualityComparer<string>
  2935. {
  2936. private TestScheduler scheduler;
  2937. private readonly int equalsThrowsAfter;
  2938. private readonly ushort getHashCodeThrowsAfter;
  2939. public Exception HashCodeException = new Exception();
  2940. public Exception EqualsException = new Exception();
  2941. public GroupByComparer(TestScheduler scheduler, ushort equalsThrowsAfter, ushort getHashCodeThrowsAfter)
  2942. {
  2943. this.scheduler = scheduler;
  2944. this.equalsThrowsAfter = equalsThrowsAfter;
  2945. this.getHashCodeThrowsAfter = getHashCodeThrowsAfter;
  2946. }
  2947. public GroupByComparer(TestScheduler scheduler)
  2948. : this(scheduler, ushort.MaxValue, ushort.MaxValue)
  2949. {
  2950. }
  2951. public bool Equals(string x, string y)
  2952. {
  2953. if (scheduler.Clock > equalsThrowsAfter)
  2954. {
  2955. throw EqualsException;
  2956. }
  2957. return x.Equals(y, StringComparison.OrdinalIgnoreCase);
  2958. }
  2959. public int GetHashCode(string obj)
  2960. {
  2961. if (scheduler.Clock > getHashCodeThrowsAfter)
  2962. {
  2963. throw HashCodeException;
  2964. }
  2965. return StringComparer.OrdinalIgnoreCase.GetHashCode(obj);
  2966. }
  2967. }
  2968. }