JoinTest.cs 50 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Text;
  8. using System.Threading.Tasks;
  9. using System.Reactive;
  10. using System.Reactive.Concurrency;
  11. using System.Reactive.Linq;
  12. using Microsoft.Reactive.Testing;
  13. using Xunit;
  14. using ReactiveTests.Dummies;
  15. using System.Reflection;
  16. using System.Threading;
  17. using System.Reactive.Disposables;
  18. using System.Reactive.Subjects;
  19. namespace ReactiveTests.Tests
  20. {
  21. public class JoinTest : ReactiveTest
  22. {
  23. [Fact]
  24. public void JoinOp_ArgumentChecking()
  25. {
  26. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Join(null, DummyObservable<int>.Instance, DummyFunc<int, IObservable<int>>.Instance, DummyFunc<int, IObservable<int>>.Instance, DummyFunc<int, int, int>.Instance));
  27. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Join(DummyObservable<int>.Instance, null, DummyFunc<int, IObservable<int>>.Instance, DummyFunc<int, IObservable<int>>.Instance, DummyFunc<int, int, int>.Instance));
  28. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Join(DummyObservable<int>.Instance, DummyObservable<int>.Instance, default(Func<int, IObservable<int>>), DummyFunc<int, IObservable<int>>.Instance, DummyFunc<int, int, int>.Instance));
  29. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Join(DummyObservable<int>.Instance, DummyObservable<int>.Instance, DummyFunc<int, IObservable<int>>.Instance, default(Func<int, IObservable<int>>), DummyFunc<int, int, int>.Instance));
  30. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Join(DummyObservable<int>.Instance, DummyObservable<int>.Instance, DummyFunc<int, IObservable<int>>.Instance, DummyFunc<int, IObservable<int>>.Instance, default(Func<int, int, int>)));
  31. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Join(DummyObservable<int>.Instance, DummyObservable<int>.Instance, DummyFunc<int, IObservable<int>>.Instance, DummyFunc<int, IObservable<int>>.Instance, DummyFunc<int, int, int>.Instance).Subscribe(null));
  32. }
  33. [Fact]
  34. public void JoinOp_Normal_I()
  35. {
  36. var scheduler = new TestScheduler();
  37. var xs = scheduler.CreateHotObservable(
  38. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  39. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  40. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  41. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  42. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  43. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  44. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  45. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(300))),
  46. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(100))),
  47. OnNext(830, new TimeInterval<int>(9, TimeSpan.FromTicks(10))),
  48. OnCompleted<TimeInterval<int>>(900)
  49. );
  50. var ys = scheduler.CreateHotObservable(
  51. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  52. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  53. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  54. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  55. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  56. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  57. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  58. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  59. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(200))),
  60. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  61. OnCompleted<TimeInterval<string>>(800)
  62. );
  63. var xsd = new List<ITestableObservable<long>>();
  64. var ysd = new List<ITestableObservable<long>>();
  65. var res = scheduler.Start(() =>
  66. xs.Join(ys, x => NewTimer(xsd, x.Interval, scheduler), y => NewTimer(ysd, y.Interval, scheduler), (x, y) => x.Value + y.Value)
  67. );
  68. res.Messages.AssertEqual(
  69. OnNext(215, "0hat"),
  70. OnNext(217, "0bat"),
  71. OnNext(219, "1hat"),
  72. OnNext(300, "3wag"),
  73. OnNext(300, "3pig"),
  74. OnNext(305, "3cup"),
  75. OnNext(310, "4wag"),
  76. OnNext(310, "4pig"),
  77. OnNext(310, "4cup"),
  78. OnNext(702, "6tin"),
  79. OnNext(710, "7tin"),
  80. OnNext(712, "6man"),
  81. OnNext(712, "7man"),
  82. OnNext(720, "8tin"),
  83. OnNext(720, "8man"),
  84. OnNext(722, "6rat"),
  85. OnNext(722, "7rat"),
  86. OnNext(722, "8rat"),
  87. OnNext(732, "7wig"),
  88. OnNext(732, "8wig"),
  89. OnNext(830, "9rat"),
  90. OnCompleted<string>(900)
  91. );
  92. xs.Subscriptions.AssertEqual(
  93. Subscribe(200, 900)
  94. );
  95. #if !NO_PERF // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
  96. ys.Subscriptions.AssertEqual(
  97. Subscribe(200, 800)
  98. );
  99. #else
  100. ys.Subscriptions.AssertEqual(
  101. Subscribe(200, 900)
  102. );
  103. #endif
  104. AssertDurations(xs, xsd, 900);
  105. AssertDurations(ys, ysd, 900);
  106. }
  107. [Fact]
  108. public void JoinOp_Normal_II()
  109. {
  110. var scheduler = new TestScheduler();
  111. var xs = scheduler.CreateHotObservable(
  112. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  113. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  114. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  115. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  116. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  117. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  118. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  119. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(200))),
  120. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(100))),
  121. OnCompleted<TimeInterval<int>>(721)
  122. );
  123. var ys = scheduler.CreateHotObservable(
  124. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  125. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  126. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  127. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  128. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  129. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  130. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  131. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  132. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(200))),
  133. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  134. OnCompleted<TimeInterval<string>>(990)
  135. );
  136. var xsd = new List<ITestableObservable<long>>();
  137. var ysd = new List<ITestableObservable<long>>();
  138. var res = scheduler.Start(() =>
  139. xs.Join(ys, x => NewTimer(xsd, x.Interval, scheduler), y => NewTimer(ysd, y.Interval, scheduler), (x, y) => x.Value + y.Value)
  140. );
  141. res.Messages.AssertEqual(
  142. OnNext(215, "0hat"),
  143. OnNext(217, "0bat"),
  144. OnNext(219, "1hat"),
  145. OnNext(300, "3wag"),
  146. OnNext(300, "3pig"),
  147. OnNext(305, "3cup"),
  148. OnNext(310, "4wag"),
  149. OnNext(310, "4pig"),
  150. OnNext(310, "4cup"),
  151. OnNext(702, "6tin"),
  152. OnNext(710, "7tin"),
  153. OnNext(712, "6man"),
  154. OnNext(712, "7man"),
  155. OnNext(720, "8tin"),
  156. OnNext(720, "8man"),
  157. OnNext(722, "6rat"),
  158. OnNext(722, "7rat"),
  159. OnNext(722, "8rat"),
  160. OnNext(732, "7wig"),
  161. OnNext(732, "8wig"),
  162. OnCompleted<string>(910)
  163. );
  164. #if !NO_PERF // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
  165. xs.Subscriptions.AssertEqual(
  166. Subscribe(200, 721)
  167. );
  168. #else
  169. xs.Subscriptions.AssertEqual(
  170. Subscribe(200, 910)
  171. );
  172. #endif
  173. ys.Subscriptions.AssertEqual(
  174. Subscribe(200, 910)
  175. );
  176. AssertDurations(xs, xsd, 910);
  177. AssertDurations(ys, ysd, 910);
  178. }
  179. [Fact]
  180. public void JoinOp_Normal_III()
  181. {
  182. var scheduler = new TestScheduler();
  183. var xs = scheduler.CreateHotObservable(
  184. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  185. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  186. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  187. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  188. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  189. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  190. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  191. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(300))),
  192. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(100))),
  193. OnNext(830, new TimeInterval<int>(9, TimeSpan.FromTicks(10))),
  194. OnCompleted<TimeInterval<int>>(900)
  195. );
  196. var ys = scheduler.CreateHotObservable(
  197. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  198. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  199. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  200. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  201. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  202. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  203. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  204. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  205. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(200))),
  206. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  207. OnCompleted<TimeInterval<string>>(800)
  208. );
  209. var xsd = new List<ITestableObservable<long>>();
  210. var ysd = new List<ITestableObservable<long>>();
  211. var res = scheduler.Start(() =>
  212. xs.Join(ys, x => NewTimer(xsd, x.Interval, scheduler).Where(_ => false), y => NewTimer(ysd, y.Interval, scheduler).Where(_ => false), (x, y) => x.Value + y.Value)
  213. );
  214. res.Messages.AssertEqual(
  215. OnNext(215, "0hat"),
  216. OnNext(217, "0bat"),
  217. OnNext(219, "1hat"),
  218. OnNext(300, "3wag"),
  219. OnNext(300, "3pig"),
  220. OnNext(305, "3cup"),
  221. OnNext(310, "4wag"),
  222. OnNext(310, "4pig"),
  223. OnNext(310, "4cup"),
  224. OnNext(702, "6tin"),
  225. OnNext(710, "7tin"),
  226. OnNext(712, "6man"),
  227. OnNext(712, "7man"),
  228. OnNext(720, "8tin"),
  229. OnNext(720, "8man"),
  230. OnNext(722, "6rat"),
  231. OnNext(722, "7rat"),
  232. OnNext(722, "8rat"),
  233. OnNext(732, "7wig"),
  234. OnNext(732, "8wig"),
  235. OnNext(830, "9rat"),
  236. OnCompleted<string>(900)
  237. );
  238. xs.Subscriptions.AssertEqual(
  239. Subscribe(200, 900)
  240. );
  241. #if !NO_PERF // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
  242. ys.Subscriptions.AssertEqual(
  243. Subscribe(200, 800)
  244. );
  245. #else
  246. ys.Subscriptions.AssertEqual(
  247. Subscribe(200, 900)
  248. );
  249. #endif
  250. AssertDurations(xs, xsd, 900);
  251. AssertDurations(ys, ysd, 900);
  252. }
  253. [Fact]
  254. public void JoinOp_Normal_IV()
  255. {
  256. var scheduler = new TestScheduler();
  257. var xs = scheduler.CreateHotObservable(
  258. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  259. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  260. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  261. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  262. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  263. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  264. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  265. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(200))),
  266. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(100))),
  267. OnCompleted<TimeInterval<int>>(990)
  268. );
  269. var ys = scheduler.CreateHotObservable(
  270. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  271. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  272. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  273. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  274. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  275. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  276. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  277. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  278. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(200))),
  279. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  280. OnCompleted<TimeInterval<string>>(980)
  281. );
  282. var xsd = new List<ITestableObservable<long>>();
  283. var ysd = new List<ITestableObservable<long>>();
  284. var res = scheduler.Start(() =>
  285. xs.Join(ys, x => NewTimer(xsd, x.Interval, scheduler), y => NewTimer(ysd, y.Interval, scheduler), (x, y) => x.Value + y.Value)
  286. );
  287. res.Messages.AssertEqual(
  288. OnNext(215, "0hat"),
  289. OnNext(217, "0bat"),
  290. OnNext(219, "1hat"),
  291. OnNext(300, "3wag"),
  292. OnNext(300, "3pig"),
  293. OnNext(305, "3cup"),
  294. OnNext(310, "4wag"),
  295. OnNext(310, "4pig"),
  296. OnNext(310, "4cup"),
  297. OnNext(702, "6tin"),
  298. OnNext(710, "7tin"),
  299. OnNext(712, "6man"),
  300. OnNext(712, "7man"),
  301. OnNext(720, "8tin"),
  302. OnNext(720, "8man"),
  303. OnNext(722, "6rat"),
  304. OnNext(722, "7rat"),
  305. OnNext(722, "8rat"),
  306. OnNext(732, "7wig"),
  307. OnNext(732, "8wig"),
  308. OnCompleted<string>(980)
  309. );
  310. xs.Subscriptions.AssertEqual(
  311. Subscribe(200, 980)
  312. );
  313. ys.Subscriptions.AssertEqual(
  314. Subscribe(200, 980)
  315. );
  316. AssertDurations(xs, xsd, 980);
  317. AssertDurations(ys, ysd, 980);
  318. }
  319. [Fact]
  320. public void JoinOp_Normal_V()
  321. {
  322. var scheduler = new TestScheduler();
  323. var xs = scheduler.CreateHotObservable(
  324. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  325. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  326. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  327. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  328. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  329. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  330. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  331. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(200))),
  332. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(100))),
  333. OnCompleted<TimeInterval<int>>(990)
  334. );
  335. var ys = scheduler.CreateHotObservable(
  336. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  337. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  338. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  339. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  340. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  341. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  342. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  343. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  344. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(200))),
  345. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  346. OnCompleted<TimeInterval<string>>(900)
  347. );
  348. var xsd = new List<ITestableObservable<long>>();
  349. var ysd = new List<ITestableObservable<long>>();
  350. var res = scheduler.Start(() =>
  351. xs.Join(ys, x => NewTimer(xsd, x.Interval, scheduler), y => NewTimer(ysd, y.Interval, scheduler), (x, y) => x.Value + y.Value)
  352. );
  353. res.Messages.AssertEqual(
  354. OnNext(215, "0hat"),
  355. OnNext(217, "0bat"),
  356. OnNext(219, "1hat"),
  357. OnNext(300, "3wag"),
  358. OnNext(300, "3pig"),
  359. OnNext(305, "3cup"),
  360. OnNext(310, "4wag"),
  361. OnNext(310, "4pig"),
  362. OnNext(310, "4cup"),
  363. OnNext(702, "6tin"),
  364. OnNext(710, "7tin"),
  365. OnNext(712, "6man"),
  366. OnNext(712, "7man"),
  367. OnNext(720, "8tin"),
  368. OnNext(720, "8man"),
  369. OnNext(722, "6rat"),
  370. OnNext(722, "7rat"),
  371. OnNext(722, "8rat"),
  372. OnNext(732, "7wig"),
  373. OnNext(732, "8wig"),
  374. OnCompleted<string>(922)
  375. );
  376. xs.Subscriptions.AssertEqual(
  377. Subscribe(200, 922)
  378. );
  379. #if !NO_PERF // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
  380. ys.Subscriptions.AssertEqual(
  381. Subscribe(200, 900)
  382. );
  383. #else
  384. ys.Subscriptions.AssertEqual(
  385. Subscribe(200, 922)
  386. );
  387. #endif
  388. AssertDurations(xs, xsd, 922);
  389. AssertDurations(ys, ysd, 922);
  390. }
  391. [Fact]
  392. public void JoinOp_Normal_VI()
  393. {
  394. var scheduler = new TestScheduler();
  395. var xs = scheduler.CreateHotObservable(
  396. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  397. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  398. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  399. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  400. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  401. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  402. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  403. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(30))),
  404. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(200))),
  405. OnNext(830, new TimeInterval<int>(9, TimeSpan.FromTicks(10))),
  406. OnCompleted<TimeInterval<int>>(850)
  407. );
  408. var ys = scheduler.CreateHotObservable(
  409. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  410. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  411. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  412. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  413. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  414. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  415. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  416. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  417. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(20))),
  418. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  419. OnCompleted<TimeInterval<string>>(900)
  420. );
  421. var xsd = new List<ITestableObservable<long>>();
  422. var ysd = new List<ITestableObservable<long>>();
  423. var res = scheduler.Start(() =>
  424. xs.Join(ys, x => NewTimer(xsd, x.Interval, scheduler), y => NewTimer(ysd, y.Interval, scheduler), (x, y) => x.Value + y.Value)
  425. );
  426. res.Messages.AssertEqual(
  427. OnNext(215, "0hat"),
  428. OnNext(217, "0bat"),
  429. OnNext(219, "1hat"),
  430. OnNext(300, "3wag"),
  431. OnNext(300, "3pig"),
  432. OnNext(305, "3cup"),
  433. OnNext(310, "4wag"),
  434. OnNext(310, "4pig"),
  435. OnNext(310, "4cup"),
  436. OnNext(702, "6tin"),
  437. OnNext(710, "7tin"),
  438. OnNext(712, "6man"),
  439. OnNext(712, "7man"),
  440. OnNext(720, "8tin"),
  441. OnNext(720, "8man"),
  442. OnNext(722, "6rat"),
  443. OnNext(722, "7rat"),
  444. OnNext(722, "8rat"),
  445. OnNext(732, "7wig"),
  446. OnNext(732, "8wig"),
  447. OnCompleted<string>(900)
  448. );
  449. #if !NO_PERF // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
  450. xs.Subscriptions.AssertEqual(
  451. Subscribe(200, 850)
  452. );
  453. #else
  454. xs.Subscriptions.AssertEqual(
  455. Subscribe(200, 900)
  456. );
  457. #endif
  458. ys.Subscriptions.AssertEqual(
  459. Subscribe(200, 900)
  460. );
  461. AssertDurations(xs, xsd, 900);
  462. AssertDurations(ys, ysd, 900);
  463. }
  464. [Fact]
  465. public void JoinOp_Normal_VII()
  466. {
  467. var scheduler = new TestScheduler();
  468. var xs = scheduler.CreateHotObservable(
  469. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  470. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  471. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  472. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  473. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  474. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  475. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  476. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(300))),
  477. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(100))),
  478. OnNext(830, new TimeInterval<int>(9, TimeSpan.FromTicks(10))),
  479. OnCompleted<TimeInterval<int>>(900)
  480. );
  481. var ys = scheduler.CreateHotObservable(
  482. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  483. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  484. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  485. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  486. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  487. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  488. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  489. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  490. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(200))),
  491. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  492. OnCompleted<TimeInterval<string>>(800)
  493. );
  494. var xsd = new List<ITestableObservable<long>>();
  495. var ysd = new List<ITestableObservable<long>>();
  496. var res = scheduler.Start(() =>
  497. xs.Join(ys, x => NewTimer(xsd, x.Interval, scheduler), y => NewTimer(ysd, y.Interval, scheduler), (x, y) => x.Value + y.Value),
  498. 713
  499. );
  500. res.Messages.AssertEqual(
  501. OnNext(215, "0hat"),
  502. OnNext(217, "0bat"),
  503. OnNext(219, "1hat"),
  504. OnNext(300, "3wag"),
  505. OnNext(300, "3pig"),
  506. OnNext(305, "3cup"),
  507. OnNext(310, "4wag"),
  508. OnNext(310, "4pig"),
  509. OnNext(310, "4cup"),
  510. OnNext(702, "6tin"),
  511. OnNext(710, "7tin"),
  512. OnNext(712, "6man"),
  513. OnNext(712, "7man")
  514. );
  515. xs.Subscriptions.AssertEqual(
  516. Subscribe(200, 713)
  517. );
  518. ys.Subscriptions.AssertEqual(
  519. Subscribe(200, 713)
  520. );
  521. AssertDurations(xs, xsd, 713);
  522. AssertDurations(ys, ysd, 713);
  523. }
  524. [Fact]
  525. public void JoinOp_Error_I()
  526. {
  527. var scheduler = new TestScheduler();
  528. var ex = new Exception();
  529. var xs = scheduler.CreateHotObservable(
  530. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  531. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  532. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  533. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  534. OnError<TimeInterval<int>>(310, ex)
  535. );
  536. var ys = scheduler.CreateHotObservable(
  537. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  538. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  539. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  540. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  541. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  542. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  543. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  544. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  545. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(200))),
  546. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  547. OnCompleted<TimeInterval<string>>(800)
  548. );
  549. var xsd = new List<ITestableObservable<long>>();
  550. var ysd = new List<ITestableObservable<long>>();
  551. var res = scheduler.Start(() =>
  552. xs.Join(ys, x => NewTimer(xsd, x.Interval, scheduler), y => NewTimer(ysd, y.Interval, scheduler), (x, y) => x.Value + y.Value)
  553. );
  554. res.Messages.AssertEqual(
  555. OnNext(215, "0hat"),
  556. OnNext(217, "0bat"),
  557. OnNext(219, "1hat"),
  558. OnNext(300, "3wag"),
  559. OnNext(300, "3pig"),
  560. OnNext(305, "3cup"),
  561. OnError<string>(310, ex)
  562. );
  563. xs.Subscriptions.AssertEqual(
  564. Subscribe(200, 310)
  565. );
  566. ys.Subscriptions.AssertEqual(
  567. Subscribe(200, 310)
  568. );
  569. AssertDurations(xs, xsd, 310);
  570. AssertDurations(ys, ysd, 310);
  571. }
  572. [Fact]
  573. public void JoinOp_Error_II()
  574. {
  575. var scheduler = new TestScheduler();
  576. var ex = new Exception();
  577. var xs = scheduler.CreateHotObservable(
  578. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  579. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  580. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  581. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  582. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  583. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  584. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  585. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(300))),
  586. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(100))),
  587. OnNext(830, new TimeInterval<int>(9, TimeSpan.FromTicks(10))),
  588. OnCompleted<TimeInterval<int>>(900)
  589. );
  590. var ys = scheduler.CreateHotObservable(
  591. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  592. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  593. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  594. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  595. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  596. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  597. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  598. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  599. OnError<TimeInterval<string>>(722, ex)
  600. );
  601. var xsd = new List<ITestableObservable<long>>();
  602. var ysd = new List<ITestableObservable<long>>();
  603. var res = scheduler.Start(() =>
  604. xs.Join(ys, x => NewTimer(xsd, x.Interval, scheduler), y => NewTimer(ysd, y.Interval, scheduler), (x, y) => x.Value + y.Value)
  605. );
  606. res.Messages.AssertEqual(
  607. OnNext(215, "0hat"),
  608. OnNext(217, "0bat"),
  609. OnNext(219, "1hat"),
  610. OnNext(300, "3wag"),
  611. OnNext(300, "3pig"),
  612. OnNext(305, "3cup"),
  613. OnNext(310, "4wag"),
  614. OnNext(310, "4pig"),
  615. OnNext(310, "4cup"),
  616. OnNext(702, "6tin"),
  617. OnNext(710, "7tin"),
  618. OnNext(712, "6man"),
  619. OnNext(712, "7man"),
  620. OnNext(720, "8tin"),
  621. OnNext(720, "8man"),
  622. OnError<string>(722, ex)
  623. );
  624. xs.Subscriptions.AssertEqual(
  625. Subscribe(200, 722)
  626. );
  627. ys.Subscriptions.AssertEqual(
  628. Subscribe(200, 722)
  629. );
  630. AssertDurations(xs, xsd, 722);
  631. AssertDurations(ys, ysd, 722);
  632. }
  633. [Fact]
  634. public void JoinOp_Error_III()
  635. {
  636. var scheduler = new TestScheduler();
  637. var xs = scheduler.CreateHotObservable(
  638. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  639. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  640. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  641. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  642. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  643. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  644. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  645. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(300))),
  646. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(100))),
  647. OnNext(830, new TimeInterval<int>(9, TimeSpan.FromTicks(10))),
  648. OnCompleted<TimeInterval<int>>(900)
  649. );
  650. var ys = scheduler.CreateHotObservable(
  651. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  652. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  653. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  654. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  655. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  656. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  657. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  658. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  659. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(200))),
  660. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  661. OnCompleted<TimeInterval<string>>(800)
  662. );
  663. var ex = new Exception();
  664. var xsd = new List<ITestableObservable<long>>();
  665. var ysd = new List<ITestableObservable<long>>();
  666. var res = scheduler.Start(() =>
  667. xs.Join(ys, x => NewTimer(xsd, x.Interval, scheduler).SelectMany(x.Value == 6 ? Observable.Throw<long>(ex) : Observable.Empty<long>()), y => NewTimer(ysd, y.Interval, scheduler), (x, y) => x.Value + y.Value)
  668. );
  669. res.Messages.AssertEqual(
  670. OnNext(215, "0hat"),
  671. OnNext(217, "0bat"),
  672. OnNext(219, "1hat"),
  673. OnNext(300, "3wag"),
  674. OnNext(300, "3pig"),
  675. OnNext(305, "3cup"),
  676. OnNext(310, "4wag"),
  677. OnNext(310, "4pig"),
  678. OnNext(310, "4cup"),
  679. OnNext(702, "6tin"),
  680. OnNext(710, "7tin"),
  681. OnNext(712, "6man"),
  682. OnNext(712, "7man"),
  683. OnNext(720, "8tin"),
  684. OnNext(720, "8man"),
  685. OnNext(722, "6rat"),
  686. OnNext(722, "7rat"),
  687. OnNext(722, "8rat"),
  688. OnError<string>(725, ex)
  689. );
  690. xs.Subscriptions.AssertEqual(
  691. Subscribe(200, 725)
  692. );
  693. ys.Subscriptions.AssertEqual(
  694. Subscribe(200, 725)
  695. );
  696. AssertDurations(xs, xsd, 725);
  697. AssertDurations(ys, ysd, 725);
  698. }
  699. [Fact]
  700. public void JoinOp_Error_IV()
  701. {
  702. var scheduler = new TestScheduler();
  703. var xs = scheduler.CreateHotObservable(
  704. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  705. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  706. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  707. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  708. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  709. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  710. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  711. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(300))),
  712. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(100))),
  713. OnNext(830, new TimeInterval<int>(9, TimeSpan.FromTicks(10))),
  714. OnCompleted<TimeInterval<int>>(900)
  715. );
  716. var ys = scheduler.CreateHotObservable(
  717. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  718. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  719. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  720. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  721. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  722. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  723. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(19))),
  724. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  725. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(200))),
  726. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  727. OnCompleted<TimeInterval<string>>(800)
  728. );
  729. var ex = new Exception();
  730. var xsd = new List<ITestableObservable<long>>();
  731. var ysd = new List<ITestableObservable<long>>();
  732. var res = scheduler.Start(() =>
  733. xs.Join(ys, x => NewTimer(xsd, x.Interval, scheduler), y => NewTimer(ysd, y.Interval, scheduler).SelectMany(y.Value == "tin" ? Observable.Throw<long>(ex) : Observable.Empty<long>()), (x, y) => x.Value + y.Value)
  734. );
  735. res.Messages.AssertEqual(
  736. OnNext(215, "0hat"),
  737. OnNext(217, "0bat"),
  738. OnNext(219, "1hat"),
  739. OnNext(300, "3wag"),
  740. OnNext(300, "3pig"),
  741. OnNext(305, "3cup"),
  742. OnNext(310, "4wag"),
  743. OnNext(310, "4pig"),
  744. OnNext(310, "4cup"),
  745. OnNext(702, "6tin"),
  746. OnNext(710, "7tin"),
  747. OnNext(712, "6man"),
  748. OnNext(712, "7man"),
  749. OnNext(720, "8tin"),
  750. OnNext(720, "8man"),
  751. OnError<string>(721, ex)
  752. );
  753. xs.Subscriptions.AssertEqual(
  754. Subscribe(200, 721)
  755. );
  756. ys.Subscriptions.AssertEqual(
  757. Subscribe(200, 721)
  758. );
  759. AssertDurations(xs, xsd, 721);
  760. AssertDurations(ys, ysd, 721);
  761. }
  762. [Fact]
  763. public void JoinOp_Error_V()
  764. {
  765. var scheduler = new TestScheduler();
  766. var xs = scheduler.CreateHotObservable(
  767. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  768. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  769. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  770. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  771. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  772. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  773. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  774. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(300))),
  775. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(100))),
  776. OnNext(830, new TimeInterval<int>(9, TimeSpan.FromTicks(10))),
  777. OnCompleted<TimeInterval<int>>(900)
  778. );
  779. var ys = scheduler.CreateHotObservable(
  780. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  781. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  782. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  783. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  784. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  785. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  786. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  787. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  788. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(200))),
  789. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  790. OnCompleted<TimeInterval<string>>(800)
  791. );
  792. var ex = new Exception();
  793. var ysd = new List<ITestableObservable<long>>();
  794. var res = scheduler.Start(() =>
  795. xs.Join(ys, x => { if (x.Value >= 0) throw ex; return Observable.Empty<long>(); }, y => NewTimer(ysd, y.Interval, scheduler), (x, y) => x.Value + y.Value)
  796. );
  797. res.Messages.AssertEqual(
  798. OnError<string>(210, ex)
  799. );
  800. xs.Subscriptions.AssertEqual(
  801. Subscribe(200, 210)
  802. );
  803. ys.Subscriptions.AssertEqual(
  804. Subscribe(200, 210)
  805. );
  806. AssertDurations(ys, ysd, 210);
  807. }
  808. [Fact]
  809. public void JoinOp_Error_VI()
  810. {
  811. var scheduler = new TestScheduler();
  812. var xs = scheduler.CreateHotObservable(
  813. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  814. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  815. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  816. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  817. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  818. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  819. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  820. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(300))),
  821. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(100))),
  822. OnNext(830, new TimeInterval<int>(9, TimeSpan.FromTicks(10))),
  823. OnCompleted<TimeInterval<int>>(900)
  824. );
  825. var ys = scheduler.CreateHotObservable(
  826. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  827. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  828. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  829. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  830. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  831. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  832. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  833. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  834. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(200))),
  835. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  836. OnCompleted<TimeInterval<string>>(800)
  837. );
  838. var ex = new Exception();
  839. var xsd = new List<ITestableObservable<long>>();
  840. var res = scheduler.Start(() =>
  841. xs.Join(ys, x => NewTimer(xsd, x.Interval, scheduler), y => { if (y.Value.Length >= 0) throw ex; return Observable.Empty<long>(); }, (x, y) => x.Value + y.Value)
  842. );
  843. res.Messages.AssertEqual(
  844. OnError<string>(215, ex)
  845. );
  846. xs.Subscriptions.AssertEqual(
  847. Subscribe(200, 215)
  848. );
  849. ys.Subscriptions.AssertEqual(
  850. Subscribe(200, 215)
  851. );
  852. AssertDurations(xs, xsd, 215);
  853. }
  854. [Fact]
  855. public void JoinOp_Error_VII()
  856. {
  857. var scheduler = new TestScheduler();
  858. var xs = scheduler.CreateHotObservable(
  859. OnNext(215, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  860. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  861. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  862. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  863. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  864. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  865. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  866. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(300))),
  867. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(100))),
  868. OnNext(830, new TimeInterval<int>(9, TimeSpan.FromTicks(10))),
  869. OnCompleted<TimeInterval<int>>(900)
  870. );
  871. var ys = scheduler.CreateHotObservable(
  872. OnNext(210, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  873. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  874. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  875. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  876. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  877. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  878. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  879. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  880. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(200))),
  881. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  882. OnCompleted<TimeInterval<string>>(800)
  883. );
  884. var ex = new Exception();
  885. var xsd = new List<ITestableObservable<long>>();
  886. var ysd = new List<ITestableObservable<long>>();
  887. var res = scheduler.Start(() =>
  888. xs.Join(ys, x => NewTimer(xsd, x.Interval, scheduler), y => NewTimer(ysd, y.Interval, scheduler), (x, y) => { if (x.Value >= 0) throw ex; return x.Value + y.Value; })
  889. );
  890. res.Messages.AssertEqual(
  891. OnError<string>(215, ex)
  892. );
  893. xs.Subscriptions.AssertEqual(
  894. Subscribe(200, 215)
  895. );
  896. ys.Subscriptions.AssertEqual(
  897. Subscribe(200, 215)
  898. );
  899. AssertDurations(xs, xsd, 215);
  900. AssertDurations(ys, ysd, 215);
  901. }
  902. [Fact]
  903. public void JoinOp_Error_VIII()
  904. {
  905. var scheduler = new TestScheduler();
  906. var xs = scheduler.CreateHotObservable(
  907. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  908. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  909. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  910. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  911. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  912. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  913. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  914. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(300))),
  915. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(100))),
  916. OnNext(830, new TimeInterval<int>(9, TimeSpan.FromTicks(10))),
  917. OnCompleted<TimeInterval<int>>(900)
  918. );
  919. var ys = scheduler.CreateHotObservable(
  920. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  921. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  922. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  923. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  924. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  925. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  926. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  927. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  928. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(200))),
  929. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  930. OnCompleted<TimeInterval<string>>(800)
  931. );
  932. var ex = new Exception();
  933. var xsd = new List<ITestableObservable<long>>();
  934. var ysd = new List<ITestableObservable<long>>();
  935. var res = scheduler.Start(() =>
  936. xs.Join(ys, x => NewTimer(xsd, x.Interval, scheduler), y => NewTimer(ysd, y.Interval, scheduler), (x, y) => { if (x.Value >= 0) throw ex; return x.Value + y.Value; })
  937. );
  938. res.Messages.AssertEqual(
  939. OnError<string>(215, ex)
  940. );
  941. xs.Subscriptions.AssertEqual(
  942. Subscribe(200, 215)
  943. );
  944. ys.Subscriptions.AssertEqual(
  945. Subscribe(200, 215)
  946. );
  947. AssertDurations(xs, xsd, 215);
  948. AssertDurations(ys, ysd, 215);
  949. }
  950. private ITestableObservable<long> NewTimer(List<ITestableObservable<long>> l, TimeSpan t, TestScheduler scheduler)
  951. {
  952. var timer = scheduler.CreateColdObservable(OnNext(t.Ticks, 0L), OnCompleted<long>(t.Ticks));
  953. l.Add(timer);
  954. return timer;
  955. }
  956. private void AssertDurations<T, U>(ITestableObservable<TimeInterval<T>> xs, List<ITestableObservable<U>> xsd, long lastEnd)
  957. {
  958. Assert.Equal(xs.Messages.Where(x => x.Value.Kind == NotificationKind.OnNext && x.Time <= lastEnd).Count(), xsd.Count);
  959. foreach (var pair in xs.Messages.Zip(xsd, (x, y) => new { Item1 = x, Item2 = y }))
  960. {
  961. var start = pair.Item1.Time;
  962. var end = Math.Min(start + pair.Item1.Value.Value.Interval.Ticks, lastEnd);
  963. pair.Item2.Subscriptions.AssertEqual(
  964. Subscribe(start, end)
  965. );
  966. }
  967. }
  968. }
  969. }