AggregateTest.cs 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Text;
  8. using System.Threading.Tasks;
  9. using System.Reactive;
  10. using System.Reactive.Concurrency;
  11. using System.Reactive.Linq;
  12. using Microsoft.Reactive.Testing;
  13. using Xunit;
  14. using ReactiveTests.Dummies;
  15. using System.Reflection;
  16. namespace ReactiveTests.Tests
  17. {
  18. public class AggregateTest : ReactiveTest
  19. {
  20. [Fact]
  21. public void Aggregate_ArgumentChecking()
  22. {
  23. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Aggregate<int, int>(default(IObservable<int>), 1, (x, y) => x + y));
  24. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Aggregate<int, int>(DummyObservable<int>.Instance, 1, default(Func<int, int, int>)));
  25. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Aggregate<int>(default(IObservable<int>), (x, y) => x + y));
  26. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Aggregate<int>(DummyObservable<int>.Instance, default(Func<int, int, int>)));
  27. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Aggregate<int, int, int>(default(IObservable<int>), 1, (x, y) => x + y, x => x));
  28. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Aggregate<int, int, int>(DummyObservable<int>.Instance, 1, default(Func<int, int, int>), x => x));
  29. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Aggregate<int, int, int>(DummyObservable<int>.Instance, 1, (x, y) => x + y, default(Func<int, int>)));
  30. }
  31. [Fact]
  32. public void AggregateWithSeed_Empty()
  33. {
  34. var scheduler = new TestScheduler();
  35. var xs = scheduler.CreateHotObservable(
  36. OnNext(150, 1),
  37. OnCompleted<int>(250)
  38. );
  39. var res = scheduler.Start(() =>
  40. xs.Aggregate(42, (acc, x) => acc + x)
  41. );
  42. res.Messages.AssertEqual(
  43. OnNext(250, 42),
  44. OnCompleted<int>(250)
  45. );
  46. xs.Subscriptions.AssertEqual(
  47. Subscribe(200, 250)
  48. );
  49. }
  50. [Fact]
  51. public void AggregateWithSeed_Return()
  52. {
  53. var scheduler = new TestScheduler();
  54. var xs = scheduler.CreateHotObservable(
  55. OnNext(150, 1),
  56. OnNext(210, 24),
  57. OnCompleted<int>(250)
  58. );
  59. var res = scheduler.Start(() =>
  60. xs.Aggregate(42, (acc, x) => acc + x)
  61. );
  62. res.Messages.AssertEqual(
  63. OnNext<int>(250, 42 + 24),
  64. OnCompleted<int>(250)
  65. );
  66. xs.Subscriptions.AssertEqual(
  67. Subscribe(200, 250)
  68. );
  69. }
  70. [Fact]
  71. public void AggregateWithSeed_Throw()
  72. {
  73. var ex = new Exception();
  74. var scheduler = new TestScheduler();
  75. var xs = scheduler.CreateHotObservable(
  76. OnNext(150, 1),
  77. OnError<int>(210, ex)
  78. );
  79. var res = scheduler.Start(() =>
  80. xs.Aggregate(42, (acc, x) => acc + x)
  81. );
  82. res.Messages.AssertEqual(
  83. OnError<int>(210, ex)
  84. );
  85. xs.Subscriptions.AssertEqual(
  86. Subscribe(200, 210)
  87. );
  88. }
  89. [Fact]
  90. public void AggregateWithSeed_Never()
  91. {
  92. var ex = new Exception();
  93. var scheduler = new TestScheduler();
  94. var xs = scheduler.CreateHotObservable(
  95. OnNext(150, 1)
  96. );
  97. var res = scheduler.Start(() =>
  98. xs.Aggregate(42, (acc, x) => acc + x)
  99. );
  100. res.Messages.AssertEqual(
  101. );
  102. xs.Subscriptions.AssertEqual(
  103. Subscribe(200, 1000)
  104. );
  105. }
  106. [Fact]
  107. public void AggregateWithSeed_Range()
  108. {
  109. var ex = new Exception();
  110. var scheduler = new TestScheduler();
  111. var xs = scheduler.CreateHotObservable(
  112. OnNext(150, 1),
  113. OnNext(210, 0),
  114. OnNext(220, 1),
  115. OnNext(230, 2),
  116. OnNext(240, 3),
  117. OnNext(250, 4),
  118. OnCompleted<int>(260)
  119. );
  120. var res = scheduler.Start(() =>
  121. xs.Aggregate(42, (acc, x) => acc + x)
  122. );
  123. res.Messages.AssertEqual(
  124. OnNext(260, 42 + Enumerable.Range(0, 5).Sum()),
  125. OnCompleted<int>(260)
  126. );
  127. xs.Subscriptions.AssertEqual(
  128. Subscribe(200, 260)
  129. );
  130. }
  131. [Fact]
  132. public void AggregateWithSeed_AccumulatorThrows()
  133. {
  134. var ex = new Exception();
  135. var scheduler = new TestScheduler();
  136. var xs = scheduler.CreateHotObservable(
  137. OnNext(150, 1),
  138. OnNext(210, 0),
  139. OnNext(220, 1),
  140. OnNext(230, 2),
  141. OnNext(240, 3),
  142. OnNext(250, 4),
  143. OnCompleted<int>(260)
  144. );
  145. var res = scheduler.Start(() =>
  146. xs.Aggregate(0, (acc, x) => { if (x < 3) return acc + x; throw ex; })
  147. );
  148. res.Messages.AssertEqual(
  149. OnError<int>(240, ex)
  150. );
  151. xs.Subscriptions.AssertEqual(
  152. Subscribe(200, 240)
  153. );
  154. }
  155. [Fact]
  156. public void AggregateWithSeedAndResult_Empty()
  157. {
  158. var scheduler = new TestScheduler();
  159. var xs = scheduler.CreateHotObservable(
  160. OnNext(150, 1),
  161. OnCompleted<int>(250)
  162. );
  163. var res = scheduler.Start(() =>
  164. xs.Aggregate(42, (acc, x) => acc + x, x => x * 5)
  165. );
  166. res.Messages.AssertEqual(
  167. OnNext(250, 42 * 5),
  168. OnCompleted<int>(250)
  169. );
  170. xs.Subscriptions.AssertEqual(
  171. Subscribe(200, 250)
  172. );
  173. }
  174. [Fact]
  175. public void AggregateWithSeedAndResult_Return()
  176. {
  177. var scheduler = new TestScheduler();
  178. var xs = scheduler.CreateHotObservable(
  179. OnNext(150, 1),
  180. OnNext(210, 24),
  181. OnCompleted<int>(250)
  182. );
  183. var res = scheduler.Start(() =>
  184. xs.Aggregate(42, (acc, x) => acc + x, x => x * 5)
  185. );
  186. res.Messages.AssertEqual(
  187. OnNext<int>(250, (42 + 24) * 5),
  188. OnCompleted<int>(250)
  189. );
  190. xs.Subscriptions.AssertEqual(
  191. Subscribe(200, 250)
  192. );
  193. }
  194. [Fact]
  195. public void AggregateWithSeedAndResult_Throw()
  196. {
  197. var ex = new Exception();
  198. var scheduler = new TestScheduler();
  199. var xs = scheduler.CreateHotObservable(
  200. OnNext(150, 1),
  201. OnError<int>(210, ex)
  202. );
  203. var res = scheduler.Start(() =>
  204. xs.Aggregate(42, (acc, x) => acc + x, x => x * 5)
  205. );
  206. res.Messages.AssertEqual(
  207. OnError<int>(210, ex)
  208. );
  209. xs.Subscriptions.AssertEqual(
  210. Subscribe(200, 210)
  211. );
  212. }
  213. [Fact]
  214. public void AggregateWithSeedAndResult_Never()
  215. {
  216. var ex = new Exception();
  217. var scheduler = new TestScheduler();
  218. var xs = scheduler.CreateHotObservable(
  219. OnNext(150, 1)
  220. );
  221. var res = scheduler.Start(() =>
  222. xs.Aggregate(42, (acc, x) => acc + x, x => x * 5)
  223. );
  224. res.Messages.AssertEqual(
  225. );
  226. xs.Subscriptions.AssertEqual(
  227. Subscribe(200, 1000)
  228. );
  229. }
  230. [Fact]
  231. public void AggregateWithSeedAndResult_Range()
  232. {
  233. var ex = new Exception();
  234. var scheduler = new TestScheduler();
  235. var xs = scheduler.CreateHotObservable(
  236. OnNext(150, 1),
  237. OnNext(210, 0),
  238. OnNext(220, 1),
  239. OnNext(230, 2),
  240. OnNext(240, 3),
  241. OnNext(250, 4),
  242. OnCompleted<int>(260)
  243. );
  244. var res = scheduler.Start(() =>
  245. xs.Aggregate(42, (acc, x) => acc + x, x => x * 5)
  246. );
  247. res.Messages.AssertEqual(
  248. OnNext(260, (42 + Enumerable.Range(0, 5).Sum()) * 5),
  249. OnCompleted<int>(260)
  250. );
  251. xs.Subscriptions.AssertEqual(
  252. Subscribe(200, 260)
  253. );
  254. }
  255. [Fact]
  256. public void AggregateWithSeedAndResult_AccumulatorThrows()
  257. {
  258. var ex = new Exception();
  259. var scheduler = new TestScheduler();
  260. var xs = scheduler.CreateHotObservable(
  261. OnNext(150, 1),
  262. OnNext(210, 0),
  263. OnNext(220, 1),
  264. OnNext(230, 2),
  265. OnNext(240, 3),
  266. OnNext(250, 4),
  267. OnCompleted<int>(260)
  268. );
  269. var res = scheduler.Start(() =>
  270. xs.Aggregate(0, (acc, x) => { if (x < 3) return acc + x; throw ex; }, x => x * 5)
  271. );
  272. res.Messages.AssertEqual(
  273. OnError<int>(240, ex)
  274. );
  275. xs.Subscriptions.AssertEqual(
  276. Subscribe(200, 240)
  277. );
  278. }
  279. [Fact]
  280. public void AggregateWithSeedAndResult_ResultSelectorThrows()
  281. {
  282. var ex = new Exception();
  283. var scheduler = new TestScheduler();
  284. var xs = scheduler.CreateHotObservable(
  285. OnNext(150, 1),
  286. OnNext(210, 0),
  287. OnNext(220, 1),
  288. OnNext(230, 2),
  289. OnNext(240, 3),
  290. OnNext(250, 4),
  291. OnCompleted<int>(260)
  292. );
  293. var res = scheduler.Start(() =>
  294. xs.Aggregate<int, int, int>(0, (acc, x) => acc + x, x => { throw ex; })
  295. );
  296. res.Messages.AssertEqual(
  297. OnError<int>(260, ex)
  298. );
  299. xs.Subscriptions.AssertEqual(
  300. Subscribe(200, 260)
  301. );
  302. }
  303. [Fact]
  304. public void AggregateWithoutSeed_Empty()
  305. {
  306. var scheduler = new TestScheduler();
  307. var xs = scheduler.CreateHotObservable(
  308. OnNext(150, 1),
  309. OnCompleted<int>(250)
  310. );
  311. var res = scheduler.Start(() =>
  312. xs.Aggregate((acc, x) => acc + x)
  313. );
  314. res.Messages.AssertEqual(
  315. OnError<int>(250, e => e is InvalidOperationException)
  316. );
  317. xs.Subscriptions.AssertEqual(
  318. Subscribe(200, 250)
  319. );
  320. }
  321. [Fact]
  322. public void AggregateWithoutSeed_Return()
  323. {
  324. var scheduler = new TestScheduler();
  325. var xs = scheduler.CreateHotObservable(
  326. OnNext(150, 1),
  327. OnNext(210, 24),
  328. OnCompleted<int>(250)
  329. );
  330. var res = scheduler.Start(() =>
  331. xs.Aggregate((acc, x) => acc + x)
  332. );
  333. res.Messages.AssertEqual(
  334. OnNext<int>(250, 24),
  335. OnCompleted<int>(250)
  336. );
  337. xs.Subscriptions.AssertEqual(
  338. Subscribe(200, 250)
  339. );
  340. }
  341. [Fact]
  342. public void AggregateWithoutSeed_Throw()
  343. {
  344. var ex = new Exception();
  345. var scheduler = new TestScheduler();
  346. var xs = scheduler.CreateHotObservable(
  347. OnNext(150, 1),
  348. OnError<int>(210, ex)
  349. );
  350. var res = scheduler.Start(() =>
  351. xs.Aggregate((acc, x) => acc + x)
  352. );
  353. res.Messages.AssertEqual(
  354. OnError<int>(210, ex)
  355. );
  356. xs.Subscriptions.AssertEqual(
  357. Subscribe(200, 210)
  358. );
  359. }
  360. [Fact]
  361. public void AggregateWithoutSeed_Never()
  362. {
  363. var scheduler = new TestScheduler();
  364. var xs = scheduler.CreateHotObservable(
  365. OnNext(150, 1)
  366. );
  367. var res = scheduler.Start(() =>
  368. xs.Aggregate((acc, x) => acc + x)
  369. );
  370. res.Messages.AssertEqual(
  371. );
  372. }
  373. [Fact]
  374. public void AggregateWithoutSeed_Range()
  375. {
  376. var scheduler = new TestScheduler();
  377. var xs = scheduler.CreateHotObservable(
  378. OnNext(150, 1),
  379. OnNext(210, 0),
  380. OnNext(220, 1),
  381. OnNext(230, 2),
  382. OnNext(240, 3),
  383. OnNext(250, 4),
  384. OnCompleted<int>(260)
  385. );
  386. var res = scheduler.Start(() =>
  387. xs.Aggregate((acc, x) => acc + x)
  388. );
  389. res.Messages.AssertEqual(
  390. OnNext(260, Enumerable.Range(0, 5).Sum()),
  391. OnCompleted<int>(260)
  392. );
  393. xs.Subscriptions.AssertEqual(
  394. Subscribe(200, 260)
  395. );
  396. }
  397. [Fact]
  398. public void AggregateWithoutSeed_AccumulatorThrows()
  399. {
  400. var ex = new Exception();
  401. var scheduler = new TestScheduler();
  402. var xs = scheduler.CreateHotObservable(
  403. OnNext(150, 1),
  404. OnNext(210, 0),
  405. OnNext(220, 1),
  406. OnNext(230, 2),
  407. OnNext(240, 3),
  408. OnNext(250, 4),
  409. OnCompleted<int>(260)
  410. );
  411. var res = scheduler.Start(() =>
  412. xs.Aggregate((acc, x) => { if (x < 3) return acc + x; throw ex; })
  413. );
  414. res.Messages.AssertEqual(
  415. OnError<int>(240, ex)
  416. );
  417. xs.Subscriptions.AssertEqual(
  418. Subscribe(200, 240)
  419. );
  420. }
  421. }
  422. }