JoinTest.cs 50 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Reactive;
  8. using System.Reactive.Linq;
  9. using Microsoft.Reactive.Testing;
  10. using ReactiveTests.Dummies;
  11. using Microsoft.VisualStudio.TestTools.UnitTesting;
  12. using Assert = Xunit.Assert;
  13. namespace ReactiveTests.Tests
  14. {
  15. [TestClass]
  16. public class JoinTest : ReactiveTest
  17. {
  18. [TestMethod]
  19. public void JoinOp_ArgumentChecking()
  20. {
  21. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Join(null, DummyObservable<int>.Instance, DummyFunc<int, IObservable<int>>.Instance, DummyFunc<int, IObservable<int>>.Instance, DummyFunc<int, int, int>.Instance));
  22. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Join(DummyObservable<int>.Instance, null, DummyFunc<int, IObservable<int>>.Instance, DummyFunc<int, IObservable<int>>.Instance, DummyFunc<int, int, int>.Instance));
  23. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Join(DummyObservable<int>.Instance, DummyObservable<int>.Instance, default(Func<int, IObservable<int>>), DummyFunc<int, IObservable<int>>.Instance, DummyFunc<int, int, int>.Instance));
  24. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Join(DummyObservable<int>.Instance, DummyObservable<int>.Instance, DummyFunc<int, IObservable<int>>.Instance, default(Func<int, IObservable<int>>), DummyFunc<int, int, int>.Instance));
  25. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Join(DummyObservable<int>.Instance, DummyObservable<int>.Instance, DummyFunc<int, IObservable<int>>.Instance, DummyFunc<int, IObservable<int>>.Instance, default(Func<int, int, int>)));
  26. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Join(DummyObservable<int>.Instance, DummyObservable<int>.Instance, DummyFunc<int, IObservable<int>>.Instance, DummyFunc<int, IObservable<int>>.Instance, DummyFunc<int, int, int>.Instance).Subscribe(null));
  27. }
  28. [TestMethod]
  29. public void JoinOp_Normal_I()
  30. {
  31. var scheduler = new TestScheduler();
  32. var xs = scheduler.CreateHotObservable(
  33. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  34. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  35. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  36. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  37. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  38. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  39. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  40. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(300))),
  41. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(100))),
  42. OnNext(830, new TimeInterval<int>(9, TimeSpan.FromTicks(10))),
  43. OnCompleted<TimeInterval<int>>(900)
  44. );
  45. var ys = scheduler.CreateHotObservable(
  46. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  47. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  48. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  49. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  50. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  51. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  52. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  53. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  54. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(200))),
  55. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  56. OnCompleted<TimeInterval<string>>(800)
  57. );
  58. var xsd = new List<ITestableObservable<long>>();
  59. var ysd = new List<ITestableObservable<long>>();
  60. var res = scheduler.Start(() =>
  61. xs.Join(ys, x => NewTimer(xsd, x.Interval, scheduler), y => NewTimer(ysd, y.Interval, scheduler), (x, y) => x.Value + y.Value)
  62. );
  63. res.Messages.AssertEqual(
  64. OnNext(215, "0hat"),
  65. OnNext(217, "0bat"),
  66. OnNext(219, "1hat"),
  67. OnNext(300, "3wag"),
  68. OnNext(300, "3pig"),
  69. OnNext(305, "3cup"),
  70. OnNext(310, "4wag"),
  71. OnNext(310, "4pig"),
  72. OnNext(310, "4cup"),
  73. OnNext(702, "6tin"),
  74. OnNext(710, "7tin"),
  75. OnNext(712, "6man"),
  76. OnNext(712, "7man"),
  77. OnNext(720, "8tin"),
  78. OnNext(720, "8man"),
  79. OnNext(722, "6rat"),
  80. OnNext(722, "7rat"),
  81. OnNext(722, "8rat"),
  82. OnNext(732, "7wig"),
  83. OnNext(732, "8wig"),
  84. OnNext(830, "9rat"),
  85. OnCompleted<string>(900)
  86. );
  87. xs.Subscriptions.AssertEqual(
  88. Subscribe(200, 900)
  89. );
  90. #if !NO_PERF // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
  91. ys.Subscriptions.AssertEqual(
  92. Subscribe(200, 800)
  93. );
  94. #else
  95. ys.Subscriptions.AssertEqual(
  96. Subscribe(200, 900)
  97. );
  98. #endif
  99. AssertDurations(xs, xsd, 900);
  100. AssertDurations(ys, ysd, 900);
  101. }
  102. [TestMethod]
  103. public void JoinOp_Normal_II()
  104. {
  105. var scheduler = new TestScheduler();
  106. var xs = scheduler.CreateHotObservable(
  107. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  108. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  109. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  110. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  111. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  112. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  113. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  114. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(200))),
  115. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(100))),
  116. OnCompleted<TimeInterval<int>>(721)
  117. );
  118. var ys = scheduler.CreateHotObservable(
  119. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  120. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  121. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  122. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  123. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  124. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  125. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  126. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  127. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(200))),
  128. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  129. OnCompleted<TimeInterval<string>>(990)
  130. );
  131. var xsd = new List<ITestableObservable<long>>();
  132. var ysd = new List<ITestableObservable<long>>();
  133. var res = scheduler.Start(() =>
  134. xs.Join(ys, x => NewTimer(xsd, x.Interval, scheduler), y => NewTimer(ysd, y.Interval, scheduler), (x, y) => x.Value + y.Value)
  135. );
  136. res.Messages.AssertEqual(
  137. OnNext(215, "0hat"),
  138. OnNext(217, "0bat"),
  139. OnNext(219, "1hat"),
  140. OnNext(300, "3wag"),
  141. OnNext(300, "3pig"),
  142. OnNext(305, "3cup"),
  143. OnNext(310, "4wag"),
  144. OnNext(310, "4pig"),
  145. OnNext(310, "4cup"),
  146. OnNext(702, "6tin"),
  147. OnNext(710, "7tin"),
  148. OnNext(712, "6man"),
  149. OnNext(712, "7man"),
  150. OnNext(720, "8tin"),
  151. OnNext(720, "8man"),
  152. OnNext(722, "6rat"),
  153. OnNext(722, "7rat"),
  154. OnNext(722, "8rat"),
  155. OnNext(732, "7wig"),
  156. OnNext(732, "8wig"),
  157. OnCompleted<string>(910)
  158. );
  159. #if !NO_PERF // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
  160. xs.Subscriptions.AssertEqual(
  161. Subscribe(200, 721)
  162. );
  163. #else
  164. xs.Subscriptions.AssertEqual(
  165. Subscribe(200, 910)
  166. );
  167. #endif
  168. ys.Subscriptions.AssertEqual(
  169. Subscribe(200, 910)
  170. );
  171. AssertDurations(xs, xsd, 910);
  172. AssertDurations(ys, ysd, 910);
  173. }
  174. [TestMethod]
  175. public void JoinOp_Normal_III()
  176. {
  177. var scheduler = new TestScheduler();
  178. var xs = scheduler.CreateHotObservable(
  179. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  180. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  181. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  182. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  183. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  184. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  185. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  186. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(300))),
  187. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(100))),
  188. OnNext(830, new TimeInterval<int>(9, TimeSpan.FromTicks(10))),
  189. OnCompleted<TimeInterval<int>>(900)
  190. );
  191. var ys = scheduler.CreateHotObservable(
  192. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  193. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  194. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  195. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  196. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  197. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  198. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  199. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  200. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(200))),
  201. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  202. OnCompleted<TimeInterval<string>>(800)
  203. );
  204. var xsd = new List<ITestableObservable<long>>();
  205. var ysd = new List<ITestableObservable<long>>();
  206. var res = scheduler.Start(() =>
  207. xs.Join(ys, x => NewTimer(xsd, x.Interval, scheduler).Where(_ => false), y => NewTimer(ysd, y.Interval, scheduler).Where(_ => false), (x, y) => x.Value + y.Value)
  208. );
  209. res.Messages.AssertEqual(
  210. OnNext(215, "0hat"),
  211. OnNext(217, "0bat"),
  212. OnNext(219, "1hat"),
  213. OnNext(300, "3wag"),
  214. OnNext(300, "3pig"),
  215. OnNext(305, "3cup"),
  216. OnNext(310, "4wag"),
  217. OnNext(310, "4pig"),
  218. OnNext(310, "4cup"),
  219. OnNext(702, "6tin"),
  220. OnNext(710, "7tin"),
  221. OnNext(712, "6man"),
  222. OnNext(712, "7man"),
  223. OnNext(720, "8tin"),
  224. OnNext(720, "8man"),
  225. OnNext(722, "6rat"),
  226. OnNext(722, "7rat"),
  227. OnNext(722, "8rat"),
  228. OnNext(732, "7wig"),
  229. OnNext(732, "8wig"),
  230. OnNext(830, "9rat"),
  231. OnCompleted<string>(900)
  232. );
  233. xs.Subscriptions.AssertEqual(
  234. Subscribe(200, 900)
  235. );
  236. #if !NO_PERF // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
  237. ys.Subscriptions.AssertEqual(
  238. Subscribe(200, 800)
  239. );
  240. #else
  241. ys.Subscriptions.AssertEqual(
  242. Subscribe(200, 900)
  243. );
  244. #endif
  245. AssertDurations(xs, xsd, 900);
  246. AssertDurations(ys, ysd, 900);
  247. }
  248. [TestMethod]
  249. public void JoinOp_Normal_IV()
  250. {
  251. var scheduler = new TestScheduler();
  252. var xs = scheduler.CreateHotObservable(
  253. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  254. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  255. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  256. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  257. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  258. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  259. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  260. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(200))),
  261. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(100))),
  262. OnCompleted<TimeInterval<int>>(990)
  263. );
  264. var ys = scheduler.CreateHotObservable(
  265. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  266. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  267. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  268. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  269. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  270. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  271. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  272. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  273. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(200))),
  274. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  275. OnCompleted<TimeInterval<string>>(980)
  276. );
  277. var xsd = new List<ITestableObservable<long>>();
  278. var ysd = new List<ITestableObservable<long>>();
  279. var res = scheduler.Start(() =>
  280. xs.Join(ys, x => NewTimer(xsd, x.Interval, scheduler), y => NewTimer(ysd, y.Interval, scheduler), (x, y) => x.Value + y.Value)
  281. );
  282. res.Messages.AssertEqual(
  283. OnNext(215, "0hat"),
  284. OnNext(217, "0bat"),
  285. OnNext(219, "1hat"),
  286. OnNext(300, "3wag"),
  287. OnNext(300, "3pig"),
  288. OnNext(305, "3cup"),
  289. OnNext(310, "4wag"),
  290. OnNext(310, "4pig"),
  291. OnNext(310, "4cup"),
  292. OnNext(702, "6tin"),
  293. OnNext(710, "7tin"),
  294. OnNext(712, "6man"),
  295. OnNext(712, "7man"),
  296. OnNext(720, "8tin"),
  297. OnNext(720, "8man"),
  298. OnNext(722, "6rat"),
  299. OnNext(722, "7rat"),
  300. OnNext(722, "8rat"),
  301. OnNext(732, "7wig"),
  302. OnNext(732, "8wig"),
  303. OnCompleted<string>(980)
  304. );
  305. xs.Subscriptions.AssertEqual(
  306. Subscribe(200, 980)
  307. );
  308. ys.Subscriptions.AssertEqual(
  309. Subscribe(200, 980)
  310. );
  311. AssertDurations(xs, xsd, 980);
  312. AssertDurations(ys, ysd, 980);
  313. }
  314. [TestMethod]
  315. public void JoinOp_Normal_V()
  316. {
  317. var scheduler = new TestScheduler();
  318. var xs = scheduler.CreateHotObservable(
  319. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  320. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  321. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  322. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  323. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  324. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  325. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  326. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(200))),
  327. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(100))),
  328. OnCompleted<TimeInterval<int>>(990)
  329. );
  330. var ys = scheduler.CreateHotObservable(
  331. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  332. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  333. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  334. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  335. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  336. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  337. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  338. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  339. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(200))),
  340. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  341. OnCompleted<TimeInterval<string>>(900)
  342. );
  343. var xsd = new List<ITestableObservable<long>>();
  344. var ysd = new List<ITestableObservable<long>>();
  345. var res = scheduler.Start(() =>
  346. xs.Join(ys, x => NewTimer(xsd, x.Interval, scheduler), y => NewTimer(ysd, y.Interval, scheduler), (x, y) => x.Value + y.Value)
  347. );
  348. res.Messages.AssertEqual(
  349. OnNext(215, "0hat"),
  350. OnNext(217, "0bat"),
  351. OnNext(219, "1hat"),
  352. OnNext(300, "3wag"),
  353. OnNext(300, "3pig"),
  354. OnNext(305, "3cup"),
  355. OnNext(310, "4wag"),
  356. OnNext(310, "4pig"),
  357. OnNext(310, "4cup"),
  358. OnNext(702, "6tin"),
  359. OnNext(710, "7tin"),
  360. OnNext(712, "6man"),
  361. OnNext(712, "7man"),
  362. OnNext(720, "8tin"),
  363. OnNext(720, "8man"),
  364. OnNext(722, "6rat"),
  365. OnNext(722, "7rat"),
  366. OnNext(722, "8rat"),
  367. OnNext(732, "7wig"),
  368. OnNext(732, "8wig"),
  369. OnCompleted<string>(922)
  370. );
  371. xs.Subscriptions.AssertEqual(
  372. Subscribe(200, 922)
  373. );
  374. #if !NO_PERF // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
  375. ys.Subscriptions.AssertEqual(
  376. Subscribe(200, 900)
  377. );
  378. #else
  379. ys.Subscriptions.AssertEqual(
  380. Subscribe(200, 922)
  381. );
  382. #endif
  383. AssertDurations(xs, xsd, 922);
  384. AssertDurations(ys, ysd, 922);
  385. }
  386. [TestMethod]
  387. public void JoinOp_Normal_VI()
  388. {
  389. var scheduler = new TestScheduler();
  390. var xs = scheduler.CreateHotObservable(
  391. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  392. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  393. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  394. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  395. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  396. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  397. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  398. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(30))),
  399. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(200))),
  400. OnNext(830, new TimeInterval<int>(9, TimeSpan.FromTicks(10))),
  401. OnCompleted<TimeInterval<int>>(850)
  402. );
  403. var ys = scheduler.CreateHotObservable(
  404. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  405. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  406. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  407. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  408. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  409. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  410. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  411. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  412. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(20))),
  413. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  414. OnCompleted<TimeInterval<string>>(900)
  415. );
  416. var xsd = new List<ITestableObservable<long>>();
  417. var ysd = new List<ITestableObservable<long>>();
  418. var res = scheduler.Start(() =>
  419. xs.Join(ys, x => NewTimer(xsd, x.Interval, scheduler), y => NewTimer(ysd, y.Interval, scheduler), (x, y) => x.Value + y.Value)
  420. );
  421. res.Messages.AssertEqual(
  422. OnNext(215, "0hat"),
  423. OnNext(217, "0bat"),
  424. OnNext(219, "1hat"),
  425. OnNext(300, "3wag"),
  426. OnNext(300, "3pig"),
  427. OnNext(305, "3cup"),
  428. OnNext(310, "4wag"),
  429. OnNext(310, "4pig"),
  430. OnNext(310, "4cup"),
  431. OnNext(702, "6tin"),
  432. OnNext(710, "7tin"),
  433. OnNext(712, "6man"),
  434. OnNext(712, "7man"),
  435. OnNext(720, "8tin"),
  436. OnNext(720, "8man"),
  437. OnNext(722, "6rat"),
  438. OnNext(722, "7rat"),
  439. OnNext(722, "8rat"),
  440. OnNext(732, "7wig"),
  441. OnNext(732, "8wig"),
  442. OnCompleted<string>(900)
  443. );
  444. #if !NO_PERF // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
  445. xs.Subscriptions.AssertEqual(
  446. Subscribe(200, 850)
  447. );
  448. #else
  449. xs.Subscriptions.AssertEqual(
  450. Subscribe(200, 900)
  451. );
  452. #endif
  453. ys.Subscriptions.AssertEqual(
  454. Subscribe(200, 900)
  455. );
  456. AssertDurations(xs, xsd, 900);
  457. AssertDurations(ys, ysd, 900);
  458. }
  459. [TestMethod]
  460. public void JoinOp_Normal_VII()
  461. {
  462. var scheduler = new TestScheduler();
  463. var xs = scheduler.CreateHotObservable(
  464. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  465. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  466. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  467. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  468. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  469. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  470. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  471. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(300))),
  472. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(100))),
  473. OnNext(830, new TimeInterval<int>(9, TimeSpan.FromTicks(10))),
  474. OnCompleted<TimeInterval<int>>(900)
  475. );
  476. var ys = scheduler.CreateHotObservable(
  477. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  478. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  479. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  480. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  481. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  482. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  483. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  484. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  485. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(200))),
  486. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  487. OnCompleted<TimeInterval<string>>(800)
  488. );
  489. var xsd = new List<ITestableObservable<long>>();
  490. var ysd = new List<ITestableObservable<long>>();
  491. var res = scheduler.Start(() =>
  492. xs.Join(ys, x => NewTimer(xsd, x.Interval, scheduler), y => NewTimer(ysd, y.Interval, scheduler), (x, y) => x.Value + y.Value),
  493. 713
  494. );
  495. res.Messages.AssertEqual(
  496. OnNext(215, "0hat"),
  497. OnNext(217, "0bat"),
  498. OnNext(219, "1hat"),
  499. OnNext(300, "3wag"),
  500. OnNext(300, "3pig"),
  501. OnNext(305, "3cup"),
  502. OnNext(310, "4wag"),
  503. OnNext(310, "4pig"),
  504. OnNext(310, "4cup"),
  505. OnNext(702, "6tin"),
  506. OnNext(710, "7tin"),
  507. OnNext(712, "6man"),
  508. OnNext(712, "7man")
  509. );
  510. xs.Subscriptions.AssertEqual(
  511. Subscribe(200, 713)
  512. );
  513. ys.Subscriptions.AssertEqual(
  514. Subscribe(200, 713)
  515. );
  516. AssertDurations(xs, xsd, 713);
  517. AssertDurations(ys, ysd, 713);
  518. }
  519. [TestMethod]
  520. public void JoinOp_Error_I()
  521. {
  522. var scheduler = new TestScheduler();
  523. var ex = new Exception();
  524. var xs = scheduler.CreateHotObservable(
  525. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  526. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  527. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  528. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  529. OnError<TimeInterval<int>>(310, ex)
  530. );
  531. var ys = scheduler.CreateHotObservable(
  532. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  533. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  534. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  535. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  536. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  537. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  538. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  539. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  540. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(200))),
  541. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  542. OnCompleted<TimeInterval<string>>(800)
  543. );
  544. var xsd = new List<ITestableObservable<long>>();
  545. var ysd = new List<ITestableObservable<long>>();
  546. var res = scheduler.Start(() =>
  547. xs.Join(ys, x => NewTimer(xsd, x.Interval, scheduler), y => NewTimer(ysd, y.Interval, scheduler), (x, y) => x.Value + y.Value)
  548. );
  549. res.Messages.AssertEqual(
  550. OnNext(215, "0hat"),
  551. OnNext(217, "0bat"),
  552. OnNext(219, "1hat"),
  553. OnNext(300, "3wag"),
  554. OnNext(300, "3pig"),
  555. OnNext(305, "3cup"),
  556. OnError<string>(310, ex)
  557. );
  558. xs.Subscriptions.AssertEqual(
  559. Subscribe(200, 310)
  560. );
  561. ys.Subscriptions.AssertEqual(
  562. Subscribe(200, 310)
  563. );
  564. AssertDurations(xs, xsd, 310);
  565. AssertDurations(ys, ysd, 310);
  566. }
  567. [TestMethod]
  568. public void JoinOp_Error_II()
  569. {
  570. var scheduler = new TestScheduler();
  571. var ex = new Exception();
  572. var xs = scheduler.CreateHotObservable(
  573. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  574. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  575. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  576. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  577. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  578. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  579. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  580. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(300))),
  581. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(100))),
  582. OnNext(830, new TimeInterval<int>(9, TimeSpan.FromTicks(10))),
  583. OnCompleted<TimeInterval<int>>(900)
  584. );
  585. var ys = scheduler.CreateHotObservable(
  586. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  587. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  588. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  589. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  590. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  591. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  592. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  593. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  594. OnError<TimeInterval<string>>(722, ex)
  595. );
  596. var xsd = new List<ITestableObservable<long>>();
  597. var ysd = new List<ITestableObservable<long>>();
  598. var res = scheduler.Start(() =>
  599. xs.Join(ys, x => NewTimer(xsd, x.Interval, scheduler), y => NewTimer(ysd, y.Interval, scheduler), (x, y) => x.Value + y.Value)
  600. );
  601. res.Messages.AssertEqual(
  602. OnNext(215, "0hat"),
  603. OnNext(217, "0bat"),
  604. OnNext(219, "1hat"),
  605. OnNext(300, "3wag"),
  606. OnNext(300, "3pig"),
  607. OnNext(305, "3cup"),
  608. OnNext(310, "4wag"),
  609. OnNext(310, "4pig"),
  610. OnNext(310, "4cup"),
  611. OnNext(702, "6tin"),
  612. OnNext(710, "7tin"),
  613. OnNext(712, "6man"),
  614. OnNext(712, "7man"),
  615. OnNext(720, "8tin"),
  616. OnNext(720, "8man"),
  617. OnError<string>(722, ex)
  618. );
  619. xs.Subscriptions.AssertEqual(
  620. Subscribe(200, 722)
  621. );
  622. ys.Subscriptions.AssertEqual(
  623. Subscribe(200, 722)
  624. );
  625. AssertDurations(xs, xsd, 722);
  626. AssertDurations(ys, ysd, 722);
  627. }
  628. [TestMethod]
  629. public void JoinOp_Error_III()
  630. {
  631. var scheduler = new TestScheduler();
  632. var xs = scheduler.CreateHotObservable(
  633. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  634. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  635. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  636. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  637. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  638. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  639. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  640. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(300))),
  641. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(100))),
  642. OnNext(830, new TimeInterval<int>(9, TimeSpan.FromTicks(10))),
  643. OnCompleted<TimeInterval<int>>(900)
  644. );
  645. var ys = scheduler.CreateHotObservable(
  646. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  647. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  648. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  649. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  650. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  651. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  652. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  653. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  654. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(200))),
  655. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  656. OnCompleted<TimeInterval<string>>(800)
  657. );
  658. var ex = new Exception();
  659. var xsd = new List<ITestableObservable<long>>();
  660. var ysd = new List<ITestableObservable<long>>();
  661. var res = scheduler.Start(() =>
  662. xs.Join(ys, x => NewTimer(xsd, x.Interval, scheduler).SelectMany(x.Value == 6 ? Observable.Throw<long>(ex) : Observable.Empty<long>()), y => NewTimer(ysd, y.Interval, scheduler), (x, y) => x.Value + y.Value)
  663. );
  664. res.Messages.AssertEqual(
  665. OnNext(215, "0hat"),
  666. OnNext(217, "0bat"),
  667. OnNext(219, "1hat"),
  668. OnNext(300, "3wag"),
  669. OnNext(300, "3pig"),
  670. OnNext(305, "3cup"),
  671. OnNext(310, "4wag"),
  672. OnNext(310, "4pig"),
  673. OnNext(310, "4cup"),
  674. OnNext(702, "6tin"),
  675. OnNext(710, "7tin"),
  676. OnNext(712, "6man"),
  677. OnNext(712, "7man"),
  678. OnNext(720, "8tin"),
  679. OnNext(720, "8man"),
  680. OnNext(722, "6rat"),
  681. OnNext(722, "7rat"),
  682. OnNext(722, "8rat"),
  683. OnError<string>(725, ex)
  684. );
  685. xs.Subscriptions.AssertEqual(
  686. Subscribe(200, 725)
  687. );
  688. ys.Subscriptions.AssertEqual(
  689. Subscribe(200, 725)
  690. );
  691. AssertDurations(xs, xsd, 725);
  692. AssertDurations(ys, ysd, 725);
  693. }
  694. [TestMethod]
  695. public void JoinOp_Error_IV()
  696. {
  697. var scheduler = new TestScheduler();
  698. var xs = scheduler.CreateHotObservable(
  699. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  700. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  701. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  702. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  703. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  704. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  705. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  706. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(300))),
  707. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(100))),
  708. OnNext(830, new TimeInterval<int>(9, TimeSpan.FromTicks(10))),
  709. OnCompleted<TimeInterval<int>>(900)
  710. );
  711. var ys = scheduler.CreateHotObservable(
  712. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  713. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  714. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  715. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  716. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  717. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  718. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(19))),
  719. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  720. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(200))),
  721. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  722. OnCompleted<TimeInterval<string>>(800)
  723. );
  724. var ex = new Exception();
  725. var xsd = new List<ITestableObservable<long>>();
  726. var ysd = new List<ITestableObservable<long>>();
  727. var res = scheduler.Start(() =>
  728. xs.Join(ys, x => NewTimer(xsd, x.Interval, scheduler), y => NewTimer(ysd, y.Interval, scheduler).SelectMany(y.Value == "tin" ? Observable.Throw<long>(ex) : Observable.Empty<long>()), (x, y) => x.Value + y.Value)
  729. );
  730. res.Messages.AssertEqual(
  731. OnNext(215, "0hat"),
  732. OnNext(217, "0bat"),
  733. OnNext(219, "1hat"),
  734. OnNext(300, "3wag"),
  735. OnNext(300, "3pig"),
  736. OnNext(305, "3cup"),
  737. OnNext(310, "4wag"),
  738. OnNext(310, "4pig"),
  739. OnNext(310, "4cup"),
  740. OnNext(702, "6tin"),
  741. OnNext(710, "7tin"),
  742. OnNext(712, "6man"),
  743. OnNext(712, "7man"),
  744. OnNext(720, "8tin"),
  745. OnNext(720, "8man"),
  746. OnError<string>(721, ex)
  747. );
  748. xs.Subscriptions.AssertEqual(
  749. Subscribe(200, 721)
  750. );
  751. ys.Subscriptions.AssertEqual(
  752. Subscribe(200, 721)
  753. );
  754. AssertDurations(xs, xsd, 721);
  755. AssertDurations(ys, ysd, 721);
  756. }
  757. [TestMethod]
  758. public void JoinOp_Error_V()
  759. {
  760. var scheduler = new TestScheduler();
  761. var xs = scheduler.CreateHotObservable(
  762. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  763. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  764. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  765. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  766. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  767. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  768. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  769. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(300))),
  770. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(100))),
  771. OnNext(830, new TimeInterval<int>(9, TimeSpan.FromTicks(10))),
  772. OnCompleted<TimeInterval<int>>(900)
  773. );
  774. var ys = scheduler.CreateHotObservable(
  775. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  776. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  777. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  778. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  779. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  780. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  781. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  782. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  783. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(200))),
  784. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  785. OnCompleted<TimeInterval<string>>(800)
  786. );
  787. var ex = new Exception();
  788. var ysd = new List<ITestableObservable<long>>();
  789. var res = scheduler.Start(() =>
  790. xs.Join(ys, x => { if (x.Value >= 0) { throw ex; } return Observable.Empty<long>(); }, y => NewTimer(ysd, y.Interval, scheduler), (x, y) => x.Value + y.Value)
  791. );
  792. res.Messages.AssertEqual(
  793. OnError<string>(210, ex)
  794. );
  795. xs.Subscriptions.AssertEqual(
  796. Subscribe(200, 210)
  797. );
  798. ys.Subscriptions.AssertEqual(
  799. Subscribe(200, 210)
  800. );
  801. AssertDurations(ys, ysd, 210);
  802. }
  803. [TestMethod]
  804. public void JoinOp_Error_VI()
  805. {
  806. var scheduler = new TestScheduler();
  807. var xs = scheduler.CreateHotObservable(
  808. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  809. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  810. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  811. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  812. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  813. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  814. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  815. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(300))),
  816. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(100))),
  817. OnNext(830, new TimeInterval<int>(9, TimeSpan.FromTicks(10))),
  818. OnCompleted<TimeInterval<int>>(900)
  819. );
  820. var ys = scheduler.CreateHotObservable(
  821. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  822. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  823. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  824. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  825. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  826. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  827. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  828. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  829. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(200))),
  830. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  831. OnCompleted<TimeInterval<string>>(800)
  832. );
  833. var ex = new Exception();
  834. var xsd = new List<ITestableObservable<long>>();
  835. var res = scheduler.Start(() =>
  836. xs.Join(ys, x => NewTimer(xsd, x.Interval, scheduler), y => { if (y.Value.Length >= 0) { throw ex; } return Observable.Empty<long>(); }, (x, y) => x.Value + y.Value)
  837. );
  838. res.Messages.AssertEqual(
  839. OnError<string>(215, ex)
  840. );
  841. xs.Subscriptions.AssertEqual(
  842. Subscribe(200, 215)
  843. );
  844. ys.Subscriptions.AssertEqual(
  845. Subscribe(200, 215)
  846. );
  847. AssertDurations(xs, xsd, 215);
  848. }
  849. [TestMethod]
  850. public void JoinOp_Error_VII()
  851. {
  852. var scheduler = new TestScheduler();
  853. var xs = scheduler.CreateHotObservable(
  854. OnNext(215, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  855. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  856. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  857. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  858. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  859. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  860. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  861. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(300))),
  862. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(100))),
  863. OnNext(830, new TimeInterval<int>(9, TimeSpan.FromTicks(10))),
  864. OnCompleted<TimeInterval<int>>(900)
  865. );
  866. var ys = scheduler.CreateHotObservable(
  867. OnNext(210, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  868. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  869. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  870. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  871. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  872. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  873. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  874. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  875. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(200))),
  876. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  877. OnCompleted<TimeInterval<string>>(800)
  878. );
  879. var ex = new Exception();
  880. var xsd = new List<ITestableObservable<long>>();
  881. var ysd = new List<ITestableObservable<long>>();
  882. var res = scheduler.Start(() =>
  883. xs.Join(ys, x => NewTimer(xsd, x.Interval, scheduler), y => NewTimer(ysd, y.Interval, scheduler), (x, y) => { if (x.Value >= 0) { throw ex; } return x.Value + y.Value; })
  884. );
  885. res.Messages.AssertEqual(
  886. OnError<string>(215, ex)
  887. );
  888. xs.Subscriptions.AssertEqual(
  889. Subscribe(200, 215)
  890. );
  891. ys.Subscriptions.AssertEqual(
  892. Subscribe(200, 215)
  893. );
  894. AssertDurations(xs, xsd, 215);
  895. AssertDurations(ys, ysd, 215);
  896. }
  897. [TestMethod]
  898. public void JoinOp_Error_VIII()
  899. {
  900. var scheduler = new TestScheduler();
  901. var xs = scheduler.CreateHotObservable(
  902. OnNext(210, new TimeInterval<int>(0, TimeSpan.FromTicks(10))),
  903. OnNext(219, new TimeInterval<int>(1, TimeSpan.FromTicks(5))),
  904. OnNext(240, new TimeInterval<int>(2, TimeSpan.FromTicks(10))),
  905. OnNext(300, new TimeInterval<int>(3, TimeSpan.FromTicks(100))),
  906. OnNext(310, new TimeInterval<int>(4, TimeSpan.FromTicks(80))),
  907. OnNext(500, new TimeInterval<int>(5, TimeSpan.FromTicks(90))),
  908. OnNext(700, new TimeInterval<int>(6, TimeSpan.FromTicks(25))),
  909. OnNext(710, new TimeInterval<int>(7, TimeSpan.FromTicks(300))),
  910. OnNext(720, new TimeInterval<int>(8, TimeSpan.FromTicks(100))),
  911. OnNext(830, new TimeInterval<int>(9, TimeSpan.FromTicks(10))),
  912. OnCompleted<TimeInterval<int>>(900)
  913. );
  914. var ys = scheduler.CreateHotObservable(
  915. OnNext(215, new TimeInterval<string>("hat", TimeSpan.FromTicks(20))),
  916. OnNext(217, new TimeInterval<string>("bat", TimeSpan.FromTicks(1))),
  917. OnNext(290, new TimeInterval<string>("wag", TimeSpan.FromTicks(200))),
  918. OnNext(300, new TimeInterval<string>("pig", TimeSpan.FromTicks(10))),
  919. OnNext(305, new TimeInterval<string>("cup", TimeSpan.FromTicks(50))),
  920. OnNext(600, new TimeInterval<string>("yak", TimeSpan.FromTicks(90))),
  921. OnNext(702, new TimeInterval<string>("tin", TimeSpan.FromTicks(20))),
  922. OnNext(712, new TimeInterval<string>("man", TimeSpan.FromTicks(10))),
  923. OnNext(722, new TimeInterval<string>("rat", TimeSpan.FromTicks(200))),
  924. OnNext(732, new TimeInterval<string>("wig", TimeSpan.FromTicks(5))),
  925. OnCompleted<TimeInterval<string>>(800)
  926. );
  927. var ex = new Exception();
  928. var xsd = new List<ITestableObservable<long>>();
  929. var ysd = new List<ITestableObservable<long>>();
  930. var res = scheduler.Start(() =>
  931. xs.Join(ys, x => NewTimer(xsd, x.Interval, scheduler), y => NewTimer(ysd, y.Interval, scheduler), (x, y) => { if (x.Value >= 0) { throw ex; } return x.Value + y.Value; })
  932. );
  933. res.Messages.AssertEqual(
  934. OnError<string>(215, ex)
  935. );
  936. xs.Subscriptions.AssertEqual(
  937. Subscribe(200, 215)
  938. );
  939. ys.Subscriptions.AssertEqual(
  940. Subscribe(200, 215)
  941. );
  942. AssertDurations(xs, xsd, 215);
  943. AssertDurations(ys, ysd, 215);
  944. }
  945. private ITestableObservable<long> NewTimer(List<ITestableObservable<long>> l, TimeSpan t, TestScheduler scheduler)
  946. {
  947. var timer = scheduler.CreateColdObservable(OnNext(t.Ticks, 0L), OnCompleted<long>(t.Ticks));
  948. l.Add(timer);
  949. return timer;
  950. }
  951. private void AssertDurations<T, U>(ITestableObservable<TimeInterval<T>> xs, List<ITestableObservable<U>> xsd, long lastEnd)
  952. {
  953. Assert.Equal(xs.Messages.Where(x => x.Value.Kind == NotificationKind.OnNext && x.Time <= lastEnd).Count(), xsd.Count);
  954. foreach (var pair in xs.Messages.Zip(xsd, (x, y) => new { Item1 = x, Item2 = y }))
  955. {
  956. var start = pair.Item1.Time;
  957. var end = Math.Min(start + pair.Item1.Value.Value.Interval.Ticks, lastEnd);
  958. pair.Item2.Subscriptions.AssertEqual(
  959. Subscribe(start, end)
  960. );
  961. }
  962. }
  963. }
  964. }