GroupJoinTest.cs 55 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Reactive;
  8. using System.Reactive.Linq;
  9. using Microsoft.Reactive.Testing;
  10. using ReactiveTests.Dummies;
  11. using Xunit;
  12. namespace ReactiveTests.Tests
  13. {
  14. public class GroupJoinTest : ReactiveTest
  15. {
  16. [Fact]
  17. public void GroupJoinOp_ArgumentChecking()
  18. {
  19. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupJoin(null, DummyObservable<int>.Instance, DummyFunc<int, IObservable<int>>.Instance, DummyFunc<int, IObservable<int>>.Instance, DummyFunc<int, IObservable<int>, int>.Instance));
  20. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupJoin(DummyObservable<int>.Instance, null, DummyFunc<int, IObservable<int>>.Instance, DummyFunc<int, IObservable<int>>.Instance, DummyFunc<int, IObservable<int>, int>.Instance));
  21. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupJoin(DummyObservable<int>.Instance, DummyObservable<int>.Instance, default(Func<int, IObservable<int>>), DummyFunc<int, IObservable<int>>.Instance, DummyFunc<int, IObservable<int>, int>.Instance));
  22. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupJoin(DummyObservable<int>.Instance, DummyObservable<int>.Instance, DummyFunc<int, IObservable<int>>.Instance, default(Func<int, IObservable<int>>), DummyFunc<int, IObservable<int>, int>.Instance));
  23. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupJoin(DummyObservable<int>.Instance, DummyObservable<int>.Instance, DummyFunc<int, IObservable<int>>.Instance, DummyFunc<int, IObservable<int>>.Instance, default(Func<int, IObservable<int>, int>)));
  24. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupJoin(DummyObservable<int>.Instance, DummyObservable<int>.Instance, DummyFunc<int, IObservable<int>>.Instance, DummyFunc<int, IObservable<int>>.Instance, DummyFunc<int, IObservable<int>, int>.Instance).Subscribe(null));
  25. }
  26. [Fact]
  27. public void GroupJoinOp_Normal_I()
  28. {
  29. var scheduler = new TestScheduler();
  30. var xs = scheduler.CreateHotObservable(
  31. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  32. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  33. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  34. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  35. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  36. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  37. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  38. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(280))),
  39. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(100))),
  40. OnNext(830, new TimeInterval<int>(9, TimeSpan.FromTicks(10))),
  41. OnCompleted<TimeInterval<int>>(900)
  42. );
  43. var ys = scheduler.CreateHotObservable(
  44. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  45. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  46. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  47. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  48. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  49. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  50. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  51. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  52. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(200))),
  53. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  54. OnCompleted<TimeInterval<string>>(800)
  55. );
  56. var xsd = new List<ITestableObservable<long>>();
  57. var ysd = new List<ITestableObservable<long>>();
  58. var res = scheduler.Start(() =>
  59. xs.GroupJoin(ys, x => NewTimer(xsd, x.Interval, scheduler), y => NewTimer(ysd, y.Interval, scheduler), (x, yy) => yy.Select(y => x.Value + y.Value)).Merge()
  60. );
  61. res.Messages.AssertEqual(
  62. OnNext(215, "0hat"),
  63. OnNext(217, "0bat"),
  64. OnNext(219, "1hat"),
  65. OnNext(300, "3wag"),
  66. OnNext(300, "3pig"),
  67. OnNext(305, "3cup"),
  68. OnNext(310, "4wag"),
  69. OnNext(310, "4pig"),
  70. OnNext(310, "4cup"),
  71. OnNext(702, "6tin"),
  72. OnNext(710, "7tin"),
  73. OnNext(712, "6man"),
  74. OnNext(712, "7man"),
  75. OnNext(720, "8tin"),
  76. OnNext(720, "8man"),
  77. OnNext(722, "6rat"),
  78. OnNext(722, "7rat"),
  79. OnNext(722, "8rat"),
  80. OnNext(732, "7wig"),
  81. OnNext(732, "8wig"),
  82. OnNext(830, "9rat"),
  83. OnCompleted<string>(990)
  84. );
  85. AssertDurations(xs, xsd, 990);
  86. AssertDurations(ys, ysd, 990);
  87. #if !NO_PERF // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
  88. xs.Subscriptions.AssertEqual(
  89. Subscribe(200, 900)
  90. );
  91. #else
  92. xs.Subscriptions.AssertEqual(
  93. Subscribe(200, 990)
  94. );
  95. #endif
  96. #if !NO_PERF // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
  97. ys.Subscriptions.AssertEqual(
  98. Subscribe(200, 800)
  99. );
  100. #else
  101. ys.Subscriptions.AssertEqual(
  102. Subscribe(200, 990)
  103. );
  104. #endif
  105. }
  106. [Fact]
  107. public void GroupJoinOp_Normal_II()
  108. {
  109. var scheduler = new TestScheduler();
  110. var xs = scheduler.CreateHotObservable(
  111. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  112. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  113. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  114. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  115. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  116. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  117. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  118. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(200))),
  119. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(100))),
  120. OnCompleted<TimeInterval<int>>(721)
  121. );
  122. var ys = scheduler.CreateHotObservable(
  123. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  124. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  125. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  126. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  127. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  128. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  129. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  130. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  131. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(200))),
  132. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  133. OnCompleted<TimeInterval<string>>(990)
  134. );
  135. var xsd = new List<ITestableObservable<long>>();
  136. var ysd = new List<ITestableObservable<long>>();
  137. var res = scheduler.Start(() =>
  138. xs.GroupJoin(ys, x => NewTimer(xsd, x.Interval, scheduler), y => NewTimer(ysd, y.Interval, scheduler), (x, yy) => yy.Select(y => x.Value + y.Value)).Merge()
  139. );
  140. res.Messages.AssertEqual(
  141. OnNext(215, "0hat"),
  142. OnNext(217, "0bat"),
  143. OnNext(219, "1hat"),
  144. OnNext(300, "3wag"),
  145. OnNext(300, "3pig"),
  146. OnNext(305, "3cup"),
  147. OnNext(310, "4wag"),
  148. OnNext(310, "4pig"),
  149. OnNext(310, "4cup"),
  150. OnNext(702, "6tin"),
  151. OnNext(710, "7tin"),
  152. OnNext(712, "6man"),
  153. OnNext(712, "7man"),
  154. OnNext(720, "8tin"),
  155. OnNext(720, "8man"),
  156. OnNext(722, "6rat"),
  157. OnNext(722, "7rat"),
  158. OnNext(722, "8rat"),
  159. OnNext(732, "7wig"),
  160. OnNext(732, "8wig"),
  161. OnCompleted<string>(910)
  162. );
  163. AssertDurations(xs, xsd, 910);
  164. AssertDurations(ys, ysd, 910);
  165. #if !NO_PERF // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
  166. xs.Subscriptions.AssertEqual(
  167. Subscribe(200, 721)
  168. );
  169. #else
  170. xs.Subscriptions.AssertEqual(
  171. Subscribe(200, 910)
  172. );
  173. #endif
  174. ys.Subscriptions.AssertEqual(
  175. Subscribe(200, 910)
  176. );
  177. }
  178. [Fact]
  179. public void GroupJoinOp_Normal_III()
  180. {
  181. var scheduler = new TestScheduler();
  182. var xs = scheduler.CreateHotObservable(
  183. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  184. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  185. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  186. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  187. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  188. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  189. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  190. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(280))),
  191. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(100))),
  192. OnNext(830, new TimeInterval<int>(9, TimeSpan.FromTicks(10))),
  193. OnCompleted<TimeInterval<int>>(900)
  194. );
  195. var ys = scheduler.CreateHotObservable(
  196. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  197. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  198. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  199. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  200. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  201. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  202. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  203. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  204. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(200))),
  205. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  206. OnCompleted<TimeInterval<string>>(800)
  207. );
  208. var xsd = new List<ITestableObservable<long>>();
  209. var ysd = new List<ITestableObservable<long>>();
  210. var res = scheduler.Start(() =>
  211. xs.GroupJoin(ys, x => NewTimer(xsd, x.Interval, scheduler).Where(_ => false), y => NewTimer(ysd, y.Interval, scheduler).Where(_ => false), (x, yy) => yy.Select(y => x.Value + y.Value)).Merge()
  212. );
  213. res.Messages.AssertEqual(
  214. OnNext(215, "0hat"),
  215. OnNext(217, "0bat"),
  216. OnNext(219, "1hat"),
  217. OnNext(300, "3wag"),
  218. OnNext(300, "3pig"),
  219. OnNext(305, "3cup"),
  220. OnNext(310, "4wag"),
  221. OnNext(310, "4pig"),
  222. OnNext(310, "4cup"),
  223. OnNext(702, "6tin"),
  224. OnNext(710, "7tin"),
  225. OnNext(712, "6man"),
  226. OnNext(712, "7man"),
  227. OnNext(720, "8tin"),
  228. OnNext(720, "8man"),
  229. OnNext(722, "6rat"),
  230. OnNext(722, "7rat"),
  231. OnNext(722, "8rat"),
  232. OnNext(732, "7wig"),
  233. OnNext(732, "8wig"),
  234. OnNext(830, "9rat"),
  235. OnCompleted<string>(990)
  236. );
  237. AssertDurations(xs, xsd, 990);
  238. AssertDurations(ys, ysd, 990);
  239. #if !NO_PERF // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
  240. xs.Subscriptions.AssertEqual(
  241. Subscribe(200, 900)
  242. );
  243. #else
  244. xs.Subscriptions.AssertEqual(
  245. Subscribe(200, 990)
  246. );
  247. #endif
  248. #if !NO_PERF // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
  249. ys.Subscriptions.AssertEqual(
  250. Subscribe(200, 800)
  251. );
  252. #else
  253. ys.Subscriptions.AssertEqual(
  254. Subscribe(200, 990)
  255. );
  256. #endif
  257. }
  258. [Fact]
  259. public void GroupJoinOp_Normal_IV()
  260. {
  261. var scheduler = new TestScheduler();
  262. var xs = scheduler.CreateHotObservable(
  263. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  264. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  265. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  266. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  267. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  268. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  269. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  270. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(200))),
  271. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(100))),
  272. OnCompleted<TimeInterval<int>>(990)
  273. );
  274. var ys = scheduler.CreateHotObservable(
  275. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  276. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  277. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  278. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  279. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  280. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  281. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  282. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  283. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(200))),
  284. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  285. OnCompleted<TimeInterval<string>>(980)
  286. );
  287. var xsd = new List<ITestableObservable<long>>();
  288. var ysd = new List<ITestableObservable<long>>();
  289. var res = scheduler.Start(() =>
  290. xs.GroupJoin(ys, x => NewTimer(xsd, x.Interval, scheduler), y => NewTimer(ysd, y.Interval, scheduler), (x, yy) => yy.Select(y => x.Value + y.Value)).Merge()
  291. );
  292. res.Messages.AssertEqual(
  293. OnNext(215, "0hat"),
  294. OnNext(217, "0bat"),
  295. OnNext(219, "1hat"),
  296. OnNext(300, "3wag"),
  297. OnNext(300, "3pig"),
  298. OnNext(305, "3cup"),
  299. OnNext(310, "4wag"),
  300. OnNext(310, "4pig"),
  301. OnNext(310, "4cup"),
  302. OnNext(702, "6tin"),
  303. OnNext(710, "7tin"),
  304. OnNext(712, "6man"),
  305. OnNext(712, "7man"),
  306. OnNext(720, "8tin"),
  307. OnNext(720, "8man"),
  308. OnNext(722, "6rat"),
  309. OnNext(722, "7rat"),
  310. OnNext(722, "8rat"),
  311. OnNext(732, "7wig"),
  312. OnNext(732, "8wig"),
  313. OnCompleted<string>(990)
  314. );
  315. AssertDurations(xs, xsd, 990);
  316. AssertDurations(ys, ysd, 990);
  317. xs.Subscriptions.AssertEqual(
  318. Subscribe(200, 990)
  319. );
  320. #if !NO_PERF // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
  321. ys.Subscriptions.AssertEqual(
  322. Subscribe(200, 980)
  323. );
  324. #else
  325. ys.Subscriptions.AssertEqual(
  326. Subscribe(200, 990)
  327. );
  328. #endif
  329. }
  330. [Fact]
  331. public void GroupJoinOp_Normal_V()
  332. {
  333. var scheduler = new TestScheduler();
  334. var xs = scheduler.CreateHotObservable(
  335. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  336. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  337. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  338. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  339. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  340. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  341. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  342. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(200))),
  343. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(100))),
  344. OnCompleted<TimeInterval<int>>(990)
  345. );
  346. var ys = scheduler.CreateHotObservable(
  347. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  348. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  349. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  350. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  351. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  352. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  353. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  354. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  355. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(200))),
  356. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  357. OnCompleted<TimeInterval<string>>(900)
  358. );
  359. var xsd = new List<ITestableObservable<long>>();
  360. var ysd = new List<ITestableObservable<long>>();
  361. var res = scheduler.Start(() =>
  362. xs.GroupJoin(ys, x => NewTimer(xsd, x.Interval, scheduler), y => NewTimer(ysd, y.Interval, scheduler), (x, yy) => yy.Select(y => x.Value + y.Value)).Merge()
  363. );
  364. res.Messages.AssertEqual(
  365. OnNext(215, "0hat"),
  366. OnNext(217, "0bat"),
  367. OnNext(219, "1hat"),
  368. OnNext(300, "3wag"),
  369. OnNext(300, "3pig"),
  370. OnNext(305, "3cup"),
  371. OnNext(310, "4wag"),
  372. OnNext(310, "4pig"),
  373. OnNext(310, "4cup"),
  374. OnNext(702, "6tin"),
  375. OnNext(710, "7tin"),
  376. OnNext(712, "6man"),
  377. OnNext(712, "7man"),
  378. OnNext(720, "8tin"),
  379. OnNext(720, "8man"),
  380. OnNext(722, "6rat"),
  381. OnNext(722, "7rat"),
  382. OnNext(722, "8rat"),
  383. OnNext(732, "7wig"),
  384. OnNext(732, "8wig"),
  385. OnCompleted<string>(990)
  386. );
  387. AssertDurations(xs, xsd, 990);
  388. AssertDurations(ys, ysd, 990);
  389. xs.Subscriptions.AssertEqual(
  390. Subscribe(200, 990)
  391. );
  392. #if !NO_PERF // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
  393. ys.Subscriptions.AssertEqual(
  394. Subscribe(200, 900)
  395. );
  396. #else
  397. ys.Subscriptions.AssertEqual(
  398. Subscribe(200, 990)
  399. );
  400. #endif
  401. }
  402. [Fact]
  403. public void GroupJoinOp_Normal_VI()
  404. {
  405. var scheduler = new TestScheduler();
  406. var xs = scheduler.CreateHotObservable(
  407. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  408. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  409. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  410. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  411. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  412. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  413. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  414. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(30))),
  415. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(200))),
  416. OnNext(830, new TimeInterval<int>(9, TimeSpan.FromTicks(10))),
  417. OnCompleted<TimeInterval<int>>(850)
  418. );
  419. var ys = scheduler.CreateHotObservable(
  420. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  421. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  422. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  423. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  424. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  425. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  426. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  427. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  428. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(20))),
  429. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  430. OnCompleted<TimeInterval<string>>(900)
  431. );
  432. var xsd = new List<ITestableObservable<long>>();
  433. var ysd = new List<ITestableObservable<long>>();
  434. var res = scheduler.Start(() =>
  435. xs.GroupJoin(ys, x => NewTimer(xsd, x.Interval, scheduler), y => NewTimer(ysd, y.Interval, scheduler), (x, yy) => yy.Select(y => x.Value + y.Value)).Merge()
  436. );
  437. res.Messages.AssertEqual(
  438. OnNext(215, "0hat"),
  439. OnNext(217, "0bat"),
  440. OnNext(219, "1hat"),
  441. OnNext(300, "3wag"),
  442. OnNext(300, "3pig"),
  443. OnNext(305, "3cup"),
  444. OnNext(310, "4wag"),
  445. OnNext(310, "4pig"),
  446. OnNext(310, "4cup"),
  447. OnNext(702, "6tin"),
  448. OnNext(710, "7tin"),
  449. OnNext(712, "6man"),
  450. OnNext(712, "7man"),
  451. OnNext(720, "8tin"),
  452. OnNext(720, "8man"),
  453. OnNext(722, "6rat"),
  454. OnNext(722, "7rat"),
  455. OnNext(722, "8rat"),
  456. OnNext(732, "7wig"),
  457. OnNext(732, "8wig"),
  458. OnCompleted<string>(920)
  459. );
  460. AssertDurations(xs, xsd, 920);
  461. AssertDurations(ys, ysd, 920);
  462. #if !NO_PERF // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
  463. xs.Subscriptions.AssertEqual(
  464. Subscribe(200, 850)
  465. );
  466. #else
  467. xs.Subscriptions.AssertEqual(
  468. Subscribe(200, 920)
  469. );
  470. #endif
  471. #if !NO_PERF // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
  472. ys.Subscriptions.AssertEqual(
  473. Subscribe(200, 900)
  474. );
  475. #else
  476. ys.Subscriptions.AssertEqual(
  477. Subscribe(200, 920)
  478. );
  479. #endif
  480. }
  481. [Fact]
  482. public void GroupJoinOp_Normal_VII()
  483. {
  484. var scheduler = new TestScheduler();
  485. var xs = scheduler.CreateHotObservable(
  486. OnCompleted<TimeInterval<int>>(210)
  487. );
  488. var ys = scheduler.CreateHotObservable(
  489. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  490. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  491. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  492. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  493. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  494. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  495. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  496. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  497. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(20))),
  498. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  499. OnCompleted<TimeInterval<string>>(900)
  500. );
  501. var xsd = new List<ITestableObservable<long>>();
  502. var ysd = new List<ITestableObservable<long>>();
  503. var res = scheduler.Start(() =>
  504. xs.GroupJoin(ys, x => NewTimer(xsd, x.Interval, scheduler), y => NewTimer(ysd, y.Interval, scheduler), (x, yy) => yy.Select(y => x.Value + y.Value)).Merge()
  505. );
  506. res.Messages.AssertEqual(
  507. OnCompleted<string>(210)
  508. );
  509. AssertDurations(xs, xsd, 210);
  510. AssertDurations(ys, ysd, 210);
  511. xs.Subscriptions.AssertEqual(
  512. Subscribe(200, 210)
  513. );
  514. ys.Subscriptions.AssertEqual(
  515. Subscribe(200, 210)
  516. );
  517. }
  518. [Fact]
  519. public void GroupJoinOp_Normal_VIII()
  520. {
  521. var scheduler = new TestScheduler();
  522. var xs = scheduler.CreateHotObservable(
  523. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(200)))
  524. );
  525. var ys = scheduler.CreateHotObservable(
  526. OnNext(220, new TimeInterval<string>("hat", TimeSpan.FromTicks(100))),
  527. OnCompleted<TimeInterval<string>>(230)
  528. );
  529. var xsd = new List<ITestableObservable<long>>();
  530. var ysd = new List<ITestableObservable<long>>();
  531. var res = scheduler.Start(() =>
  532. xs.GroupJoin(ys, x => NewTimer(xsd, x.Interval, scheduler), y => NewTimer(ysd, y.Interval, scheduler), (x, yy) => yy.Select(y => x.Value + y.Value)).Merge()
  533. );
  534. res.Messages.AssertEqual(
  535. OnNext(220, "0hat")
  536. );
  537. AssertDurations(xs, xsd, 1000);
  538. AssertDurations(ys, ysd, 1000);
  539. xs.Subscriptions.AssertEqual(
  540. Subscribe(200, 1000)
  541. );
  542. #if !NO_PERF // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
  543. ys.Subscriptions.AssertEqual(
  544. Subscribe(200, 230)
  545. );
  546. #else
  547. ys.Subscriptions.AssertEqual(
  548. Subscribe(200, 1000)
  549. );
  550. #endif
  551. }
  552. [Fact]
  553. public void GroupJoinOp_Normal_IX()
  554. {
  555. var scheduler = new TestScheduler();
  556. var xs = scheduler.CreateHotObservable(
  557. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  558. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  559. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  560. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  561. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  562. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  563. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  564. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(300))),
  565. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(100))),
  566. OnNext(830, new TimeInterval<int>(9, TimeSpan.FromTicks(10))),
  567. OnCompleted<TimeInterval<int>>(900)
  568. );
  569. var ys = scheduler.CreateHotObservable(
  570. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  571. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  572. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  573. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  574. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  575. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  576. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  577. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  578. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(200))),
  579. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  580. OnCompleted<TimeInterval<string>>(800)
  581. );
  582. var xsd = new List<ITestableObservable<long>>();
  583. var ysd = new List<ITestableObservable<long>>();
  584. var res = scheduler.Start(() =>
  585. xs.GroupJoin(ys, x => NewTimer(xsd, x.Interval, scheduler), y => NewTimer(ysd, y.Interval, scheduler), (x, yy) => yy.Select(y => x.Value + y.Value)).Merge(),
  586. 713
  587. );
  588. res.Messages.AssertEqual(
  589. OnNext(215, "0hat"),
  590. OnNext(217, "0bat"),
  591. OnNext(219, "1hat"),
  592. OnNext(300, "3wag"),
  593. OnNext(300, "3pig"),
  594. OnNext(305, "3cup"),
  595. OnNext(310, "4wag"),
  596. OnNext(310, "4pig"),
  597. OnNext(310, "4cup"),
  598. OnNext(702, "6tin"),
  599. OnNext(710, "7tin"),
  600. OnNext(712, "6man"),
  601. OnNext(712, "7man")
  602. );
  603. AssertDurations(xs, xsd, 713);
  604. AssertDurations(ys, ysd, 713);
  605. xs.Subscriptions.AssertEqual(
  606. Subscribe(200, 713)
  607. );
  608. ys.Subscriptions.AssertEqual(
  609. Subscribe(200, 713)
  610. );
  611. }
  612. [Fact]
  613. public void GroupJoinOp_Error_I()
  614. {
  615. var scheduler = new TestScheduler();
  616. var ex = new Exception();
  617. var xs = scheduler.CreateHotObservable(
  618. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  619. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  620. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  621. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  622. OnError<TimeInterval<int>>(310, ex)
  623. );
  624. var ys = scheduler.CreateHotObservable(
  625. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  626. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  627. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  628. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  629. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  630. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  631. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  632. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  633. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(200))),
  634. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  635. OnCompleted<TimeInterval<string>>(800)
  636. );
  637. var xsd = new List<ITestableObservable<long>>();
  638. var ysd = new List<ITestableObservable<long>>();
  639. var res = scheduler.Start(() =>
  640. xs.GroupJoin(ys, x => NewTimer(xsd, x.Interval, scheduler), y => NewTimer(ysd, y.Interval, scheduler), (x, yy) => yy.Select(y => x.Value + y.Value)).Merge()
  641. );
  642. res.Messages.AssertEqual(
  643. OnNext(215, "0hat"),
  644. OnNext(217, "0bat"),
  645. OnNext(219, "1hat"),
  646. OnNext(300, "3wag"),
  647. OnNext(300, "3pig"),
  648. OnNext(305, "3cup"),
  649. OnError<string>(310, ex)
  650. );
  651. AssertDurations(xs, xsd, 310);
  652. AssertDurations(ys, ysd, 310);
  653. xs.Subscriptions.AssertEqual(
  654. Subscribe(200, 310)
  655. );
  656. ys.Subscriptions.AssertEqual(
  657. Subscribe(200, 310)
  658. );
  659. }
  660. [Fact]
  661. public void GroupJoinOp_Error_II()
  662. {
  663. var scheduler = new TestScheduler();
  664. var ex = new Exception();
  665. var xs = scheduler.CreateHotObservable(
  666. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  667. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  668. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  669. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  670. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  671. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  672. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  673. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(300))),
  674. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(100))),
  675. OnNext(830, new TimeInterval<int>(9, TimeSpan.FromTicks(10))),
  676. OnCompleted<TimeInterval<int>>(900)
  677. );
  678. var ys = scheduler.CreateHotObservable(
  679. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  680. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  681. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  682. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  683. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  684. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  685. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  686. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  687. OnError<TimeInterval<string>>(722, ex)
  688. );
  689. var xsd = new List<ITestableObservable<long>>();
  690. var ysd = new List<ITestableObservable<long>>();
  691. var res = scheduler.Start(() =>
  692. xs.GroupJoin(ys, x => NewTimer(xsd, x.Interval, scheduler), y => NewTimer(ysd, y.Interval, scheduler), (x, yy) => yy.Select(y => x.Value + y.Value)).Merge()
  693. );
  694. res.Messages.AssertEqual(
  695. OnNext(215, "0hat"),
  696. OnNext(217, "0bat"),
  697. OnNext(219, "1hat"),
  698. OnNext(300, "3wag"),
  699. OnNext(300, "3pig"),
  700. OnNext(305, "3cup"),
  701. OnNext(310, "4wag"),
  702. OnNext(310, "4pig"),
  703. OnNext(310, "4cup"),
  704. OnNext(702, "6tin"),
  705. OnNext(710, "7tin"),
  706. OnNext(712, "6man"),
  707. OnNext(712, "7man"),
  708. OnNext(720, "8tin"),
  709. OnNext(720, "8man"),
  710. OnError<string>(722, ex)
  711. );
  712. AssertDurations(xs, xsd, 722);
  713. AssertDurations(ys, ysd, 722);
  714. xs.Subscriptions.AssertEqual(
  715. Subscribe(200, 722)
  716. );
  717. ys.Subscriptions.AssertEqual(
  718. Subscribe(200, 722)
  719. );
  720. }
  721. [Fact]
  722. public void GroupJoinOp_Error_III()
  723. {
  724. var scheduler = new TestScheduler();
  725. var xs = scheduler.CreateHotObservable(
  726. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  727. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  728. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  729. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  730. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  731. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  732. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  733. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(300))),
  734. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(100))),
  735. OnNext(830, new TimeInterval<int>(9, TimeSpan.FromTicks(10))),
  736. OnCompleted<TimeInterval<int>>(900)
  737. );
  738. var ys = scheduler.CreateHotObservable(
  739. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  740. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  741. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  742. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  743. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  744. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  745. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  746. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  747. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(200))),
  748. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  749. OnCompleted<TimeInterval<string>>(800)
  750. );
  751. var ex = new Exception();
  752. var xsd = new List<ITestableObservable<long>>();
  753. var ysd = new List<ITestableObservable<long>>();
  754. var res = scheduler.Start(() =>
  755. xs.GroupJoin(ys, x => NewTimer(xsd, x.Interval, scheduler).SelectMany(x.Value == 6 ? Observable.Throw<long>(ex) : Observable.Empty<long>()), y => NewTimer(ysd, y.Interval, scheduler), (x, yy) => yy.Select(y => x.Value + y.Value)).Merge()
  756. );
  757. res.Messages.AssertEqual(
  758. OnNext(215, "0hat"),
  759. OnNext(217, "0bat"),
  760. OnNext(219, "1hat"),
  761. OnNext(300, "3wag"),
  762. OnNext(300, "3pig"),
  763. OnNext(305, "3cup"),
  764. OnNext(310, "4wag"),
  765. OnNext(310, "4pig"),
  766. OnNext(310, "4cup"),
  767. OnNext(702, "6tin"),
  768. OnNext(710, "7tin"),
  769. OnNext(712, "6man"),
  770. OnNext(712, "7man"),
  771. OnNext(720, "8tin"),
  772. OnNext(720, "8man"),
  773. OnNext(722, "6rat"),
  774. OnNext(722, "7rat"),
  775. OnNext(722, "8rat"),
  776. OnError<string>(725, ex)
  777. );
  778. AssertDurations(xs, xsd, 725);
  779. AssertDurations(ys, ysd, 725);
  780. xs.Subscriptions.AssertEqual(
  781. Subscribe(200, 725)
  782. );
  783. ys.Subscriptions.AssertEqual(
  784. Subscribe(200, 725)
  785. );
  786. }
  787. [Fact]
  788. public void GroupJoinOp_Error_IV()
  789. {
  790. var scheduler = new TestScheduler();
  791. var xs = scheduler.CreateHotObservable(
  792. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  793. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  794. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  795. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  796. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  797. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  798. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  799. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(300))),
  800. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(100))),
  801. OnNext(830, new TimeInterval<int>(9, TimeSpan.FromTicks(10))),
  802. OnCompleted<TimeInterval<int>>(900)
  803. );
  804. var ys = scheduler.CreateHotObservable(
  805. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  806. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  807. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  808. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  809. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  810. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  811. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(19))),
  812. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  813. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(200))),
  814. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  815. OnCompleted<TimeInterval<string>>(800)
  816. );
  817. var ex = new Exception();
  818. var xsd = new List<ITestableObservable<long>>();
  819. var ysd = new List<ITestableObservable<long>>();
  820. var res = scheduler.Start(() =>
  821. xs.GroupJoin(ys, x => NewTimer(xsd, x.Interval, scheduler), y => NewTimer(ysd, y.Interval, scheduler).SelectMany(y.Value == "tin" ? Observable.Throw<long>(ex) : Observable.Empty<long>()), (x, yy) => yy.Select(y => x.Value + y.Value)).Merge()
  822. );
  823. res.Messages.AssertEqual(
  824. OnNext(215, "0hat"),
  825. OnNext(217, "0bat"),
  826. OnNext(219, "1hat"),
  827. OnNext(300, "3wag"),
  828. OnNext(300, "3pig"),
  829. OnNext(305, "3cup"),
  830. OnNext(310, "4wag"),
  831. OnNext(310, "4pig"),
  832. OnNext(310, "4cup"),
  833. OnNext(702, "6tin"),
  834. OnNext(710, "7tin"),
  835. OnNext(712, "6man"),
  836. OnNext(712, "7man"),
  837. OnNext(720, "8tin"),
  838. OnNext(720, "8man"),
  839. OnError<string>(721, ex)
  840. );
  841. AssertDurations(xs, xsd, 721);
  842. AssertDurations(ys, ysd, 721);
  843. xs.Subscriptions.AssertEqual(
  844. Subscribe(200, 721)
  845. );
  846. ys.Subscriptions.AssertEqual(
  847. Subscribe(200, 721)
  848. );
  849. }
  850. [Fact]
  851. public void GroupJoinOp_Error_V()
  852. {
  853. var scheduler = new TestScheduler();
  854. var xs = scheduler.CreateHotObservable(
  855. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  856. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  857. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  858. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  859. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  860. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  861. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  862. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(300))),
  863. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(100))),
  864. OnNext(830, new TimeInterval<int>(9, TimeSpan.FromTicks(10))),
  865. OnCompleted<TimeInterval<int>>(900)
  866. );
  867. var ys = scheduler.CreateHotObservable(
  868. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  869. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  870. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  871. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  872. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  873. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  874. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  875. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  876. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(200))),
  877. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  878. OnCompleted<TimeInterval<string>>(800)
  879. );
  880. var ex = new Exception();
  881. var ysd = new List<ITestableObservable<long>>();
  882. var res = scheduler.Start(() =>
  883. xs.GroupJoin(ys, x => { if (x.Value >= 0) { throw ex; } return Observable.Empty<long>(); }, y => NewTimer(ysd, y.Interval, scheduler), (x, yy) => yy.Select(y => x.Value + y.Value)).Merge()
  884. );
  885. res.Messages.AssertEqual(
  886. OnError<string>(210, ex)
  887. );
  888. AssertDurations(ys, ysd, 210);
  889. xs.Subscriptions.AssertEqual(
  890. Subscribe(200, 210)
  891. );
  892. ys.Subscriptions.AssertEqual(
  893. Subscribe(200, 210)
  894. );
  895. }
  896. [Fact]
  897. public void GroupJoinOp_Error_VI()
  898. {
  899. var scheduler = new TestScheduler();
  900. var xs = scheduler.CreateHotObservable(
  901. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  902. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  903. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  904. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  905. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  906. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  907. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  908. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(300))),
  909. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(100))),
  910. OnNext(830, new TimeInterval<int>(9, TimeSpan.FromTicks(10))),
  911. OnCompleted<TimeInterval<int>>(900)
  912. );
  913. var ys = scheduler.CreateHotObservable(
  914. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  915. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  916. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  917. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  918. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  919. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  920. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  921. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  922. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(200))),
  923. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  924. OnCompleted<TimeInterval<string>>(800)
  925. );
  926. var ex = new Exception();
  927. var xsd = new List<ITestableObservable<long>>();
  928. var res = scheduler.Start(() =>
  929. xs.GroupJoin(ys, x => NewTimer(xsd, x.Interval, scheduler), y => { if (y.Value.Length >= 0) { throw ex; } return Observable.Empty<long>(); }, (x, yy) => yy.Select(y => x.Value + y.Value)).Merge()
  930. );
  931. res.Messages.AssertEqual(
  932. OnError<string>(215, ex)
  933. );
  934. AssertDurations(xs, xsd, 215);
  935. xs.Subscriptions.AssertEqual(
  936. Subscribe(200, 215)
  937. );
  938. ys.Subscriptions.AssertEqual(
  939. Subscribe(200, 215)
  940. );
  941. }
  942. [Fact]
  943. public void GroupJoinOp_Error_VII()
  944. {
  945. var scheduler = new TestScheduler();
  946. var xs = scheduler.CreateHotObservable(
  947. OnNext(215, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  948. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  949. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  950. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  951. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  952. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  953. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  954. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(300))),
  955. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(100))),
  956. OnNext(830, new TimeInterval<int>(9, TimeSpan.FromTicks(10))),
  957. OnCompleted<TimeInterval<int>>(900)
  958. );
  959. var ys = scheduler.CreateHotObservable(
  960. OnNext(210, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  961. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  962. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  963. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  964. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  965. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  966. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  967. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  968. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(200))),
  969. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  970. OnCompleted<TimeInterval<string>>(800)
  971. );
  972. var ex = new Exception();
  973. var xsd = new List<ITestableObservable<long>>();
  974. var ysd = new List<ITestableObservable<long>>();
  975. var res = scheduler.Start(() =>
  976. xs.GroupJoin(ys, x => NewTimer(xsd, x.Interval, scheduler), y => NewTimer(ysd, y.Interval, scheduler), (x, yy) => { if (x.Value >= 0) { throw ex; } return yy.Select(y => x.Value + y.Value); }).Merge()
  977. );
  978. res.Messages.AssertEqual(
  979. OnError<string>(215, ex)
  980. );
  981. #if !NO_PERF // BREAKING CHANGE v2 > v1.x - Duration selector is now invoked before the result selector
  982. AssertDurations(xs, xsd, 215);
  983. #endif
  984. AssertDurations(ys, ysd, 215);
  985. xs.Subscriptions.AssertEqual(
  986. Subscribe(200, 215)
  987. );
  988. ys.Subscriptions.AssertEqual(
  989. Subscribe(200, 215)
  990. );
  991. }
  992. [Fact]
  993. public void GroupJoinOp_Error_VIII()
  994. {
  995. var scheduler = new TestScheduler();
  996. var xs = scheduler.CreateHotObservable(
  997. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  998. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  999. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  1000. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  1001. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  1002. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  1003. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  1004. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(300))),
  1005. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(100))),
  1006. OnNext(830, new TimeInterval<int>(9, TimeSpan.FromTicks(10))),
  1007. OnCompleted<TimeInterval<int>>(900)
  1008. );
  1009. var ys = scheduler.CreateHotObservable(
  1010. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  1011. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  1012. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  1013. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  1014. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  1015. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  1016. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  1017. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  1018. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(200))),
  1019. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  1020. OnCompleted<TimeInterval<string>>(800)
  1021. );
  1022. var ex = new Exception();
  1023. var xsd = new List<ITestableObservable<long>>();
  1024. var ysd = new List<ITestableObservable<long>>();
  1025. var res = scheduler.Start(() =>
  1026. xs.GroupJoin(ys, x => NewTimer(xsd, x.Interval, scheduler), y => NewTimer(ysd, y.Interval, scheduler), (x, yy) => { if (x.Value >= 0) { throw ex; } return yy.Select(y => x.Value + y.Value); }).Merge()
  1027. );
  1028. res.Messages.AssertEqual(
  1029. OnError<string>(210, ex)
  1030. );
  1031. #if !NO_PERF // BREAKING CHANGE v2 > v1.x - Duration selector is now invoked before the result selector
  1032. AssertDurations(xs, xsd, 210);
  1033. #endif
  1034. AssertDurations(ys, ysd, 210);
  1035. xs.Subscriptions.AssertEqual(
  1036. Subscribe(200, 210)
  1037. );
  1038. ys.Subscriptions.AssertEqual(
  1039. Subscribe(200, 210)
  1040. );
  1041. }
  1042. private ITestableObservable<long> NewTimer(List<ITestableObservable<long>> l, TimeSpan t, TestScheduler scheduler)
  1043. {
  1044. var timer = scheduler.CreateColdObservable(OnNext(t.Ticks, 0L), OnCompleted<long>(t.Ticks));
  1045. l.Add(timer);
  1046. return timer;
  1047. }
  1048. private void AssertDurations<T, U>(ITestableObservable<TimeInterval<T>> xs, List<ITestableObservable<U>> xsd, long lastEnd)
  1049. {
  1050. Assert.Equal(xs.Messages.Where(x => x.Value.Kind == NotificationKind.OnNext && x.Time <= lastEnd).Count(), xsd.Count);
  1051. foreach (var pair in xs.Messages.Zip(xsd, (x, y) => new { Item1 = x, Item2 = y }))
  1052. {
  1053. var start = pair.Item1.Time;
  1054. var end = Math.Min(start + pair.Item1.Value.Value.Interval.Ticks, lastEnd);
  1055. pair.Item2.Subscriptions.AssertEqual(
  1056. Subscribe(start, end)
  1057. );
  1058. }
  1059. }
  1060. }
  1061. }