GroupByTest.cs 120 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Reactive.Concurrency;
  8. using System.Reactive.Linq;
  9. using System.Text;
  10. using Microsoft.Reactive.Testing;
  11. using ReactiveTests.Dummies;
  12. using Microsoft.VisualStudio.TestTools.UnitTesting;
  13. using Assert = Xunit.Assert;
  14. namespace ReactiveTests.Tests
  15. {
  16. [TestClass]
  17. public class GroupByTest : ReactiveTest
  18. {
  19. #region + GroupBy +
  20. [TestMethod]
  21. public void GroupBy_ArgumentChecking()
  22. {
  23. ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).GroupBy(DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, EqualityComparer<int>.Default));
  24. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy(null, DummyFunc<int, int>.Instance, EqualityComparer<int>.Default));
  25. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, (Func<int, int>)null, EqualityComparer<int>.Default));
  26. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, null));
  27. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, EqualityComparer<int>.Default).Subscribe(null));
  28. }
  29. [TestMethod]
  30. public void GroupBy_KeyEle_ArgumentChecking()
  31. {
  32. ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).GroupBy(DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance));
  33. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy((Func<int, int>)null, DummyFunc<int, int>.Instance));
  34. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, (Func<int, int>)null));
  35. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance).Subscribe(null));
  36. }
  37. [TestMethod]
  38. public void GroupBy_KeyComparer_ArgumentChecking()
  39. {
  40. ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).GroupBy(DummyFunc<int, int>.Instance, EqualityComparer<int>.Default));
  41. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy(null, EqualityComparer<int>.Default));
  42. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, null));
  43. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, EqualityComparer<int>.Default).Subscribe(null));
  44. }
  45. [TestMethod]
  46. public void GroupBy_Key_ArgumentChecking()
  47. {
  48. ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).GroupBy(DummyFunc<int, int>.Instance));
  49. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy((Func<int, int>)null));
  50. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance).Subscribe(null));
  51. }
  52. [TestMethod]
  53. public void GroupBy_WithKeyComparer()
  54. {
  55. var scheduler = new TestScheduler();
  56. var keyInvoked = 0;
  57. var xs = scheduler.CreateHotObservable(
  58. OnNext(90, "error"),
  59. OnNext(110, "error"),
  60. OnNext(130, "error"),
  61. OnNext(220, " foo"),
  62. OnNext(240, " FoO "),
  63. OnNext(270, "baR "),
  64. OnNext(310, "foO "),
  65. OnNext(350, " Baz "),
  66. OnNext(360, " qux "),
  67. OnNext(390, " bar"),
  68. OnNext(420, " BAR "),
  69. OnNext(470, "FOO "),
  70. OnNext(480, "baz "),
  71. OnNext(510, " bAZ "),
  72. OnNext(530, " fOo "),
  73. OnCompleted<string>(570),
  74. OnNext(580, "error"),
  75. OnCompleted<string>(600),
  76. OnError<string>(650, new Exception())
  77. );
  78. var comparer = new GroupByComparer(scheduler);
  79. var res = scheduler.Start(() =>
  80. xs.GroupBy(x =>
  81. {
  82. keyInvoked++;
  83. return x.Trim();
  84. }, comparer).Select(g => g.Key)
  85. );
  86. res.Messages.AssertEqual(
  87. OnNext(220, "foo"),
  88. OnNext(270, "baR"),
  89. OnNext(350, "Baz"),
  90. OnNext(360, "qux"),
  91. OnCompleted<string>(570)
  92. );
  93. xs.Subscriptions.AssertEqual(
  94. Subscribe(200, 570)
  95. );
  96. Assert.Equal(12, keyInvoked);
  97. }
  98. [TestMethod]
  99. public void GroupBy_Outer_Complete()
  100. {
  101. var scheduler = new TestScheduler();
  102. var keyInvoked = 0;
  103. var eleInvoked = 0;
  104. var xs = scheduler.CreateHotObservable(
  105. OnNext(90, "error"),
  106. OnNext(110, "error"),
  107. OnNext(130, "error"),
  108. OnNext(220, " foo"),
  109. OnNext(240, " FoO "),
  110. OnNext(270, "baR "),
  111. OnNext(310, "foO "),
  112. OnNext(350, " Baz "),
  113. OnNext(360, " qux "),
  114. OnNext(390, " bar"),
  115. OnNext(420, " BAR "),
  116. OnNext(470, "FOO "),
  117. OnNext(480, "baz "),
  118. OnNext(510, " bAZ "),
  119. OnNext(530, " fOo "),
  120. OnCompleted<string>(570),
  121. OnNext(580, "error"),
  122. OnCompleted<string>(600),
  123. OnError<string>(650, new Exception())
  124. );
  125. var comparer = new GroupByComparer(scheduler);
  126. var res = scheduler.Start(() =>
  127. xs.GroupBy(
  128. x =>
  129. {
  130. keyInvoked++;
  131. return x.Trim();
  132. },
  133. x =>
  134. {
  135. eleInvoked++;
  136. return Reverse(x);
  137. },
  138. comparer
  139. ).Select(g => g.Key)
  140. );
  141. res.Messages.AssertEqual(
  142. OnNext(220, "foo"),
  143. OnNext(270, "baR"),
  144. OnNext(350, "Baz"),
  145. OnNext(360, "qux"),
  146. OnCompleted<string>(570)
  147. );
  148. xs.Subscriptions.AssertEqual(
  149. Subscribe(200, 570)
  150. );
  151. Assert.Equal(12, keyInvoked);
  152. Assert.Equal(12, eleInvoked);
  153. }
  154. [TestMethod]
  155. public void GroupBy_Outer_Error()
  156. {
  157. var scheduler = new TestScheduler();
  158. var keyInvoked = 0;
  159. var eleInvoked = 0;
  160. var ex = new Exception();
  161. var xs = scheduler.CreateHotObservable(
  162. OnNext(90, "error"),
  163. OnNext(110, "error"),
  164. OnNext(130, "error"),
  165. OnNext(220, " foo"),
  166. OnNext(240, " FoO "),
  167. OnNext(270, "baR "),
  168. OnNext(310, "foO "),
  169. OnNext(350, " Baz "),
  170. OnNext(360, " qux "),
  171. OnNext(390, " bar"),
  172. OnNext(420, " BAR "),
  173. OnNext(470, "FOO "),
  174. OnNext(480, "baz "),
  175. OnNext(510, " bAZ "),
  176. OnNext(530, " fOo "),
  177. OnError<string>(570, ex),
  178. OnNext(580, "error"),
  179. OnCompleted<string>(600),
  180. OnError<string>(650, new Exception())
  181. );
  182. var comparer = new GroupByComparer(scheduler);
  183. var res = scheduler.Start(() =>
  184. xs.GroupBy(
  185. x =>
  186. {
  187. keyInvoked++;
  188. return x.Trim();
  189. },
  190. x =>
  191. {
  192. eleInvoked++;
  193. return Reverse(x);
  194. },
  195. comparer
  196. ).Select(g => g.Key)
  197. );
  198. res.Messages.AssertEqual(
  199. OnNext(220, "foo"),
  200. OnNext(270, "baR"),
  201. OnNext(350, "Baz"),
  202. OnNext(360, "qux"),
  203. OnError<string>(570, ex)
  204. );
  205. xs.Subscriptions.AssertEqual(
  206. Subscribe(200, 570)
  207. );
  208. Assert.Equal(12, keyInvoked);
  209. Assert.Equal(12, eleInvoked);
  210. }
  211. [TestMethod]
  212. public void GroupBy_Outer_Dispose()
  213. {
  214. var scheduler = new TestScheduler();
  215. var keyInvoked = 0;
  216. var eleInvoked = 0;
  217. var xs = scheduler.CreateHotObservable(
  218. OnNext(90, "error"),
  219. OnNext(110, "error"),
  220. OnNext(130, "error"),
  221. OnNext(220, " foo"),
  222. OnNext(240, " FoO "),
  223. OnNext(270, "baR "),
  224. OnNext(310, "foO "),
  225. OnNext(350, " Baz "),
  226. OnNext(360, " qux "),
  227. OnNext(390, " bar"),
  228. OnNext(420, " BAR "),
  229. OnNext(470, "FOO "),
  230. OnNext(480, "baz "),
  231. OnNext(510, " bAZ "),
  232. OnNext(530, " fOo "),
  233. OnCompleted<string>(570),
  234. OnNext(580, "error"),
  235. OnCompleted<string>(600),
  236. OnError<string>(650, new Exception())
  237. );
  238. var comparer = new GroupByComparer(scheduler);
  239. var res = scheduler.Start(() =>
  240. xs.GroupBy(
  241. x =>
  242. {
  243. keyInvoked++;
  244. return x.Trim();
  245. }, x =>
  246. {
  247. eleInvoked++;
  248. return Reverse(x);
  249. }, comparer
  250. ).Select(g => g.Key),
  251. 355
  252. );
  253. res.Messages.AssertEqual(
  254. OnNext(220, "foo"),
  255. OnNext(270, "baR"),
  256. OnNext(350, "Baz")
  257. );
  258. xs.Subscriptions.AssertEqual(
  259. Subscribe(200, 355)
  260. );
  261. Assert.Equal(5, keyInvoked);
  262. Assert.Equal(5, eleInvoked);
  263. }
  264. [TestMethod]
  265. public void GroupBy_Outer_KeyThrow()
  266. {
  267. var scheduler = new TestScheduler();
  268. var keyInvoked = 0;
  269. var eleInvoked = 0;
  270. var ex = new Exception();
  271. var xs = scheduler.CreateHotObservable(
  272. OnNext(90, "error"),
  273. OnNext(110, "error"),
  274. OnNext(130, "error"),
  275. OnNext(220, " foo"),
  276. OnNext(240, " FoO "),
  277. OnNext(270, "baR "),
  278. OnNext(310, "foO "),
  279. OnNext(350, " Baz "),
  280. OnNext(360, " qux "),
  281. OnNext(390, " bar"),
  282. OnNext(420, " BAR "),
  283. OnNext(470, "FOO "),
  284. OnNext(480, "baz "),
  285. OnNext(510, " bAZ "),
  286. OnNext(530, " fOo "),
  287. OnCompleted<string>(570),
  288. OnNext(580, "error"),
  289. OnCompleted<string>(600),
  290. OnError<string>(650, new Exception())
  291. );
  292. var comparer = new GroupByComparer(scheduler);
  293. var res = scheduler.Start(() =>
  294. xs.GroupBy(
  295. x =>
  296. {
  297. keyInvoked++;
  298. if (keyInvoked == 10)
  299. {
  300. throw ex;
  301. }
  302. return x.Trim();
  303. },
  304. x =>
  305. {
  306. eleInvoked++;
  307. return Reverse(x);
  308. },
  309. comparer
  310. ).Select(g => g.Key)
  311. );
  312. res.Messages.AssertEqual(
  313. OnNext(220, "foo"),
  314. OnNext(270, "baR"),
  315. OnNext(350, "Baz"),
  316. OnNext(360, "qux"),
  317. OnError<string>(480, ex)
  318. );
  319. xs.Subscriptions.AssertEqual(
  320. Subscribe(200, 480)
  321. );
  322. Assert.Equal(10, keyInvoked);
  323. Assert.Equal(9, eleInvoked);
  324. }
  325. [TestMethod]
  326. public void GroupBy_Outer_EleThrow()
  327. {
  328. var scheduler = new TestScheduler();
  329. var keyInvoked = 0;
  330. var eleInvoked = 0;
  331. var ex = new Exception();
  332. var xs = scheduler.CreateHotObservable(
  333. OnNext(90, "error"),
  334. OnNext(110, "error"),
  335. OnNext(130, "error"),
  336. OnNext(220, " foo"),
  337. OnNext(240, " FoO "),
  338. OnNext(270, "baR "),
  339. OnNext(310, "foO "),
  340. OnNext(350, " Baz "),
  341. OnNext(360, " qux "),
  342. OnNext(390, " bar"),
  343. OnNext(420, " BAR "),
  344. OnNext(470, "FOO "),
  345. OnNext(480, "baz "),
  346. OnNext(510, " bAZ "),
  347. OnNext(530, " fOo "),
  348. OnCompleted<string>(570),
  349. OnNext(580, "error"),
  350. OnCompleted<string>(600),
  351. OnError<string>(650, new Exception())
  352. );
  353. var comparer = new GroupByComparer(scheduler);
  354. var res = scheduler.Start(() =>
  355. xs.GroupBy(
  356. x =>
  357. {
  358. keyInvoked++;
  359. return x.Trim();
  360. },
  361. x =>
  362. {
  363. eleInvoked++;
  364. if (eleInvoked == 10)
  365. {
  366. throw ex;
  367. }
  368. return Reverse(x);
  369. },
  370. comparer
  371. ).Select(g => g.Key)
  372. );
  373. res.Messages.AssertEqual(
  374. OnNext(220, "foo"),
  375. OnNext(270, "baR"),
  376. OnNext(350, "Baz"),
  377. OnNext(360, "qux"),
  378. OnError<string>(480, ex)
  379. );
  380. xs.Subscriptions.AssertEqual(
  381. Subscribe(200, 480)
  382. );
  383. Assert.Equal(10, keyInvoked);
  384. Assert.Equal(10, eleInvoked);
  385. }
  386. [TestMethod]
  387. public void GroupBy_Outer_ComparerEqualsThrow()
  388. {
  389. var scheduler = new TestScheduler();
  390. var keyInvoked = 0;
  391. var eleInvoked = 0;
  392. var xs = scheduler.CreateHotObservable(
  393. OnNext(90, "error"),
  394. OnNext(110, "error"),
  395. OnNext(130, "error"),
  396. OnNext(220, " foo"),
  397. OnNext(240, " FoO "),
  398. OnNext(270, "baR "),
  399. OnNext(310, "foO "),
  400. OnNext(350, " Baz "),
  401. OnNext(360, " qux "),
  402. OnNext(390, " bar"),
  403. OnNext(420, " BAR "),
  404. OnNext(470, "FOO "),
  405. OnNext(480, "baz "),
  406. OnNext(510, " bAZ "),
  407. OnNext(530, " fOo "),
  408. OnCompleted<string>(570),
  409. OnNext(580, "error"),
  410. OnCompleted<string>(600),
  411. OnError<string>(650, new Exception())
  412. );
  413. var comparer = new GroupByComparer(scheduler, 250, ushort.MaxValue);
  414. var res = scheduler.Start(() =>
  415. xs.GroupBy(
  416. x =>
  417. {
  418. keyInvoked++;
  419. return x.Trim();
  420. },
  421. x =>
  422. {
  423. eleInvoked++;
  424. return Reverse(x);
  425. },
  426. comparer
  427. ).Select(g => g.Key)
  428. );
  429. res.Messages.AssertEqual(
  430. OnNext(220, "foo"),
  431. OnNext(270, "baR"),
  432. OnError<string>(310, comparer.EqualsException)
  433. );
  434. xs.Subscriptions.AssertEqual(
  435. Subscribe(200, 310)
  436. );
  437. Assert.Equal(4, keyInvoked);
  438. Assert.Equal(3, eleInvoked);
  439. }
  440. [TestMethod]
  441. public void GroupBy_Outer_ComparerGetHashCodeThrow()
  442. {
  443. var scheduler = new TestScheduler();
  444. var keyInvoked = 0;
  445. var eleInvoked = 0;
  446. var xs = scheduler.CreateHotObservable(
  447. OnNext(90, "error"),
  448. OnNext(110, "error"),
  449. OnNext(130, "error"),
  450. OnNext(220, " foo"),
  451. OnNext(240, " FoO "),
  452. OnNext(270, "baR "),
  453. OnNext(310, "foO "),
  454. OnNext(350, " Baz "),
  455. OnNext(360, " qux "),
  456. OnNext(390, " bar"),
  457. OnNext(420, " BAR "),
  458. OnNext(470, "FOO "),
  459. OnNext(480, "baz "),
  460. OnNext(510, " bAZ "),
  461. OnNext(530, " fOo "),
  462. OnCompleted<string>(570),
  463. OnNext(580, "error"),
  464. OnCompleted<string>(600),
  465. OnError<string>(650, new Exception())
  466. );
  467. var comparer = new GroupByComparer(scheduler, ushort.MaxValue, 410);
  468. var res = scheduler.Start(() =>
  469. xs.GroupBy(
  470. x =>
  471. {
  472. keyInvoked++;
  473. return x.Trim();
  474. },
  475. x =>
  476. {
  477. eleInvoked++;
  478. return Reverse(x);
  479. },
  480. comparer
  481. ).Select(g => g.Key)
  482. );
  483. res.Messages.AssertEqual(
  484. OnNext(220, "foo"),
  485. OnNext(270, "baR"),
  486. OnNext(350, "Baz"),
  487. OnNext(360, "qux"),
  488. OnError<string>(420, comparer.HashCodeException)
  489. );
  490. xs.Subscriptions.AssertEqual(
  491. Subscribe(200, 420)
  492. );
  493. Assert.Equal(8, keyInvoked);
  494. Assert.Equal(7, eleInvoked);
  495. }
  496. [TestMethod]
  497. public void GroupBy_Inner_Complete()
  498. {
  499. var scheduler = new TestScheduler();
  500. var xs = scheduler.CreateHotObservable(
  501. OnNext(90, "error"),
  502. OnNext(110, "error"),
  503. OnNext(130, "error"),
  504. OnNext(220, " foo"),
  505. OnNext(240, " FoO "),
  506. OnNext(270, "baR "),
  507. OnNext(310, "foO "),
  508. OnNext(350, " Baz "),
  509. OnNext(360, " qux "),
  510. OnNext(390, " bar"),
  511. OnNext(420, " BAR "),
  512. OnNext(470, "FOO "),
  513. OnNext(480, "baz "),
  514. OnNext(510, " bAZ "),
  515. OnNext(530, " fOo "),
  516. OnCompleted<string>(570),
  517. OnNext(580, "error"),
  518. OnCompleted<string>(600),
  519. OnError<string>(650, new Exception())
  520. );
  521. var comparer = new GroupByComparer(scheduler);
  522. var outer = default(IObservable<IGroupedObservable<string, string>>);
  523. var outerSubscription = default(IDisposable);
  524. var inners = new Dictionary<string, IObservable<string>>();
  525. var innerSubscriptions = new Dictionary<string, IDisposable>();
  526. var res = new Dictionary<string, ITestableObserver<string>>();
  527. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x => Reverse(x), comparer));
  528. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  529. {
  530. var result = scheduler.CreateObserver<string>();
  531. inners[group.Key] = group;
  532. res[group.Key] = result;
  533. scheduler.ScheduleRelative(100, () => innerSubscriptions[group.Key] = group.Subscribe(result));
  534. }));
  535. scheduler.ScheduleAbsolute(Disposed, () =>
  536. {
  537. outerSubscription.Dispose();
  538. foreach (var d in innerSubscriptions.Values)
  539. {
  540. d.Dispose();
  541. }
  542. });
  543. scheduler.Start();
  544. Assert.Equal(4, inners.Count);
  545. res["foo"].Messages.AssertEqual(
  546. OnNext(470, " OOF"),
  547. OnNext(530, " oOf "),
  548. OnCompleted<string>(570)
  549. );
  550. res["baR"].Messages.AssertEqual(
  551. OnNext(390, "rab "),
  552. OnNext(420, " RAB "),
  553. OnCompleted<string>(570)
  554. );
  555. res["Baz"].Messages.AssertEqual(
  556. OnNext(480, " zab"),
  557. OnNext(510, " ZAb "),
  558. OnCompleted<string>(570)
  559. );
  560. res["qux"].Messages.AssertEqual(
  561. OnCompleted<string>(570)
  562. );
  563. xs.Subscriptions.AssertEqual(
  564. Subscribe(200, 570)
  565. );
  566. }
  567. [TestMethod]
  568. public void GroupBy_Inner_Complete_All()
  569. {
  570. var scheduler = new TestScheduler();
  571. var xs = scheduler.CreateHotObservable(
  572. OnNext(90, "error"),
  573. OnNext(110, "error"),
  574. OnNext(130, "error"),
  575. OnNext(220, " foo"),
  576. OnNext(240, " FoO "),
  577. OnNext(270, "baR "),
  578. OnNext(310, "foO "),
  579. OnNext(350, " Baz "),
  580. OnNext(360, " qux "),
  581. OnNext(390, " bar"),
  582. OnNext(420, " BAR "),
  583. OnNext(470, "FOO "),
  584. OnNext(480, "baz "),
  585. OnNext(510, " bAZ "),
  586. OnNext(530, " fOo "),
  587. OnCompleted<string>(570),
  588. OnNext(580, "error"),
  589. OnCompleted<string>(600),
  590. OnError<string>(650, new Exception())
  591. );
  592. var comparer = new GroupByComparer(scheduler);
  593. var outer = default(IObservable<IGroupedObservable<string, string>>);
  594. var outerSubscription = default(IDisposable);
  595. var inners = new Dictionary<string, IObservable<string>>();
  596. var innerSubscriptions = new Dictionary<string, IDisposable>();
  597. var res = new Dictionary<string, ITestableObserver<string>>();
  598. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x => Reverse(x), comparer));
  599. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  600. {
  601. var result = scheduler.CreateObserver<string>();
  602. inners[group.Key] = group;
  603. res[group.Key] = result;
  604. innerSubscriptions[group.Key] = group.Subscribe(result);
  605. }));
  606. scheduler.ScheduleAbsolute(Disposed, () =>
  607. {
  608. outerSubscription.Dispose();
  609. foreach (var d in innerSubscriptions.Values)
  610. {
  611. d.Dispose();
  612. }
  613. });
  614. scheduler.Start();
  615. Assert.Equal(4, inners.Count);
  616. res["foo"].Messages.AssertEqual(
  617. OnNext(220, "oof "),
  618. OnNext(240, " OoF "),
  619. OnNext(310, " Oof"),
  620. OnNext(470, " OOF"),
  621. OnNext(530, " oOf "),
  622. OnCompleted<string>(570)
  623. );
  624. res["baR"].Messages.AssertEqual(
  625. OnNext(270, " Rab"),
  626. OnNext(390, "rab "),
  627. OnNext(420, " RAB "),
  628. OnCompleted<string>(570)
  629. );
  630. res["Baz"].Messages.AssertEqual(
  631. OnNext(350, " zaB "),
  632. OnNext(480, " zab"),
  633. OnNext(510, " ZAb "),
  634. OnCompleted<string>(570)
  635. );
  636. res["qux"].Messages.AssertEqual(
  637. OnNext(360, " xuq "),
  638. OnCompleted<string>(570)
  639. );
  640. xs.Subscriptions.AssertEqual(
  641. Subscribe(200, 570)
  642. );
  643. }
  644. [TestMethod]
  645. public void GroupBy_Inner_Error()
  646. {
  647. var scheduler = new TestScheduler();
  648. var ex1 = new Exception();
  649. var xs = scheduler.CreateHotObservable(
  650. OnNext(90, "error"),
  651. OnNext(110, "error"),
  652. OnNext(130, "error"),
  653. OnNext(220, " foo"),
  654. OnNext(240, " FoO "),
  655. OnNext(270, "baR "),
  656. OnNext(310, "foO "),
  657. OnNext(350, " Baz "),
  658. OnNext(360, " qux "),
  659. OnNext(390, " bar"),
  660. OnNext(420, " BAR "),
  661. OnNext(470, "FOO "),
  662. OnNext(480, "baz "),
  663. OnNext(510, " bAZ "),
  664. OnNext(530, " fOo "),
  665. OnError<string>(570, ex1),
  666. OnNext(580, "error"),
  667. OnCompleted<string>(600),
  668. OnError<string>(650, new Exception())
  669. );
  670. var comparer = new GroupByComparer(scheduler);
  671. var outer = default(IObservable<IGroupedObservable<string, string>>);
  672. var outerSubscription = default(IDisposable);
  673. var inners = new Dictionary<string, IObservable<string>>();
  674. var innerSubscriptions = new Dictionary<string, IDisposable>();
  675. var res = new Dictionary<string, ITestableObserver<string>>();
  676. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x => Reverse(x), comparer));
  677. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  678. {
  679. var result = scheduler.CreateObserver<string>();
  680. inners[group.Key] = group;
  681. res[group.Key] = result;
  682. scheduler.ScheduleRelative(100, () => innerSubscriptions[group.Key] = group.Subscribe(result));
  683. }, ex => { }));
  684. scheduler.ScheduleAbsolute(Disposed, () =>
  685. {
  686. outerSubscription.Dispose();
  687. foreach (var d in innerSubscriptions.Values)
  688. {
  689. d.Dispose();
  690. }
  691. });
  692. scheduler.Start();
  693. Assert.Equal(4, inners.Count);
  694. res["foo"].Messages.AssertEqual(
  695. OnNext(470, " OOF"),
  696. OnNext(530, " oOf "),
  697. OnError<string>(570, ex1)
  698. );
  699. res["baR"].Messages.AssertEqual(
  700. OnNext(390, "rab "),
  701. OnNext(420, " RAB "),
  702. OnError<string>(570, ex1)
  703. );
  704. res["Baz"].Messages.AssertEqual(
  705. OnNext(480, " zab"),
  706. OnNext(510, " ZAb "),
  707. OnError<string>(570, ex1)
  708. );
  709. res["qux"].Messages.AssertEqual(
  710. OnError<string>(570, ex1)
  711. );
  712. xs.Subscriptions.AssertEqual(
  713. Subscribe(200, 570)
  714. );
  715. }
  716. [TestMethod]
  717. public void GroupBy_Inner_Dispose()
  718. {
  719. var scheduler = new TestScheduler();
  720. var xs = scheduler.CreateHotObservable(
  721. OnNext(90, "error"),
  722. OnNext(110, "error"),
  723. OnNext(130, "error"),
  724. OnNext(220, " foo"),
  725. OnNext(240, " FoO "),
  726. OnNext(270, "baR "),
  727. OnNext(310, "foO "),
  728. OnNext(350, " Baz "),
  729. OnNext(360, " qux "),
  730. OnNext(390, " bar"),
  731. OnNext(420, " BAR "),
  732. OnNext(470, "FOO "),
  733. OnNext(480, "baz "),
  734. OnNext(510, " bAZ "),
  735. OnNext(530, " fOo "),
  736. OnCompleted<string>(570),
  737. OnNext(580, "error"),
  738. OnCompleted<string>(600),
  739. OnError<string>(650, new Exception())
  740. );
  741. var comparer = new GroupByComparer(scheduler);
  742. var outer = default(IObservable<IGroupedObservable<string, string>>);
  743. var outerSubscription = default(IDisposable);
  744. var inners = new Dictionary<string, IObservable<string>>();
  745. var innerSubscriptions = new Dictionary<string, IDisposable>();
  746. var res = new Dictionary<string, ITestableObserver<string>>();
  747. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x => Reverse(x), comparer));
  748. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  749. {
  750. var result = scheduler.CreateObserver<string>();
  751. inners[group.Key] = group;
  752. res[group.Key] = result;
  753. innerSubscriptions[group.Key] = group.Subscribe(result);
  754. }));
  755. scheduler.ScheduleAbsolute(400, () =>
  756. {
  757. outerSubscription.Dispose();
  758. foreach (var d in innerSubscriptions.Values)
  759. {
  760. d.Dispose();
  761. }
  762. });
  763. scheduler.Start();
  764. Assert.Equal(4, inners.Count);
  765. res["foo"].Messages.AssertEqual(
  766. OnNext(220, "oof "),
  767. OnNext(240, " OoF "),
  768. OnNext(310, " Oof")
  769. );
  770. res["baR"].Messages.AssertEqual(
  771. OnNext(270, " Rab"),
  772. OnNext(390, "rab ")
  773. );
  774. res["Baz"].Messages.AssertEqual(
  775. OnNext(350, " zaB ")
  776. );
  777. res["qux"].Messages.AssertEqual(
  778. OnNext(360, " xuq ")
  779. );
  780. xs.Subscriptions.AssertEqual(
  781. Subscribe(200, 400)
  782. );
  783. }
  784. [TestMethod]
  785. public void GroupBy_Inner_KeyThrow()
  786. {
  787. var scheduler = new TestScheduler();
  788. var ex = new Exception();
  789. var xs = scheduler.CreateHotObservable(
  790. OnNext(90, "error"),
  791. OnNext(110, "error"),
  792. OnNext(130, "error"),
  793. OnNext(220, " foo"),
  794. OnNext(240, " FoO "),
  795. OnNext(270, "baR "),
  796. OnNext(310, "foO "),
  797. OnNext(350, " Baz "),
  798. OnNext(360, " qux "),
  799. OnNext(390, " bar"),
  800. OnNext(420, " BAR "),
  801. OnNext(470, "FOO "),
  802. OnNext(480, "baz "),
  803. OnNext(510, " bAZ "),
  804. OnNext(530, " fOo "),
  805. OnCompleted<string>(570),
  806. OnNext(580, "error"),
  807. OnCompleted<string>(600),
  808. OnError<string>(650, new Exception())
  809. );
  810. var comparer = new GroupByComparer(scheduler);
  811. var outer = default(IObservable<IGroupedObservable<string, string>>);
  812. var outerSubscription = default(IDisposable);
  813. var inners = new Dictionary<string, IObservable<string>>();
  814. var innerSubscriptions = new Dictionary<string, IDisposable>();
  815. var res = new Dictionary<string, ITestableObserver<string>>();
  816. var keyInvoked = 0;
  817. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x =>
  818. {
  819. keyInvoked++;
  820. if (keyInvoked == 6)
  821. {
  822. throw ex;
  823. }
  824. return x.Trim();
  825. }, x => Reverse(x), comparer));
  826. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  827. {
  828. var result = scheduler.CreateObserver<string>();
  829. inners[group.Key] = group;
  830. res[group.Key] = result;
  831. innerSubscriptions[group.Key] = group.Subscribe(result);
  832. }, _ => { }));
  833. scheduler.ScheduleAbsolute(Disposed, () =>
  834. {
  835. outerSubscription.Dispose();
  836. foreach (var d in innerSubscriptions.Values)
  837. {
  838. d.Dispose();
  839. }
  840. });
  841. scheduler.Start();
  842. Assert.Equal(3, inners.Count);
  843. res["foo"].Messages.AssertEqual(
  844. OnNext(220, "oof "),
  845. OnNext(240, " OoF "),
  846. OnNext(310, " Oof"),
  847. OnError<string>(360, ex)
  848. );
  849. res["baR"].Messages.AssertEqual(
  850. OnNext(270, " Rab"),
  851. OnError<string>(360, ex)
  852. );
  853. res["Baz"].Messages.AssertEqual(
  854. OnNext(350, " zaB "),
  855. OnError<string>(360, ex)
  856. );
  857. xs.Subscriptions.AssertEqual(
  858. Subscribe(200, 360)
  859. );
  860. }
  861. [TestMethod]
  862. public void GroupBy_Inner_EleThrow()
  863. {
  864. var scheduler = new TestScheduler();
  865. var ex = new Exception();
  866. var xs = scheduler.CreateHotObservable(
  867. OnNext(90, "error"),
  868. OnNext(110, "error"),
  869. OnNext(130, "error"),
  870. OnNext(220, " foo"),
  871. OnNext(240, " FoO "),
  872. OnNext(270, "baR "),
  873. OnNext(310, "foO "),
  874. OnNext(350, " Baz "),
  875. OnNext(360, " qux "),
  876. OnNext(390, " bar"),
  877. OnNext(420, " BAR "),
  878. OnNext(470, "FOO "),
  879. OnNext(480, "baz "),
  880. OnNext(510, " bAZ "),
  881. OnNext(530, " fOo "),
  882. OnCompleted<string>(570),
  883. OnNext(580, "error"),
  884. OnCompleted<string>(600),
  885. OnError<string>(650, new Exception())
  886. );
  887. var comparer = new GroupByComparer(scheduler);
  888. var outer = default(IObservable<IGroupedObservable<string, string>>);
  889. var outerSubscription = default(IDisposable);
  890. var inners = new Dictionary<string, IObservable<string>>();
  891. var innerSubscriptions = new Dictionary<string, IDisposable>();
  892. var res = new Dictionary<string, ITestableObserver<string>>();
  893. var eleInvoked = 0;
  894. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x =>
  895. {
  896. eleInvoked++;
  897. if (eleInvoked == 6)
  898. {
  899. throw ex;
  900. }
  901. return Reverse(x);
  902. }, comparer));
  903. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  904. {
  905. var result = scheduler.CreateObserver<string>();
  906. inners[group.Key] = group;
  907. res[group.Key] = result;
  908. innerSubscriptions[group.Key] = group.Subscribe(result);
  909. }, _ => { }));
  910. scheduler.ScheduleAbsolute(Disposed, () =>
  911. {
  912. outerSubscription.Dispose();
  913. foreach (var d in innerSubscriptions.Values)
  914. {
  915. d.Dispose();
  916. }
  917. });
  918. scheduler.Start();
  919. Assert.Equal(4, inners.Count);
  920. res["foo"].Messages.AssertEqual(
  921. OnNext(220, "oof "),
  922. OnNext(240, " OoF "),
  923. OnNext(310, " Oof"),
  924. OnError<string>(360, ex)
  925. );
  926. res["baR"].Messages.AssertEqual(
  927. OnNext(270, " Rab"),
  928. OnError<string>(360, ex)
  929. );
  930. res["Baz"].Messages.AssertEqual(
  931. OnNext(350, " zaB "),
  932. OnError<string>(360, ex)
  933. );
  934. res["qux"].Messages.AssertEqual(
  935. OnError<string>(360, ex)
  936. );
  937. xs.Subscriptions.AssertEqual(
  938. Subscribe(200, 360)
  939. );
  940. }
  941. [TestMethod]
  942. public void GroupBy_Inner_Comparer_EqualsThrow()
  943. {
  944. var scheduler = new TestScheduler();
  945. var xs = scheduler.CreateHotObservable(
  946. OnNext(90, "error"),
  947. OnNext(110, "error"),
  948. OnNext(130, "error"),
  949. OnNext(220, " foo"),
  950. OnNext(240, " FoO "),
  951. OnNext(270, "baR "),
  952. OnNext(310, "foO "),
  953. OnNext(350, " Baz "),
  954. OnNext(360, " qux "),
  955. OnNext(390, " bar"),
  956. OnNext(420, " BAR "),
  957. OnNext(470, "FOO "),
  958. OnNext(480, "baz "),
  959. OnNext(510, " bAZ "),
  960. OnNext(530, " fOo "),
  961. OnCompleted<string>(570),
  962. OnNext(580, "error"),
  963. OnCompleted<string>(600),
  964. OnError<string>(650, new Exception())
  965. );
  966. var comparer = new GroupByComparer(scheduler, 400, ushort.MaxValue);
  967. var outer = default(IObservable<IGroupedObservable<string, string>>);
  968. var outerSubscription = default(IDisposable);
  969. var inners = new Dictionary<string, IObservable<string>>();
  970. var innerSubscriptions = new Dictionary<string, IDisposable>();
  971. var res = new Dictionary<string, ITestableObserver<string>>();
  972. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x => Reverse(x), comparer));
  973. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  974. {
  975. var result = scheduler.CreateObserver<string>();
  976. inners[group.Key] = group;
  977. res[group.Key] = result;
  978. innerSubscriptions[group.Key] = group.Subscribe(result);
  979. }, _ => { }));
  980. scheduler.ScheduleAbsolute(Disposed, () =>
  981. {
  982. outerSubscription.Dispose();
  983. foreach (var d in innerSubscriptions.Values)
  984. {
  985. d.Dispose();
  986. }
  987. });
  988. scheduler.Start();
  989. Assert.Equal(4, inners.Count);
  990. res["foo"].Messages.AssertEqual(
  991. OnNext(220, "oof "),
  992. OnNext(240, " OoF "),
  993. OnNext(310, " Oof"),
  994. OnError<string>(420, comparer.EqualsException)
  995. );
  996. res["baR"].Messages.AssertEqual(
  997. OnNext(270, " Rab"),
  998. OnNext(390, "rab "),
  999. OnError<string>(420, comparer.EqualsException)
  1000. );
  1001. res["Baz"].Messages.AssertEqual(
  1002. OnNext(350, " zaB "),
  1003. OnError<string>(420, comparer.EqualsException)
  1004. );
  1005. res["qux"].Messages.AssertEqual(
  1006. OnNext(360, " xuq "),
  1007. OnError<string>(420, comparer.EqualsException)
  1008. );
  1009. xs.Subscriptions.AssertEqual(
  1010. Subscribe(200, 420)
  1011. );
  1012. }
  1013. [TestMethod]
  1014. public void GroupBy_Inner_Comparer_GetHashCodeThrow()
  1015. {
  1016. var scheduler = new TestScheduler();
  1017. var xs = scheduler.CreateHotObservable(
  1018. OnNext(90, "error"),
  1019. OnNext(110, "error"),
  1020. OnNext(130, "error"),
  1021. OnNext(220, " foo"),
  1022. OnNext(240, " FoO "),
  1023. OnNext(270, "baR "),
  1024. OnNext(310, "foO "),
  1025. OnNext(350, " Baz "),
  1026. OnNext(360, " qux "),
  1027. OnNext(390, " bar"),
  1028. OnNext(420, " BAR "),
  1029. OnNext(470, "FOO "),
  1030. OnNext(480, "baz "),
  1031. OnNext(510, " bAZ "),
  1032. OnNext(530, " fOo "),
  1033. OnCompleted<string>(570),
  1034. OnNext(580, "error"),
  1035. OnCompleted<string>(600),
  1036. OnError<string>(650, new Exception())
  1037. );
  1038. var comparer = new GroupByComparer(scheduler, ushort.MaxValue, 400);
  1039. var outer = default(IObservable<IGroupedObservable<string, string>>);
  1040. var outerSubscription = default(IDisposable);
  1041. var inners = new Dictionary<string, IObservable<string>>();
  1042. var innerSubscriptions = new Dictionary<string, IDisposable>();
  1043. var res = new Dictionary<string, ITestableObserver<string>>();
  1044. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x => Reverse(x), comparer));
  1045. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  1046. {
  1047. var result = scheduler.CreateObserver<string>();
  1048. inners[group.Key] = group;
  1049. res[group.Key] = result;
  1050. innerSubscriptions[group.Key] = group.Subscribe(result);
  1051. }, _ => { }));
  1052. scheduler.ScheduleAbsolute(Disposed, () =>
  1053. {
  1054. outerSubscription.Dispose();
  1055. foreach (var d in innerSubscriptions.Values)
  1056. {
  1057. d.Dispose();
  1058. }
  1059. });
  1060. scheduler.Start();
  1061. Assert.Equal(4, inners.Count);
  1062. res["foo"].Messages.AssertEqual(
  1063. OnNext(220, "oof "),
  1064. OnNext(240, " OoF "),
  1065. OnNext(310, " Oof"),
  1066. OnError<string>(420, comparer.HashCodeException)
  1067. );
  1068. res["baR"].Messages.AssertEqual(
  1069. OnNext(270, " Rab"),
  1070. OnNext(390, "rab "),
  1071. OnError<string>(420, comparer.HashCodeException)
  1072. );
  1073. res["Baz"].Messages.AssertEqual(
  1074. OnNext(350, " zaB "),
  1075. OnError<string>(420, comparer.HashCodeException)
  1076. );
  1077. res["qux"].Messages.AssertEqual(
  1078. OnNext(360, " xuq "),
  1079. OnError<string>(420, comparer.HashCodeException)
  1080. );
  1081. xs.Subscriptions.AssertEqual(
  1082. Subscribe(200, 420)
  1083. );
  1084. }
  1085. [TestMethod]
  1086. public void GroupBy_Outer_Independence()
  1087. {
  1088. var scheduler = new TestScheduler();
  1089. var xs = scheduler.CreateHotObservable(
  1090. OnNext(90, "error"),
  1091. OnNext(110, "error"),
  1092. OnNext(130, "error"),
  1093. OnNext(220, " foo"),
  1094. OnNext(240, " FoO "),
  1095. OnNext(270, "baR "),
  1096. OnNext(310, "foO "),
  1097. OnNext(350, " Baz "),
  1098. OnNext(360, " qux "),
  1099. OnNext(390, " bar"),
  1100. OnNext(420, " BAR "),
  1101. OnNext(470, "FOO "),
  1102. OnNext(480, "baz "),
  1103. OnNext(510, " bAZ "),
  1104. OnNext(530, " fOo "),
  1105. OnCompleted<string>(570),
  1106. OnNext(580, "error"),
  1107. OnCompleted<string>(600),
  1108. OnError<string>(650, new Exception())
  1109. );
  1110. var comparer = new GroupByComparer(scheduler);
  1111. var outer = default(IObservable<IGroupedObservable<string, string>>);
  1112. var outerSubscription = default(IDisposable);
  1113. var inners = new Dictionary<string, IObservable<string>>();
  1114. var innerSubscriptions = new Dictionary<string, IDisposable>();
  1115. var res = new Dictionary<string, ITestableObserver<string>>();
  1116. var outerResults = scheduler.CreateObserver<string>();
  1117. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x => Reverse(x), comparer));
  1118. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  1119. {
  1120. outerResults.OnNext(group.Key);
  1121. var result = scheduler.CreateObserver<string>();
  1122. inners[group.Key] = group;
  1123. res[group.Key] = result;
  1124. innerSubscriptions[group.Key] = group.Subscribe(result);
  1125. }, outerResults.OnError, outerResults.OnCompleted));
  1126. scheduler.ScheduleAbsolute(Disposed, () =>
  1127. {
  1128. outerSubscription.Dispose();
  1129. foreach (var d in innerSubscriptions.Values)
  1130. {
  1131. d.Dispose();
  1132. }
  1133. });
  1134. scheduler.ScheduleAbsolute(320, () => outerSubscription.Dispose());
  1135. scheduler.Start();
  1136. Assert.Equal(2, inners.Count);
  1137. outerResults.Messages.AssertEqual(
  1138. OnNext(220, "foo"),
  1139. OnNext(270, "baR")
  1140. );
  1141. res["foo"].Messages.AssertEqual(
  1142. OnNext(220, "oof "),
  1143. OnNext(240, " OoF "),
  1144. OnNext(310, " Oof"),
  1145. OnNext(470, " OOF"),
  1146. OnNext(530, " oOf "),
  1147. OnCompleted<string>(570)
  1148. );
  1149. res["baR"].Messages.AssertEqual(
  1150. OnNext(270, " Rab"),
  1151. OnNext(390, "rab "),
  1152. OnNext(420, " RAB "),
  1153. OnCompleted<string>(570)
  1154. );
  1155. xs.Subscriptions.AssertEqual(
  1156. Subscribe(200, 570)
  1157. );
  1158. }
  1159. [TestMethod]
  1160. public void GroupBy_Inner_Independence()
  1161. {
  1162. var scheduler = new TestScheduler();
  1163. var xs = scheduler.CreateHotObservable(
  1164. OnNext(90, "error"),
  1165. OnNext(110, "error"),
  1166. OnNext(130, "error"),
  1167. OnNext(220, " foo"),
  1168. OnNext(240, " FoO "),
  1169. OnNext(270, "baR "),
  1170. OnNext(310, "foO "),
  1171. OnNext(350, " Baz "),
  1172. OnNext(360, " qux "),
  1173. OnNext(390, " bar"),
  1174. OnNext(420, " BAR "),
  1175. OnNext(470, "FOO "),
  1176. OnNext(480, "baz "),
  1177. OnNext(510, " bAZ "),
  1178. OnNext(530, " fOo "),
  1179. OnCompleted<string>(570),
  1180. OnNext(580, "error"),
  1181. OnCompleted<string>(600),
  1182. OnError<string>(650, new Exception())
  1183. );
  1184. var comparer = new GroupByComparer(scheduler);
  1185. var outer = default(IObservable<IGroupedObservable<string, string>>);
  1186. var outerSubscription = default(IDisposable);
  1187. var inners = new Dictionary<string, IObservable<string>>();
  1188. var innerSubscriptions = new Dictionary<string, IDisposable>();
  1189. var res = new Dictionary<string, ITestableObserver<string>>();
  1190. var outerResults = scheduler.CreateObserver<string>();
  1191. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x => Reverse(x), comparer));
  1192. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  1193. {
  1194. outerResults.OnNext(group.Key);
  1195. var result = scheduler.CreateObserver<string>();
  1196. inners[group.Key] = group;
  1197. res[group.Key] = result;
  1198. innerSubscriptions[group.Key] = group.Subscribe(result);
  1199. }, outerResults.OnError, outerResults.OnCompleted));
  1200. scheduler.ScheduleAbsolute(Disposed, () =>
  1201. {
  1202. outerSubscription.Dispose();
  1203. foreach (var d in innerSubscriptions.Values)
  1204. {
  1205. d.Dispose();
  1206. }
  1207. });
  1208. scheduler.ScheduleAbsolute(320, () => innerSubscriptions["foo"].Dispose());
  1209. scheduler.Start();
  1210. Assert.Equal(4, inners.Count);
  1211. res["foo"].Messages.AssertEqual(
  1212. OnNext(220, "oof "),
  1213. OnNext(240, " OoF "),
  1214. OnNext(310, " Oof")
  1215. );
  1216. res["baR"].Messages.AssertEqual(
  1217. OnNext(270, " Rab"),
  1218. OnNext(390, "rab "),
  1219. OnNext(420, " RAB "),
  1220. OnCompleted<string>(570)
  1221. );
  1222. res["Baz"].Messages.AssertEqual(
  1223. OnNext(350, " zaB "),
  1224. OnNext(480, " zab"),
  1225. OnNext(510, " ZAb "),
  1226. OnCompleted<string>(570)
  1227. );
  1228. res["qux"].Messages.AssertEqual(
  1229. OnNext(360, " xuq "),
  1230. OnCompleted<string>(570)
  1231. );
  1232. xs.Subscriptions.AssertEqual(
  1233. Subscribe(200, 570)
  1234. );
  1235. }
  1236. [TestMethod]
  1237. public void GroupBy_Inner_Multiple_Independence()
  1238. {
  1239. var scheduler = new TestScheduler();
  1240. var xs = scheduler.CreateHotObservable(
  1241. OnNext(90, "error"),
  1242. OnNext(110, "error"),
  1243. OnNext(130, "error"),
  1244. OnNext(220, " foo"),
  1245. OnNext(240, " FoO "),
  1246. OnNext(270, "baR "),
  1247. OnNext(310, "foO "),
  1248. OnNext(350, " Baz "),
  1249. OnNext(360, " qux "),
  1250. OnNext(390, " bar"),
  1251. OnNext(420, " BAR "),
  1252. OnNext(470, "FOO "),
  1253. OnNext(480, "baz "),
  1254. OnNext(510, " bAZ "),
  1255. OnNext(530, " fOo "),
  1256. OnCompleted<string>(570),
  1257. OnNext(580, "error"),
  1258. OnCompleted<string>(600),
  1259. OnError<string>(650, new Exception())
  1260. );
  1261. var comparer = new GroupByComparer(scheduler);
  1262. var outer = default(IObservable<IGroupedObservable<string, string>>);
  1263. var outerSubscription = default(IDisposable);
  1264. var inners = new Dictionary<string, IObservable<string>>();
  1265. var innerSubscriptions = new Dictionary<string, IDisposable>();
  1266. var res = new Dictionary<string, ITestableObserver<string>>();
  1267. var outerResults = scheduler.CreateObserver<string>();
  1268. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x => Reverse(x), comparer));
  1269. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  1270. {
  1271. outerResults.OnNext(group.Key);
  1272. var result = scheduler.CreateObserver<string>();
  1273. inners[group.Key] = group;
  1274. res[group.Key] = result;
  1275. innerSubscriptions[group.Key] = group.Subscribe(result);
  1276. }, outerResults.OnError, outerResults.OnCompleted));
  1277. scheduler.ScheduleAbsolute(Disposed, () =>
  1278. {
  1279. outerSubscription.Dispose();
  1280. foreach (var d in innerSubscriptions.Values)
  1281. {
  1282. d.Dispose();
  1283. }
  1284. });
  1285. scheduler.ScheduleAbsolute(320, () => innerSubscriptions["foo"].Dispose());
  1286. scheduler.ScheduleAbsolute(280, () => innerSubscriptions["baR"].Dispose());
  1287. scheduler.ScheduleAbsolute(355, () => innerSubscriptions["Baz"].Dispose());
  1288. scheduler.ScheduleAbsolute(400, () => innerSubscriptions["qux"].Dispose());
  1289. scheduler.Start();
  1290. Assert.Equal(4, inners.Count);
  1291. res["foo"].Messages.AssertEqual(
  1292. OnNext(220, "oof "),
  1293. OnNext(240, " OoF "),
  1294. OnNext(310, " Oof")
  1295. );
  1296. res["baR"].Messages.AssertEqual(
  1297. OnNext(270, " Rab")
  1298. );
  1299. res["Baz"].Messages.AssertEqual(
  1300. OnNext(350, " zaB ")
  1301. );
  1302. res["qux"].Messages.AssertEqual(
  1303. OnNext(360, " xuq ")
  1304. );
  1305. xs.Subscriptions.AssertEqual(
  1306. Subscribe(200, 570)
  1307. );
  1308. }
  1309. [TestMethod]
  1310. public void GroupBy_Inner_Escape_Complete()
  1311. {
  1312. var scheduler = new TestScheduler();
  1313. var xs = scheduler.CreateHotObservable(
  1314. OnNext(220, " foo"),
  1315. OnNext(240, " FoO "),
  1316. OnNext(310, "foO "),
  1317. OnNext(470, "FOO "),
  1318. OnNext(530, " fOo "),
  1319. OnCompleted<string>(570)
  1320. );
  1321. var outer = default(IObservable<IGroupedObservable<string, string>>);
  1322. var outerSubscription = default(IDisposable);
  1323. var inner = default(IObservable<string>);
  1324. var innerSubscription = default(IDisposable);
  1325. var res = scheduler.CreateObserver<string>();
  1326. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim()));
  1327. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  1328. {
  1329. inner = group;
  1330. }));
  1331. scheduler.ScheduleAbsolute(600, () => innerSubscription = inner.Subscribe(res));
  1332. scheduler.ScheduleAbsolute(Disposed, () =>
  1333. {
  1334. outerSubscription.Dispose();
  1335. innerSubscription.Dispose();
  1336. });
  1337. scheduler.Start();
  1338. xs.Subscriptions.AssertEqual(
  1339. Subscribe(200, 570)
  1340. );
  1341. res.Messages.AssertEqual(
  1342. OnCompleted<string>(600)
  1343. );
  1344. }
  1345. [TestMethod]
  1346. public void GroupBy_Inner_Escape_Error()
  1347. {
  1348. var scheduler = new TestScheduler();
  1349. var ex = new Exception();
  1350. var xs = scheduler.CreateHotObservable(
  1351. OnNext(220, " foo"),
  1352. OnNext(240, " FoO "),
  1353. OnNext(310, "foO "),
  1354. OnNext(470, "FOO "),
  1355. OnNext(530, " fOo "),
  1356. OnError<string>(570, ex)
  1357. );
  1358. var outer = default(IObservable<IGroupedObservable<string, string>>);
  1359. var outerSubscription = default(IDisposable);
  1360. var inner = default(IObservable<string>);
  1361. var innerSubscription = default(IDisposable);
  1362. var res = scheduler.CreateObserver<string>();
  1363. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim()));
  1364. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  1365. {
  1366. inner = group;
  1367. }, _ => { }));
  1368. scheduler.ScheduleAbsolute(600, () => innerSubscription = inner.Subscribe(res));
  1369. scheduler.ScheduleAbsolute(Disposed, () =>
  1370. {
  1371. outerSubscription.Dispose();
  1372. innerSubscription.Dispose();
  1373. });
  1374. scheduler.Start();
  1375. xs.Subscriptions.AssertEqual(
  1376. Subscribe(200, 570)
  1377. );
  1378. res.Messages.AssertEqual(
  1379. OnError<string>(600, ex)
  1380. );
  1381. }
  1382. [TestMethod]
  1383. public void GroupBy_Inner_Escape_Dispose()
  1384. {
  1385. var scheduler = new TestScheduler();
  1386. var xs = scheduler.CreateHotObservable(
  1387. OnNext(220, " foo"),
  1388. OnNext(240, " FoO "),
  1389. OnNext(310, "foO "),
  1390. OnNext(470, "FOO "),
  1391. OnNext(530, " fOo "),
  1392. OnError<string>(570, new Exception())
  1393. );
  1394. var outer = default(IObservable<IGroupedObservable<string, string>>);
  1395. var outerSubscription = default(IDisposable);
  1396. var inner = default(IObservable<string>);
  1397. var innerSubscription = default(IDisposable);
  1398. var res = scheduler.CreateObserver<string>();
  1399. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim()));
  1400. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  1401. {
  1402. inner = group;
  1403. }));
  1404. scheduler.ScheduleAbsolute(400, () => outerSubscription.Dispose());
  1405. scheduler.ScheduleAbsolute(600, () => innerSubscription = inner.Subscribe(res));
  1406. scheduler.ScheduleAbsolute(Disposed, () =>
  1407. {
  1408. innerSubscription.Dispose();
  1409. });
  1410. scheduler.Start();
  1411. xs.Subscriptions.AssertEqual(
  1412. Subscribe(200, 400)
  1413. );
  1414. res.Messages.AssertEqual(
  1415. );
  1416. }
  1417. [TestMethod]
  1418. public void GroupBy_NullKeys_Simple()
  1419. {
  1420. var scheduler = new TestScheduler();
  1421. var xs = scheduler.CreateHotObservable(
  1422. OnNext(220, "bar"),
  1423. OnNext(240, "foo"),
  1424. OnNext(310, "qux"),
  1425. OnNext(470, "baz"),
  1426. OnCompleted<string>(500)
  1427. );
  1428. var res = scheduler.Start(() => xs.GroupBy(x => x[0] == 'b' ? null : x.ToUpper()).SelectMany(g => g, (g, x) => (g.Key ?? "(null)") + x));
  1429. res.Messages.AssertEqual(
  1430. OnNext(220, "(null)bar"),
  1431. OnNext(240, "FOOfoo"),
  1432. OnNext(310, "QUXqux"),
  1433. OnNext(470, "(null)baz"),
  1434. OnCompleted<string>(500)
  1435. );
  1436. xs.Subscriptions.AssertEqual(
  1437. Subscribe(200, 500)
  1438. );
  1439. }
  1440. [TestMethod]
  1441. public void GroupBy_NullKeys_Error()
  1442. {
  1443. var scheduler = new TestScheduler();
  1444. var ex = new Exception();
  1445. var xs = scheduler.CreateHotObservable(
  1446. OnNext(220, "bar"),
  1447. OnNext(240, "foo"),
  1448. OnNext(310, "qux"),
  1449. OnNext(470, "baz"),
  1450. OnError<string>(500, ex)
  1451. );
  1452. var nullGroup = scheduler.CreateObserver<string>();
  1453. var err = default(Exception);
  1454. scheduler.ScheduleAbsolute(200, () => xs.GroupBy(x => x[0] == 'b' ? null : x.ToUpper()).Where(g => g.Key == null).Subscribe(g => g.Subscribe(nullGroup), ex_ => err = ex_));
  1455. scheduler.Start();
  1456. Assert.Same(ex, err);
  1457. nullGroup.Messages.AssertEqual(
  1458. OnNext(220, "bar"),
  1459. OnNext(470, "baz"),
  1460. OnError<string>(500, ex)
  1461. );
  1462. xs.Subscriptions.AssertEqual(
  1463. Subscribe(200, 500)
  1464. );
  1465. }
  1466. private static string Reverse(string s)
  1467. {
  1468. var sb = new StringBuilder();
  1469. for (var i = s.Length - 1; i >= 0; i--)
  1470. {
  1471. sb.Append(s[i]);
  1472. }
  1473. return sb.ToString();
  1474. }
  1475. #endregion
  1476. #region + GroupBy w/capacity +
  1477. private const int _groupByCapacity = 1024;
  1478. [TestMethod]
  1479. public void GroupBy_Capacity_ArgumentChecking()
  1480. {
  1481. ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).GroupBy(DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, _groupByCapacity, EqualityComparer<int>.Default));
  1482. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy(null, DummyFunc<int, int>.Instance, _groupByCapacity, EqualityComparer<int>.Default));
  1483. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, (Func<int, int>)null, _groupByCapacity, EqualityComparer<int>.Default));
  1484. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, _groupByCapacity, null));
  1485. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, _groupByCapacity, EqualityComparer<int>.Default).Subscribe(null));
  1486. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, -1, EqualityComparer<int>.Default));
  1487. }
  1488. [TestMethod]
  1489. public void GroupBy_Capacity_KeyEle_ArgumentChecking()
  1490. {
  1491. ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).GroupBy(DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, _groupByCapacity));
  1492. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy((Func<int, int>)null, DummyFunc<int, int>.Instance, _groupByCapacity));
  1493. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, (Func<int, int>)null, _groupByCapacity));
  1494. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, _groupByCapacity).Subscribe(null));
  1495. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, -1));
  1496. }
  1497. [TestMethod]
  1498. public void GroupBy_Capacity_KeyComparer_ArgumentChecking()
  1499. {
  1500. ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).GroupBy(DummyFunc<int, int>.Instance, _groupByCapacity, EqualityComparer<int>.Default));
  1501. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy(null, _groupByCapacity, EqualityComparer<int>.Default));
  1502. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, _groupByCapacity, null));
  1503. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, _groupByCapacity, EqualityComparer<int>.Default).Subscribe(null));
  1504. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, -1, EqualityComparer<int>.Default));
  1505. }
  1506. [TestMethod]
  1507. public void GroupBy_Capacity_Key_ArgumentChecking()
  1508. {
  1509. ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).GroupBy(DummyFunc<int, int>.Instance, _groupByCapacity));
  1510. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy((Func<int, int>)null, _groupByCapacity));
  1511. ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, _groupByCapacity).Subscribe(null));
  1512. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => DummyObservable<int>.Instance.GroupBy(DummyFunc<int, int>.Instance, -1));
  1513. }
  1514. [TestMethod]
  1515. public void GroupBy_Capacity_WithKeyComparer()
  1516. {
  1517. var scheduler = new TestScheduler();
  1518. var keyInvoked = 0;
  1519. var xs = scheduler.CreateHotObservable(
  1520. OnNext(90, "error"),
  1521. OnNext(110, "error"),
  1522. OnNext(130, "error"),
  1523. OnNext(220, " foo"),
  1524. OnNext(240, " FoO "),
  1525. OnNext(270, "baR "),
  1526. OnNext(310, "foO "),
  1527. OnNext(350, " Baz "),
  1528. OnNext(360, " qux "),
  1529. OnNext(390, " bar"),
  1530. OnNext(420, " BAR "),
  1531. OnNext(470, "FOO "),
  1532. OnNext(480, "baz "),
  1533. OnNext(510, " bAZ "),
  1534. OnNext(530, " fOo "),
  1535. OnCompleted<string>(570),
  1536. OnNext(580, "error"),
  1537. OnCompleted<string>(600),
  1538. OnError<string>(650, new Exception())
  1539. );
  1540. var comparer = new GroupByComparer(scheduler);
  1541. var res = scheduler.Start(() =>
  1542. xs.GroupBy(x =>
  1543. {
  1544. keyInvoked++;
  1545. return x.Trim();
  1546. }, _groupByCapacity, comparer).Select(g => g.Key)
  1547. );
  1548. res.Messages.AssertEqual(
  1549. OnNext(220, "foo"),
  1550. OnNext(270, "baR"),
  1551. OnNext(350, "Baz"),
  1552. OnNext(360, "qux"),
  1553. OnCompleted<string>(570)
  1554. );
  1555. xs.Subscriptions.AssertEqual(
  1556. Subscribe(200, 570)
  1557. );
  1558. Assert.Equal(12, keyInvoked);
  1559. }
  1560. [TestMethod]
  1561. public void GroupBy_Capacity_Outer_Complete()
  1562. {
  1563. var scheduler = new TestScheduler();
  1564. var keyInvoked = 0;
  1565. var eleInvoked = 0;
  1566. var xs = scheduler.CreateHotObservable(
  1567. OnNext(90, "error"),
  1568. OnNext(110, "error"),
  1569. OnNext(130, "error"),
  1570. OnNext(220, " foo"),
  1571. OnNext(240, " FoO "),
  1572. OnNext(270, "baR "),
  1573. OnNext(310, "foO "),
  1574. OnNext(350, " Baz "),
  1575. OnNext(360, " qux "),
  1576. OnNext(390, " bar"),
  1577. OnNext(420, " BAR "),
  1578. OnNext(470, "FOO "),
  1579. OnNext(480, "baz "),
  1580. OnNext(510, " bAZ "),
  1581. OnNext(530, " fOo "),
  1582. OnCompleted<string>(570),
  1583. OnNext(580, "error"),
  1584. OnCompleted<string>(600),
  1585. OnError<string>(650, new Exception())
  1586. );
  1587. var comparer = new GroupByComparer(scheduler);
  1588. var res = scheduler.Start(() =>
  1589. xs.GroupBy(
  1590. x =>
  1591. {
  1592. keyInvoked++;
  1593. return x.Trim();
  1594. },
  1595. x =>
  1596. {
  1597. eleInvoked++;
  1598. return Reverse(x);
  1599. },
  1600. _groupByCapacity,
  1601. comparer
  1602. ).Select(g => g.Key)
  1603. );
  1604. res.Messages.AssertEqual(
  1605. OnNext(220, "foo"),
  1606. OnNext(270, "baR"),
  1607. OnNext(350, "Baz"),
  1608. OnNext(360, "qux"),
  1609. OnCompleted<string>(570)
  1610. );
  1611. xs.Subscriptions.AssertEqual(
  1612. Subscribe(200, 570)
  1613. );
  1614. Assert.Equal(12, keyInvoked);
  1615. Assert.Equal(12, eleInvoked);
  1616. }
  1617. [TestMethod]
  1618. public void GroupBy_Capacity_Outer_Error()
  1619. {
  1620. var scheduler = new TestScheduler();
  1621. var keyInvoked = 0;
  1622. var eleInvoked = 0;
  1623. var ex = new Exception();
  1624. var xs = scheduler.CreateHotObservable(
  1625. OnNext(90, "error"),
  1626. OnNext(110, "error"),
  1627. OnNext(130, "error"),
  1628. OnNext(220, " foo"),
  1629. OnNext(240, " FoO "),
  1630. OnNext(270, "baR "),
  1631. OnNext(310, "foO "),
  1632. OnNext(350, " Baz "),
  1633. OnNext(360, " qux "),
  1634. OnNext(390, " bar"),
  1635. OnNext(420, " BAR "),
  1636. OnNext(470, "FOO "),
  1637. OnNext(480, "baz "),
  1638. OnNext(510, " bAZ "),
  1639. OnNext(530, " fOo "),
  1640. OnError<string>(570, ex),
  1641. OnNext(580, "error"),
  1642. OnCompleted<string>(600),
  1643. OnError<string>(650, new Exception())
  1644. );
  1645. var comparer = new GroupByComparer(scheduler);
  1646. var res = scheduler.Start(() =>
  1647. xs.GroupBy(
  1648. x =>
  1649. {
  1650. keyInvoked++;
  1651. return x.Trim();
  1652. },
  1653. x =>
  1654. {
  1655. eleInvoked++;
  1656. return Reverse(x);
  1657. },
  1658. _groupByCapacity,
  1659. comparer
  1660. ).Select(g => g.Key)
  1661. );
  1662. res.Messages.AssertEqual(
  1663. OnNext(220, "foo"),
  1664. OnNext(270, "baR"),
  1665. OnNext(350, "Baz"),
  1666. OnNext(360, "qux"),
  1667. OnError<string>(570, ex)
  1668. );
  1669. xs.Subscriptions.AssertEqual(
  1670. Subscribe(200, 570)
  1671. );
  1672. Assert.Equal(12, keyInvoked);
  1673. Assert.Equal(12, eleInvoked);
  1674. }
  1675. [TestMethod]
  1676. public void GroupBy_Capacity_Outer_Dispose()
  1677. {
  1678. var scheduler = new TestScheduler();
  1679. var keyInvoked = 0;
  1680. var eleInvoked = 0;
  1681. var xs = scheduler.CreateHotObservable(
  1682. OnNext(90, "error"),
  1683. OnNext(110, "error"),
  1684. OnNext(130, "error"),
  1685. OnNext(220, " foo"),
  1686. OnNext(240, " FoO "),
  1687. OnNext(270, "baR "),
  1688. OnNext(310, "foO "),
  1689. OnNext(350, " Baz "),
  1690. OnNext(360, " qux "),
  1691. OnNext(390, " bar"),
  1692. OnNext(420, " BAR "),
  1693. OnNext(470, "FOO "),
  1694. OnNext(480, "baz "),
  1695. OnNext(510, " bAZ "),
  1696. OnNext(530, " fOo "),
  1697. OnCompleted<string>(570),
  1698. OnNext(580, "error"),
  1699. OnCompleted<string>(600),
  1700. OnError<string>(650, new Exception())
  1701. );
  1702. var comparer = new GroupByComparer(scheduler);
  1703. var res = scheduler.Start(() =>
  1704. xs.GroupBy(
  1705. x =>
  1706. {
  1707. keyInvoked++;
  1708. return x.Trim();
  1709. }, x =>
  1710. {
  1711. eleInvoked++;
  1712. return Reverse(x);
  1713. }, _groupByCapacity, comparer
  1714. ).Select(g => g.Key),
  1715. 355
  1716. );
  1717. res.Messages.AssertEqual(
  1718. OnNext(220, "foo"),
  1719. OnNext(270, "baR"),
  1720. OnNext(350, "Baz")
  1721. );
  1722. xs.Subscriptions.AssertEqual(
  1723. Subscribe(200, 355)
  1724. );
  1725. Assert.Equal(5, keyInvoked);
  1726. Assert.Equal(5, eleInvoked);
  1727. }
  1728. [TestMethod]
  1729. public void GroupBy_Capacity_Outer_KeyThrow()
  1730. {
  1731. var scheduler = new TestScheduler();
  1732. var keyInvoked = 0;
  1733. var eleInvoked = 0;
  1734. var ex = new Exception();
  1735. var xs = scheduler.CreateHotObservable(
  1736. OnNext(90, "error"),
  1737. OnNext(110, "error"),
  1738. OnNext(130, "error"),
  1739. OnNext(220, " foo"),
  1740. OnNext(240, " FoO "),
  1741. OnNext(270, "baR "),
  1742. OnNext(310, "foO "),
  1743. OnNext(350, " Baz "),
  1744. OnNext(360, " qux "),
  1745. OnNext(390, " bar"),
  1746. OnNext(420, " BAR "),
  1747. OnNext(470, "FOO "),
  1748. OnNext(480, "baz "),
  1749. OnNext(510, " bAZ "),
  1750. OnNext(530, " fOo "),
  1751. OnCompleted<string>(570),
  1752. OnNext(580, "error"),
  1753. OnCompleted<string>(600),
  1754. OnError<string>(650, new Exception())
  1755. );
  1756. var comparer = new GroupByComparer(scheduler);
  1757. var res = scheduler.Start(() =>
  1758. xs.GroupBy(
  1759. x =>
  1760. {
  1761. keyInvoked++;
  1762. if (keyInvoked == 10)
  1763. {
  1764. throw ex;
  1765. }
  1766. return x.Trim();
  1767. },
  1768. x =>
  1769. {
  1770. eleInvoked++;
  1771. return Reverse(x);
  1772. },
  1773. _groupByCapacity,
  1774. comparer
  1775. ).Select(g => g.Key)
  1776. );
  1777. res.Messages.AssertEqual(
  1778. OnNext(220, "foo"),
  1779. OnNext(270, "baR"),
  1780. OnNext(350, "Baz"),
  1781. OnNext(360, "qux"),
  1782. OnError<string>(480, ex)
  1783. );
  1784. xs.Subscriptions.AssertEqual(
  1785. Subscribe(200, 480)
  1786. );
  1787. Assert.Equal(10, keyInvoked);
  1788. Assert.Equal(9, eleInvoked);
  1789. }
  1790. [TestMethod]
  1791. public void GroupBy_Capacity_Outer_EleThrow()
  1792. {
  1793. var scheduler = new TestScheduler();
  1794. var keyInvoked = 0;
  1795. var eleInvoked = 0;
  1796. var ex = new Exception();
  1797. var xs = scheduler.CreateHotObservable(
  1798. OnNext(90, "error"),
  1799. OnNext(110, "error"),
  1800. OnNext(130, "error"),
  1801. OnNext(220, " foo"),
  1802. OnNext(240, " FoO "),
  1803. OnNext(270, "baR "),
  1804. OnNext(310, "foO "),
  1805. OnNext(350, " Baz "),
  1806. OnNext(360, " qux "),
  1807. OnNext(390, " bar"),
  1808. OnNext(420, " BAR "),
  1809. OnNext(470, "FOO "),
  1810. OnNext(480, "baz "),
  1811. OnNext(510, " bAZ "),
  1812. OnNext(530, " fOo "),
  1813. OnCompleted<string>(570),
  1814. OnNext(580, "error"),
  1815. OnCompleted<string>(600),
  1816. OnError<string>(650, new Exception())
  1817. );
  1818. var comparer = new GroupByComparer(scheduler);
  1819. var res = scheduler.Start(() =>
  1820. xs.GroupBy(
  1821. x =>
  1822. {
  1823. keyInvoked++;
  1824. return x.Trim();
  1825. },
  1826. x =>
  1827. {
  1828. eleInvoked++;
  1829. if (eleInvoked == 10)
  1830. {
  1831. throw ex;
  1832. }
  1833. return Reverse(x);
  1834. },
  1835. _groupByCapacity,
  1836. comparer
  1837. ).Select(g => g.Key)
  1838. );
  1839. res.Messages.AssertEqual(
  1840. OnNext(220, "foo"),
  1841. OnNext(270, "baR"),
  1842. OnNext(350, "Baz"),
  1843. OnNext(360, "qux"),
  1844. OnError<string>(480, ex)
  1845. );
  1846. xs.Subscriptions.AssertEqual(
  1847. Subscribe(200, 480)
  1848. );
  1849. Assert.Equal(10, keyInvoked);
  1850. Assert.Equal(10, eleInvoked);
  1851. }
  1852. [TestMethod]
  1853. public void GroupBy_Capacity_Outer_ComparerEqualsThrow()
  1854. {
  1855. var scheduler = new TestScheduler();
  1856. var keyInvoked = 0;
  1857. var eleInvoked = 0;
  1858. var xs = scheduler.CreateHotObservable(
  1859. OnNext(90, "error"),
  1860. OnNext(110, "error"),
  1861. OnNext(130, "error"),
  1862. OnNext(220, " foo"),
  1863. OnNext(240, " FoO "),
  1864. OnNext(270, "baR "),
  1865. OnNext(310, "foO "),
  1866. OnNext(350, " Baz "),
  1867. OnNext(360, " qux "),
  1868. OnNext(390, " bar"),
  1869. OnNext(420, " BAR "),
  1870. OnNext(470, "FOO "),
  1871. OnNext(480, "baz "),
  1872. OnNext(510, " bAZ "),
  1873. OnNext(530, " fOo "),
  1874. OnCompleted<string>(570),
  1875. OnNext(580, "error"),
  1876. OnCompleted<string>(600),
  1877. OnError<string>(650, new Exception())
  1878. );
  1879. var comparer = new GroupByComparer(scheduler, 250, ushort.MaxValue);
  1880. var res = scheduler.Start(() =>
  1881. xs.GroupBy(
  1882. x =>
  1883. {
  1884. keyInvoked++;
  1885. return x.Trim();
  1886. },
  1887. x =>
  1888. {
  1889. eleInvoked++;
  1890. return Reverse(x);
  1891. },
  1892. _groupByCapacity,
  1893. comparer
  1894. ).Select(g => g.Key)
  1895. );
  1896. res.Messages.AssertEqual(
  1897. OnNext(220, "foo"),
  1898. OnNext(270, "baR"),
  1899. OnError<string>(310, comparer.EqualsException)
  1900. );
  1901. xs.Subscriptions.AssertEqual(
  1902. Subscribe(200, 310)
  1903. );
  1904. Assert.Equal(4, keyInvoked);
  1905. Assert.Equal(3, eleInvoked);
  1906. }
  1907. [TestMethod]
  1908. public void GroupBy_Capacity_Outer_ComparerGetHashCodeThrow()
  1909. {
  1910. var scheduler = new TestScheduler();
  1911. var keyInvoked = 0;
  1912. var eleInvoked = 0;
  1913. var xs = scheduler.CreateHotObservable(
  1914. OnNext(90, "error"),
  1915. OnNext(110, "error"),
  1916. OnNext(130, "error"),
  1917. OnNext(220, " foo"),
  1918. OnNext(240, " FoO "),
  1919. OnNext(270, "baR "),
  1920. OnNext(310, "foO "),
  1921. OnNext(350, " Baz "),
  1922. OnNext(360, " qux "),
  1923. OnNext(390, " bar"),
  1924. OnNext(420, " BAR "),
  1925. OnNext(470, "FOO "),
  1926. OnNext(480, "baz "),
  1927. OnNext(510, " bAZ "),
  1928. OnNext(530, " fOo "),
  1929. OnCompleted<string>(570),
  1930. OnNext(580, "error"),
  1931. OnCompleted<string>(600),
  1932. OnError<string>(650, new Exception())
  1933. );
  1934. var comparer = new GroupByComparer(scheduler, ushort.MaxValue, 410);
  1935. var res = scheduler.Start(() =>
  1936. xs.GroupBy(
  1937. x =>
  1938. {
  1939. keyInvoked++;
  1940. return x.Trim();
  1941. },
  1942. x =>
  1943. {
  1944. eleInvoked++;
  1945. return Reverse(x);
  1946. },
  1947. _groupByCapacity,
  1948. comparer
  1949. ).Select(g => g.Key)
  1950. );
  1951. res.Messages.AssertEqual(
  1952. OnNext(220, "foo"),
  1953. OnNext(270, "baR"),
  1954. OnNext(350, "Baz"),
  1955. OnNext(360, "qux"),
  1956. OnError<string>(420, comparer.HashCodeException)
  1957. );
  1958. xs.Subscriptions.AssertEqual(
  1959. Subscribe(200, 420)
  1960. );
  1961. Assert.Equal(8, keyInvoked);
  1962. Assert.Equal(7, eleInvoked);
  1963. }
  1964. [TestMethod]
  1965. public void GroupBy_Capacity_Inner_Complete()
  1966. {
  1967. var scheduler = new TestScheduler();
  1968. var xs = scheduler.CreateHotObservable(
  1969. OnNext(90, "error"),
  1970. OnNext(110, "error"),
  1971. OnNext(130, "error"),
  1972. OnNext(220, " foo"),
  1973. OnNext(240, " FoO "),
  1974. OnNext(270, "baR "),
  1975. OnNext(310, "foO "),
  1976. OnNext(350, " Baz "),
  1977. OnNext(360, " qux "),
  1978. OnNext(390, " bar"),
  1979. OnNext(420, " BAR "),
  1980. OnNext(470, "FOO "),
  1981. OnNext(480, "baz "),
  1982. OnNext(510, " bAZ "),
  1983. OnNext(530, " fOo "),
  1984. OnCompleted<string>(570),
  1985. OnNext(580, "error"),
  1986. OnCompleted<string>(600),
  1987. OnError<string>(650, new Exception())
  1988. );
  1989. var comparer = new GroupByComparer(scheduler);
  1990. var outer = default(IObservable<IGroupedObservable<string, string>>);
  1991. var outerSubscription = default(IDisposable);
  1992. var inners = new Dictionary<string, IObservable<string>>();
  1993. var innerSubscriptions = new Dictionary<string, IDisposable>();
  1994. var res = new Dictionary<string, ITestableObserver<string>>();
  1995. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x => Reverse(x), _groupByCapacity, comparer));
  1996. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  1997. {
  1998. var result = scheduler.CreateObserver<string>();
  1999. inners[group.Key] = group;
  2000. res[group.Key] = result;
  2001. scheduler.ScheduleRelative(100, () => innerSubscriptions[group.Key] = group.Subscribe(result));
  2002. }));
  2003. scheduler.ScheduleAbsolute(Disposed, () =>
  2004. {
  2005. outerSubscription.Dispose();
  2006. foreach (var d in innerSubscriptions.Values)
  2007. {
  2008. d.Dispose();
  2009. }
  2010. });
  2011. scheduler.Start();
  2012. Assert.Equal(4, inners.Count);
  2013. res["foo"].Messages.AssertEqual(
  2014. OnNext(470, " OOF"),
  2015. OnNext(530, " oOf "),
  2016. OnCompleted<string>(570)
  2017. );
  2018. res["baR"].Messages.AssertEqual(
  2019. OnNext(390, "rab "),
  2020. OnNext(420, " RAB "),
  2021. OnCompleted<string>(570)
  2022. );
  2023. res["Baz"].Messages.AssertEqual(
  2024. OnNext(480, " zab"),
  2025. OnNext(510, " ZAb "),
  2026. OnCompleted<string>(570)
  2027. );
  2028. res["qux"].Messages.AssertEqual(
  2029. OnCompleted<string>(570)
  2030. );
  2031. xs.Subscriptions.AssertEqual(
  2032. Subscribe(200, 570)
  2033. );
  2034. }
  2035. [TestMethod]
  2036. public void GroupBy_Capacity_Inner_Complete_All()
  2037. {
  2038. var scheduler = new TestScheduler();
  2039. var xs = scheduler.CreateHotObservable(
  2040. OnNext(90, "error"),
  2041. OnNext(110, "error"),
  2042. OnNext(130, "error"),
  2043. OnNext(220, " foo"),
  2044. OnNext(240, " FoO "),
  2045. OnNext(270, "baR "),
  2046. OnNext(310, "foO "),
  2047. OnNext(350, " Baz "),
  2048. OnNext(360, " qux "),
  2049. OnNext(390, " bar"),
  2050. OnNext(420, " BAR "),
  2051. OnNext(470, "FOO "),
  2052. OnNext(480, "baz "),
  2053. OnNext(510, " bAZ "),
  2054. OnNext(530, " fOo "),
  2055. OnCompleted<string>(570),
  2056. OnNext(580, "error"),
  2057. OnCompleted<string>(600),
  2058. OnError<string>(650, new Exception())
  2059. );
  2060. var comparer = new GroupByComparer(scheduler);
  2061. var outer = default(IObservable<IGroupedObservable<string, string>>);
  2062. var outerSubscription = default(IDisposable);
  2063. var inners = new Dictionary<string, IObservable<string>>();
  2064. var innerSubscriptions = new Dictionary<string, IDisposable>();
  2065. var res = new Dictionary<string, ITestableObserver<string>>();
  2066. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x => Reverse(x), _groupByCapacity, comparer));
  2067. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  2068. {
  2069. var result = scheduler.CreateObserver<string>();
  2070. inners[group.Key] = group;
  2071. res[group.Key] = result;
  2072. innerSubscriptions[group.Key] = group.Subscribe(result);
  2073. }));
  2074. scheduler.ScheduleAbsolute(Disposed, () =>
  2075. {
  2076. outerSubscription.Dispose();
  2077. foreach (var d in innerSubscriptions.Values)
  2078. {
  2079. d.Dispose();
  2080. }
  2081. });
  2082. scheduler.Start();
  2083. Assert.Equal(4, inners.Count);
  2084. res["foo"].Messages.AssertEqual(
  2085. OnNext(220, "oof "),
  2086. OnNext(240, " OoF "),
  2087. OnNext(310, " Oof"),
  2088. OnNext(470, " OOF"),
  2089. OnNext(530, " oOf "),
  2090. OnCompleted<string>(570)
  2091. );
  2092. res["baR"].Messages.AssertEqual(
  2093. OnNext(270, " Rab"),
  2094. OnNext(390, "rab "),
  2095. OnNext(420, " RAB "),
  2096. OnCompleted<string>(570)
  2097. );
  2098. res["Baz"].Messages.AssertEqual(
  2099. OnNext(350, " zaB "),
  2100. OnNext(480, " zab"),
  2101. OnNext(510, " ZAb "),
  2102. OnCompleted<string>(570)
  2103. );
  2104. res["qux"].Messages.AssertEqual(
  2105. OnNext(360, " xuq "),
  2106. OnCompleted<string>(570)
  2107. );
  2108. xs.Subscriptions.AssertEqual(
  2109. Subscribe(200, 570)
  2110. );
  2111. }
  2112. [TestMethod]
  2113. public void GroupBy_Capacity_Inner_Error()
  2114. {
  2115. var scheduler = new TestScheduler();
  2116. var ex1 = new Exception();
  2117. var xs = scheduler.CreateHotObservable(
  2118. OnNext(90, "error"),
  2119. OnNext(110, "error"),
  2120. OnNext(130, "error"),
  2121. OnNext(220, " foo"),
  2122. OnNext(240, " FoO "),
  2123. OnNext(270, "baR "),
  2124. OnNext(310, "foO "),
  2125. OnNext(350, " Baz "),
  2126. OnNext(360, " qux "),
  2127. OnNext(390, " bar"),
  2128. OnNext(420, " BAR "),
  2129. OnNext(470, "FOO "),
  2130. OnNext(480, "baz "),
  2131. OnNext(510, " bAZ "),
  2132. OnNext(530, " fOo "),
  2133. OnError<string>(570, ex1),
  2134. OnNext(580, "error"),
  2135. OnCompleted<string>(600),
  2136. OnError<string>(650, new Exception())
  2137. );
  2138. var comparer = new GroupByComparer(scheduler);
  2139. var outer = default(IObservable<IGroupedObservable<string, string>>);
  2140. var outerSubscription = default(IDisposable);
  2141. var inners = new Dictionary<string, IObservable<string>>();
  2142. var innerSubscriptions = new Dictionary<string, IDisposable>();
  2143. var res = new Dictionary<string, ITestableObserver<string>>();
  2144. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x => Reverse(x), _groupByCapacity, comparer));
  2145. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  2146. {
  2147. var result = scheduler.CreateObserver<string>();
  2148. inners[group.Key] = group;
  2149. res[group.Key] = result;
  2150. scheduler.ScheduleRelative(100, () => innerSubscriptions[group.Key] = group.Subscribe(result));
  2151. }, ex => { }));
  2152. scheduler.ScheduleAbsolute(Disposed, () =>
  2153. {
  2154. outerSubscription.Dispose();
  2155. foreach (var d in innerSubscriptions.Values)
  2156. {
  2157. d.Dispose();
  2158. }
  2159. });
  2160. scheduler.Start();
  2161. Assert.Equal(4, inners.Count);
  2162. res["foo"].Messages.AssertEqual(
  2163. OnNext(470, " OOF"),
  2164. OnNext(530, " oOf "),
  2165. OnError<string>(570, ex1)
  2166. );
  2167. res["baR"].Messages.AssertEqual(
  2168. OnNext(390, "rab "),
  2169. OnNext(420, " RAB "),
  2170. OnError<string>(570, ex1)
  2171. );
  2172. res["Baz"].Messages.AssertEqual(
  2173. OnNext(480, " zab"),
  2174. OnNext(510, " ZAb "),
  2175. OnError<string>(570, ex1)
  2176. );
  2177. res["qux"].Messages.AssertEqual(
  2178. OnError<string>(570, ex1)
  2179. );
  2180. xs.Subscriptions.AssertEqual(
  2181. Subscribe(200, 570)
  2182. );
  2183. }
  2184. [TestMethod]
  2185. public void GroupBy_Capacity_Inner_Dispose()
  2186. {
  2187. var scheduler = new TestScheduler();
  2188. var xs = scheduler.CreateHotObservable(
  2189. OnNext(90, "error"),
  2190. OnNext(110, "error"),
  2191. OnNext(130, "error"),
  2192. OnNext(220, " foo"),
  2193. OnNext(240, " FoO "),
  2194. OnNext(270, "baR "),
  2195. OnNext(310, "foO "),
  2196. OnNext(350, " Baz "),
  2197. OnNext(360, " qux "),
  2198. OnNext(390, " bar"),
  2199. OnNext(420, " BAR "),
  2200. OnNext(470, "FOO "),
  2201. OnNext(480, "baz "),
  2202. OnNext(510, " bAZ "),
  2203. OnNext(530, " fOo "),
  2204. OnCompleted<string>(570),
  2205. OnNext(580, "error"),
  2206. OnCompleted<string>(600),
  2207. OnError<string>(650, new Exception())
  2208. );
  2209. var comparer = new GroupByComparer(scheduler);
  2210. var outer = default(IObservable<IGroupedObservable<string, string>>);
  2211. var outerSubscription = default(IDisposable);
  2212. var inners = new Dictionary<string, IObservable<string>>();
  2213. var innerSubscriptions = new Dictionary<string, IDisposable>();
  2214. var res = new Dictionary<string, ITestableObserver<string>>();
  2215. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x => Reverse(x), _groupByCapacity, comparer));
  2216. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  2217. {
  2218. var result = scheduler.CreateObserver<string>();
  2219. inners[group.Key] = group;
  2220. res[group.Key] = result;
  2221. innerSubscriptions[group.Key] = group.Subscribe(result);
  2222. }));
  2223. scheduler.ScheduleAbsolute(400, () =>
  2224. {
  2225. outerSubscription.Dispose();
  2226. foreach (var d in innerSubscriptions.Values)
  2227. {
  2228. d.Dispose();
  2229. }
  2230. });
  2231. scheduler.Start();
  2232. Assert.Equal(4, inners.Count);
  2233. res["foo"].Messages.AssertEqual(
  2234. OnNext(220, "oof "),
  2235. OnNext(240, " OoF "),
  2236. OnNext(310, " Oof")
  2237. );
  2238. res["baR"].Messages.AssertEqual(
  2239. OnNext(270, " Rab"),
  2240. OnNext(390, "rab ")
  2241. );
  2242. res["Baz"].Messages.AssertEqual(
  2243. OnNext(350, " zaB ")
  2244. );
  2245. res["qux"].Messages.AssertEqual(
  2246. OnNext(360, " xuq ")
  2247. );
  2248. xs.Subscriptions.AssertEqual(
  2249. Subscribe(200, 400)
  2250. );
  2251. }
  2252. [TestMethod]
  2253. public void GroupBy_Capacity_Inner_KeyThrow()
  2254. {
  2255. var scheduler = new TestScheduler();
  2256. var ex = new Exception();
  2257. var xs = scheduler.CreateHotObservable(
  2258. OnNext(90, "error"),
  2259. OnNext(110, "error"),
  2260. OnNext(130, "error"),
  2261. OnNext(220, " foo"),
  2262. OnNext(240, " FoO "),
  2263. OnNext(270, "baR "),
  2264. OnNext(310, "foO "),
  2265. OnNext(350, " Baz "),
  2266. OnNext(360, " qux "),
  2267. OnNext(390, " bar"),
  2268. OnNext(420, " BAR "),
  2269. OnNext(470, "FOO "),
  2270. OnNext(480, "baz "),
  2271. OnNext(510, " bAZ "),
  2272. OnNext(530, " fOo "),
  2273. OnCompleted<string>(570),
  2274. OnNext(580, "error"),
  2275. OnCompleted<string>(600),
  2276. OnError<string>(650, new Exception())
  2277. );
  2278. var comparer = new GroupByComparer(scheduler);
  2279. var outer = default(IObservable<IGroupedObservable<string, string>>);
  2280. var outerSubscription = default(IDisposable);
  2281. var inners = new Dictionary<string, IObservable<string>>();
  2282. var innerSubscriptions = new Dictionary<string, IDisposable>();
  2283. var res = new Dictionary<string, ITestableObserver<string>>();
  2284. var keyInvoked = 0;
  2285. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x =>
  2286. {
  2287. keyInvoked++;
  2288. if (keyInvoked == 6)
  2289. {
  2290. throw ex;
  2291. }
  2292. return x.Trim();
  2293. }, x => Reverse(x), _groupByCapacity, comparer));
  2294. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  2295. {
  2296. var result = scheduler.CreateObserver<string>();
  2297. inners[group.Key] = group;
  2298. res[group.Key] = result;
  2299. innerSubscriptions[group.Key] = group.Subscribe(result);
  2300. }, _ => { }));
  2301. scheduler.ScheduleAbsolute(Disposed, () =>
  2302. {
  2303. outerSubscription.Dispose();
  2304. foreach (var d in innerSubscriptions.Values)
  2305. {
  2306. d.Dispose();
  2307. }
  2308. });
  2309. scheduler.Start();
  2310. Assert.Equal(3, inners.Count);
  2311. res["foo"].Messages.AssertEqual(
  2312. OnNext(220, "oof "),
  2313. OnNext(240, " OoF "),
  2314. OnNext(310, " Oof"),
  2315. OnError<string>(360, ex)
  2316. );
  2317. res["baR"].Messages.AssertEqual(
  2318. OnNext(270, " Rab"),
  2319. OnError<string>(360, ex)
  2320. );
  2321. res["Baz"].Messages.AssertEqual(
  2322. OnNext(350, " zaB "),
  2323. OnError<string>(360, ex)
  2324. );
  2325. xs.Subscriptions.AssertEqual(
  2326. Subscribe(200, 360)
  2327. );
  2328. }
  2329. [TestMethod]
  2330. public void GroupBy_Capacity_Inner_EleThrow()
  2331. {
  2332. var scheduler = new TestScheduler();
  2333. var ex = new Exception();
  2334. var xs = scheduler.CreateHotObservable(
  2335. OnNext(90, "error"),
  2336. OnNext(110, "error"),
  2337. OnNext(130, "error"),
  2338. OnNext(220, " foo"),
  2339. OnNext(240, " FoO "),
  2340. OnNext(270, "baR "),
  2341. OnNext(310, "foO "),
  2342. OnNext(350, " Baz "),
  2343. OnNext(360, " qux "),
  2344. OnNext(390, " bar"),
  2345. OnNext(420, " BAR "),
  2346. OnNext(470, "FOO "),
  2347. OnNext(480, "baz "),
  2348. OnNext(510, " bAZ "),
  2349. OnNext(530, " fOo "),
  2350. OnCompleted<string>(570),
  2351. OnNext(580, "error"),
  2352. OnCompleted<string>(600),
  2353. OnError<string>(650, new Exception())
  2354. );
  2355. var comparer = new GroupByComparer(scheduler);
  2356. var outer = default(IObservable<IGroupedObservable<string, string>>);
  2357. var outerSubscription = default(IDisposable);
  2358. var inners = new Dictionary<string, IObservable<string>>();
  2359. var innerSubscriptions = new Dictionary<string, IDisposable>();
  2360. var res = new Dictionary<string, ITestableObserver<string>>();
  2361. var eleInvoked = 0;
  2362. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x =>
  2363. {
  2364. eleInvoked++;
  2365. if (eleInvoked == 6)
  2366. {
  2367. throw ex;
  2368. }
  2369. return Reverse(x);
  2370. }, _groupByCapacity, comparer));
  2371. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  2372. {
  2373. var result = scheduler.CreateObserver<string>();
  2374. inners[group.Key] = group;
  2375. res[group.Key] = result;
  2376. innerSubscriptions[group.Key] = group.Subscribe(result);
  2377. }, _ => { }));
  2378. scheduler.ScheduleAbsolute(Disposed, () =>
  2379. {
  2380. outerSubscription.Dispose();
  2381. foreach (var d in innerSubscriptions.Values)
  2382. {
  2383. d.Dispose();
  2384. }
  2385. });
  2386. scheduler.Start();
  2387. Assert.Equal(4, inners.Count);
  2388. res["foo"].Messages.AssertEqual(
  2389. OnNext(220, "oof "),
  2390. OnNext(240, " OoF "),
  2391. OnNext(310, " Oof"),
  2392. OnError<string>(360, ex)
  2393. );
  2394. res["baR"].Messages.AssertEqual(
  2395. OnNext(270, " Rab"),
  2396. OnError<string>(360, ex)
  2397. );
  2398. res["Baz"].Messages.AssertEqual(
  2399. OnNext(350, " zaB "),
  2400. OnError<string>(360, ex)
  2401. );
  2402. res["qux"].Messages.AssertEqual(
  2403. OnError<string>(360, ex)
  2404. );
  2405. xs.Subscriptions.AssertEqual(
  2406. Subscribe(200, 360)
  2407. );
  2408. }
  2409. [TestMethod]
  2410. public void GroupBy_Capacity_Inner_Comparer_EqualsThrow()
  2411. {
  2412. var scheduler = new TestScheduler();
  2413. var xs = scheduler.CreateHotObservable(
  2414. OnNext(90, "error"),
  2415. OnNext(110, "error"),
  2416. OnNext(130, "error"),
  2417. OnNext(220, " foo"),
  2418. OnNext(240, " FoO "),
  2419. OnNext(270, "baR "),
  2420. OnNext(310, "foO "),
  2421. OnNext(350, " Baz "),
  2422. OnNext(360, " qux "),
  2423. OnNext(390, " bar"),
  2424. OnNext(420, " BAR "),
  2425. OnNext(470, "FOO "),
  2426. OnNext(480, "baz "),
  2427. OnNext(510, " bAZ "),
  2428. OnNext(530, " fOo "),
  2429. OnCompleted<string>(570),
  2430. OnNext(580, "error"),
  2431. OnCompleted<string>(600),
  2432. OnError<string>(650, new Exception())
  2433. );
  2434. var comparer = new GroupByComparer(scheduler, 400, ushort.MaxValue);
  2435. var outer = default(IObservable<IGroupedObservable<string, string>>);
  2436. var outerSubscription = default(IDisposable);
  2437. var inners = new Dictionary<string, IObservable<string>>();
  2438. var innerSubscriptions = new Dictionary<string, IDisposable>();
  2439. var res = new Dictionary<string, ITestableObserver<string>>();
  2440. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x => Reverse(x), _groupByCapacity, comparer));
  2441. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  2442. {
  2443. var result = scheduler.CreateObserver<string>();
  2444. inners[group.Key] = group;
  2445. res[group.Key] = result;
  2446. innerSubscriptions[group.Key] = group.Subscribe(result);
  2447. }, _ => { }));
  2448. scheduler.ScheduleAbsolute(Disposed, () =>
  2449. {
  2450. outerSubscription.Dispose();
  2451. foreach (var d in innerSubscriptions.Values)
  2452. {
  2453. d.Dispose();
  2454. }
  2455. });
  2456. scheduler.Start();
  2457. Assert.Equal(4, inners.Count);
  2458. res["foo"].Messages.AssertEqual(
  2459. OnNext(220, "oof "),
  2460. OnNext(240, " OoF "),
  2461. OnNext(310, " Oof"),
  2462. OnError<string>(420, comparer.EqualsException)
  2463. );
  2464. res["baR"].Messages.AssertEqual(
  2465. OnNext(270, " Rab"),
  2466. OnNext(390, "rab "),
  2467. OnError<string>(420, comparer.EqualsException)
  2468. );
  2469. res["Baz"].Messages.AssertEqual(
  2470. OnNext(350, " zaB "),
  2471. OnError<string>(420, comparer.EqualsException)
  2472. );
  2473. res["qux"].Messages.AssertEqual(
  2474. OnNext(360, " xuq "),
  2475. OnError<string>(420, comparer.EqualsException)
  2476. );
  2477. xs.Subscriptions.AssertEqual(
  2478. Subscribe(200, 420)
  2479. );
  2480. }
  2481. [TestMethod]
  2482. public void GroupBy_Capacity_Inner_Comparer_GetHashCodeThrow()
  2483. {
  2484. var scheduler = new TestScheduler();
  2485. var xs = scheduler.CreateHotObservable(
  2486. OnNext(90, "error"),
  2487. OnNext(110, "error"),
  2488. OnNext(130, "error"),
  2489. OnNext(220, " foo"),
  2490. OnNext(240, " FoO "),
  2491. OnNext(270, "baR "),
  2492. OnNext(310, "foO "),
  2493. OnNext(350, " Baz "),
  2494. OnNext(360, " qux "),
  2495. OnNext(390, " bar"),
  2496. OnNext(420, " BAR "),
  2497. OnNext(470, "FOO "),
  2498. OnNext(480, "baz "),
  2499. OnNext(510, " bAZ "),
  2500. OnNext(530, " fOo "),
  2501. OnCompleted<string>(570),
  2502. OnNext(580, "error"),
  2503. OnCompleted<string>(600),
  2504. OnError<string>(650, new Exception())
  2505. );
  2506. var comparer = new GroupByComparer(scheduler, ushort.MaxValue, 400);
  2507. var outer = default(IObservable<IGroupedObservable<string, string>>);
  2508. var outerSubscription = default(IDisposable);
  2509. var inners = new Dictionary<string, IObservable<string>>();
  2510. var innerSubscriptions = new Dictionary<string, IDisposable>();
  2511. var res = new Dictionary<string, ITestableObserver<string>>();
  2512. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x => Reverse(x), _groupByCapacity, comparer));
  2513. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  2514. {
  2515. var result = scheduler.CreateObserver<string>();
  2516. inners[group.Key] = group;
  2517. res[group.Key] = result;
  2518. innerSubscriptions[group.Key] = group.Subscribe(result);
  2519. }, _ => { }));
  2520. scheduler.ScheduleAbsolute(Disposed, () =>
  2521. {
  2522. outerSubscription.Dispose();
  2523. foreach (var d in innerSubscriptions.Values)
  2524. {
  2525. d.Dispose();
  2526. }
  2527. });
  2528. scheduler.Start();
  2529. Assert.Equal(4, inners.Count);
  2530. res["foo"].Messages.AssertEqual(
  2531. OnNext(220, "oof "),
  2532. OnNext(240, " OoF "),
  2533. OnNext(310, " Oof"),
  2534. OnError<string>(420, comparer.HashCodeException)
  2535. );
  2536. res["baR"].Messages.AssertEqual(
  2537. OnNext(270, " Rab"),
  2538. OnNext(390, "rab "),
  2539. OnError<string>(420, comparer.HashCodeException)
  2540. );
  2541. res["Baz"].Messages.AssertEqual(
  2542. OnNext(350, " zaB "),
  2543. OnError<string>(420, comparer.HashCodeException)
  2544. );
  2545. res["qux"].Messages.AssertEqual(
  2546. OnNext(360, " xuq "),
  2547. OnError<string>(420, comparer.HashCodeException)
  2548. );
  2549. xs.Subscriptions.AssertEqual(
  2550. Subscribe(200, 420)
  2551. );
  2552. }
  2553. [TestMethod]
  2554. public void GroupBy_Capacity_Outer_Independence()
  2555. {
  2556. var scheduler = new TestScheduler();
  2557. var xs = scheduler.CreateHotObservable(
  2558. OnNext(90, "error"),
  2559. OnNext(110, "error"),
  2560. OnNext(130, "error"),
  2561. OnNext(220, " foo"),
  2562. OnNext(240, " FoO "),
  2563. OnNext(270, "baR "),
  2564. OnNext(310, "foO "),
  2565. OnNext(350, " Baz "),
  2566. OnNext(360, " qux "),
  2567. OnNext(390, " bar"),
  2568. OnNext(420, " BAR "),
  2569. OnNext(470, "FOO "),
  2570. OnNext(480, "baz "),
  2571. OnNext(510, " bAZ "),
  2572. OnNext(530, " fOo "),
  2573. OnCompleted<string>(570),
  2574. OnNext(580, "error"),
  2575. OnCompleted<string>(600),
  2576. OnError<string>(650, new Exception())
  2577. );
  2578. var comparer = new GroupByComparer(scheduler);
  2579. var outer = default(IObservable<IGroupedObservable<string, string>>);
  2580. var outerSubscription = default(IDisposable);
  2581. var inners = new Dictionary<string, IObservable<string>>();
  2582. var innerSubscriptions = new Dictionary<string, IDisposable>();
  2583. var res = new Dictionary<string, ITestableObserver<string>>();
  2584. var outerResults = scheduler.CreateObserver<string>();
  2585. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x => Reverse(x), _groupByCapacity, comparer));
  2586. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  2587. {
  2588. outerResults.OnNext(group.Key);
  2589. var result = scheduler.CreateObserver<string>();
  2590. inners[group.Key] = group;
  2591. res[group.Key] = result;
  2592. innerSubscriptions[group.Key] = group.Subscribe(result);
  2593. }, outerResults.OnError, outerResults.OnCompleted));
  2594. scheduler.ScheduleAbsolute(Disposed, () =>
  2595. {
  2596. outerSubscription.Dispose();
  2597. foreach (var d in innerSubscriptions.Values)
  2598. {
  2599. d.Dispose();
  2600. }
  2601. });
  2602. scheduler.ScheduleAbsolute(320, () => outerSubscription.Dispose());
  2603. scheduler.Start();
  2604. Assert.Equal(2, inners.Count);
  2605. outerResults.Messages.AssertEqual(
  2606. OnNext(220, "foo"),
  2607. OnNext(270, "baR")
  2608. );
  2609. res["foo"].Messages.AssertEqual(
  2610. OnNext(220, "oof "),
  2611. OnNext(240, " OoF "),
  2612. OnNext(310, " Oof"),
  2613. OnNext(470, " OOF"),
  2614. OnNext(530, " oOf "),
  2615. OnCompleted<string>(570)
  2616. );
  2617. res["baR"].Messages.AssertEqual(
  2618. OnNext(270, " Rab"),
  2619. OnNext(390, "rab "),
  2620. OnNext(420, " RAB "),
  2621. OnCompleted<string>(570)
  2622. );
  2623. xs.Subscriptions.AssertEqual(
  2624. Subscribe(200, 570)
  2625. );
  2626. }
  2627. [TestMethod]
  2628. public void GroupBy_Capacity_Inner_Independence()
  2629. {
  2630. var scheduler = new TestScheduler();
  2631. var xs = scheduler.CreateHotObservable(
  2632. OnNext(90, "error"),
  2633. OnNext(110, "error"),
  2634. OnNext(130, "error"),
  2635. OnNext(220, " foo"),
  2636. OnNext(240, " FoO "),
  2637. OnNext(270, "baR "),
  2638. OnNext(310, "foO "),
  2639. OnNext(350, " Baz "),
  2640. OnNext(360, " qux "),
  2641. OnNext(390, " bar"),
  2642. OnNext(420, " BAR "),
  2643. OnNext(470, "FOO "),
  2644. OnNext(480, "baz "),
  2645. OnNext(510, " bAZ "),
  2646. OnNext(530, " fOo "),
  2647. OnCompleted<string>(570),
  2648. OnNext(580, "error"),
  2649. OnCompleted<string>(600),
  2650. OnError<string>(650, new Exception())
  2651. );
  2652. var comparer = new GroupByComparer(scheduler);
  2653. var outer = default(IObservable<IGroupedObservable<string, string>>);
  2654. var outerSubscription = default(IDisposable);
  2655. var inners = new Dictionary<string, IObservable<string>>();
  2656. var innerSubscriptions = new Dictionary<string, IDisposable>();
  2657. var res = new Dictionary<string, ITestableObserver<string>>();
  2658. var outerResults = scheduler.CreateObserver<string>();
  2659. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x => Reverse(x), _groupByCapacity, comparer));
  2660. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  2661. {
  2662. outerResults.OnNext(group.Key);
  2663. var result = scheduler.CreateObserver<string>();
  2664. inners[group.Key] = group;
  2665. res[group.Key] = result;
  2666. innerSubscriptions[group.Key] = group.Subscribe(result);
  2667. }, outerResults.OnError, outerResults.OnCompleted));
  2668. scheduler.ScheduleAbsolute(Disposed, () =>
  2669. {
  2670. outerSubscription.Dispose();
  2671. foreach (var d in innerSubscriptions.Values)
  2672. {
  2673. d.Dispose();
  2674. }
  2675. });
  2676. scheduler.ScheduleAbsolute(320, () => innerSubscriptions["foo"].Dispose());
  2677. scheduler.Start();
  2678. Assert.Equal(4, inners.Count);
  2679. res["foo"].Messages.AssertEqual(
  2680. OnNext(220, "oof "),
  2681. OnNext(240, " OoF "),
  2682. OnNext(310, " Oof")
  2683. );
  2684. res["baR"].Messages.AssertEqual(
  2685. OnNext(270, " Rab"),
  2686. OnNext(390, "rab "),
  2687. OnNext(420, " RAB "),
  2688. OnCompleted<string>(570)
  2689. );
  2690. res["Baz"].Messages.AssertEqual(
  2691. OnNext(350, " zaB "),
  2692. OnNext(480, " zab"),
  2693. OnNext(510, " ZAb "),
  2694. OnCompleted<string>(570)
  2695. );
  2696. res["qux"].Messages.AssertEqual(
  2697. OnNext(360, " xuq "),
  2698. OnCompleted<string>(570)
  2699. );
  2700. xs.Subscriptions.AssertEqual(
  2701. Subscribe(200, 570)
  2702. );
  2703. }
  2704. [TestMethod]
  2705. public void GroupBy_Capacity_Inner_Multiple_Independence()
  2706. {
  2707. var scheduler = new TestScheduler();
  2708. var xs = scheduler.CreateHotObservable(
  2709. OnNext(90, "error"),
  2710. OnNext(110, "error"),
  2711. OnNext(130, "error"),
  2712. OnNext(220, " foo"),
  2713. OnNext(240, " FoO "),
  2714. OnNext(270, "baR "),
  2715. OnNext(310, "foO "),
  2716. OnNext(350, " Baz "),
  2717. OnNext(360, " qux "),
  2718. OnNext(390, " bar"),
  2719. OnNext(420, " BAR "),
  2720. OnNext(470, "FOO "),
  2721. OnNext(480, "baz "),
  2722. OnNext(510, " bAZ "),
  2723. OnNext(530, " fOo "),
  2724. OnCompleted<string>(570),
  2725. OnNext(580, "error"),
  2726. OnCompleted<string>(600),
  2727. OnError<string>(650, new Exception())
  2728. );
  2729. var comparer = new GroupByComparer(scheduler);
  2730. var outer = default(IObservable<IGroupedObservable<string, string>>);
  2731. var outerSubscription = default(IDisposable);
  2732. var inners = new Dictionary<string, IObservable<string>>();
  2733. var innerSubscriptions = new Dictionary<string, IDisposable>();
  2734. var res = new Dictionary<string, ITestableObserver<string>>();
  2735. var outerResults = scheduler.CreateObserver<string>();
  2736. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), x => Reverse(x), _groupByCapacity, comparer));
  2737. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  2738. {
  2739. outerResults.OnNext(group.Key);
  2740. var result = scheduler.CreateObserver<string>();
  2741. inners[group.Key] = group;
  2742. res[group.Key] = result;
  2743. innerSubscriptions[group.Key] = group.Subscribe(result);
  2744. }, outerResults.OnError, outerResults.OnCompleted));
  2745. scheduler.ScheduleAbsolute(Disposed, () =>
  2746. {
  2747. outerSubscription.Dispose();
  2748. foreach (var d in innerSubscriptions.Values)
  2749. {
  2750. d.Dispose();
  2751. }
  2752. });
  2753. scheduler.ScheduleAbsolute(320, () => innerSubscriptions["foo"].Dispose());
  2754. scheduler.ScheduleAbsolute(280, () => innerSubscriptions["baR"].Dispose());
  2755. scheduler.ScheduleAbsolute(355, () => innerSubscriptions["Baz"].Dispose());
  2756. scheduler.ScheduleAbsolute(400, () => innerSubscriptions["qux"].Dispose());
  2757. scheduler.Start();
  2758. Assert.Equal(4, inners.Count);
  2759. res["foo"].Messages.AssertEqual(
  2760. OnNext(220, "oof "),
  2761. OnNext(240, " OoF "),
  2762. OnNext(310, " Oof")
  2763. );
  2764. res["baR"].Messages.AssertEqual(
  2765. OnNext(270, " Rab")
  2766. );
  2767. res["Baz"].Messages.AssertEqual(
  2768. OnNext(350, " zaB ")
  2769. );
  2770. res["qux"].Messages.AssertEqual(
  2771. OnNext(360, " xuq ")
  2772. );
  2773. xs.Subscriptions.AssertEqual(
  2774. Subscribe(200, 570)
  2775. );
  2776. }
  2777. [TestMethod]
  2778. public void GroupBy_Capacity_Inner_Escape_Complete()
  2779. {
  2780. var scheduler = new TestScheduler();
  2781. var xs = scheduler.CreateHotObservable(
  2782. OnNext(220, " foo"),
  2783. OnNext(240, " FoO "),
  2784. OnNext(310, "foO "),
  2785. OnNext(470, "FOO "),
  2786. OnNext(530, " fOo "),
  2787. OnCompleted<string>(570)
  2788. );
  2789. var outer = default(IObservable<IGroupedObservable<string, string>>);
  2790. var outerSubscription = default(IDisposable);
  2791. var inner = default(IObservable<string>);
  2792. var innerSubscription = default(IDisposable);
  2793. var res = scheduler.CreateObserver<string>();
  2794. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), _groupByCapacity));
  2795. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  2796. {
  2797. inner = group;
  2798. }));
  2799. scheduler.ScheduleAbsolute(600, () => innerSubscription = inner.Subscribe(res));
  2800. scheduler.ScheduleAbsolute(Disposed, () =>
  2801. {
  2802. outerSubscription.Dispose();
  2803. innerSubscription.Dispose();
  2804. });
  2805. scheduler.Start();
  2806. xs.Subscriptions.AssertEqual(
  2807. Subscribe(200, 570)
  2808. );
  2809. res.Messages.AssertEqual(
  2810. OnCompleted<string>(600)
  2811. );
  2812. }
  2813. [TestMethod]
  2814. public void GroupBy_Capacity_Inner_Escape_Error()
  2815. {
  2816. var scheduler = new TestScheduler();
  2817. var ex = new Exception();
  2818. var xs = scheduler.CreateHotObservable(
  2819. OnNext(220, " foo"),
  2820. OnNext(240, " FoO "),
  2821. OnNext(310, "foO "),
  2822. OnNext(470, "FOO "),
  2823. OnNext(530, " fOo "),
  2824. OnError<string>(570, ex)
  2825. );
  2826. var outer = default(IObservable<IGroupedObservable<string, string>>);
  2827. var outerSubscription = default(IDisposable);
  2828. var inner = default(IObservable<string>);
  2829. var innerSubscription = default(IDisposable);
  2830. var res = scheduler.CreateObserver<string>();
  2831. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), _groupByCapacity));
  2832. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  2833. {
  2834. inner = group;
  2835. }, _ => { }));
  2836. scheduler.ScheduleAbsolute(600, () => innerSubscription = inner.Subscribe(res));
  2837. scheduler.ScheduleAbsolute(Disposed, () =>
  2838. {
  2839. outerSubscription.Dispose();
  2840. innerSubscription.Dispose();
  2841. });
  2842. scheduler.Start();
  2843. xs.Subscriptions.AssertEqual(
  2844. Subscribe(200, 570)
  2845. );
  2846. res.Messages.AssertEqual(
  2847. OnError<string>(600, ex)
  2848. );
  2849. }
  2850. [TestMethod]
  2851. public void GroupBy_Capacity_Inner_Escape_Dispose()
  2852. {
  2853. var scheduler = new TestScheduler();
  2854. var xs = scheduler.CreateHotObservable(
  2855. OnNext(220, " foo"),
  2856. OnNext(240, " FoO "),
  2857. OnNext(310, "foO "),
  2858. OnNext(470, "FOO "),
  2859. OnNext(530, " fOo "),
  2860. OnError<string>(570, new Exception())
  2861. );
  2862. var outer = default(IObservable<IGroupedObservable<string, string>>);
  2863. var outerSubscription = default(IDisposable);
  2864. var inner = default(IObservable<string>);
  2865. var innerSubscription = default(IDisposable);
  2866. var res = scheduler.CreateObserver<string>();
  2867. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupBy(x => x.Trim(), _groupByCapacity));
  2868. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  2869. {
  2870. inner = group;
  2871. }));
  2872. scheduler.ScheduleAbsolute(400, () => outerSubscription.Dispose());
  2873. scheduler.ScheduleAbsolute(600, () => innerSubscription = inner.Subscribe(res));
  2874. scheduler.ScheduleAbsolute(Disposed, () =>
  2875. {
  2876. innerSubscription.Dispose();
  2877. });
  2878. scheduler.Start();
  2879. xs.Subscriptions.AssertEqual(
  2880. Subscribe(200, 400)
  2881. );
  2882. res.Messages.AssertEqual(
  2883. );
  2884. }
  2885. [TestMethod]
  2886. public void GroupBy_Capacity_NullKeys_Simple()
  2887. {
  2888. var scheduler = new TestScheduler();
  2889. var xs = scheduler.CreateHotObservable(
  2890. OnNext(220, "bar"),
  2891. OnNext(240, "foo"),
  2892. OnNext(310, "qux"),
  2893. OnNext(470, "baz"),
  2894. OnCompleted<string>(500)
  2895. );
  2896. var res = scheduler.Start(() => xs.GroupBy(x => x[0] == 'b' ? null : x.ToUpper(), _groupByCapacity).SelectMany(g => g, (g, x) => (g.Key ?? "(null)") + x));
  2897. res.Messages.AssertEqual(
  2898. OnNext(220, "(null)bar"),
  2899. OnNext(240, "FOOfoo"),
  2900. OnNext(310, "QUXqux"),
  2901. OnNext(470, "(null)baz"),
  2902. OnCompleted<string>(500)
  2903. );
  2904. xs.Subscriptions.AssertEqual(
  2905. Subscribe(200, 500)
  2906. );
  2907. }
  2908. [TestMethod]
  2909. public void GroupBy_Capacity_NullKeys_Error()
  2910. {
  2911. var scheduler = new TestScheduler();
  2912. var ex = new Exception();
  2913. var xs = scheduler.CreateHotObservable(
  2914. OnNext(220, "bar"),
  2915. OnNext(240, "foo"),
  2916. OnNext(310, "qux"),
  2917. OnNext(470, "baz"),
  2918. OnError<string>(500, ex)
  2919. );
  2920. var nullGroup = scheduler.CreateObserver<string>();
  2921. var err = default(Exception);
  2922. scheduler.ScheduleAbsolute(200, () => xs.GroupBy(x => x[0] == 'b' ? null : x.ToUpper(), _groupByCapacity).Where(g => g.Key == null).Subscribe(g => g.Subscribe(nullGroup), ex_ => err = ex_));
  2923. scheduler.Start();
  2924. Assert.Same(ex, err);
  2925. nullGroup.Messages.AssertEqual(
  2926. OnNext(220, "bar"),
  2927. OnNext(470, "baz"),
  2928. OnError<string>(500, ex)
  2929. );
  2930. xs.Subscriptions.AssertEqual(
  2931. Subscribe(200, 500)
  2932. );
  2933. }
  2934. #endregion
  2935. }
  2936. internal class GroupByComparer : IEqualityComparer<string>
  2937. {
  2938. private TestScheduler _scheduler;
  2939. private readonly int _equalsThrowsAfter;
  2940. private readonly ushort _getHashCodeThrowsAfter;
  2941. public Exception HashCodeException = new Exception();
  2942. public Exception EqualsException = new Exception();
  2943. public GroupByComparer(TestScheduler scheduler, ushort equalsThrowsAfter, ushort getHashCodeThrowsAfter)
  2944. {
  2945. this._scheduler = scheduler;
  2946. this._equalsThrowsAfter = equalsThrowsAfter;
  2947. this._getHashCodeThrowsAfter = getHashCodeThrowsAfter;
  2948. }
  2949. public GroupByComparer(TestScheduler scheduler)
  2950. : this(scheduler, ushort.MaxValue, ushort.MaxValue)
  2951. {
  2952. }
  2953. public bool Equals(string x, string y)
  2954. {
  2955. if (_scheduler.Clock > _equalsThrowsAfter)
  2956. {
  2957. throw EqualsException;
  2958. }
  2959. return x.Equals(y, StringComparison.OrdinalIgnoreCase);
  2960. }
  2961. public int GetHashCode(string obj)
  2962. {
  2963. if (_scheduler.Clock > _getHashCodeThrowsAfter)
  2964. {
  2965. throw HashCodeException;
  2966. }
  2967. return StringComparer.OrdinalIgnoreCase.GetHashCode(obj);
  2968. }
  2969. }
  2970. }