MulticastTest.cs 14 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Linq;
  6. using System.Reactive.Concurrency;
  7. using System.Reactive.Linq;
  8. using System.Reactive.Subjects;
  9. using Microsoft.Reactive.Testing;
  10. using ReactiveTests.Dummies;
  11. using Xunit;
  12. namespace ReactiveTests.Tests
  13. {
  14. public class MulticastTest : ReactiveTest
  15. {
  16. [Fact]
  17. public void Multicast_ArgumentChecking()
  18. {
  19. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Multicast<int, int>(null, new Subject<int>()));
  20. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Multicast<int, int>(DummyObservable<int>.Instance, null));
  21. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Multicast<int, int, int>(null, () => new Subject<int>(), xs => xs));
  22. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Multicast<int, int, int>(DummyObservable<int>.Instance, null, xs => xs));
  23. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Multicast<int, int, int>(DummyObservable<int>.Instance, () => new Subject<int>(), null));
  24. }
  25. [Fact]
  26. public void Multicast_Hot_1()
  27. {
  28. var scheduler = new TestScheduler();
  29. var s = new Subject<int>();
  30. var xs = scheduler.CreateHotObservable(
  31. OnNext(40, 0),
  32. OnNext(90, 1),
  33. OnNext(150, 2),
  34. OnNext(210, 3),
  35. OnNext(240, 4),
  36. OnNext(270, 5),
  37. OnNext(330, 6),
  38. OnNext(340, 7),
  39. OnCompleted<int>(390)
  40. );
  41. var c = default(IConnectableObservable<int>);
  42. var o = scheduler.CreateObserver<int>();
  43. var d1 = default(IDisposable);
  44. var d2 = default(IDisposable);
  45. scheduler.ScheduleAbsolute(50, () => c = xs.Multicast(s));
  46. scheduler.ScheduleAbsolute(100, () => d1 = c.Subscribe(o));
  47. scheduler.ScheduleAbsolute(200, () => d2 = c.Connect());
  48. scheduler.ScheduleAbsolute(300, () => d1.Dispose());
  49. scheduler.Start();
  50. o.Messages.AssertEqual(
  51. OnNext(210, 3),
  52. OnNext(240, 4),
  53. OnNext(270, 5)
  54. );
  55. xs.Subscriptions.AssertEqual(
  56. Subscribe(200, 390)
  57. );
  58. }
  59. [Fact]
  60. public void Multicast_Hot_2()
  61. {
  62. var scheduler = new TestScheduler();
  63. var s = new Subject<int>();
  64. var xs = scheduler.CreateHotObservable(
  65. OnNext(40, 0),
  66. OnNext(90, 1),
  67. OnNext(150, 2),
  68. OnNext(210, 3),
  69. OnNext(240, 4),
  70. OnNext(270, 5),
  71. OnNext(330, 6),
  72. OnNext(340, 7),
  73. OnCompleted<int>(390)
  74. );
  75. var c = default(IConnectableObservable<int>);
  76. var o = scheduler.CreateObserver<int>();
  77. var d1 = default(IDisposable);
  78. var d2 = default(IDisposable);
  79. scheduler.ScheduleAbsolute(50, () => c = xs.Multicast(s));
  80. scheduler.ScheduleAbsolute(100, () => d2 = c.Connect());
  81. scheduler.ScheduleAbsolute(200, () => d1 = c.Subscribe(o));
  82. scheduler.ScheduleAbsolute(300, () => d1.Dispose());
  83. scheduler.Start();
  84. o.Messages.AssertEqual(
  85. OnNext(210, 3),
  86. OnNext(240, 4),
  87. OnNext(270, 5)
  88. );
  89. xs.Subscriptions.AssertEqual(
  90. Subscribe(100, 390)
  91. );
  92. }
  93. [Fact]
  94. public void Multicast_Hot_3()
  95. {
  96. var scheduler = new TestScheduler();
  97. var s = new Subject<int>();
  98. var xs = scheduler.CreateHotObservable(
  99. OnNext(40, 0),
  100. OnNext(90, 1),
  101. OnNext(150, 2),
  102. OnNext(210, 3),
  103. OnNext(240, 4),
  104. OnNext(270, 5),
  105. OnNext(330, 6),
  106. OnNext(340, 7),
  107. OnCompleted<int>(390)
  108. );
  109. var c = default(IConnectableObservable<int>);
  110. var o = scheduler.CreateObserver<int>();
  111. var d1 = default(IDisposable);
  112. var d2 = default(IDisposable);
  113. scheduler.ScheduleAbsolute(50, () => c = xs.Multicast(s));
  114. scheduler.ScheduleAbsolute(100, () => d2 = c.Connect());
  115. scheduler.ScheduleAbsolute(200, () => d1 = c.Subscribe(o));
  116. scheduler.ScheduleAbsolute(300, () => d2.Dispose());
  117. scheduler.ScheduleAbsolute(335, () => d2 = c.Connect());
  118. scheduler.Start();
  119. o.Messages.AssertEqual(
  120. OnNext(210, 3),
  121. OnNext(240, 4),
  122. OnNext(270, 5),
  123. OnNext(340, 7),
  124. OnCompleted<int>(390)
  125. );
  126. xs.Subscriptions.AssertEqual(
  127. Subscribe(100, 300),
  128. Subscribe(335, 390)
  129. );
  130. }
  131. [Fact]
  132. public void Multicast_Hot_4()
  133. {
  134. var scheduler = new TestScheduler();
  135. var s = new Subject<int>();
  136. var ex = new Exception();
  137. var xs = scheduler.CreateHotObservable(
  138. OnNext(40, 0),
  139. OnNext(90, 1),
  140. OnNext(150, 2),
  141. OnNext(210, 3),
  142. OnNext(240, 4),
  143. OnNext(270, 5),
  144. OnNext(330, 6),
  145. OnNext(340, 7),
  146. OnError<int>(390, ex)
  147. );
  148. var c = default(IConnectableObservable<int>);
  149. var o = scheduler.CreateObserver<int>();
  150. var d1 = default(IDisposable);
  151. var d2 = default(IDisposable);
  152. scheduler.ScheduleAbsolute(50, () => c = xs.Multicast(s));
  153. scheduler.ScheduleAbsolute(100, () => d2 = c.Connect());
  154. scheduler.ScheduleAbsolute(200, () => d1 = c.Subscribe(o));
  155. scheduler.ScheduleAbsolute(300, () => d2.Dispose());
  156. scheduler.ScheduleAbsolute(335, () => d2 = c.Connect());
  157. scheduler.Start();
  158. o.Messages.AssertEqual(
  159. OnNext(210, 3),
  160. OnNext(240, 4),
  161. OnNext(270, 5),
  162. OnNext(340, 7),
  163. OnError<int>(390, ex)
  164. );
  165. xs.Subscriptions.AssertEqual(
  166. Subscribe(100, 300),
  167. Subscribe(335, 390)
  168. );
  169. }
  170. [Fact]
  171. public void Multicast_Hot_5()
  172. {
  173. var scheduler = new TestScheduler();
  174. var s = new Subject<int>();
  175. var ex = new Exception();
  176. var xs = scheduler.CreateHotObservable(
  177. OnNext(40, 0),
  178. OnNext(90, 1),
  179. OnNext(150, 2),
  180. OnNext(210, 3),
  181. OnNext(240, 4),
  182. OnNext(270, 5),
  183. OnNext(330, 6),
  184. OnNext(340, 7),
  185. OnError<int>(390, ex)
  186. );
  187. var c = default(IConnectableObservable<int>);
  188. var o = scheduler.CreateObserver<int>();
  189. var d1 = default(IDisposable);
  190. var d2 = default(IDisposable);
  191. scheduler.ScheduleAbsolute(50, () => c = xs.Multicast(s));
  192. scheduler.ScheduleAbsolute(100, () => d2 = c.Connect());
  193. scheduler.ScheduleAbsolute(400, () => d1 = c.Subscribe(o));
  194. scheduler.Start();
  195. o.Messages.AssertEqual(
  196. OnError<int>(400, ex)
  197. );
  198. xs.Subscriptions.AssertEqual(
  199. Subscribe(100, 390)
  200. );
  201. }
  202. [Fact]
  203. public void Multicast_Hot_6()
  204. {
  205. var scheduler = new TestScheduler();
  206. var s = new Subject<int>();
  207. var xs = scheduler.CreateHotObservable(
  208. OnNext(40, 0),
  209. OnNext(90, 1),
  210. OnNext(150, 2),
  211. OnNext(210, 3),
  212. OnNext(240, 4),
  213. OnNext(270, 5),
  214. OnNext(330, 6),
  215. OnNext(340, 7),
  216. OnCompleted<int>(390)
  217. );
  218. var c = default(IConnectableObservable<int>);
  219. var o = scheduler.CreateObserver<int>();
  220. var d1 = default(IDisposable);
  221. var d2 = default(IDisposable);
  222. scheduler.ScheduleAbsolute(50, () => c = xs.Multicast(s));
  223. scheduler.ScheduleAbsolute(100, () => d2 = c.Connect());
  224. scheduler.ScheduleAbsolute(400, () => d1 = c.Subscribe(o));
  225. scheduler.Start();
  226. o.Messages.AssertEqual(
  227. OnCompleted<int>(400)
  228. );
  229. xs.Subscriptions.AssertEqual(
  230. Subscribe(100, 390)
  231. );
  232. }
  233. [Fact]
  234. public void Multicast_Cold_Completed()
  235. {
  236. var scheduler = new TestScheduler();
  237. var xs = scheduler.CreateHotObservable(
  238. OnNext(40, 0),
  239. OnNext(90, 1),
  240. OnNext(150, 2),
  241. OnNext(210, 3),
  242. OnNext(240, 4),
  243. OnNext(270, 5),
  244. OnNext(330, 6),
  245. OnNext(340, 7),
  246. OnCompleted<int>(390)
  247. );
  248. var res = scheduler.Start(() =>
  249. xs.Multicast(() => new Subject<int>(), ys => ys)
  250. );
  251. res.Messages.AssertEqual(
  252. OnNext(210, 3),
  253. OnNext(240, 4),
  254. OnNext(270, 5),
  255. OnNext(330, 6),
  256. OnNext(340, 7),
  257. OnCompleted<int>(390)
  258. );
  259. xs.Subscriptions.AssertEqual(
  260. Subscribe(200, 390)
  261. );
  262. }
  263. [Fact]
  264. public void Multicast_Cold_Error()
  265. {
  266. var scheduler = new TestScheduler();
  267. var ex = new Exception();
  268. var xs = scheduler.CreateHotObservable(
  269. OnNext(40, 0),
  270. OnNext(90, 1),
  271. OnNext(150, 2),
  272. OnNext(210, 3),
  273. OnNext(240, 4),
  274. OnNext(270, 5),
  275. OnNext(330, 6),
  276. OnNext(340, 7),
  277. OnError<int>(390, ex)
  278. );
  279. var res = scheduler.Start(() =>
  280. xs.Multicast(() => new Subject<int>(), ys => ys)
  281. );
  282. res.Messages.AssertEqual(
  283. OnNext(210, 3),
  284. OnNext(240, 4),
  285. OnNext(270, 5),
  286. OnNext(330, 6),
  287. OnNext(340, 7),
  288. OnError<int>(390, ex)
  289. );
  290. xs.Subscriptions.AssertEqual(
  291. Subscribe(200, 390)
  292. );
  293. }
  294. [Fact]
  295. public void Multicast_Cold_Dispose()
  296. {
  297. var scheduler = new TestScheduler();
  298. var xs = scheduler.CreateHotObservable(
  299. OnNext(40, 0),
  300. OnNext(90, 1),
  301. OnNext(150, 2),
  302. OnNext(210, 3),
  303. OnNext(240, 4),
  304. OnNext(270, 5),
  305. OnNext(330, 6),
  306. OnNext(340, 7)
  307. );
  308. var res = scheduler.Start(() =>
  309. xs.Multicast(() => new Subject<int>(), ys => ys)
  310. );
  311. res.Messages.AssertEqual(
  312. OnNext(210, 3),
  313. OnNext(240, 4),
  314. OnNext(270, 5),
  315. OnNext(330, 6),
  316. OnNext(340, 7)
  317. );
  318. xs.Subscriptions.AssertEqual(
  319. Subscribe(200, 1000)
  320. );
  321. }
  322. [Fact]
  323. public void Multicast_Cold_Zip()
  324. {
  325. var scheduler = new TestScheduler();
  326. var xs = scheduler.CreateHotObservable(
  327. OnNext(40, 0),
  328. OnNext(90, 1),
  329. OnNext(150, 2),
  330. OnNext(210, 3),
  331. OnNext(240, 4),
  332. OnNext(270, 5),
  333. OnNext(330, 6),
  334. OnNext(340, 7),
  335. OnCompleted<int>(390)
  336. );
  337. var res = scheduler.Start(() =>
  338. xs.Multicast(() => new Subject<int>(), ys => ys.Zip(ys, (a, b) => a + b))
  339. );
  340. res.Messages.AssertEqual(
  341. OnNext(210, 6),
  342. OnNext(240, 8),
  343. OnNext(270, 10),
  344. OnNext(330, 12),
  345. OnNext(340, 14),
  346. OnCompleted<int>(390)
  347. );
  348. xs.Subscriptions.AssertEqual(
  349. Subscribe(200, 390)
  350. );
  351. }
  352. [Fact]
  353. public void Multicast_SubjectSelectorThrows()
  354. {
  355. var ex = new Exception();
  356. var scheduler = new TestScheduler();
  357. var xs = scheduler.CreateHotObservable(
  358. OnNext(210, 1),
  359. OnNext(240, 2),
  360. OnCompleted<int>(300)
  361. );
  362. var res = scheduler.Start(() =>
  363. xs.Multicast<int, int, int>(() => { throw ex; }, _ => _)
  364. );
  365. res.Messages.AssertEqual(
  366. OnError<int>(200, ex)
  367. );
  368. xs.Subscriptions.AssertEqual(
  369. );
  370. }
  371. [Fact]
  372. public void Multicast_SelectorThrows()
  373. {
  374. var ex = new Exception();
  375. var scheduler = new TestScheduler();
  376. var xs = scheduler.CreateHotObservable(
  377. OnNext(210, 1),
  378. OnNext(240, 2),
  379. OnCompleted<int>(300)
  380. );
  381. var res = scheduler.Start(() =>
  382. xs.Multicast<int, int, int>(() => new Subject<int>(), _ => { throw ex; })
  383. );
  384. res.Messages.AssertEqual(
  385. OnError<int>(200, ex)
  386. );
  387. xs.Subscriptions.AssertEqual(
  388. );
  389. }
  390. }
  391. }