GroupJoinTest.cs 55 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Text;
  8. using System.Threading.Tasks;
  9. using System.Reactive;
  10. using System.Reactive.Concurrency;
  11. using System.Reactive.Linq;
  12. using Microsoft.Reactive.Testing;
  13. using Xunit;
  14. using ReactiveTests.Dummies;
  15. using System.Reflection;
  16. using System.Threading;
  17. using System.Reactive.Disposables;
  18. using System.Reactive.Subjects;
  19. namespace ReactiveTests.Tests
  20. {
  21. public class GroupJoinTest : ReactiveTest
  22. {
  23. [Fact]
  24. public void GroupJoinOp_ArgumentChecking()
  25. {
  26. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupJoin(null, DummyObservable<int>.Instance, DummyFunc<int, IObservable<int>>.Instance, DummyFunc<int, IObservable<int>>.Instance, DummyFunc<int, IObservable<int>, int>.Instance));
  27. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupJoin(DummyObservable<int>.Instance, null, DummyFunc<int, IObservable<int>>.Instance, DummyFunc<int, IObservable<int>>.Instance, DummyFunc<int, IObservable<int>, int>.Instance));
  28. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupJoin(DummyObservable<int>.Instance, DummyObservable<int>.Instance, default(Func<int, IObservable<int>>), DummyFunc<int, IObservable<int>>.Instance, DummyFunc<int, IObservable<int>, int>.Instance));
  29. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupJoin(DummyObservable<int>.Instance, DummyObservable<int>.Instance, DummyFunc<int, IObservable<int>>.Instance, default(Func<int, IObservable<int>>), DummyFunc<int, IObservable<int>, int>.Instance));
  30. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupJoin(DummyObservable<int>.Instance, DummyObservable<int>.Instance, DummyFunc<int, IObservable<int>>.Instance, DummyFunc<int, IObservable<int>>.Instance, default(Func<int, IObservable<int>, int>)));
  31. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupJoin(DummyObservable<int>.Instance, DummyObservable<int>.Instance, DummyFunc<int, IObservable<int>>.Instance, DummyFunc<int, IObservable<int>>.Instance, DummyFunc<int, IObservable<int>, int>.Instance).Subscribe(null));
  32. }
  33. [Fact]
  34. public void GroupJoinOp_Normal_I()
  35. {
  36. var scheduler = new TestScheduler();
  37. var xs = scheduler.CreateHotObservable(
  38. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  39. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  40. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  41. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  42. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  43. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  44. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  45. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(280))),
  46. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(100))),
  47. OnNext(830, new TimeInterval<int>(9, TimeSpan.FromTicks(10))),
  48. OnCompleted<TimeInterval<int>>(900)
  49. );
  50. var ys = scheduler.CreateHotObservable(
  51. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  52. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  53. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  54. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  55. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  56. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  57. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  58. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  59. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(200))),
  60. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  61. OnCompleted<TimeInterval<string>>(800)
  62. );
  63. var xsd = new List<ITestableObservable<long>>();
  64. var ysd = new List<ITestableObservable<long>>();
  65. var res = scheduler.Start(() =>
  66. xs.GroupJoin(ys, x => NewTimer(xsd, x.Interval, scheduler), y => NewTimer(ysd, y.Interval, scheduler), (x, yy) => yy.Select(y => x.Value + y.Value)).Merge()
  67. );
  68. res.Messages.AssertEqual(
  69. OnNext(215, "0hat"),
  70. OnNext(217, "0bat"),
  71. OnNext(219, "1hat"),
  72. OnNext(300, "3wag"),
  73. OnNext(300, "3pig"),
  74. OnNext(305, "3cup"),
  75. OnNext(310, "4wag"),
  76. OnNext(310, "4pig"),
  77. OnNext(310, "4cup"),
  78. OnNext(702, "6tin"),
  79. OnNext(710, "7tin"),
  80. OnNext(712, "6man"),
  81. OnNext(712, "7man"),
  82. OnNext(720, "8tin"),
  83. OnNext(720, "8man"),
  84. OnNext(722, "6rat"),
  85. OnNext(722, "7rat"),
  86. OnNext(722, "8rat"),
  87. OnNext(732, "7wig"),
  88. OnNext(732, "8wig"),
  89. OnNext(830, "9rat"),
  90. OnCompleted<string>(990)
  91. );
  92. AssertDurations(xs, xsd, 990);
  93. AssertDurations(ys, ysd, 990);
  94. #if !NO_PERF // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
  95. xs.Subscriptions.AssertEqual(
  96. Subscribe(200, 900)
  97. );
  98. #else
  99. xs.Subscriptions.AssertEqual(
  100. Subscribe(200, 990)
  101. );
  102. #endif
  103. #if !NO_PERF // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
  104. ys.Subscriptions.AssertEqual(
  105. Subscribe(200, 800)
  106. );
  107. #else
  108. ys.Subscriptions.AssertEqual(
  109. Subscribe(200, 990)
  110. );
  111. #endif
  112. }
  113. [Fact]
  114. public void GroupJoinOp_Normal_II()
  115. {
  116. var scheduler = new TestScheduler();
  117. var xs = scheduler.CreateHotObservable(
  118. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  119. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  120. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  121. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  122. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  123. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  124. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  125. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(200))),
  126. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(100))),
  127. OnCompleted<TimeInterval<int>>(721)
  128. );
  129. var ys = scheduler.CreateHotObservable(
  130. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  131. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  132. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  133. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  134. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  135. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  136. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  137. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  138. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(200))),
  139. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  140. OnCompleted<TimeInterval<string>>(990)
  141. );
  142. var xsd = new List<ITestableObservable<long>>();
  143. var ysd = new List<ITestableObservable<long>>();
  144. var res = scheduler.Start(() =>
  145. xs.GroupJoin(ys, x => NewTimer(xsd, x.Interval, scheduler), y => NewTimer(ysd, y.Interval, scheduler), (x, yy) => yy.Select(y => x.Value + y.Value)).Merge()
  146. );
  147. res.Messages.AssertEqual(
  148. OnNext(215, "0hat"),
  149. OnNext(217, "0bat"),
  150. OnNext(219, "1hat"),
  151. OnNext(300, "3wag"),
  152. OnNext(300, "3pig"),
  153. OnNext(305, "3cup"),
  154. OnNext(310, "4wag"),
  155. OnNext(310, "4pig"),
  156. OnNext(310, "4cup"),
  157. OnNext(702, "6tin"),
  158. OnNext(710, "7tin"),
  159. OnNext(712, "6man"),
  160. OnNext(712, "7man"),
  161. OnNext(720, "8tin"),
  162. OnNext(720, "8man"),
  163. OnNext(722, "6rat"),
  164. OnNext(722, "7rat"),
  165. OnNext(722, "8rat"),
  166. OnNext(732, "7wig"),
  167. OnNext(732, "8wig"),
  168. OnCompleted<string>(910)
  169. );
  170. AssertDurations(xs, xsd, 910);
  171. AssertDurations(ys, ysd, 910);
  172. #if !NO_PERF // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
  173. xs.Subscriptions.AssertEqual(
  174. Subscribe(200, 721)
  175. );
  176. #else
  177. xs.Subscriptions.AssertEqual(
  178. Subscribe(200, 910)
  179. );
  180. #endif
  181. ys.Subscriptions.AssertEqual(
  182. Subscribe(200, 910)
  183. );
  184. }
  185. [Fact]
  186. public void GroupJoinOp_Normal_III()
  187. {
  188. var scheduler = new TestScheduler();
  189. var xs = scheduler.CreateHotObservable(
  190. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  191. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  192. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  193. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  194. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  195. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  196. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  197. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(280))),
  198. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(100))),
  199. OnNext(830, new TimeInterval<int>(9, TimeSpan.FromTicks(10))),
  200. OnCompleted<TimeInterval<int>>(900)
  201. );
  202. var ys = scheduler.CreateHotObservable(
  203. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  204. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  205. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  206. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  207. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  208. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  209. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  210. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  211. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(200))),
  212. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  213. OnCompleted<TimeInterval<string>>(800)
  214. );
  215. var xsd = new List<ITestableObservable<long>>();
  216. var ysd = new List<ITestableObservable<long>>();
  217. var res = scheduler.Start(() =>
  218. xs.GroupJoin(ys, x => NewTimer(xsd, x.Interval, scheduler).Where(_ => false), y => NewTimer(ysd, y.Interval, scheduler).Where(_ => false), (x, yy) => yy.Select(y => x.Value + y.Value)).Merge()
  219. );
  220. res.Messages.AssertEqual(
  221. OnNext(215, "0hat"),
  222. OnNext(217, "0bat"),
  223. OnNext(219, "1hat"),
  224. OnNext(300, "3wag"),
  225. OnNext(300, "3pig"),
  226. OnNext(305, "3cup"),
  227. OnNext(310, "4wag"),
  228. OnNext(310, "4pig"),
  229. OnNext(310, "4cup"),
  230. OnNext(702, "6tin"),
  231. OnNext(710, "7tin"),
  232. OnNext(712, "6man"),
  233. OnNext(712, "7man"),
  234. OnNext(720, "8tin"),
  235. OnNext(720, "8man"),
  236. OnNext(722, "6rat"),
  237. OnNext(722, "7rat"),
  238. OnNext(722, "8rat"),
  239. OnNext(732, "7wig"),
  240. OnNext(732, "8wig"),
  241. OnNext(830, "9rat"),
  242. OnCompleted<string>(990)
  243. );
  244. AssertDurations(xs, xsd, 990);
  245. AssertDurations(ys, ysd, 990);
  246. #if !NO_PERF // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
  247. xs.Subscriptions.AssertEqual(
  248. Subscribe(200, 900)
  249. );
  250. #else
  251. xs.Subscriptions.AssertEqual(
  252. Subscribe(200, 990)
  253. );
  254. #endif
  255. #if !NO_PERF // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
  256. ys.Subscriptions.AssertEqual(
  257. Subscribe(200, 800)
  258. );
  259. #else
  260. ys.Subscriptions.AssertEqual(
  261. Subscribe(200, 990)
  262. );
  263. #endif
  264. }
  265. [Fact]
  266. public void GroupJoinOp_Normal_IV()
  267. {
  268. var scheduler = new TestScheduler();
  269. var xs = scheduler.CreateHotObservable(
  270. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  271. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  272. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  273. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  274. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  275. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  276. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  277. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(200))),
  278. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(100))),
  279. OnCompleted<TimeInterval<int>>(990)
  280. );
  281. var ys = scheduler.CreateHotObservable(
  282. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  283. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  284. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  285. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  286. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  287. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  288. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  289. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  290. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(200))),
  291. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  292. OnCompleted<TimeInterval<string>>(980)
  293. );
  294. var xsd = new List<ITestableObservable<long>>();
  295. var ysd = new List<ITestableObservable<long>>();
  296. var res = scheduler.Start(() =>
  297. xs.GroupJoin(ys, x => NewTimer(xsd, x.Interval, scheduler), y => NewTimer(ysd, y.Interval, scheduler), (x, yy) => yy.Select(y => x.Value + y.Value)).Merge()
  298. );
  299. res.Messages.AssertEqual(
  300. OnNext(215, "0hat"),
  301. OnNext(217, "0bat"),
  302. OnNext(219, "1hat"),
  303. OnNext(300, "3wag"),
  304. OnNext(300, "3pig"),
  305. OnNext(305, "3cup"),
  306. OnNext(310, "4wag"),
  307. OnNext(310, "4pig"),
  308. OnNext(310, "4cup"),
  309. OnNext(702, "6tin"),
  310. OnNext(710, "7tin"),
  311. OnNext(712, "6man"),
  312. OnNext(712, "7man"),
  313. OnNext(720, "8tin"),
  314. OnNext(720, "8man"),
  315. OnNext(722, "6rat"),
  316. OnNext(722, "7rat"),
  317. OnNext(722, "8rat"),
  318. OnNext(732, "7wig"),
  319. OnNext(732, "8wig"),
  320. OnCompleted<string>(990)
  321. );
  322. AssertDurations(xs, xsd, 990);
  323. AssertDurations(ys, ysd, 990);
  324. xs.Subscriptions.AssertEqual(
  325. Subscribe(200, 990)
  326. );
  327. #if !NO_PERF // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
  328. ys.Subscriptions.AssertEqual(
  329. Subscribe(200, 980)
  330. );
  331. #else
  332. ys.Subscriptions.AssertEqual(
  333. Subscribe(200, 990)
  334. );
  335. #endif
  336. }
  337. [Fact]
  338. public void GroupJoinOp_Normal_V()
  339. {
  340. var scheduler = new TestScheduler();
  341. var xs = scheduler.CreateHotObservable(
  342. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  343. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  344. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  345. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  346. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  347. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  348. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  349. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(200))),
  350. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(100))),
  351. OnCompleted<TimeInterval<int>>(990)
  352. );
  353. var ys = scheduler.CreateHotObservable(
  354. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  355. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  356. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  357. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  358. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  359. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  360. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  361. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  362. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(200))),
  363. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  364. OnCompleted<TimeInterval<string>>(900)
  365. );
  366. var xsd = new List<ITestableObservable<long>>();
  367. var ysd = new List<ITestableObservable<long>>();
  368. var res = scheduler.Start(() =>
  369. xs.GroupJoin(ys, x => NewTimer(xsd, x.Interval, scheduler), y => NewTimer(ysd, y.Interval, scheduler), (x, yy) => yy.Select(y => x.Value + y.Value)).Merge()
  370. );
  371. res.Messages.AssertEqual(
  372. OnNext(215, "0hat"),
  373. OnNext(217, "0bat"),
  374. OnNext(219, "1hat"),
  375. OnNext(300, "3wag"),
  376. OnNext(300, "3pig"),
  377. OnNext(305, "3cup"),
  378. OnNext(310, "4wag"),
  379. OnNext(310, "4pig"),
  380. OnNext(310, "4cup"),
  381. OnNext(702, "6tin"),
  382. OnNext(710, "7tin"),
  383. OnNext(712, "6man"),
  384. OnNext(712, "7man"),
  385. OnNext(720, "8tin"),
  386. OnNext(720, "8man"),
  387. OnNext(722, "6rat"),
  388. OnNext(722, "7rat"),
  389. OnNext(722, "8rat"),
  390. OnNext(732, "7wig"),
  391. OnNext(732, "8wig"),
  392. OnCompleted<string>(990)
  393. );
  394. AssertDurations(xs, xsd, 990);
  395. AssertDurations(ys, ysd, 990);
  396. xs.Subscriptions.AssertEqual(
  397. Subscribe(200, 990)
  398. );
  399. #if !NO_PERF // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
  400. ys.Subscriptions.AssertEqual(
  401. Subscribe(200, 900)
  402. );
  403. #else
  404. ys.Subscriptions.AssertEqual(
  405. Subscribe(200, 990)
  406. );
  407. #endif
  408. }
  409. [Fact]
  410. public void GroupJoinOp_Normal_VI()
  411. {
  412. var scheduler = new TestScheduler();
  413. var xs = scheduler.CreateHotObservable(
  414. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  415. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  416. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  417. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  418. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  419. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  420. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  421. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(30))),
  422. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(200))),
  423. OnNext(830, new TimeInterval<int>(9, TimeSpan.FromTicks(10))),
  424. OnCompleted<TimeInterval<int>>(850)
  425. );
  426. var ys = scheduler.CreateHotObservable(
  427. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  428. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  429. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  430. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  431. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  432. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  433. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  434. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  435. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(20))),
  436. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  437. OnCompleted<TimeInterval<string>>(900)
  438. );
  439. var xsd = new List<ITestableObservable<long>>();
  440. var ysd = new List<ITestableObservable<long>>();
  441. var res = scheduler.Start(() =>
  442. xs.GroupJoin(ys, x => NewTimer(xsd, x.Interval, scheduler), y => NewTimer(ysd, y.Interval, scheduler), (x, yy) => yy.Select(y => x.Value + y.Value)).Merge()
  443. );
  444. res.Messages.AssertEqual(
  445. OnNext(215, "0hat"),
  446. OnNext(217, "0bat"),
  447. OnNext(219, "1hat"),
  448. OnNext(300, "3wag"),
  449. OnNext(300, "3pig"),
  450. OnNext(305, "3cup"),
  451. OnNext(310, "4wag"),
  452. OnNext(310, "4pig"),
  453. OnNext(310, "4cup"),
  454. OnNext(702, "6tin"),
  455. OnNext(710, "7tin"),
  456. OnNext(712, "6man"),
  457. OnNext(712, "7man"),
  458. OnNext(720, "8tin"),
  459. OnNext(720, "8man"),
  460. OnNext(722, "6rat"),
  461. OnNext(722, "7rat"),
  462. OnNext(722, "8rat"),
  463. OnNext(732, "7wig"),
  464. OnNext(732, "8wig"),
  465. OnCompleted<string>(920)
  466. );
  467. AssertDurations(xs, xsd, 920);
  468. AssertDurations(ys, ysd, 920);
  469. #if !NO_PERF // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
  470. xs.Subscriptions.AssertEqual(
  471. Subscribe(200, 850)
  472. );
  473. #else
  474. xs.Subscriptions.AssertEqual(
  475. Subscribe(200, 920)
  476. );
  477. #endif
  478. #if !NO_PERF // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
  479. ys.Subscriptions.AssertEqual(
  480. Subscribe(200, 900)
  481. );
  482. #else
  483. ys.Subscriptions.AssertEqual(
  484. Subscribe(200, 920)
  485. );
  486. #endif
  487. }
  488. [Fact]
  489. public void GroupJoinOp_Normal_VII()
  490. {
  491. var scheduler = new TestScheduler();
  492. var xs = scheduler.CreateHotObservable(
  493. OnCompleted<TimeInterval<int>>(210)
  494. );
  495. var ys = scheduler.CreateHotObservable(
  496. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  497. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  498. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  499. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  500. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  501. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  502. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  503. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  504. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(20))),
  505. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  506. OnCompleted<TimeInterval<string>>(900)
  507. );
  508. var xsd = new List<ITestableObservable<long>>();
  509. var ysd = new List<ITestableObservable<long>>();
  510. var res = scheduler.Start(() =>
  511. xs.GroupJoin(ys, x => NewTimer(xsd, x.Interval, scheduler), y => NewTimer(ysd, y.Interval, scheduler), (x, yy) => yy.Select(y => x.Value + y.Value)).Merge()
  512. );
  513. res.Messages.AssertEqual(
  514. OnCompleted<string>(210)
  515. );
  516. AssertDurations(xs, xsd, 210);
  517. AssertDurations(ys, ysd, 210);
  518. xs.Subscriptions.AssertEqual(
  519. Subscribe(200, 210)
  520. );
  521. ys.Subscriptions.AssertEqual(
  522. Subscribe(200, 210)
  523. );
  524. }
  525. [Fact]
  526. public void GroupJoinOp_Normal_VIII()
  527. {
  528. var scheduler = new TestScheduler();
  529. var xs = scheduler.CreateHotObservable(
  530. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(200)))
  531. );
  532. var ys = scheduler.CreateHotObservable(
  533. OnNext(220, new TimeInterval<string>("hat", TimeSpan.FromTicks(100))),
  534. OnCompleted<TimeInterval<string>>(230)
  535. );
  536. var xsd = new List<ITestableObservable<long>>();
  537. var ysd = new List<ITestableObservable<long>>();
  538. var res = scheduler.Start(() =>
  539. xs.GroupJoin(ys, x => NewTimer(xsd, x.Interval, scheduler), y => NewTimer(ysd, y.Interval, scheduler), (x, yy) => yy.Select(y => x.Value + y.Value)).Merge()
  540. );
  541. res.Messages.AssertEqual(
  542. OnNext(220, "0hat")
  543. );
  544. AssertDurations(xs, xsd, 1000);
  545. AssertDurations(ys, ysd, 1000);
  546. xs.Subscriptions.AssertEqual(
  547. Subscribe(200, 1000)
  548. );
  549. #if !NO_PERF // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
  550. ys.Subscriptions.AssertEqual(
  551. Subscribe(200, 230)
  552. );
  553. #else
  554. ys.Subscriptions.AssertEqual(
  555. Subscribe(200, 1000)
  556. );
  557. #endif
  558. }
  559. [Fact]
  560. public void GroupJoinOp_Normal_IX()
  561. {
  562. var scheduler = new TestScheduler();
  563. var xs = scheduler.CreateHotObservable(
  564. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  565. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  566. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  567. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  568. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  569. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  570. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  571. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(300))),
  572. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(100))),
  573. OnNext(830, new TimeInterval<int>(9, TimeSpan.FromTicks(10))),
  574. OnCompleted<TimeInterval<int>>(900)
  575. );
  576. var ys = scheduler.CreateHotObservable(
  577. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  578. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  579. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  580. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  581. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  582. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  583. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  584. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  585. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(200))),
  586. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  587. OnCompleted<TimeInterval<string>>(800)
  588. );
  589. var xsd = new List<ITestableObservable<long>>();
  590. var ysd = new List<ITestableObservable<long>>();
  591. var res = scheduler.Start(() =>
  592. xs.GroupJoin(ys, x => NewTimer(xsd, x.Interval, scheduler), y => NewTimer(ysd, y.Interval, scheduler), (x, yy) => yy.Select(y => x.Value + y.Value)).Merge(),
  593. 713
  594. );
  595. res.Messages.AssertEqual(
  596. OnNext(215, "0hat"),
  597. OnNext(217, "0bat"),
  598. OnNext(219, "1hat"),
  599. OnNext(300, "3wag"),
  600. OnNext(300, "3pig"),
  601. OnNext(305, "3cup"),
  602. OnNext(310, "4wag"),
  603. OnNext(310, "4pig"),
  604. OnNext(310, "4cup"),
  605. OnNext(702, "6tin"),
  606. OnNext(710, "7tin"),
  607. OnNext(712, "6man"),
  608. OnNext(712, "7man")
  609. );
  610. AssertDurations(xs, xsd, 713);
  611. AssertDurations(ys, ysd, 713);
  612. xs.Subscriptions.AssertEqual(
  613. Subscribe(200, 713)
  614. );
  615. ys.Subscriptions.AssertEqual(
  616. Subscribe(200, 713)
  617. );
  618. }
  619. [Fact]
  620. public void GroupJoinOp_Error_I()
  621. {
  622. var scheduler = new TestScheduler();
  623. var ex = new Exception();
  624. var xs = scheduler.CreateHotObservable(
  625. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  626. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  627. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  628. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  629. OnError<TimeInterval<int>>(310, ex)
  630. );
  631. var ys = scheduler.CreateHotObservable(
  632. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  633. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  634. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  635. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  636. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  637. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  638. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  639. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  640. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(200))),
  641. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  642. OnCompleted<TimeInterval<string>>(800)
  643. );
  644. var xsd = new List<ITestableObservable<long>>();
  645. var ysd = new List<ITestableObservable<long>>();
  646. var res = scheduler.Start(() =>
  647. xs.GroupJoin(ys, x => NewTimer(xsd, x.Interval, scheduler), y => NewTimer(ysd, y.Interval, scheduler), (x, yy) => yy.Select(y => x.Value + y.Value)).Merge()
  648. );
  649. res.Messages.AssertEqual(
  650. OnNext(215, "0hat"),
  651. OnNext(217, "0bat"),
  652. OnNext(219, "1hat"),
  653. OnNext(300, "3wag"),
  654. OnNext(300, "3pig"),
  655. OnNext(305, "3cup"),
  656. OnError<string>(310, ex)
  657. );
  658. AssertDurations(xs, xsd, 310);
  659. AssertDurations(ys, ysd, 310);
  660. xs.Subscriptions.AssertEqual(
  661. Subscribe(200, 310)
  662. );
  663. ys.Subscriptions.AssertEqual(
  664. Subscribe(200, 310)
  665. );
  666. }
  667. [Fact]
  668. public void GroupJoinOp_Error_II()
  669. {
  670. var scheduler = new TestScheduler();
  671. var ex = new Exception();
  672. var xs = scheduler.CreateHotObservable(
  673. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  674. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  675. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  676. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  677. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  678. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  679. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  680. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(300))),
  681. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(100))),
  682. OnNext(830, new TimeInterval<int>(9, TimeSpan.FromTicks(10))),
  683. OnCompleted<TimeInterval<int>>(900)
  684. );
  685. var ys = scheduler.CreateHotObservable(
  686. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  687. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  688. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  689. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  690. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  691. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  692. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  693. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  694. OnError<TimeInterval<string>>(722, ex)
  695. );
  696. var xsd = new List<ITestableObservable<long>>();
  697. var ysd = new List<ITestableObservable<long>>();
  698. var res = scheduler.Start(() =>
  699. xs.GroupJoin(ys, x => NewTimer(xsd, x.Interval, scheduler), y => NewTimer(ysd, y.Interval, scheduler), (x, yy) => yy.Select(y => x.Value + y.Value)).Merge()
  700. );
  701. res.Messages.AssertEqual(
  702. OnNext(215, "0hat"),
  703. OnNext(217, "0bat"),
  704. OnNext(219, "1hat"),
  705. OnNext(300, "3wag"),
  706. OnNext(300, "3pig"),
  707. OnNext(305, "3cup"),
  708. OnNext(310, "4wag"),
  709. OnNext(310, "4pig"),
  710. OnNext(310, "4cup"),
  711. OnNext(702, "6tin"),
  712. OnNext(710, "7tin"),
  713. OnNext(712, "6man"),
  714. OnNext(712, "7man"),
  715. OnNext(720, "8tin"),
  716. OnNext(720, "8man"),
  717. OnError<string>(722, ex)
  718. );
  719. AssertDurations(xs, xsd, 722);
  720. AssertDurations(ys, ysd, 722);
  721. xs.Subscriptions.AssertEqual(
  722. Subscribe(200, 722)
  723. );
  724. ys.Subscriptions.AssertEqual(
  725. Subscribe(200, 722)
  726. );
  727. }
  728. [Fact]
  729. public void GroupJoinOp_Error_III()
  730. {
  731. var scheduler = new TestScheduler();
  732. var xs = scheduler.CreateHotObservable(
  733. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  734. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  735. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  736. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  737. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  738. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  739. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  740. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(300))),
  741. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(100))),
  742. OnNext(830, new TimeInterval<int>(9, TimeSpan.FromTicks(10))),
  743. OnCompleted<TimeInterval<int>>(900)
  744. );
  745. var ys = scheduler.CreateHotObservable(
  746. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  747. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  748. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  749. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  750. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  751. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  752. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  753. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  754. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(200))),
  755. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  756. OnCompleted<TimeInterval<string>>(800)
  757. );
  758. var ex = new Exception();
  759. var xsd = new List<ITestableObservable<long>>();
  760. var ysd = new List<ITestableObservable<long>>();
  761. var res = scheduler.Start(() =>
  762. xs.GroupJoin(ys, x => NewTimer(xsd, x.Interval, scheduler).SelectMany(x.Value == 6 ? Observable.Throw<long>(ex) : Observable.Empty<long>()), y => NewTimer(ysd, y.Interval, scheduler), (x, yy) => yy.Select(y => x.Value + y.Value)).Merge()
  763. );
  764. res.Messages.AssertEqual(
  765. OnNext(215, "0hat"),
  766. OnNext(217, "0bat"),
  767. OnNext(219, "1hat"),
  768. OnNext(300, "3wag"),
  769. OnNext(300, "3pig"),
  770. OnNext(305, "3cup"),
  771. OnNext(310, "4wag"),
  772. OnNext(310, "4pig"),
  773. OnNext(310, "4cup"),
  774. OnNext(702, "6tin"),
  775. OnNext(710, "7tin"),
  776. OnNext(712, "6man"),
  777. OnNext(712, "7man"),
  778. OnNext(720, "8tin"),
  779. OnNext(720, "8man"),
  780. OnNext(722, "6rat"),
  781. OnNext(722, "7rat"),
  782. OnNext(722, "8rat"),
  783. OnError<string>(725, ex)
  784. );
  785. AssertDurations(xs, xsd, 725);
  786. AssertDurations(ys, ysd, 725);
  787. xs.Subscriptions.AssertEqual(
  788. Subscribe(200, 725)
  789. );
  790. ys.Subscriptions.AssertEqual(
  791. Subscribe(200, 725)
  792. );
  793. }
  794. [Fact]
  795. public void GroupJoinOp_Error_IV()
  796. {
  797. var scheduler = new TestScheduler();
  798. var xs = scheduler.CreateHotObservable(
  799. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  800. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  801. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  802. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  803. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  804. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  805. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  806. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(300))),
  807. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(100))),
  808. OnNext(830, new TimeInterval<int>(9, TimeSpan.FromTicks(10))),
  809. OnCompleted<TimeInterval<int>>(900)
  810. );
  811. var ys = scheduler.CreateHotObservable(
  812. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  813. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  814. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  815. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  816. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  817. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  818. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(19))),
  819. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  820. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(200))),
  821. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  822. OnCompleted<TimeInterval<string>>(800)
  823. );
  824. var ex = new Exception();
  825. var xsd = new List<ITestableObservable<long>>();
  826. var ysd = new List<ITestableObservable<long>>();
  827. var res = scheduler.Start(() =>
  828. xs.GroupJoin(ys, x => NewTimer(xsd, x.Interval, scheduler), y => NewTimer(ysd, y.Interval, scheduler).SelectMany(y.Value == "tin" ? Observable.Throw<long>(ex) : Observable.Empty<long>()), (x, yy) => yy.Select(y => x.Value + y.Value)).Merge()
  829. );
  830. res.Messages.AssertEqual(
  831. OnNext(215, "0hat"),
  832. OnNext(217, "0bat"),
  833. OnNext(219, "1hat"),
  834. OnNext(300, "3wag"),
  835. OnNext(300, "3pig"),
  836. OnNext(305, "3cup"),
  837. OnNext(310, "4wag"),
  838. OnNext(310, "4pig"),
  839. OnNext(310, "4cup"),
  840. OnNext(702, "6tin"),
  841. OnNext(710, "7tin"),
  842. OnNext(712, "6man"),
  843. OnNext(712, "7man"),
  844. OnNext(720, "8tin"),
  845. OnNext(720, "8man"),
  846. OnError<string>(721, ex)
  847. );
  848. AssertDurations(xs, xsd, 721);
  849. AssertDurations(ys, ysd, 721);
  850. xs.Subscriptions.AssertEqual(
  851. Subscribe(200, 721)
  852. );
  853. ys.Subscriptions.AssertEqual(
  854. Subscribe(200, 721)
  855. );
  856. }
  857. [Fact]
  858. public void GroupJoinOp_Error_V()
  859. {
  860. var scheduler = new TestScheduler();
  861. var xs = scheduler.CreateHotObservable(
  862. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  863. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  864. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  865. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  866. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  867. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  868. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  869. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(300))),
  870. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(100))),
  871. OnNext(830, new TimeInterval<int>(9, TimeSpan.FromTicks(10))),
  872. OnCompleted<TimeInterval<int>>(900)
  873. );
  874. var ys = scheduler.CreateHotObservable(
  875. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  876. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  877. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  878. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  879. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  880. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  881. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  882. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  883. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(200))),
  884. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  885. OnCompleted<TimeInterval<string>>(800)
  886. );
  887. var ex = new Exception();
  888. var ysd = new List<ITestableObservable<long>>();
  889. var res = scheduler.Start(() =>
  890. xs.GroupJoin(ys, x => { if (x.Value >= 0) throw ex; return Observable.Empty<long>(); }, y => NewTimer(ysd, y.Interval, scheduler), (x, yy) => yy.Select(y => x.Value + y.Value)).Merge()
  891. );
  892. res.Messages.AssertEqual(
  893. OnError<string>(210, ex)
  894. );
  895. AssertDurations(ys, ysd, 210);
  896. xs.Subscriptions.AssertEqual(
  897. Subscribe(200, 210)
  898. );
  899. ys.Subscriptions.AssertEqual(
  900. Subscribe(200, 210)
  901. );
  902. }
  903. [Fact]
  904. public void GroupJoinOp_Error_VI()
  905. {
  906. var scheduler = new TestScheduler();
  907. var xs = scheduler.CreateHotObservable(
  908. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  909. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  910. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  911. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  912. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  913. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  914. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  915. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(300))),
  916. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(100))),
  917. OnNext(830, new TimeInterval<int>(9, TimeSpan.FromTicks(10))),
  918. OnCompleted<TimeInterval<int>>(900)
  919. );
  920. var ys = scheduler.CreateHotObservable(
  921. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  922. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  923. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  924. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  925. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  926. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  927. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  928. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  929. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(200))),
  930. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  931. OnCompleted<TimeInterval<string>>(800)
  932. );
  933. var ex = new Exception();
  934. var xsd = new List<ITestableObservable<long>>();
  935. var res = scheduler.Start(() =>
  936. xs.GroupJoin(ys, x => NewTimer(xsd, x.Interval, scheduler), y => { if (y.Value.Length >= 0) throw ex; return Observable.Empty<long>(); }, (x, yy) => yy.Select(y => x.Value + y.Value)).Merge()
  937. );
  938. res.Messages.AssertEqual(
  939. OnError<string>(215, ex)
  940. );
  941. AssertDurations(xs, xsd, 215);
  942. xs.Subscriptions.AssertEqual(
  943. Subscribe(200, 215)
  944. );
  945. ys.Subscriptions.AssertEqual(
  946. Subscribe(200, 215)
  947. );
  948. }
  949. [Fact]
  950. public void GroupJoinOp_Error_VII()
  951. {
  952. var scheduler = new TestScheduler();
  953. var xs = scheduler.CreateHotObservable(
  954. OnNext(215, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  955. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  956. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  957. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  958. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  959. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  960. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  961. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(300))),
  962. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(100))),
  963. OnNext(830, new TimeInterval<int>(9, TimeSpan.FromTicks(10))),
  964. OnCompleted<TimeInterval<int>>(900)
  965. );
  966. var ys = scheduler.CreateHotObservable(
  967. OnNext(210, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  968. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  969. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  970. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  971. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  972. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  973. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  974. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  975. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(200))),
  976. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  977. OnCompleted<TimeInterval<string>>(800)
  978. );
  979. var ex = new Exception();
  980. var xsd = new List<ITestableObservable<long>>();
  981. var ysd = new List<ITestableObservable<long>>();
  982. var res = scheduler.Start(() =>
  983. xs.GroupJoin(ys, x => NewTimer(xsd, x.Interval, scheduler), y => NewTimer(ysd, y.Interval, scheduler), (x, yy) => { if (x.Value >= 0) throw ex; return yy.Select(y => x.Value + y.Value); }).Merge()
  984. );
  985. res.Messages.AssertEqual(
  986. OnError<string>(215, ex)
  987. );
  988. #if !NO_PERF // BREAKING CHANGE v2 > v1.x - Duration selector is now invoked before the result selector
  989. AssertDurations(xs, xsd, 215);
  990. #endif
  991. AssertDurations(ys, ysd, 215);
  992. xs.Subscriptions.AssertEqual(
  993. Subscribe(200, 215)
  994. );
  995. ys.Subscriptions.AssertEqual(
  996. Subscribe(200, 215)
  997. );
  998. }
  999. [Fact]
  1000. public void GroupJoinOp_Error_VIII()
  1001. {
  1002. var scheduler = new TestScheduler();
  1003. var xs = scheduler.CreateHotObservable(
  1004. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  1005. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  1006. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  1007. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  1008. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  1009. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  1010. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  1011. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(300))),
  1012. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(100))),
  1013. OnNext(830, new TimeInterval<int>(9, TimeSpan.FromTicks(10))),
  1014. OnCompleted<TimeInterval<int>>(900)
  1015. );
  1016. var ys = scheduler.CreateHotObservable(
  1017. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  1018. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  1019. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  1020. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  1021. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  1022. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  1023. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  1024. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  1025. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(200))),
  1026. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  1027. OnCompleted<TimeInterval<string>>(800)
  1028. );
  1029. var ex = new Exception();
  1030. var xsd = new List<ITestableObservable<long>>();
  1031. var ysd = new List<ITestableObservable<long>>();
  1032. var res = scheduler.Start(() =>
  1033. xs.GroupJoin(ys, x => NewTimer(xsd, x.Interval, scheduler), y => NewTimer(ysd, y.Interval, scheduler), (x, yy) => { if (x.Value >= 0) throw ex; return yy.Select(y => x.Value + y.Value); }).Merge()
  1034. );
  1035. res.Messages.AssertEqual(
  1036. OnError<string>(210, ex)
  1037. );
  1038. #if !NO_PERF // BREAKING CHANGE v2 > v1.x - Duration selector is now invoked before the result selector
  1039. AssertDurations(xs, xsd, 210);
  1040. #endif
  1041. AssertDurations(ys, ysd, 210);
  1042. xs.Subscriptions.AssertEqual(
  1043. Subscribe(200, 210)
  1044. );
  1045. ys.Subscriptions.AssertEqual(
  1046. Subscribe(200, 210)
  1047. );
  1048. }
  1049. private ITestableObservable<long> NewTimer(List<ITestableObservable<long>> l, TimeSpan t, TestScheduler scheduler)
  1050. {
  1051. var timer = scheduler.CreateColdObservable(OnNext(t.Ticks, 0L), OnCompleted<long>(t.Ticks));
  1052. l.Add(timer);
  1053. return timer;
  1054. }
  1055. private void AssertDurations<T, U>(ITestableObservable<TimeInterval<T>> xs, List<ITestableObservable<U>> xsd, long lastEnd)
  1056. {
  1057. Assert.Equal(xs.Messages.Where(x => x.Value.Kind == NotificationKind.OnNext && x.Time <= lastEnd).Count(), xsd.Count);
  1058. foreach (var pair in xs.Messages.Zip(xsd, (x, y) => new { Item1 = x, Item2 = y }))
  1059. {
  1060. var start = pair.Item1.Time;
  1061. var end = Math.Min(start + pair.Item1.Value.Value.Interval.Ticks, lastEnd);
  1062. pair.Item2.Subscriptions.AssertEqual(
  1063. Subscribe(start, end)
  1064. );
  1065. }
  1066. }
  1067. }
  1068. }