GroupJoinTest.cs 55 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Reactive;
  8. using System.Reactive.Linq;
  9. using Microsoft.Reactive.Testing;
  10. using ReactiveTests.Dummies;
  11. using Microsoft.VisualStudio.TestTools.UnitTesting;
  12. using Assert = Xunit.Assert;
  13. namespace ReactiveTests.Tests
  14. {
  15. [TestClass]
  16. public class GroupJoinTest : ReactiveTest
  17. {
  18. [TestMethod]
  19. public void GroupJoinOp_ArgumentChecking()
  20. {
  21. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupJoin(null, DummyObservable<int>.Instance, DummyFunc<int, IObservable<int>>.Instance, DummyFunc<int, IObservable<int>>.Instance, DummyFunc<int, IObservable<int>, int>.Instance));
  22. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupJoin(DummyObservable<int>.Instance, null, DummyFunc<int, IObservable<int>>.Instance, DummyFunc<int, IObservable<int>>.Instance, DummyFunc<int, IObservable<int>, int>.Instance));
  23. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupJoin(DummyObservable<int>.Instance, DummyObservable<int>.Instance, default(Func<int, IObservable<int>>), DummyFunc<int, IObservable<int>>.Instance, DummyFunc<int, IObservable<int>, int>.Instance));
  24. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupJoin(DummyObservable<int>.Instance, DummyObservable<int>.Instance, DummyFunc<int, IObservable<int>>.Instance, default(Func<int, IObservable<int>>), DummyFunc<int, IObservable<int>, int>.Instance));
  25. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupJoin(DummyObservable<int>.Instance, DummyObservable<int>.Instance, DummyFunc<int, IObservable<int>>.Instance, DummyFunc<int, IObservable<int>>.Instance, default(Func<int, IObservable<int>, int>)));
  26. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupJoin(DummyObservable<int>.Instance, DummyObservable<int>.Instance, DummyFunc<int, IObservable<int>>.Instance, DummyFunc<int, IObservable<int>>.Instance, DummyFunc<int, IObservable<int>, int>.Instance).Subscribe(null));
  27. }
  28. [TestMethod]
  29. public void GroupJoinOp_Normal_I()
  30. {
  31. var scheduler = new TestScheduler();
  32. var xs = scheduler.CreateHotObservable(
  33. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  34. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  35. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  36. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  37. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  38. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  39. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  40. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(280))),
  41. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(100))),
  42. OnNext(830, new TimeInterval<int>(9, TimeSpan.FromTicks(10))),
  43. OnCompleted<TimeInterval<int>>(900)
  44. );
  45. var ys = scheduler.CreateHotObservable(
  46. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  47. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  48. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  49. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  50. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  51. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  52. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  53. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  54. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(200))),
  55. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  56. OnCompleted<TimeInterval<string>>(800)
  57. );
  58. var xsd = new List<ITestableObservable<long>>();
  59. var ysd = new List<ITestableObservable<long>>();
  60. var res = scheduler.Start(() =>
  61. xs.GroupJoin(ys, x => NewTimer(xsd, x.Interval, scheduler), y => NewTimer(ysd, y.Interval, scheduler), (x, yy) => yy.Select(y => x.Value + y.Value)).Merge()
  62. );
  63. res.Messages.AssertEqual(
  64. OnNext(215, "0hat"),
  65. OnNext(217, "0bat"),
  66. OnNext(219, "1hat"),
  67. OnNext(300, "3wag"),
  68. OnNext(300, "3pig"),
  69. OnNext(305, "3cup"),
  70. OnNext(310, "4wag"),
  71. OnNext(310, "4pig"),
  72. OnNext(310, "4cup"),
  73. OnNext(702, "6tin"),
  74. OnNext(710, "7tin"),
  75. OnNext(712, "6man"),
  76. OnNext(712, "7man"),
  77. OnNext(720, "8tin"),
  78. OnNext(720, "8man"),
  79. OnNext(722, "6rat"),
  80. OnNext(722, "7rat"),
  81. OnNext(722, "8rat"),
  82. OnNext(732, "7wig"),
  83. OnNext(732, "8wig"),
  84. OnNext(830, "9rat"),
  85. OnCompleted<string>(990)
  86. );
  87. AssertDurations(xs, xsd, 990);
  88. AssertDurations(ys, ysd, 990);
  89. #if !NO_PERF // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
  90. xs.Subscriptions.AssertEqual(
  91. Subscribe(200, 900)
  92. );
  93. #else
  94. xs.Subscriptions.AssertEqual(
  95. Subscribe(200, 990)
  96. );
  97. #endif
  98. #if !NO_PERF // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
  99. ys.Subscriptions.AssertEqual(
  100. Subscribe(200, 800)
  101. );
  102. #else
  103. ys.Subscriptions.AssertEqual(
  104. Subscribe(200, 990)
  105. );
  106. #endif
  107. }
  108. [TestMethod]
  109. public void GroupJoinOp_Normal_II()
  110. {
  111. var scheduler = new TestScheduler();
  112. var xs = scheduler.CreateHotObservable(
  113. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  114. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  115. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  116. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  117. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  118. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  119. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  120. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(200))),
  121. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(100))),
  122. OnCompleted<TimeInterval<int>>(721)
  123. );
  124. var ys = scheduler.CreateHotObservable(
  125. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  126. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  127. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  128. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  129. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  130. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  131. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  132. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  133. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(200))),
  134. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  135. OnCompleted<TimeInterval<string>>(990)
  136. );
  137. var xsd = new List<ITestableObservable<long>>();
  138. var ysd = new List<ITestableObservable<long>>();
  139. var res = scheduler.Start(() =>
  140. xs.GroupJoin(ys, x => NewTimer(xsd, x.Interval, scheduler), y => NewTimer(ysd, y.Interval, scheduler), (x, yy) => yy.Select(y => x.Value + y.Value)).Merge()
  141. );
  142. res.Messages.AssertEqual(
  143. OnNext(215, "0hat"),
  144. OnNext(217, "0bat"),
  145. OnNext(219, "1hat"),
  146. OnNext(300, "3wag"),
  147. OnNext(300, "3pig"),
  148. OnNext(305, "3cup"),
  149. OnNext(310, "4wag"),
  150. OnNext(310, "4pig"),
  151. OnNext(310, "4cup"),
  152. OnNext(702, "6tin"),
  153. OnNext(710, "7tin"),
  154. OnNext(712, "6man"),
  155. OnNext(712, "7man"),
  156. OnNext(720, "8tin"),
  157. OnNext(720, "8man"),
  158. OnNext(722, "6rat"),
  159. OnNext(722, "7rat"),
  160. OnNext(722, "8rat"),
  161. OnNext(732, "7wig"),
  162. OnNext(732, "8wig"),
  163. OnCompleted<string>(910)
  164. );
  165. AssertDurations(xs, xsd, 910);
  166. AssertDurations(ys, ysd, 910);
  167. #if !NO_PERF // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
  168. xs.Subscriptions.AssertEqual(
  169. Subscribe(200, 721)
  170. );
  171. #else
  172. xs.Subscriptions.AssertEqual(
  173. Subscribe(200, 910)
  174. );
  175. #endif
  176. ys.Subscriptions.AssertEqual(
  177. Subscribe(200, 910)
  178. );
  179. }
  180. [TestMethod]
  181. public void GroupJoinOp_Normal_III()
  182. {
  183. var scheduler = new TestScheduler();
  184. var xs = scheduler.CreateHotObservable(
  185. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  186. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  187. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  188. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  189. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  190. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  191. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  192. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(280))),
  193. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(100))),
  194. OnNext(830, new TimeInterval<int>(9, TimeSpan.FromTicks(10))),
  195. OnCompleted<TimeInterval<int>>(900)
  196. );
  197. var ys = scheduler.CreateHotObservable(
  198. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  199. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  200. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  201. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  202. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  203. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  204. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  205. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  206. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(200))),
  207. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  208. OnCompleted<TimeInterval<string>>(800)
  209. );
  210. var xsd = new List<ITestableObservable<long>>();
  211. var ysd = new List<ITestableObservable<long>>();
  212. var res = scheduler.Start(() =>
  213. xs.GroupJoin(ys, x => NewTimer(xsd, x.Interval, scheduler).Where(_ => false), y => NewTimer(ysd, y.Interval, scheduler).Where(_ => false), (x, yy) => yy.Select(y => x.Value + y.Value)).Merge()
  214. );
  215. res.Messages.AssertEqual(
  216. OnNext(215, "0hat"),
  217. OnNext(217, "0bat"),
  218. OnNext(219, "1hat"),
  219. OnNext(300, "3wag"),
  220. OnNext(300, "3pig"),
  221. OnNext(305, "3cup"),
  222. OnNext(310, "4wag"),
  223. OnNext(310, "4pig"),
  224. OnNext(310, "4cup"),
  225. OnNext(702, "6tin"),
  226. OnNext(710, "7tin"),
  227. OnNext(712, "6man"),
  228. OnNext(712, "7man"),
  229. OnNext(720, "8tin"),
  230. OnNext(720, "8man"),
  231. OnNext(722, "6rat"),
  232. OnNext(722, "7rat"),
  233. OnNext(722, "8rat"),
  234. OnNext(732, "7wig"),
  235. OnNext(732, "8wig"),
  236. OnNext(830, "9rat"),
  237. OnCompleted<string>(990)
  238. );
  239. AssertDurations(xs, xsd, 990);
  240. AssertDurations(ys, ysd, 990);
  241. #if !NO_PERF // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
  242. xs.Subscriptions.AssertEqual(
  243. Subscribe(200, 900)
  244. );
  245. #else
  246. xs.Subscriptions.AssertEqual(
  247. Subscribe(200, 990)
  248. );
  249. #endif
  250. #if !NO_PERF // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
  251. ys.Subscriptions.AssertEqual(
  252. Subscribe(200, 800)
  253. );
  254. #else
  255. ys.Subscriptions.AssertEqual(
  256. Subscribe(200, 990)
  257. );
  258. #endif
  259. }
  260. [TestMethod]
  261. public void GroupJoinOp_Normal_IV()
  262. {
  263. var scheduler = new TestScheduler();
  264. var xs = scheduler.CreateHotObservable(
  265. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  266. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  267. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  268. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  269. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  270. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  271. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  272. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(200))),
  273. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(100))),
  274. OnCompleted<TimeInterval<int>>(990)
  275. );
  276. var ys = scheduler.CreateHotObservable(
  277. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  278. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  279. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  280. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  281. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  282. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  283. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  284. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  285. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(200))),
  286. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  287. OnCompleted<TimeInterval<string>>(980)
  288. );
  289. var xsd = new List<ITestableObservable<long>>();
  290. var ysd = new List<ITestableObservable<long>>();
  291. var res = scheduler.Start(() =>
  292. xs.GroupJoin(ys, x => NewTimer(xsd, x.Interval, scheduler), y => NewTimer(ysd, y.Interval, scheduler), (x, yy) => yy.Select(y => x.Value + y.Value)).Merge()
  293. );
  294. res.Messages.AssertEqual(
  295. OnNext(215, "0hat"),
  296. OnNext(217, "0bat"),
  297. OnNext(219, "1hat"),
  298. OnNext(300, "3wag"),
  299. OnNext(300, "3pig"),
  300. OnNext(305, "3cup"),
  301. OnNext(310, "4wag"),
  302. OnNext(310, "4pig"),
  303. OnNext(310, "4cup"),
  304. OnNext(702, "6tin"),
  305. OnNext(710, "7tin"),
  306. OnNext(712, "6man"),
  307. OnNext(712, "7man"),
  308. OnNext(720, "8tin"),
  309. OnNext(720, "8man"),
  310. OnNext(722, "6rat"),
  311. OnNext(722, "7rat"),
  312. OnNext(722, "8rat"),
  313. OnNext(732, "7wig"),
  314. OnNext(732, "8wig"),
  315. OnCompleted<string>(990)
  316. );
  317. AssertDurations(xs, xsd, 990);
  318. AssertDurations(ys, ysd, 990);
  319. xs.Subscriptions.AssertEqual(
  320. Subscribe(200, 990)
  321. );
  322. #if !NO_PERF // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
  323. ys.Subscriptions.AssertEqual(
  324. Subscribe(200, 980)
  325. );
  326. #else
  327. ys.Subscriptions.AssertEqual(
  328. Subscribe(200, 990)
  329. );
  330. #endif
  331. }
  332. [TestMethod]
  333. public void GroupJoinOp_Normal_V()
  334. {
  335. var scheduler = new TestScheduler();
  336. var xs = scheduler.CreateHotObservable(
  337. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  338. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  339. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  340. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  341. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  342. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  343. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  344. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(200))),
  345. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(100))),
  346. OnCompleted<TimeInterval<int>>(990)
  347. );
  348. var ys = scheduler.CreateHotObservable(
  349. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  350. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  351. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  352. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  353. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  354. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  355. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  356. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  357. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(200))),
  358. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  359. OnCompleted<TimeInterval<string>>(900)
  360. );
  361. var xsd = new List<ITestableObservable<long>>();
  362. var ysd = new List<ITestableObservable<long>>();
  363. var res = scheduler.Start(() =>
  364. xs.GroupJoin(ys, x => NewTimer(xsd, x.Interval, scheduler), y => NewTimer(ysd, y.Interval, scheduler), (x, yy) => yy.Select(y => x.Value + y.Value)).Merge()
  365. );
  366. res.Messages.AssertEqual(
  367. OnNext(215, "0hat"),
  368. OnNext(217, "0bat"),
  369. OnNext(219, "1hat"),
  370. OnNext(300, "3wag"),
  371. OnNext(300, "3pig"),
  372. OnNext(305, "3cup"),
  373. OnNext(310, "4wag"),
  374. OnNext(310, "4pig"),
  375. OnNext(310, "4cup"),
  376. OnNext(702, "6tin"),
  377. OnNext(710, "7tin"),
  378. OnNext(712, "6man"),
  379. OnNext(712, "7man"),
  380. OnNext(720, "8tin"),
  381. OnNext(720, "8man"),
  382. OnNext(722, "6rat"),
  383. OnNext(722, "7rat"),
  384. OnNext(722, "8rat"),
  385. OnNext(732, "7wig"),
  386. OnNext(732, "8wig"),
  387. OnCompleted<string>(990)
  388. );
  389. AssertDurations(xs, xsd, 990);
  390. AssertDurations(ys, ysd, 990);
  391. xs.Subscriptions.AssertEqual(
  392. Subscribe(200, 990)
  393. );
  394. #if !NO_PERF // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
  395. ys.Subscriptions.AssertEqual(
  396. Subscribe(200, 900)
  397. );
  398. #else
  399. ys.Subscriptions.AssertEqual(
  400. Subscribe(200, 990)
  401. );
  402. #endif
  403. }
  404. [TestMethod]
  405. public void GroupJoinOp_Normal_VI()
  406. {
  407. var scheduler = new TestScheduler();
  408. var xs = scheduler.CreateHotObservable(
  409. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  410. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  411. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  412. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  413. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  414. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  415. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  416. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(30))),
  417. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(200))),
  418. OnNext(830, new TimeInterval<int>(9, TimeSpan.FromTicks(10))),
  419. OnCompleted<TimeInterval<int>>(850)
  420. );
  421. var ys = scheduler.CreateHotObservable(
  422. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  423. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  424. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  425. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  426. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  427. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  428. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  429. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  430. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(20))),
  431. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  432. OnCompleted<TimeInterval<string>>(900)
  433. );
  434. var xsd = new List<ITestableObservable<long>>();
  435. var ysd = new List<ITestableObservable<long>>();
  436. var res = scheduler.Start(() =>
  437. xs.GroupJoin(ys, x => NewTimer(xsd, x.Interval, scheduler), y => NewTimer(ysd, y.Interval, scheduler), (x, yy) => yy.Select(y => x.Value + y.Value)).Merge()
  438. );
  439. res.Messages.AssertEqual(
  440. OnNext(215, "0hat"),
  441. OnNext(217, "0bat"),
  442. OnNext(219, "1hat"),
  443. OnNext(300, "3wag"),
  444. OnNext(300, "3pig"),
  445. OnNext(305, "3cup"),
  446. OnNext(310, "4wag"),
  447. OnNext(310, "4pig"),
  448. OnNext(310, "4cup"),
  449. OnNext(702, "6tin"),
  450. OnNext(710, "7tin"),
  451. OnNext(712, "6man"),
  452. OnNext(712, "7man"),
  453. OnNext(720, "8tin"),
  454. OnNext(720, "8man"),
  455. OnNext(722, "6rat"),
  456. OnNext(722, "7rat"),
  457. OnNext(722, "8rat"),
  458. OnNext(732, "7wig"),
  459. OnNext(732, "8wig"),
  460. OnCompleted<string>(920)
  461. );
  462. AssertDurations(xs, xsd, 920);
  463. AssertDurations(ys, ysd, 920);
  464. #if !NO_PERF // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
  465. xs.Subscriptions.AssertEqual(
  466. Subscribe(200, 850)
  467. );
  468. #else
  469. xs.Subscriptions.AssertEqual(
  470. Subscribe(200, 920)
  471. );
  472. #endif
  473. #if !NO_PERF // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
  474. ys.Subscriptions.AssertEqual(
  475. Subscribe(200, 900)
  476. );
  477. #else
  478. ys.Subscriptions.AssertEqual(
  479. Subscribe(200, 920)
  480. );
  481. #endif
  482. }
  483. [TestMethod]
  484. public void GroupJoinOp_Normal_VII()
  485. {
  486. var scheduler = new TestScheduler();
  487. var xs = scheduler.CreateHotObservable(
  488. OnCompleted<TimeInterval<int>>(210)
  489. );
  490. var ys = scheduler.CreateHotObservable(
  491. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  492. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  493. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  494. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  495. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  496. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  497. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  498. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  499. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(20))),
  500. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  501. OnCompleted<TimeInterval<string>>(900)
  502. );
  503. var xsd = new List<ITestableObservable<long>>();
  504. var ysd = new List<ITestableObservable<long>>();
  505. var res = scheduler.Start(() =>
  506. xs.GroupJoin(ys, x => NewTimer(xsd, x.Interval, scheduler), y => NewTimer(ysd, y.Interval, scheduler), (x, yy) => yy.Select(y => x.Value + y.Value)).Merge()
  507. );
  508. res.Messages.AssertEqual(
  509. OnCompleted<string>(210)
  510. );
  511. AssertDurations(xs, xsd, 210);
  512. AssertDurations(ys, ysd, 210);
  513. xs.Subscriptions.AssertEqual(
  514. Subscribe(200, 210)
  515. );
  516. ys.Subscriptions.AssertEqual(
  517. Subscribe(200, 210)
  518. );
  519. }
  520. [TestMethod]
  521. public void GroupJoinOp_Normal_VIII()
  522. {
  523. var scheduler = new TestScheduler();
  524. var xs = scheduler.CreateHotObservable(
  525. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(200)))
  526. );
  527. var ys = scheduler.CreateHotObservable(
  528. OnNext(220, new TimeInterval<string>("hat", TimeSpan.FromTicks(100))),
  529. OnCompleted<TimeInterval<string>>(230)
  530. );
  531. var xsd = new List<ITestableObservable<long>>();
  532. var ysd = new List<ITestableObservable<long>>();
  533. var res = scheduler.Start(() =>
  534. xs.GroupJoin(ys, x => NewTimer(xsd, x.Interval, scheduler), y => NewTimer(ysd, y.Interval, scheduler), (x, yy) => yy.Select(y => x.Value + y.Value)).Merge()
  535. );
  536. res.Messages.AssertEqual(
  537. OnNext(220, "0hat")
  538. );
  539. AssertDurations(xs, xsd, 1000);
  540. AssertDurations(ys, ysd, 1000);
  541. xs.Subscriptions.AssertEqual(
  542. Subscribe(200, 1000)
  543. );
  544. #if !NO_PERF // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
  545. ys.Subscriptions.AssertEqual(
  546. Subscribe(200, 230)
  547. );
  548. #else
  549. ys.Subscriptions.AssertEqual(
  550. Subscribe(200, 1000)
  551. );
  552. #endif
  553. }
  554. [TestMethod]
  555. public void GroupJoinOp_Normal_IX()
  556. {
  557. var scheduler = new TestScheduler();
  558. var xs = scheduler.CreateHotObservable(
  559. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  560. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  561. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  562. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  563. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  564. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  565. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  566. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(300))),
  567. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(100))),
  568. OnNext(830, new TimeInterval<int>(9, TimeSpan.FromTicks(10))),
  569. OnCompleted<TimeInterval<int>>(900)
  570. );
  571. var ys = scheduler.CreateHotObservable(
  572. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  573. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  574. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  575. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  576. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  577. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  578. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  579. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  580. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(200))),
  581. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  582. OnCompleted<TimeInterval<string>>(800)
  583. );
  584. var xsd = new List<ITestableObservable<long>>();
  585. var ysd = new List<ITestableObservable<long>>();
  586. var res = scheduler.Start(() =>
  587. xs.GroupJoin(ys, x => NewTimer(xsd, x.Interval, scheduler), y => NewTimer(ysd, y.Interval, scheduler), (x, yy) => yy.Select(y => x.Value + y.Value)).Merge(),
  588. 713
  589. );
  590. res.Messages.AssertEqual(
  591. OnNext(215, "0hat"),
  592. OnNext(217, "0bat"),
  593. OnNext(219, "1hat"),
  594. OnNext(300, "3wag"),
  595. OnNext(300, "3pig"),
  596. OnNext(305, "3cup"),
  597. OnNext(310, "4wag"),
  598. OnNext(310, "4pig"),
  599. OnNext(310, "4cup"),
  600. OnNext(702, "6tin"),
  601. OnNext(710, "7tin"),
  602. OnNext(712, "6man"),
  603. OnNext(712, "7man")
  604. );
  605. AssertDurations(xs, xsd, 713);
  606. AssertDurations(ys, ysd, 713);
  607. xs.Subscriptions.AssertEqual(
  608. Subscribe(200, 713)
  609. );
  610. ys.Subscriptions.AssertEqual(
  611. Subscribe(200, 713)
  612. );
  613. }
  614. [TestMethod]
  615. public void GroupJoinOp_Error_I()
  616. {
  617. var scheduler = new TestScheduler();
  618. var ex = new Exception();
  619. var xs = scheduler.CreateHotObservable(
  620. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  621. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  622. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  623. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  624. OnError<TimeInterval<int>>(310, ex)
  625. );
  626. var ys = scheduler.CreateHotObservable(
  627. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  628. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  629. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  630. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  631. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  632. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  633. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  634. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  635. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(200))),
  636. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  637. OnCompleted<TimeInterval<string>>(800)
  638. );
  639. var xsd = new List<ITestableObservable<long>>();
  640. var ysd = new List<ITestableObservable<long>>();
  641. var res = scheduler.Start(() =>
  642. xs.GroupJoin(ys, x => NewTimer(xsd, x.Interval, scheduler), y => NewTimer(ysd, y.Interval, scheduler), (x, yy) => yy.Select(y => x.Value + y.Value)).Merge()
  643. );
  644. res.Messages.AssertEqual(
  645. OnNext(215, "0hat"),
  646. OnNext(217, "0bat"),
  647. OnNext(219, "1hat"),
  648. OnNext(300, "3wag"),
  649. OnNext(300, "3pig"),
  650. OnNext(305, "3cup"),
  651. OnError<string>(310, ex)
  652. );
  653. AssertDurations(xs, xsd, 310);
  654. AssertDurations(ys, ysd, 310);
  655. xs.Subscriptions.AssertEqual(
  656. Subscribe(200, 310)
  657. );
  658. ys.Subscriptions.AssertEqual(
  659. Subscribe(200, 310)
  660. );
  661. }
  662. [TestMethod]
  663. public void GroupJoinOp_Error_II()
  664. {
  665. var scheduler = new TestScheduler();
  666. var ex = new Exception();
  667. var xs = scheduler.CreateHotObservable(
  668. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  669. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  670. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  671. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  672. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  673. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  674. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  675. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(300))),
  676. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(100))),
  677. OnNext(830, new TimeInterval<int>(9, TimeSpan.FromTicks(10))),
  678. OnCompleted<TimeInterval<int>>(900)
  679. );
  680. var ys = scheduler.CreateHotObservable(
  681. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  682. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  683. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  684. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  685. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  686. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  687. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  688. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  689. OnError<TimeInterval<string>>(722, ex)
  690. );
  691. var xsd = new List<ITestableObservable<long>>();
  692. var ysd = new List<ITestableObservable<long>>();
  693. var res = scheduler.Start(() =>
  694. xs.GroupJoin(ys, x => NewTimer(xsd, x.Interval, scheduler), y => NewTimer(ysd, y.Interval, scheduler), (x, yy) => yy.Select(y => x.Value + y.Value)).Merge()
  695. );
  696. res.Messages.AssertEqual(
  697. OnNext(215, "0hat"),
  698. OnNext(217, "0bat"),
  699. OnNext(219, "1hat"),
  700. OnNext(300, "3wag"),
  701. OnNext(300, "3pig"),
  702. OnNext(305, "3cup"),
  703. OnNext(310, "4wag"),
  704. OnNext(310, "4pig"),
  705. OnNext(310, "4cup"),
  706. OnNext(702, "6tin"),
  707. OnNext(710, "7tin"),
  708. OnNext(712, "6man"),
  709. OnNext(712, "7man"),
  710. OnNext(720, "8tin"),
  711. OnNext(720, "8man"),
  712. OnError<string>(722, ex)
  713. );
  714. AssertDurations(xs, xsd, 722);
  715. AssertDurations(ys, ysd, 722);
  716. xs.Subscriptions.AssertEqual(
  717. Subscribe(200, 722)
  718. );
  719. ys.Subscriptions.AssertEqual(
  720. Subscribe(200, 722)
  721. );
  722. }
  723. [TestMethod]
  724. public void GroupJoinOp_Error_III()
  725. {
  726. var scheduler = new TestScheduler();
  727. var xs = scheduler.CreateHotObservable(
  728. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  729. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  730. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  731. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  732. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  733. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  734. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  735. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(300))),
  736. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(100))),
  737. OnNext(830, new TimeInterval<int>(9, TimeSpan.FromTicks(10))),
  738. OnCompleted<TimeInterval<int>>(900)
  739. );
  740. var ys = scheduler.CreateHotObservable(
  741. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  742. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  743. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  744. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  745. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  746. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  747. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  748. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  749. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(200))),
  750. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  751. OnCompleted<TimeInterval<string>>(800)
  752. );
  753. var ex = new Exception();
  754. var xsd = new List<ITestableObservable<long>>();
  755. var ysd = new List<ITestableObservable<long>>();
  756. var res = scheduler.Start(() =>
  757. xs.GroupJoin(ys, x => NewTimer(xsd, x.Interval, scheduler).SelectMany(x.Value == 6 ? Observable.Throw<long>(ex) : Observable.Empty<long>()), y => NewTimer(ysd, y.Interval, scheduler), (x, yy) => yy.Select(y => x.Value + y.Value)).Merge()
  758. );
  759. res.Messages.AssertEqual(
  760. OnNext(215, "0hat"),
  761. OnNext(217, "0bat"),
  762. OnNext(219, "1hat"),
  763. OnNext(300, "3wag"),
  764. OnNext(300, "3pig"),
  765. OnNext(305, "3cup"),
  766. OnNext(310, "4wag"),
  767. OnNext(310, "4pig"),
  768. OnNext(310, "4cup"),
  769. OnNext(702, "6tin"),
  770. OnNext(710, "7tin"),
  771. OnNext(712, "6man"),
  772. OnNext(712, "7man"),
  773. OnNext(720, "8tin"),
  774. OnNext(720, "8man"),
  775. OnNext(722, "6rat"),
  776. OnNext(722, "7rat"),
  777. OnNext(722, "8rat"),
  778. OnError<string>(725, ex)
  779. );
  780. AssertDurations(xs, xsd, 725);
  781. AssertDurations(ys, ysd, 725);
  782. xs.Subscriptions.AssertEqual(
  783. Subscribe(200, 725)
  784. );
  785. ys.Subscriptions.AssertEqual(
  786. Subscribe(200, 725)
  787. );
  788. }
  789. [TestMethod]
  790. public void GroupJoinOp_Error_IV()
  791. {
  792. var scheduler = new TestScheduler();
  793. var xs = scheduler.CreateHotObservable(
  794. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  795. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  796. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  797. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  798. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  799. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  800. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  801. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(300))),
  802. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(100))),
  803. OnNext(830, new TimeInterval<int>(9, TimeSpan.FromTicks(10))),
  804. OnCompleted<TimeInterval<int>>(900)
  805. );
  806. var ys = scheduler.CreateHotObservable(
  807. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  808. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  809. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  810. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  811. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  812. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  813. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(19))),
  814. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  815. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(200))),
  816. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  817. OnCompleted<TimeInterval<string>>(800)
  818. );
  819. var ex = new Exception();
  820. var xsd = new List<ITestableObservable<long>>();
  821. var ysd = new List<ITestableObservable<long>>();
  822. var res = scheduler.Start(() =>
  823. xs.GroupJoin(ys, x => NewTimer(xsd, x.Interval, scheduler), y => NewTimer(ysd, y.Interval, scheduler).SelectMany(y.Value == "tin" ? Observable.Throw<long>(ex) : Observable.Empty<long>()), (x, yy) => yy.Select(y => x.Value + y.Value)).Merge()
  824. );
  825. res.Messages.AssertEqual(
  826. OnNext(215, "0hat"),
  827. OnNext(217, "0bat"),
  828. OnNext(219, "1hat"),
  829. OnNext(300, "3wag"),
  830. OnNext(300, "3pig"),
  831. OnNext(305, "3cup"),
  832. OnNext(310, "4wag"),
  833. OnNext(310, "4pig"),
  834. OnNext(310, "4cup"),
  835. OnNext(702, "6tin"),
  836. OnNext(710, "7tin"),
  837. OnNext(712, "6man"),
  838. OnNext(712, "7man"),
  839. OnNext(720, "8tin"),
  840. OnNext(720, "8man"),
  841. OnError<string>(721, ex)
  842. );
  843. AssertDurations(xs, xsd, 721);
  844. AssertDurations(ys, ysd, 721);
  845. xs.Subscriptions.AssertEqual(
  846. Subscribe(200, 721)
  847. );
  848. ys.Subscriptions.AssertEqual(
  849. Subscribe(200, 721)
  850. );
  851. }
  852. [TestMethod]
  853. public void GroupJoinOp_Error_V()
  854. {
  855. var scheduler = new TestScheduler();
  856. var xs = scheduler.CreateHotObservable(
  857. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  858. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  859. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  860. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  861. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  862. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  863. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  864. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(300))),
  865. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(100))),
  866. OnNext(830, new TimeInterval<int>(9, TimeSpan.FromTicks(10))),
  867. OnCompleted<TimeInterval<int>>(900)
  868. );
  869. var ys = scheduler.CreateHotObservable(
  870. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  871. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  872. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  873. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  874. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  875. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  876. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  877. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  878. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(200))),
  879. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  880. OnCompleted<TimeInterval<string>>(800)
  881. );
  882. var ex = new Exception();
  883. var ysd = new List<ITestableObservable<long>>();
  884. var res = scheduler.Start(() =>
  885. xs.GroupJoin(ys, x => { if (x.Value >= 0) { throw ex; } return Observable.Empty<long>(); }, y => NewTimer(ysd, y.Interval, scheduler), (x, yy) => yy.Select(y => x.Value + y.Value)).Merge()
  886. );
  887. res.Messages.AssertEqual(
  888. OnError<string>(210, ex)
  889. );
  890. AssertDurations(ys, ysd, 210);
  891. xs.Subscriptions.AssertEqual(
  892. Subscribe(200, 210)
  893. );
  894. ys.Subscriptions.AssertEqual(
  895. Subscribe(200, 210)
  896. );
  897. }
  898. [TestMethod]
  899. public void GroupJoinOp_Error_VI()
  900. {
  901. var scheduler = new TestScheduler();
  902. var xs = scheduler.CreateHotObservable(
  903. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  904. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  905. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  906. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  907. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  908. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  909. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  910. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(300))),
  911. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(100))),
  912. OnNext(830, new TimeInterval<int>(9, TimeSpan.FromTicks(10))),
  913. OnCompleted<TimeInterval<int>>(900)
  914. );
  915. var ys = scheduler.CreateHotObservable(
  916. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  917. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  918. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  919. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  920. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  921. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  922. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  923. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  924. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(200))),
  925. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  926. OnCompleted<TimeInterval<string>>(800)
  927. );
  928. var ex = new Exception();
  929. var xsd = new List<ITestableObservable<long>>();
  930. var res = scheduler.Start(() =>
  931. xs.GroupJoin(ys, x => NewTimer(xsd, x.Interval, scheduler), y => { if (y.Value.Length >= 0) { throw ex; } return Observable.Empty<long>(); }, (x, yy) => yy.Select(y => x.Value + y.Value)).Merge()
  932. );
  933. res.Messages.AssertEqual(
  934. OnError<string>(215, ex)
  935. );
  936. AssertDurations(xs, xsd, 215);
  937. xs.Subscriptions.AssertEqual(
  938. Subscribe(200, 215)
  939. );
  940. ys.Subscriptions.AssertEqual(
  941. Subscribe(200, 215)
  942. );
  943. }
  944. [TestMethod]
  945. public void GroupJoinOp_Error_VII()
  946. {
  947. var scheduler = new TestScheduler();
  948. var xs = scheduler.CreateHotObservable(
  949. OnNext(215, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  950. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  951. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  952. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  953. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  954. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  955. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  956. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(300))),
  957. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(100))),
  958. OnNext(830, new TimeInterval<int>(9, TimeSpan.FromTicks(10))),
  959. OnCompleted<TimeInterval<int>>(900)
  960. );
  961. var ys = scheduler.CreateHotObservable(
  962. OnNext(210, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  963. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  964. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  965. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  966. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  967. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  968. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  969. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  970. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(200))),
  971. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  972. OnCompleted<TimeInterval<string>>(800)
  973. );
  974. var ex = new Exception();
  975. var xsd = new List<ITestableObservable<long>>();
  976. var ysd = new List<ITestableObservable<long>>();
  977. var res = scheduler.Start(() =>
  978. xs.GroupJoin(ys, x => NewTimer(xsd, x.Interval, scheduler), y => NewTimer(ysd, y.Interval, scheduler), (x, yy) => { if (x.Value >= 0) { throw ex; } return yy.Select(y => x.Value + y.Value); }).Merge()
  979. );
  980. res.Messages.AssertEqual(
  981. OnError<string>(215, ex)
  982. );
  983. #if !NO_PERF // BREAKING CHANGE v2 > v1.x - Duration selector is now invoked before the result selector
  984. AssertDurations(xs, xsd, 215);
  985. #endif
  986. AssertDurations(ys, ysd, 215);
  987. xs.Subscriptions.AssertEqual(
  988. Subscribe(200, 215)
  989. );
  990. ys.Subscriptions.AssertEqual(
  991. Subscribe(200, 215)
  992. );
  993. }
  994. [TestMethod]
  995. public void GroupJoinOp_Error_VIII()
  996. {
  997. var scheduler = new TestScheduler();
  998. var xs = scheduler.CreateHotObservable(
  999. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  1000. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  1001. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  1002. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  1003. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  1004. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  1005. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  1006. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(300))),
  1007. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(100))),
  1008. OnNext(830, new TimeInterval<int>(9, TimeSpan.FromTicks(10))),
  1009. OnCompleted<TimeInterval<int>>(900)
  1010. );
  1011. var ys = scheduler.CreateHotObservable(
  1012. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  1013. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  1014. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  1015. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  1016. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  1017. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  1018. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  1019. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  1020. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(200))),
  1021. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  1022. OnCompleted<TimeInterval<string>>(800)
  1023. );
  1024. var ex = new Exception();
  1025. var xsd = new List<ITestableObservable<long>>();
  1026. var ysd = new List<ITestableObservable<long>>();
  1027. var res = scheduler.Start(() =>
  1028. xs.GroupJoin(ys, x => NewTimer(xsd, x.Interval, scheduler), y => NewTimer(ysd, y.Interval, scheduler), (x, yy) => { if (x.Value >= 0) { throw ex; } return yy.Select(y => x.Value + y.Value); }).Merge()
  1029. );
  1030. res.Messages.AssertEqual(
  1031. OnError<string>(210, ex)
  1032. );
  1033. #if !NO_PERF // BREAKING CHANGE v2 > v1.x - Duration selector is now invoked before the result selector
  1034. AssertDurations(xs, xsd, 210);
  1035. #endif
  1036. AssertDurations(ys, ysd, 210);
  1037. xs.Subscriptions.AssertEqual(
  1038. Subscribe(200, 210)
  1039. );
  1040. ys.Subscriptions.AssertEqual(
  1041. Subscribe(200, 210)
  1042. );
  1043. }
  1044. private ITestableObservable<long> NewTimer(List<ITestableObservable<long>> l, TimeSpan t, TestScheduler scheduler)
  1045. {
  1046. var timer = scheduler.CreateColdObservable(OnNext(t.Ticks, 0L), OnCompleted<long>(t.Ticks));
  1047. l.Add(timer);
  1048. return timer;
  1049. }
  1050. private void AssertDurations<T, U>(ITestableObservable<TimeInterval<T>> xs, List<ITestableObservable<U>> xsd, long lastEnd)
  1051. {
  1052. Assert.Equal(xs.Messages.Where(x => x.Value.Kind == NotificationKind.OnNext && x.Time <= lastEnd).Count(), xsd.Count);
  1053. foreach (var pair in xs.Messages.Zip(xsd, (x, y) => new { Item1 = x, Item2 = y }))
  1054. {
  1055. var start = pair.Item1.Time;
  1056. var end = Math.Min(start + pair.Item1.Value.Value.Interval.Ticks, lastEnd);
  1057. pair.Item2.Subscriptions.AssertEqual(
  1058. Subscribe(start, end)
  1059. );
  1060. }
  1061. }
  1062. }
  1063. }