MulticastTest.cs 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Linq;
  6. using System.Reactive.Concurrency;
  7. using System.Reactive.Linq;
  8. using System.Reactive.Subjects;
  9. using Microsoft.Reactive.Testing;
  10. using ReactiveTests.Dummies;
  11. using Microsoft.VisualStudio.TestTools.UnitTesting;
  12. namespace ReactiveTests.Tests
  13. {
  14. [TestClass]
  15. public class MulticastTest : ReactiveTest
  16. {
  17. [TestMethod]
  18. public void Multicast_ArgumentChecking()
  19. {
  20. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Multicast(null, new Subject<int>()));
  21. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Multicast<int, int>(DummyObservable<int>.Instance, null));
  22. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Multicast(null, () => new Subject<int>(), xs => xs));
  23. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Multicast<int, int, int>(DummyObservable<int>.Instance, null, xs => xs));
  24. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Multicast<int, int, int>(DummyObservable<int>.Instance, () => new Subject<int>(), null));
  25. }
  26. [TestMethod]
  27. public void Multicast_Hot_1()
  28. {
  29. var scheduler = new TestScheduler();
  30. var s = new Subject<int>();
  31. var xs = scheduler.CreateHotObservable(
  32. OnNext(40, 0),
  33. OnNext(90, 1),
  34. OnNext(150, 2),
  35. OnNext(210, 3),
  36. OnNext(240, 4),
  37. OnNext(270, 5),
  38. OnNext(330, 6),
  39. OnNext(340, 7),
  40. OnCompleted<int>(390)
  41. );
  42. var c = default(IConnectableObservable<int>);
  43. var o = scheduler.CreateObserver<int>();
  44. var d1 = default(IDisposable);
  45. var d2 = default(IDisposable);
  46. scheduler.ScheduleAbsolute(50, () => c = xs.Multicast(s));
  47. scheduler.ScheduleAbsolute(100, () => d1 = c.Subscribe(o));
  48. scheduler.ScheduleAbsolute(200, () => d2 = c.Connect());
  49. scheduler.ScheduleAbsolute(300, () => d1.Dispose());
  50. scheduler.Start();
  51. o.Messages.AssertEqual(
  52. OnNext(210, 3),
  53. OnNext(240, 4),
  54. OnNext(270, 5)
  55. );
  56. xs.Subscriptions.AssertEqual(
  57. Subscribe(200, 390)
  58. );
  59. }
  60. [TestMethod]
  61. public void Multicast_Hot_2()
  62. {
  63. var scheduler = new TestScheduler();
  64. var s = new Subject<int>();
  65. var xs = scheduler.CreateHotObservable(
  66. OnNext(40, 0),
  67. OnNext(90, 1),
  68. OnNext(150, 2),
  69. OnNext(210, 3),
  70. OnNext(240, 4),
  71. OnNext(270, 5),
  72. OnNext(330, 6),
  73. OnNext(340, 7),
  74. OnCompleted<int>(390)
  75. );
  76. var c = default(IConnectableObservable<int>);
  77. var o = scheduler.CreateObserver<int>();
  78. var d1 = default(IDisposable);
  79. var d2 = default(IDisposable);
  80. scheduler.ScheduleAbsolute(50, () => c = xs.Multicast(s));
  81. scheduler.ScheduleAbsolute(100, () => d2 = c.Connect());
  82. scheduler.ScheduleAbsolute(200, () => d1 = c.Subscribe(o));
  83. scheduler.ScheduleAbsolute(300, () => d1.Dispose());
  84. scheduler.Start();
  85. o.Messages.AssertEqual(
  86. OnNext(210, 3),
  87. OnNext(240, 4),
  88. OnNext(270, 5)
  89. );
  90. xs.Subscriptions.AssertEqual(
  91. Subscribe(100, 390)
  92. );
  93. }
  94. [TestMethod]
  95. public void Multicast_Hot_3()
  96. {
  97. var scheduler = new TestScheduler();
  98. var s = new Subject<int>();
  99. var xs = scheduler.CreateHotObservable(
  100. OnNext(40, 0),
  101. OnNext(90, 1),
  102. OnNext(150, 2),
  103. OnNext(210, 3),
  104. OnNext(240, 4),
  105. OnNext(270, 5),
  106. OnNext(330, 6),
  107. OnNext(340, 7),
  108. OnCompleted<int>(390)
  109. );
  110. var c = default(IConnectableObservable<int>);
  111. var o = scheduler.CreateObserver<int>();
  112. var d1 = default(IDisposable);
  113. var d2 = default(IDisposable);
  114. scheduler.ScheduleAbsolute(50, () => c = xs.Multicast(s));
  115. scheduler.ScheduleAbsolute(100, () => d2 = c.Connect());
  116. scheduler.ScheduleAbsolute(200, () => d1 = c.Subscribe(o));
  117. scheduler.ScheduleAbsolute(300, () => d2.Dispose());
  118. scheduler.ScheduleAbsolute(335, () => d2 = c.Connect());
  119. scheduler.Start();
  120. o.Messages.AssertEqual(
  121. OnNext(210, 3),
  122. OnNext(240, 4),
  123. OnNext(270, 5),
  124. OnNext(340, 7),
  125. OnCompleted<int>(390)
  126. );
  127. xs.Subscriptions.AssertEqual(
  128. Subscribe(100, 300),
  129. Subscribe(335, 390)
  130. );
  131. }
  132. [TestMethod]
  133. public void Multicast_Hot_4()
  134. {
  135. var scheduler = new TestScheduler();
  136. var s = new Subject<int>();
  137. var ex = new Exception();
  138. var xs = scheduler.CreateHotObservable(
  139. OnNext(40, 0),
  140. OnNext(90, 1),
  141. OnNext(150, 2),
  142. OnNext(210, 3),
  143. OnNext(240, 4),
  144. OnNext(270, 5),
  145. OnNext(330, 6),
  146. OnNext(340, 7),
  147. OnError<int>(390, ex)
  148. );
  149. var c = default(IConnectableObservable<int>);
  150. var o = scheduler.CreateObserver<int>();
  151. var d1 = default(IDisposable);
  152. var d2 = default(IDisposable);
  153. scheduler.ScheduleAbsolute(50, () => c = xs.Multicast(s));
  154. scheduler.ScheduleAbsolute(100, () => d2 = c.Connect());
  155. scheduler.ScheduleAbsolute(200, () => d1 = c.Subscribe(o));
  156. scheduler.ScheduleAbsolute(300, () => d2.Dispose());
  157. scheduler.ScheduleAbsolute(335, () => d2 = c.Connect());
  158. scheduler.Start();
  159. o.Messages.AssertEqual(
  160. OnNext(210, 3),
  161. OnNext(240, 4),
  162. OnNext(270, 5),
  163. OnNext(340, 7),
  164. OnError<int>(390, ex)
  165. );
  166. xs.Subscriptions.AssertEqual(
  167. Subscribe(100, 300),
  168. Subscribe(335, 390)
  169. );
  170. }
  171. [TestMethod]
  172. public void Multicast_Hot_5()
  173. {
  174. var scheduler = new TestScheduler();
  175. var s = new Subject<int>();
  176. var ex = new Exception();
  177. var xs = scheduler.CreateHotObservable(
  178. OnNext(40, 0),
  179. OnNext(90, 1),
  180. OnNext(150, 2),
  181. OnNext(210, 3),
  182. OnNext(240, 4),
  183. OnNext(270, 5),
  184. OnNext(330, 6),
  185. OnNext(340, 7),
  186. OnError<int>(390, ex)
  187. );
  188. var c = default(IConnectableObservable<int>);
  189. var o = scheduler.CreateObserver<int>();
  190. var d1 = default(IDisposable);
  191. var d2 = default(IDisposable);
  192. scheduler.ScheduleAbsolute(50, () => c = xs.Multicast(s));
  193. scheduler.ScheduleAbsolute(100, () => d2 = c.Connect());
  194. scheduler.ScheduleAbsolute(400, () => d1 = c.Subscribe(o));
  195. scheduler.Start();
  196. o.Messages.AssertEqual(
  197. OnError<int>(400, ex)
  198. );
  199. xs.Subscriptions.AssertEqual(
  200. Subscribe(100, 390)
  201. );
  202. }
  203. [TestMethod]
  204. public void Multicast_Hot_6()
  205. {
  206. var scheduler = new TestScheduler();
  207. var s = new Subject<int>();
  208. var xs = scheduler.CreateHotObservable(
  209. OnNext(40, 0),
  210. OnNext(90, 1),
  211. OnNext(150, 2),
  212. OnNext(210, 3),
  213. OnNext(240, 4),
  214. OnNext(270, 5),
  215. OnNext(330, 6),
  216. OnNext(340, 7),
  217. OnCompleted<int>(390)
  218. );
  219. var c = default(IConnectableObservable<int>);
  220. var o = scheduler.CreateObserver<int>();
  221. var d1 = default(IDisposable);
  222. var d2 = default(IDisposable);
  223. scheduler.ScheduleAbsolute(50, () => c = xs.Multicast(s));
  224. scheduler.ScheduleAbsolute(100, () => d2 = c.Connect());
  225. scheduler.ScheduleAbsolute(400, () => d1 = c.Subscribe(o));
  226. scheduler.Start();
  227. o.Messages.AssertEqual(
  228. OnCompleted<int>(400)
  229. );
  230. xs.Subscriptions.AssertEqual(
  231. Subscribe(100, 390)
  232. );
  233. }
  234. [TestMethod]
  235. public void Multicast_Cold_Completed()
  236. {
  237. var scheduler = new TestScheduler();
  238. var xs = scheduler.CreateHotObservable(
  239. OnNext(40, 0),
  240. OnNext(90, 1),
  241. OnNext(150, 2),
  242. OnNext(210, 3),
  243. OnNext(240, 4),
  244. OnNext(270, 5),
  245. OnNext(330, 6),
  246. OnNext(340, 7),
  247. OnCompleted<int>(390)
  248. );
  249. var res = scheduler.Start(() =>
  250. xs.Multicast(() => new Subject<int>(), ys => ys)
  251. );
  252. res.Messages.AssertEqual(
  253. OnNext(210, 3),
  254. OnNext(240, 4),
  255. OnNext(270, 5),
  256. OnNext(330, 6),
  257. OnNext(340, 7),
  258. OnCompleted<int>(390)
  259. );
  260. xs.Subscriptions.AssertEqual(
  261. Subscribe(200, 390)
  262. );
  263. }
  264. [TestMethod]
  265. public void Multicast_Cold_Error()
  266. {
  267. var scheduler = new TestScheduler();
  268. var ex = new Exception();
  269. var xs = scheduler.CreateHotObservable(
  270. OnNext(40, 0),
  271. OnNext(90, 1),
  272. OnNext(150, 2),
  273. OnNext(210, 3),
  274. OnNext(240, 4),
  275. OnNext(270, 5),
  276. OnNext(330, 6),
  277. OnNext(340, 7),
  278. OnError<int>(390, ex)
  279. );
  280. var res = scheduler.Start(() =>
  281. xs.Multicast(() => new Subject<int>(), ys => ys)
  282. );
  283. res.Messages.AssertEqual(
  284. OnNext(210, 3),
  285. OnNext(240, 4),
  286. OnNext(270, 5),
  287. OnNext(330, 6),
  288. OnNext(340, 7),
  289. OnError<int>(390, ex)
  290. );
  291. xs.Subscriptions.AssertEqual(
  292. Subscribe(200, 390)
  293. );
  294. }
  295. [TestMethod]
  296. public void Multicast_Cold_Dispose()
  297. {
  298. var scheduler = new TestScheduler();
  299. var xs = scheduler.CreateHotObservable(
  300. OnNext(40, 0),
  301. OnNext(90, 1),
  302. OnNext(150, 2),
  303. OnNext(210, 3),
  304. OnNext(240, 4),
  305. OnNext(270, 5),
  306. OnNext(330, 6),
  307. OnNext(340, 7)
  308. );
  309. var res = scheduler.Start(() =>
  310. xs.Multicast(() => new Subject<int>(), ys => ys)
  311. );
  312. res.Messages.AssertEqual(
  313. OnNext(210, 3),
  314. OnNext(240, 4),
  315. OnNext(270, 5),
  316. OnNext(330, 6),
  317. OnNext(340, 7)
  318. );
  319. xs.Subscriptions.AssertEqual(
  320. Subscribe(200, 1000)
  321. );
  322. }
  323. [TestMethod]
  324. public void Multicast_Cold_Zip()
  325. {
  326. var scheduler = new TestScheduler();
  327. var xs = scheduler.CreateHotObservable(
  328. OnNext(40, 0),
  329. OnNext(90, 1),
  330. OnNext(150, 2),
  331. OnNext(210, 3),
  332. OnNext(240, 4),
  333. OnNext(270, 5),
  334. OnNext(330, 6),
  335. OnNext(340, 7),
  336. OnCompleted<int>(390)
  337. );
  338. var res = scheduler.Start(() =>
  339. xs.Multicast(() => new Subject<int>(), ys => ys.Zip(ys, (a, b) => a + b))
  340. );
  341. res.Messages.AssertEqual(
  342. OnNext(210, 6),
  343. OnNext(240, 8),
  344. OnNext(270, 10),
  345. OnNext(330, 12),
  346. OnNext(340, 14),
  347. OnCompleted<int>(390)
  348. );
  349. xs.Subscriptions.AssertEqual(
  350. Subscribe(200, 390)
  351. );
  352. }
  353. [TestMethod]
  354. public void Multicast_SubjectSelectorThrows()
  355. {
  356. var ex = new Exception();
  357. var scheduler = new TestScheduler();
  358. var xs = scheduler.CreateHotObservable(
  359. OnNext(210, 1),
  360. OnNext(240, 2),
  361. OnCompleted<int>(300)
  362. );
  363. var res = scheduler.Start(() =>
  364. xs.Multicast<int, int, int>(() => { throw ex; }, _ => _)
  365. );
  366. res.Messages.AssertEqual(
  367. OnError<int>(200, ex)
  368. );
  369. xs.Subscriptions.AssertEqual(
  370. );
  371. }
  372. [TestMethod]
  373. public void Multicast_SelectorThrows()
  374. {
  375. var ex = new Exception();
  376. var scheduler = new TestScheduler();
  377. var xs = scheduler.CreateHotObservable(
  378. OnNext(210, 1),
  379. OnNext(240, 2),
  380. OnCompleted<int>(300)
  381. );
  382. var res = scheduler.Start(() =>
  383. xs.Multicast<int, int, int>(() => new Subject<int>(), _ => { throw ex; })
  384. );
  385. res.Messages.AssertEqual(
  386. OnError<int>(200, ex)
  387. );
  388. xs.Subscriptions.AssertEqual(
  389. );
  390. }
  391. }
  392. }