AmbTest.cs 17 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Reactive.Linq;
  8. using Microsoft.Reactive.Testing;
  9. using ReactiveTests.Dummies;
  10. using Microsoft.VisualStudio.TestTools.UnitTesting;
  11. using Assert = Xunit.Assert;
  12. namespace ReactiveTests.Tests
  13. {
  14. [TestClass]
  15. public class AmbTest : ReactiveTest
  16. {
  17. [TestMethod]
  18. public void Amb_ArgumentChecking()
  19. {
  20. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Amb((IObservable<int>[])null));
  21. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Amb((IEnumerable<IObservable<int>>)null));
  22. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Amb(null, DummyObservable<int>.Instance));
  23. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Amb(DummyObservable<int>.Instance, null));
  24. }
  25. [TestMethod]
  26. public void Amb_Never2()
  27. {
  28. var scheduler = new TestScheduler();
  29. var l = scheduler.CreateHotObservable(
  30. OnNext(150, 1)
  31. );
  32. var r = scheduler.CreateHotObservable(
  33. OnNext(150, 1)
  34. );
  35. var res = scheduler.Start(() =>
  36. l.Amb(r)
  37. );
  38. res.Messages.AssertEqual(
  39. );
  40. l.Subscriptions.AssertEqual(
  41. Subscribe(200, 1000)
  42. );
  43. r.Subscriptions.AssertEqual(
  44. Subscribe(200, 1000)
  45. );
  46. }
  47. [TestMethod]
  48. public void Amb_Never3()
  49. {
  50. var scheduler = new TestScheduler();
  51. var n1 = scheduler.CreateHotObservable(
  52. OnNext(150, 1)
  53. );
  54. var n2 = scheduler.CreateHotObservable(
  55. OnNext(150, 1)
  56. );
  57. var n3 = scheduler.CreateHotObservable(
  58. OnNext(150, 1)
  59. );
  60. var res = scheduler.Start(() =>
  61. new[] { n1, n2, n3 }.Amb()
  62. );
  63. res.Messages.AssertEqual(
  64. );
  65. n1.Subscriptions.AssertEqual(
  66. Subscribe(200, 1000)
  67. );
  68. n2.Subscriptions.AssertEqual(
  69. Subscribe(200, 1000)
  70. );
  71. n3.Subscriptions.AssertEqual(
  72. Subscribe(200, 1000)
  73. );
  74. }
  75. [TestMethod]
  76. public void Amb_Never3_Params()
  77. {
  78. var scheduler = new TestScheduler();
  79. var n1 = scheduler.CreateHotObservable(
  80. OnNext(150, 1)
  81. );
  82. var n2 = scheduler.CreateHotObservable(
  83. OnNext(150, 1)
  84. );
  85. var n3 = scheduler.CreateHotObservable(
  86. OnNext(150, 1)
  87. );
  88. var res = scheduler.Start(() =>
  89. Observable.Amb(n1, n2, n3)
  90. );
  91. res.Messages.AssertEqual(
  92. );
  93. n1.Subscriptions.AssertEqual(
  94. Subscribe(200, 1000)
  95. );
  96. n2.Subscriptions.AssertEqual(
  97. Subscribe(200, 1000)
  98. );
  99. n3.Subscriptions.AssertEqual(
  100. Subscribe(200, 1000)
  101. );
  102. }
  103. [TestMethod]
  104. public void Amb_NeverEmpty()
  105. {
  106. var scheduler = new TestScheduler();
  107. var n = scheduler.CreateHotObservable(
  108. OnNext(150, 1)
  109. );
  110. var e = scheduler.CreateHotObservable(
  111. OnNext(150, 1),
  112. OnCompleted<int>(225)
  113. );
  114. var res = scheduler.Start(() =>
  115. n.Amb(e)
  116. );
  117. res.Messages.AssertEqual(
  118. OnCompleted<int>(225)
  119. );
  120. n.Subscriptions.AssertEqual(
  121. Subscribe(200, 225)
  122. );
  123. e.Subscriptions.AssertEqual(
  124. Subscribe(200, 225)
  125. );
  126. }
  127. [TestMethod]
  128. public void Amb_EmptyNever()
  129. {
  130. var scheduler = new TestScheduler();
  131. var n = scheduler.CreateHotObservable(
  132. OnNext(150, 1)
  133. );
  134. var e = scheduler.CreateHotObservable(
  135. OnNext(150, 1),
  136. OnCompleted<int>(225)
  137. );
  138. var res = scheduler.Start(() =>
  139. e.Amb(n)
  140. );
  141. res.Messages.AssertEqual(
  142. OnCompleted<int>(225)
  143. );
  144. n.Subscriptions.AssertEqual(
  145. Subscribe(200, 225)
  146. );
  147. e.Subscriptions.AssertEqual(
  148. Subscribe(200, 225)
  149. );
  150. }
  151. [TestMethod]
  152. public void Amb_RegularShouldDisposeLoser()
  153. {
  154. var scheduler = new TestScheduler();
  155. var o1 = scheduler.CreateHotObservable(
  156. OnNext(150, 1),
  157. OnNext(210, 2),
  158. OnCompleted<int>(240)
  159. );
  160. var o2 = scheduler.CreateHotObservable(
  161. OnNext(150, 1),
  162. OnNext(220, 3),
  163. OnCompleted<int>(250)
  164. );
  165. var res = scheduler.Start(() =>
  166. o1.Amb(o2)
  167. );
  168. res.Messages.AssertEqual(
  169. OnNext(210, 2),
  170. OnCompleted<int>(240)
  171. );
  172. o1.Subscriptions.AssertEqual(
  173. Subscribe(200, 240)
  174. );
  175. o2.Subscriptions.AssertEqual(
  176. Subscribe(200, 210)
  177. );
  178. }
  179. [TestMethod]
  180. public void Amb_WinnerThrows()
  181. {
  182. var scheduler = new TestScheduler();
  183. var ex = new Exception();
  184. var o1 = scheduler.CreateHotObservable(
  185. OnNext(150, 1),
  186. OnNext(210, 2),
  187. OnError<int>(220, ex)
  188. );
  189. var o2 = scheduler.CreateHotObservable(
  190. OnNext(150, 1),
  191. OnNext(220, 3),
  192. OnCompleted<int>(250)
  193. );
  194. var res = scheduler.Start(() =>
  195. o1.Amb(o2)
  196. );
  197. res.Messages.AssertEqual(
  198. OnNext(210, 2),
  199. OnError<int>(220, ex)
  200. );
  201. o1.Subscriptions.AssertEqual(
  202. Subscribe(200, 220)
  203. );
  204. o2.Subscriptions.AssertEqual(
  205. Subscribe(200, 210)
  206. );
  207. }
  208. [TestMethod]
  209. public void Amb_LoserThrows()
  210. {
  211. var scheduler = new TestScheduler();
  212. var ex = new Exception();
  213. var o1 = scheduler.CreateHotObservable(
  214. OnNext(150, 1),
  215. OnNext(220, 2),
  216. OnError<int>(230, ex)
  217. );
  218. var o2 = scheduler.CreateHotObservable(
  219. OnNext(150, 1),
  220. OnNext(210, 3),
  221. OnCompleted<int>(250)
  222. );
  223. var res = scheduler.Start(() =>
  224. o1.Amb(o2)
  225. );
  226. res.Messages.AssertEqual(
  227. OnNext(210, 3),
  228. OnCompleted<int>(250)
  229. );
  230. o1.Subscriptions.AssertEqual(
  231. Subscribe(200, 210)
  232. );
  233. o2.Subscriptions.AssertEqual(
  234. Subscribe(200, 250)
  235. );
  236. }
  237. [TestMethod]
  238. public void Amb_ThrowsBeforeElectionLeft()
  239. {
  240. var scheduler = new TestScheduler();
  241. var ex = new Exception();
  242. var o1 = scheduler.CreateHotObservable(
  243. OnNext(150, 1),
  244. OnError<int>(210, ex)
  245. );
  246. var o2 = scheduler.CreateHotObservable(
  247. OnNext(150, 1),
  248. OnNext(220, 3),
  249. OnCompleted<int>(250)
  250. );
  251. var res = scheduler.Start(() =>
  252. o1.Amb(o2)
  253. );
  254. res.Messages.AssertEqual(
  255. OnError<int>(210, ex)
  256. );
  257. o1.Subscriptions.AssertEqual(
  258. Subscribe(200, 210)
  259. );
  260. o2.Subscriptions.AssertEqual(
  261. Subscribe(200, 210)
  262. );
  263. }
  264. [TestMethod]
  265. public void Amb_ThrowsBeforeElectionRight()
  266. {
  267. var scheduler = new TestScheduler();
  268. var ex = new Exception();
  269. var o1 = scheduler.CreateHotObservable(
  270. OnNext(150, 1),
  271. OnNext(220, 3),
  272. OnCompleted<int>(250)
  273. );
  274. var o2 = scheduler.CreateHotObservable(
  275. OnNext(150, 1),
  276. OnError<int>(210, ex)
  277. );
  278. var res = scheduler.Start(() =>
  279. o1.Amb(o2)
  280. );
  281. res.Messages.AssertEqual(
  282. OnError<int>(210, ex)
  283. );
  284. o1.Subscriptions.AssertEqual(
  285. Subscribe(200, 210)
  286. );
  287. o2.Subscriptions.AssertEqual(
  288. Subscribe(200, 210)
  289. );
  290. }
  291. [TestMethod]
  292. public void Amb_Many_Array_OnNext()
  293. {
  294. var scheduler = new TestScheduler();
  295. var ex = new Exception();
  296. var o1 = scheduler.CreateColdObservable(
  297. OnNext(150, 1),
  298. OnNext(220, 3),
  299. OnCompleted<int>(250)
  300. );
  301. var o2 = scheduler.CreateColdObservable(
  302. OnNext(150, 2),
  303. OnError<int>(210, ex)
  304. );
  305. var o3 = scheduler.CreateColdObservable(
  306. OnCompleted<int>(150)
  307. );
  308. var res = scheduler.Start(() =>
  309. Observable.Amb(o1, o2, o3)
  310. );
  311. res.Messages.AssertEqual(
  312. OnNext(350, 1),
  313. OnNext(420, 3),
  314. OnCompleted<int>(450)
  315. );
  316. o1.Subscriptions.AssertEqual(
  317. Subscribe(200, 450)
  318. );
  319. o2.Subscriptions.AssertEqual(
  320. Subscribe(200, 350)
  321. );
  322. o3.Subscriptions.AssertEqual(
  323. Subscribe(200, 350)
  324. );
  325. }
  326. [TestMethod]
  327. public void Amb_Many_Array_OnError()
  328. {
  329. var scheduler = new TestScheduler();
  330. var ex = new Exception();
  331. var o1 = scheduler.CreateColdObservable(
  332. OnError<int>(150, ex)
  333. );
  334. var o2 = scheduler.CreateColdObservable(
  335. OnNext(150, 1),
  336. OnNext(220, 3),
  337. OnCompleted<int>(250)
  338. );
  339. var o3 = scheduler.CreateColdObservable(
  340. OnCompleted<int>(150)
  341. );
  342. var res = scheduler.Start(() =>
  343. Observable.Amb(o1, o2, o3)
  344. );
  345. res.Messages.AssertEqual(
  346. OnError<int>(350, ex)
  347. );
  348. o1.Subscriptions.AssertEqual(
  349. Subscribe(200, 350)
  350. );
  351. o2.Subscriptions.AssertEqual(
  352. Subscribe(200, 350)
  353. );
  354. o3.Subscriptions.AssertEqual(
  355. Subscribe(200, 350)
  356. );
  357. }
  358. [TestMethod]
  359. public void Amb_Many_Array_OnCompleted()
  360. {
  361. var scheduler = new TestScheduler();
  362. var ex = new Exception();
  363. var o1 = scheduler.CreateColdObservable(
  364. OnCompleted<int>(150)
  365. );
  366. var o2 = scheduler.CreateColdObservable(
  367. OnNext(150, 1),
  368. OnNext(220, 3),
  369. OnCompleted<int>(250)
  370. );
  371. var o3 = scheduler.CreateColdObservable(
  372. OnNext(150, 2),
  373. OnError<int>(210, ex)
  374. );
  375. var res = scheduler.Start(() =>
  376. Observable.Amb(o1, o2, o3)
  377. );
  378. res.Messages.AssertEqual(
  379. OnCompleted<int>(350)
  380. );
  381. o1.Subscriptions.AssertEqual(
  382. Subscribe(200, 350)
  383. );
  384. o2.Subscriptions.AssertEqual(
  385. Subscribe(200, 350)
  386. );
  387. o3.Subscriptions.AssertEqual(
  388. Subscribe(200, 350)
  389. );
  390. }
  391. [TestMethod]
  392. public void Amb_Many_Enumerable_OnNext()
  393. {
  394. var scheduler = new TestScheduler();
  395. var ex = new Exception();
  396. var o1 = scheduler.CreateColdObservable(
  397. OnNext(150, 1),
  398. OnNext(220, 3),
  399. OnCompleted<int>(250)
  400. );
  401. var o2 = scheduler.CreateColdObservable(
  402. OnNext(150, 2),
  403. OnError<int>(210, ex)
  404. );
  405. var o3 = scheduler.CreateColdObservable(
  406. OnCompleted<int>(150)
  407. );
  408. var res = scheduler.Start(() =>
  409. new[] { o1, o2, o3 }.Amb()
  410. );
  411. res.Messages.AssertEqual(
  412. OnNext(350, 1),
  413. OnNext(420, 3),
  414. OnCompleted<int>(450)
  415. );
  416. o1.Subscriptions.AssertEqual(
  417. Subscribe(200, 450)
  418. );
  419. o2.Subscriptions.AssertEqual(
  420. Subscribe(200, 350)
  421. );
  422. o3.Subscriptions.AssertEqual(
  423. Subscribe(200, 350)
  424. );
  425. }
  426. [TestMethod]
  427. public void Amb_Many_Enumerable_OnError()
  428. {
  429. var scheduler = new TestScheduler();
  430. var ex = new Exception();
  431. var o1 = scheduler.CreateColdObservable(
  432. OnError<int>(150, ex)
  433. );
  434. var o2 = scheduler.CreateColdObservable(
  435. OnNext(150, 1),
  436. OnNext(220, 3),
  437. OnCompleted<int>(250)
  438. );
  439. var o3 = scheduler.CreateColdObservable(
  440. OnCompleted<int>(150)
  441. );
  442. var res = scheduler.Start(() =>
  443. new[] { o1, o2, o3 }.Amb()
  444. );
  445. res.Messages.AssertEqual(
  446. OnError<int>(350, ex)
  447. );
  448. o1.Subscriptions.AssertEqual(
  449. Subscribe(200, 350)
  450. );
  451. o2.Subscriptions.AssertEqual(
  452. Subscribe(200, 350)
  453. );
  454. o3.Subscriptions.AssertEqual(
  455. Subscribe(200, 350)
  456. );
  457. }
  458. [TestMethod]
  459. public void Amb_Many_Enumerable_OnCompleted()
  460. {
  461. var scheduler = new TestScheduler();
  462. var ex = new Exception();
  463. var o1 = scheduler.CreateColdObservable(
  464. OnCompleted<int>(150)
  465. );
  466. var o2 = scheduler.CreateColdObservable(
  467. OnNext(150, 1),
  468. OnNext(220, 3),
  469. OnCompleted<int>(250)
  470. );
  471. var o3 = scheduler.CreateColdObservable(
  472. OnNext(150, 2),
  473. OnError<int>(210, ex)
  474. );
  475. var res = scheduler.Start(() =>
  476. new[] { o1, o2, o3 }.Amb()
  477. );
  478. res.Messages.AssertEqual(
  479. OnCompleted<int>(350)
  480. );
  481. o1.Subscriptions.AssertEqual(
  482. Subscribe(200, 350)
  483. );
  484. o2.Subscriptions.AssertEqual(
  485. Subscribe(200, 350)
  486. );
  487. o3.Subscriptions.AssertEqual(
  488. Subscribe(200, 350)
  489. );
  490. }
  491. [TestMethod]
  492. public void Amb_Many_Enumerable_Many_Sources()
  493. {
  494. for (var i = 0; i < 32; i++)
  495. {
  496. var sources = new List<IObservable<int>>();
  497. for (var j = 0; j < i; j++)
  498. {
  499. sources.Add(Observable.Return(j));
  500. }
  501. var result = sources.Amb().ToList().First();
  502. if (i == 0)
  503. {
  504. Assert.Equal(0, result.Count);
  505. }
  506. else
  507. {
  508. Assert.Equal(1, result.Count);
  509. Assert.Equal(0, result[0]);
  510. }
  511. }
  512. }
  513. [TestMethod]
  514. public void Amb_Many_Enumerable_Many_Sources_NoStackOverflow()
  515. {
  516. for (var i = 0; i < 100; i++)
  517. {
  518. var sources = new List<IObservable<int>>();
  519. for (var j = 0; j < i; j++)
  520. {
  521. if (j == i - 1)
  522. {
  523. sources.Add(Observable.Return(j));
  524. }
  525. else
  526. {
  527. sources.Add(Observable.Never<int>());
  528. }
  529. }
  530. var result = sources.Amb().ToList().First();
  531. if (i == 0)
  532. {
  533. Assert.Equal(0, result.Count);
  534. }
  535. else
  536. {
  537. Assert.Equal(1, result.Count);
  538. Assert.Equal(i - 1, result[0]);
  539. }
  540. }
  541. }
  542. }
  543. }