ForkJoinTest.cs 18 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Reactive.Linq;
  8. using Microsoft.Reactive.Testing;
  9. using ReactiveTests.Dummies;
  10. using Xunit;
  11. namespace ReactiveTests.Tests
  12. {
  13. public class ForkJoinTest : ReactiveTest
  14. {
  15. [Fact]
  16. public void ForkJoin_ArgumentChecking()
  17. {
  18. var someObservable = DummyObservable<int>.Instance;
  19. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableEx.ForkJoin(someObservable, someObservable, (Func<int, int, int>)null));
  20. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableEx.ForkJoin(someObservable, (IObservable<int>)null, (_, __) => _ + __));
  21. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableEx.ForkJoin((IObservable<int>)null, someObservable, (_, __) => _ + __));
  22. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableEx.ForkJoin((IObservable<int>[])null));
  23. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableEx.ForkJoin((IEnumerable<IObservable<int>>)null));
  24. }
  25. [Fact]
  26. public void ForkJoin_EmptyEmpty()
  27. {
  28. var scheduler = new TestScheduler();
  29. var msgs1 = new[] {
  30. OnNext(150, 1),
  31. OnCompleted<int>(230)
  32. };
  33. var msgs2 = new[] {
  34. OnNext(150, 1),
  35. OnCompleted<int>(250)
  36. };
  37. var o = scheduler.CreateHotObservable(msgs1);
  38. var e = scheduler.CreateHotObservable(msgs2);
  39. var res = scheduler.Start(() => e.ForkJoin(o, (x, y) => x + y));
  40. res.Messages.AssertEqual(
  41. OnCompleted<int>(250)
  42. );
  43. }
  44. [Fact]
  45. public void ForkJoin_None()
  46. {
  47. var scheduler = new TestScheduler();
  48. var res = scheduler.Start(() => ObservableEx.ForkJoin<int>());
  49. res.Messages.AssertEqual(
  50. OnCompleted<int[]>(200)
  51. );
  52. }
  53. [Fact]
  54. public void ForkJoin_EmptyReturn()
  55. {
  56. var scheduler = new TestScheduler();
  57. var msgs1 = new[] {
  58. OnNext(150, 1),
  59. OnCompleted<int>(230)
  60. };
  61. var msgs2 = new[] {
  62. OnNext(150, 1),
  63. OnNext(210, 2),
  64. OnCompleted<int>(250)
  65. };
  66. var o = scheduler.CreateHotObservable(msgs1);
  67. var e = scheduler.CreateHotObservable(msgs2);
  68. var res = scheduler.Start(() => e.ForkJoin(o, (x, y) => x + y));
  69. res.Messages.AssertEqual(
  70. OnCompleted<int>(250)
  71. );
  72. }
  73. [Fact]
  74. public void ForkJoin_ReturnEmpty()
  75. {
  76. var scheduler = new TestScheduler();
  77. var msgs1 = new[] {
  78. OnNext(150, 1),
  79. OnNext(210, 2),
  80. OnCompleted<int>(230)
  81. };
  82. var msgs2 = new[] {
  83. OnNext(150, 1),
  84. OnCompleted<int>(250)
  85. };
  86. var o = scheduler.CreateHotObservable(msgs1);
  87. var e = scheduler.CreateHotObservable(msgs2);
  88. var res = scheduler.Start(() => e.ForkJoin(o, (x, y) => x + y));
  89. res.Messages.AssertEqual(
  90. OnCompleted<int>(250)
  91. );
  92. }
  93. [Fact]
  94. public void ForkJoin_ReturnReturn()
  95. {
  96. var scheduler = new TestScheduler();
  97. var msgs1 = new[] {
  98. OnNext(150, 1),
  99. OnNext(210, 2),
  100. OnCompleted<int>(230)
  101. };
  102. var msgs2 = new[] {
  103. OnNext(150, 1),
  104. OnNext(220, 3),
  105. OnCompleted<int>(250)
  106. };
  107. var o = scheduler.CreateHotObservable(msgs1);
  108. var e = scheduler.CreateHotObservable(msgs2);
  109. var res = scheduler.Start(() => e.ForkJoin(o, (x, y) => x + y));
  110. res.Messages.AssertEqual(
  111. OnNext(250, 2 + 3),
  112. OnCompleted<int>(250)
  113. );
  114. }
  115. [Fact]
  116. public void ForkJoin_EmptyThrow()
  117. {
  118. var ex = new Exception();
  119. var scheduler = new TestScheduler();
  120. var msgs1 = new[] {
  121. OnNext(150, 1),
  122. OnCompleted<int>(230)
  123. };
  124. var msgs2 = new[] {
  125. OnNext(150, 1),
  126. OnError<int>(210, ex),
  127. OnCompleted<int>(250)
  128. };
  129. var o = scheduler.CreateHotObservable(msgs1);
  130. var e = scheduler.CreateHotObservable(msgs2);
  131. var res = scheduler.Start(() => e.ForkJoin(o, (x, y) => x + y));
  132. res.Messages.AssertEqual(
  133. OnError<int>(210, ex)
  134. );
  135. }
  136. [Fact]
  137. public void ForkJoin_ThrowEmpty()
  138. {
  139. var ex = new Exception();
  140. var scheduler = new TestScheduler();
  141. var msgs1 = new[] {
  142. OnNext(150, 1),
  143. OnError<int>(210, ex),
  144. OnCompleted<int>(230)
  145. };
  146. var msgs2 = new[] {
  147. OnNext(150, 1),
  148. OnCompleted<int>(250)
  149. };
  150. var o = scheduler.CreateHotObservable(msgs1);
  151. var e = scheduler.CreateHotObservable(msgs2);
  152. var res = scheduler.Start(() => e.ForkJoin(o, (x, y) => x + y));
  153. res.Messages.AssertEqual(
  154. OnError<int>(210, ex)
  155. );
  156. }
  157. [Fact]
  158. public void ForkJoin_ReturnThrow()
  159. {
  160. var ex = new Exception();
  161. var scheduler = new TestScheduler();
  162. var msgs1 = new[] {
  163. OnNext(150, 1),
  164. OnNext(210, 2),
  165. OnCompleted<int>(230)
  166. };
  167. var msgs2 = new[] {
  168. OnNext(150, 1),
  169. OnError<int>(220, ex),
  170. OnCompleted<int>(250)
  171. };
  172. var o = scheduler.CreateHotObservable(msgs1);
  173. var e = scheduler.CreateHotObservable(msgs2);
  174. var res = scheduler.Start(() => e.ForkJoin(o, (x, y) => x + y));
  175. res.Messages.AssertEqual(
  176. OnError<int>(220, ex)
  177. );
  178. }
  179. [Fact]
  180. public void ForkJoin_ThrowReturn()
  181. {
  182. var ex = new Exception();
  183. var scheduler = new TestScheduler();
  184. var msgs1 = new[] {
  185. OnNext(150, 1),
  186. OnError<int>(220, ex),
  187. OnCompleted<int>(230)
  188. };
  189. var msgs2 = new[] {
  190. OnNext(150, 1),
  191. OnNext(210, 2),
  192. OnCompleted<int>(250)
  193. };
  194. var o = scheduler.CreateHotObservable(msgs1);
  195. var e = scheduler.CreateHotObservable(msgs2);
  196. var res = scheduler.Start(() => e.ForkJoin(o, (x, y) => x + y));
  197. res.Messages.AssertEqual(
  198. OnError<int>(220, ex)
  199. );
  200. }
  201. [Fact]
  202. public void ForkJoin_Binary()
  203. {
  204. var scheduler = new TestScheduler();
  205. var msgs1 = new[] {
  206. OnNext(150, 1),
  207. OnNext(215, 2),
  208. OnNext(225, 4),
  209. OnCompleted<int>(230)
  210. };
  211. var msgs2 = new[] {
  212. OnNext(150, 1),
  213. OnNext(235, 6),
  214. OnNext(240, 7),
  215. OnCompleted<int>(250)
  216. };
  217. var o = scheduler.CreateHotObservable(msgs1);
  218. var e = scheduler.CreateHotObservable(msgs2);
  219. var res = scheduler.Start(() => e.ForkJoin(o, (x, y) => x + y));
  220. res.Messages.AssertEqual(
  221. OnNext(250, 4 + 7), // TODO: fix ForkJoin behavior
  222. OnCompleted<int>(250)
  223. );
  224. }
  225. [Fact]
  226. public void ForkJoin_NaryParams()
  227. {
  228. var scheduler = new TestScheduler();
  229. var msgs1 = new[] {
  230. OnNext(150, 1),
  231. OnNext(215, 2),
  232. OnNext(225, 4),
  233. OnCompleted<int>(230)
  234. };
  235. var msgs2 = new[] {
  236. OnNext(150, 1),
  237. OnNext(235, 6),
  238. OnNext(240, 7),
  239. OnCompleted<int>(250)
  240. };
  241. var msgs3 = new[] {
  242. OnNext(150, 1),
  243. OnNext(230, 3),
  244. OnNext(245, 5),
  245. OnCompleted<int>(270)
  246. };
  247. var o1 = scheduler.CreateHotObservable(msgs1);
  248. var o2 = scheduler.CreateHotObservable(msgs2);
  249. var o3 = scheduler.CreateHotObservable(msgs3);
  250. var res = scheduler.Start(() => ObservableEx.ForkJoin(o1, o2, o3));
  251. res.Messages.AssertEqual(
  252. OnNext<int[]>(270, l => l.SequenceEqual(new[] { 4, 7, 5 })), // TODO: fix ForkJoin behavior
  253. OnCompleted<int[]>(270)
  254. );
  255. }
  256. [Fact]
  257. public void ForkJoin_NaryParamsEmpty()
  258. {
  259. var scheduler = new TestScheduler();
  260. var msgs1 = new[] {
  261. OnNext(150, 1),
  262. OnNext(215, 2),
  263. OnNext(225, 4),
  264. OnCompleted<int>(230)
  265. };
  266. var msgs2 = new[] {
  267. OnNext(150, 1),
  268. OnNext(235, 6),
  269. OnNext(240, 7),
  270. OnCompleted<int>(250)
  271. };
  272. var msgs3 = new[] {
  273. OnCompleted<int>(270)
  274. };
  275. var o1 = scheduler.CreateHotObservable(msgs1);
  276. var o2 = scheduler.CreateHotObservable(msgs2);
  277. var o3 = scheduler.CreateHotObservable(msgs3);
  278. var res = scheduler.Start(() => ObservableEx.ForkJoin(o1, o2, o3));
  279. res.Messages.AssertEqual(
  280. OnCompleted<int[]>(270)
  281. );
  282. }
  283. [Fact]
  284. public void ForkJoin_NaryParamsEmptyBeforeEnd()
  285. {
  286. var scheduler = new TestScheduler();
  287. var msgs1 = new[] {
  288. OnNext(150, 1),
  289. OnNext(215, 2),
  290. OnNext(225, 4),
  291. OnCompleted<int>(230)
  292. };
  293. var msgs2 = new[] {
  294. OnCompleted<int>(235)
  295. };
  296. var msgs3 = new[] {
  297. OnNext(150, 1),
  298. OnNext(230, 3),
  299. OnNext(245, 5),
  300. OnCompleted<int>(270)
  301. };
  302. var o1 = scheduler.CreateHotObservable(msgs1);
  303. var o2 = scheduler.CreateHotObservable(msgs2);
  304. var o3 = scheduler.CreateHotObservable(msgs3);
  305. var res = scheduler.Start(() => ObservableEx.ForkJoin(o1, o2, o3));
  306. res.Messages.AssertEqual(
  307. OnCompleted<int[]>(235)
  308. );
  309. }
  310. [Fact]
  311. public void ForkJoin_Nary_Immediate()
  312. {
  313. ObservableEx.ForkJoin(Observable.Return(1), Observable.Return(2)).First().SequenceEqual(new[] { 1, 2 });
  314. }
  315. [Fact]
  316. public void ForkJoin_Nary_Virtual_And_Immediate()
  317. {
  318. var scheduler = new TestScheduler();
  319. var msgs1 = new[] {
  320. OnNext(150, 1),
  321. OnNext(215, 2),
  322. OnNext(225, 4),
  323. OnCompleted<int>(230)
  324. };
  325. var msgs2 = new[] {
  326. OnNext(150, 1),
  327. OnNext(235, 6),
  328. OnNext(240, 7),
  329. OnCompleted<int>(250)
  330. };
  331. var msgs3 = new[] {
  332. OnNext(150, 1),
  333. OnNext(230, 3),
  334. OnNext(245, 5),
  335. OnCompleted<int>(270)
  336. };
  337. var o1 = scheduler.CreateHotObservable(msgs1);
  338. var o2 = scheduler.CreateHotObservable(msgs2);
  339. var o3 = scheduler.CreateHotObservable(msgs3);
  340. var res = scheduler.Start(() => ObservableEx.ForkJoin(new List<IObservable<int>> { o1, o2, o3, Observable.Return(20) }));
  341. res.Messages.AssertEqual(
  342. OnNext<int[]>(270, l => l.SequenceEqual(new[] { 4, 7, 5, 20 })),
  343. OnCompleted<int[]>(270)
  344. );
  345. }
  346. [Fact]
  347. public void ForkJoin_Nary_Immediate_And_Virtual()
  348. {
  349. var scheduler = new TestScheduler();
  350. var msgs1 = new[] {
  351. OnNext(150, 1),
  352. OnNext(215, 2),
  353. OnNext(225, 4),
  354. OnCompleted<int>(230)
  355. };
  356. var msgs2 = new[] {
  357. OnNext(150, 1),
  358. OnNext(235, 6),
  359. OnNext(240, 7),
  360. OnCompleted<int>(250)
  361. };
  362. var msgs3 = new[] {
  363. OnNext(150, 1),
  364. OnNext(230, 3),
  365. OnNext(245, 5),
  366. OnCompleted<int>(270)
  367. };
  368. var o1 = scheduler.CreateHotObservable(msgs1);
  369. var o2 = scheduler.CreateHotObservable(msgs2);
  370. var o3 = scheduler.CreateHotObservable(msgs3);
  371. var res = scheduler.Start(() => ObservableEx.ForkJoin(new List<IObservable<int>> { Observable.Return(20), o1, o2, o3 }));
  372. res.Messages.AssertEqual(
  373. OnNext<int[]>(270, l => l.SequenceEqual(new[] { 20, 4, 7, 5 })),
  374. OnCompleted<int[]>(270)
  375. );
  376. }
  377. [Fact]
  378. public void ForkJoin_Nary()
  379. {
  380. var scheduler = new TestScheduler();
  381. var msgs1 = new[] {
  382. OnNext(150, 1),
  383. OnNext(215, 2),
  384. OnNext(225, 4),
  385. OnCompleted<int>(230)
  386. };
  387. var msgs2 = new[] {
  388. OnNext(150, 1),
  389. OnNext(235, 6),
  390. OnNext(240, 7),
  391. OnCompleted<int>(250)
  392. };
  393. var msgs3 = new[] {
  394. OnNext(150, 1),
  395. OnNext(230, 3),
  396. OnNext(245, 5),
  397. OnCompleted<int>(270)
  398. };
  399. var o1 = scheduler.CreateHotObservable(msgs1);
  400. var o2 = scheduler.CreateHotObservable(msgs2);
  401. var o3 = scheduler.CreateHotObservable(msgs3);
  402. var res = scheduler.Start(() => ObservableEx.ForkJoin(new List<IObservable<int>> { o1, o2, o3 }));
  403. res.Messages.AssertEqual(
  404. OnNext<int[]>(270, l => l.SequenceEqual(new[] { 4, 7, 5 })), // TODO: fix ForkJoin behavior
  405. OnCompleted<int[]>(270)
  406. );
  407. }
  408. [Fact]
  409. public void Bug_1302_SelectorThrows_LeftLast()
  410. {
  411. var scheduler = new TestScheduler();
  412. var xs = scheduler.CreateHotObservable(
  413. OnNext(210, 1),
  414. OnCompleted<int>(220)
  415. );
  416. var ys = scheduler.CreateHotObservable(
  417. OnNext(215, 2),
  418. OnCompleted<int>(217)
  419. );
  420. var ex = new Exception();
  421. var results = scheduler.Start(() => xs.ForkJoin<int, int, int>(ys, (x, y) => { throw ex; }));
  422. results.Messages.AssertEqual(
  423. OnError<int>(220, ex)
  424. );
  425. xs.Subscriptions.AssertEqual(
  426. Subscribe(200, 220)
  427. );
  428. ys.Subscriptions.AssertEqual(
  429. Subscribe(200, 217)
  430. );
  431. }
  432. [Fact]
  433. public void Bug_1302_SelectorThrows_RightLast()
  434. {
  435. var scheduler = new TestScheduler();
  436. var xs = scheduler.CreateHotObservable(
  437. OnNext(210, 1),
  438. OnCompleted<int>(217)
  439. );
  440. var ys = scheduler.CreateHotObservable(
  441. OnNext(215, 2),
  442. OnCompleted<int>(220)
  443. );
  444. var ex = new Exception();
  445. var results = scheduler.Start(() => xs.ForkJoin<int, int, int>(ys, (x, y) => { throw ex; }));
  446. results.Messages.AssertEqual(
  447. OnError<int>(220, ex)
  448. );
  449. xs.Subscriptions.AssertEqual(
  450. Subscribe(200, 217)
  451. );
  452. ys.Subscriptions.AssertEqual(
  453. Subscribe(200, 220)
  454. );
  455. }
  456. [Fact]
  457. public void Bug_1302_RightLast_NoLeft()
  458. {
  459. var scheduler = new TestScheduler();
  460. var xs = scheduler.CreateHotObservable(
  461. OnCompleted<int>(217)
  462. );
  463. var ys = scheduler.CreateHotObservable(
  464. OnNext(215, 2),
  465. OnCompleted<int>(220)
  466. );
  467. var results = scheduler.Start(() => xs.ForkJoin<int, int, int>(ys, (x, y) => x + y));
  468. results.Messages.AssertEqual(
  469. OnCompleted<int>(220)
  470. );
  471. xs.Subscriptions.AssertEqual(
  472. Subscribe(200, 217)
  473. );
  474. ys.Subscriptions.AssertEqual(
  475. Subscribe(200, 220)
  476. );
  477. }
  478. [Fact]
  479. public void Bug_1302_RightLast_NoRight()
  480. {
  481. var scheduler = new TestScheduler();
  482. var xs = scheduler.CreateHotObservable(
  483. OnNext(215, 2),
  484. OnCompleted<int>(217)
  485. );
  486. var ys = scheduler.CreateHotObservable(
  487. OnCompleted<int>(220)
  488. );
  489. var results = scheduler.Start(() => xs.ForkJoin<int, int, int>(ys, (x, y) => x + y));
  490. results.Messages.AssertEqual(
  491. OnCompleted<int>(220)
  492. );
  493. xs.Subscriptions.AssertEqual(
  494. Subscribe(200, 217)
  495. );
  496. ys.Subscriptions.AssertEqual(
  497. Subscribe(200, 220)
  498. );
  499. }
  500. }
  501. }