CollectTest.cs 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Reactive.Concurrency;
  8. using System.Reactive.Linq;
  9. using Microsoft.Reactive.Testing;
  10. using Microsoft.VisualStudio.TestTools.UnitTesting;
  11. using Assert = Xunit.Assert;
  12. namespace ReactiveTests.Tests
  13. {
  14. [TestClass]
  15. public class CollectTest : ReactiveTest
  16. {
  17. [TestMethod]
  18. public void Collect_ArgumentChecking()
  19. {
  20. var someObservable = Observable.Empty<int>();
  21. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Collect(default(IObservable<int>), () => 0, (x, y) => x));
  22. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Collect(someObservable, default(Func<int>), (x, y) => x));
  23. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Collect(someObservable, () => 0, default));
  24. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Collect(default(IObservable<int>), () => 0, (x, y) => x, x => x));
  25. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Collect(someObservable, default(Func<int>), (x, y) => x, x => x));
  26. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Collect(someObservable, () => 0, default, x => x));
  27. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Collect(someObservable, () => 0, (x, y) => x, default));
  28. }
  29. [TestMethod]
  30. public void Collect_Regular1()
  31. {
  32. var scheduler = new TestScheduler();
  33. var xs = scheduler.CreateHotObservable(
  34. OnNext(100, 1),
  35. OnNext(200, 2),
  36. OnNext(300, 3),
  37. OnNext(400, 4),
  38. OnNext(500, 5),
  39. OnNext(600, 6),
  40. OnNext(700, 7),
  41. OnNext(800, 8),
  42. OnCompleted<int>(900)
  43. );
  44. var ys = xs.Collect(() => 0, (x, y) => x + y);
  45. var e = default(IEnumerator<int>);
  46. var res = new List<int>();
  47. var log = new Action(() =>
  48. {
  49. Assert.True(e.MoveNext());
  50. res.Add(e.Current);
  51. });
  52. scheduler.ScheduleAbsolute(250, () => { e = ys.GetEnumerator(); });
  53. scheduler.ScheduleAbsolute(270, log);
  54. scheduler.ScheduleAbsolute(310, log);
  55. scheduler.ScheduleAbsolute(450, log);
  56. scheduler.ScheduleAbsolute(470, log);
  57. scheduler.ScheduleAbsolute(750, log);
  58. scheduler.ScheduleAbsolute(850, log);
  59. scheduler.ScheduleAbsolute(950, log);
  60. scheduler.ScheduleAbsolute(980, () => Assert.False(e.MoveNext()));
  61. scheduler.Start();
  62. xs.Subscriptions.AssertEqual(
  63. Subscribe(250, 900)
  64. );
  65. Assert.Equal(7, res.Count);
  66. Assert.Equal(res[0], new int[] { }.Sum());
  67. Assert.Equal(res[1], new int[] { 3 }.Sum());
  68. Assert.Equal(res[2], new int[] { 4 }.Sum());
  69. Assert.Equal(res[3], new int[] { }.Sum());
  70. Assert.Equal(res[4], new int[] { 5, 6, 7 }.Sum());
  71. Assert.Equal(res[5], new int[] { 8 }.Sum());
  72. Assert.Equal(res[6], new int[] { }.Sum());
  73. }
  74. [TestMethod]
  75. public void Collect_Regular2()
  76. {
  77. var scheduler = new TestScheduler();
  78. var xs = scheduler.CreateHotObservable(
  79. OnNext(100, 1),
  80. OnNext(200, 2),
  81. OnNext(300, 3),
  82. OnNext(400, 4),
  83. OnNext(500, 5),
  84. OnNext(600, 6),
  85. OnNext(700, 7),
  86. OnNext(800, 8),
  87. OnCompleted<int>(900)
  88. );
  89. var ys = xs.Collect(() => 0, (x, y) => x + y);
  90. var e = default(IEnumerator<int>);
  91. var res = new List<int>();
  92. var log = new Action(() =>
  93. {
  94. Assert.True(e.MoveNext());
  95. res.Add(e.Current);
  96. });
  97. scheduler.ScheduleAbsolute(250, () => { e = ys.GetEnumerator(); });
  98. scheduler.ScheduleAbsolute(550, log);
  99. scheduler.ScheduleAbsolute(950, log);
  100. scheduler.ScheduleAbsolute(980, () => Assert.False(e.MoveNext()));
  101. scheduler.Start();
  102. xs.Subscriptions.AssertEqual(
  103. Subscribe(250, 900)
  104. );
  105. Assert.Equal(2, res.Count);
  106. Assert.Equal(res[0], new int[] { 3, 4, 5 }.Sum());
  107. Assert.Equal(res[1], new int[] { 6, 7, 8 }.Sum());
  108. }
  109. [TestMethod]
  110. public void Collect_InitialCollectorThrows()
  111. {
  112. var scheduler = new TestScheduler();
  113. var xs = scheduler.CreateHotObservable(
  114. OnNext(100, 1),
  115. OnNext(200, 2),
  116. OnNext(300, 3),
  117. OnNext(400, 4),
  118. OnCompleted<int>(500)
  119. );
  120. var ex = new Exception();
  121. var ys = xs.Collect<int, int>(() => { throw ex; }, (x, y) => x + y);
  122. var ex_ = default(Exception);
  123. scheduler.ScheduleAbsolute(250, () =>
  124. {
  125. try
  126. {
  127. ys.GetEnumerator();
  128. }
  129. catch (Exception err)
  130. {
  131. ex_ = err;
  132. }
  133. });
  134. scheduler.Start();
  135. xs.Subscriptions.AssertEqual(
  136. );
  137. Assert.Same(ex_, ex);
  138. }
  139. [TestMethod]
  140. public void Collect_SecondCollectorThrows()
  141. {
  142. var scheduler = new TestScheduler();
  143. var xs = scheduler.CreateHotObservable(
  144. OnNext(100, 1),
  145. OnNext(200, 2),
  146. OnNext(300, 3),
  147. OnNext(400, 4),
  148. OnCompleted<int>(500)
  149. );
  150. var ex = new Exception();
  151. var n = 0;
  152. var ys = xs.Collect(() =>
  153. {
  154. if (n++ == 0)
  155. {
  156. return 0;
  157. }
  158. else
  159. {
  160. throw ex;
  161. }
  162. }, (x, y) => x + y);
  163. var e = default(IEnumerator<int>);
  164. var ex_ = default(Exception);
  165. scheduler.ScheduleAbsolute(250, () => e = ys.GetEnumerator());
  166. scheduler.ScheduleAbsolute(350, () =>
  167. {
  168. try
  169. {
  170. e.MoveNext();
  171. }
  172. catch (Exception err)
  173. {
  174. ex_ = err;
  175. }
  176. });
  177. scheduler.Start();
  178. xs.Subscriptions.AssertEqual(
  179. Subscribe(250, 350)
  180. );
  181. Assert.Same(ex_, ex);
  182. }
  183. [TestMethod]
  184. public void Collect_NewCollectorThrows()
  185. {
  186. var scheduler = new TestScheduler();
  187. var xs = scheduler.CreateHotObservable(
  188. OnNext(100, 1),
  189. OnNext(200, 2),
  190. OnNext(300, 3),
  191. OnNext(400, 4),
  192. OnCompleted<int>(500)
  193. );
  194. var ex = new Exception();
  195. var ys = xs.Collect(() => 0, (x, y) => x + y, x => { throw ex; });
  196. var e = default(IEnumerator<int>);
  197. var ex_ = default(Exception);
  198. scheduler.ScheduleAbsolute(250, () => e = ys.GetEnumerator());
  199. scheduler.ScheduleAbsolute(350, () =>
  200. {
  201. try
  202. {
  203. e.MoveNext();
  204. }
  205. catch (Exception err)
  206. {
  207. ex_ = err;
  208. }
  209. });
  210. scheduler.Start();
  211. xs.Subscriptions.AssertEqual(
  212. Subscribe(250, 350)
  213. );
  214. Assert.Same(ex_, ex);
  215. }
  216. [TestMethod]
  217. public void Collect_MergeThrows()
  218. {
  219. var scheduler = new TestScheduler();
  220. var xs = scheduler.CreateHotObservable(
  221. OnNext(100, 1),
  222. OnNext(200, 2),
  223. OnNext(300, 3),
  224. OnNext(400, 4),
  225. OnCompleted<int>(500)
  226. );
  227. var ex = new Exception();
  228. var ys = xs.Collect(() => 0, (x, y) => { throw ex; });
  229. var e = default(IEnumerator<int>);
  230. var ex_ = default(Exception);
  231. scheduler.ScheduleAbsolute(250, () => { e = ys.GetEnumerator(); });
  232. scheduler.ScheduleAbsolute(350, () =>
  233. {
  234. try
  235. {
  236. e.MoveNext();
  237. }
  238. catch (Exception err)
  239. {
  240. ex_ = err;
  241. }
  242. });
  243. scheduler.Start();
  244. xs.Subscriptions.AssertEqual(
  245. Subscribe(250, 300)
  246. );
  247. Assert.Same(ex_, ex);
  248. }
  249. }
  250. }