SubjectTest.cs 20 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Linq;
  6. using System.Reactive;
  7. using System.Reactive.Concurrency;
  8. using System.Reactive.Linq;
  9. using System.Reactive.Subjects;
  10. using System.Threading;
  11. using Microsoft.Reactive.Testing;
  12. using Microsoft.VisualStudio.TestTools.UnitTesting;
  13. using Assert = Xunit.Assert;
  14. namespace ReactiveTests.Tests
  15. {
  16. [TestClass]
  17. public partial class SubjectTest : ReactiveTest
  18. {
  19. [TestMethod]
  20. public void Subscribe_ArgumentChecking()
  21. {
  22. ReactiveAssert.Throws<ArgumentNullException>(() => new Subject<int>().Subscribe(null));
  23. }
  24. [TestMethod]
  25. public void OnError_ArgumentChecking()
  26. {
  27. ReactiveAssert.Throws<ArgumentNullException>(() => new Subject<int>().OnError(null));
  28. }
  29. [TestMethod]
  30. public void Infinite()
  31. {
  32. var scheduler = new TestScheduler();
  33. var xs = scheduler.CreateHotObservable(
  34. OnNext(70, 1),
  35. OnNext(110, 2),
  36. OnNext(220, 3),
  37. OnNext(270, 4),
  38. OnNext(340, 5),
  39. OnNext(410, 6),
  40. OnNext(520, 7),
  41. OnNext(630, 8),
  42. OnNext(710, 9),
  43. OnNext(870, 10),
  44. OnNext(940, 11),
  45. OnNext(1020, 12)
  46. );
  47. var s = default(Subject<int>);
  48. var subscription = default(IDisposable);
  49. var results1 = scheduler.CreateObserver<int>();
  50. var subscription1 = default(IDisposable);
  51. var results2 = scheduler.CreateObserver<int>();
  52. var subscription2 = default(IDisposable);
  53. var results3 = scheduler.CreateObserver<int>();
  54. var subscription3 = default(IDisposable);
  55. scheduler.ScheduleAbsolute(100, () => s = new Subject<int>());
  56. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(s));
  57. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  58. scheduler.ScheduleAbsolute(300, () => subscription1 = s.Subscribe(results1));
  59. scheduler.ScheduleAbsolute(400, () => subscription2 = s.Subscribe(results2));
  60. scheduler.ScheduleAbsolute(900, () => subscription3 = s.Subscribe(results3));
  61. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  62. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  63. scheduler.ScheduleAbsolute(800, () => subscription1.Dispose());
  64. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  65. scheduler.Start();
  66. results1.Messages.AssertEqual(
  67. OnNext(340, 5),
  68. OnNext(410, 6),
  69. OnNext(520, 7)
  70. );
  71. results2.Messages.AssertEqual(
  72. OnNext(410, 6),
  73. OnNext(520, 7),
  74. OnNext(630, 8)
  75. );
  76. results3.Messages.AssertEqual(
  77. OnNext(940, 11)
  78. );
  79. }
  80. [TestMethod]
  81. public void Finite()
  82. {
  83. var scheduler = new TestScheduler();
  84. var xs = scheduler.CreateHotObservable(
  85. OnNext(70, 1),
  86. OnNext(110, 2),
  87. OnNext(220, 3),
  88. OnNext(270, 4),
  89. OnNext(340, 5),
  90. OnNext(410, 6),
  91. OnNext(520, 7),
  92. OnCompleted<int>(630),
  93. OnNext(640, 9),
  94. OnCompleted<int>(650),
  95. OnError<int>(660, new Exception())
  96. );
  97. var s = default(Subject<int>);
  98. var subscription = default(IDisposable);
  99. var results1 = scheduler.CreateObserver<int>();
  100. var subscription1 = default(IDisposable);
  101. var results2 = scheduler.CreateObserver<int>();
  102. var subscription2 = default(IDisposable);
  103. var results3 = scheduler.CreateObserver<int>();
  104. var subscription3 = default(IDisposable);
  105. scheduler.ScheduleAbsolute(100, () => s = new Subject<int>());
  106. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(s));
  107. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  108. scheduler.ScheduleAbsolute(300, () => subscription1 = s.Subscribe(results1));
  109. scheduler.ScheduleAbsolute(400, () => subscription2 = s.Subscribe(results2));
  110. scheduler.ScheduleAbsolute(900, () => subscription3 = s.Subscribe(results3));
  111. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  112. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  113. scheduler.ScheduleAbsolute(800, () => subscription1.Dispose());
  114. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  115. scheduler.Start();
  116. results1.Messages.AssertEqual(
  117. OnNext(340, 5),
  118. OnNext(410, 6),
  119. OnNext(520, 7)
  120. );
  121. results2.Messages.AssertEqual(
  122. OnNext(410, 6),
  123. OnNext(520, 7),
  124. OnCompleted<int>(630)
  125. );
  126. results3.Messages.AssertEqual(
  127. OnCompleted<int>(900)
  128. );
  129. }
  130. [TestMethod]
  131. public void Error()
  132. {
  133. var scheduler = new TestScheduler();
  134. var ex = new Exception();
  135. var xs = scheduler.CreateHotObservable(
  136. OnNext(70, 1),
  137. OnNext(110, 2),
  138. OnNext(220, 3),
  139. OnNext(270, 4),
  140. OnNext(340, 5),
  141. OnNext(410, 6),
  142. OnNext(520, 7),
  143. OnError<int>(630, ex),
  144. OnNext(640, 9),
  145. OnCompleted<int>(650),
  146. OnError<int>(660, new Exception())
  147. );
  148. var s = default(Subject<int>);
  149. var subscription = default(IDisposable);
  150. var results1 = scheduler.CreateObserver<int>();
  151. var subscription1 = default(IDisposable);
  152. var results2 = scheduler.CreateObserver<int>();
  153. var subscription2 = default(IDisposable);
  154. var results3 = scheduler.CreateObserver<int>();
  155. var subscription3 = default(IDisposable);
  156. scheduler.ScheduleAbsolute(100, () => s = new Subject<int>());
  157. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(s));
  158. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  159. scheduler.ScheduleAbsolute(300, () => subscription1 = s.Subscribe(results1));
  160. scheduler.ScheduleAbsolute(400, () => subscription2 = s.Subscribe(results2));
  161. scheduler.ScheduleAbsolute(900, () => subscription3 = s.Subscribe(results3));
  162. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  163. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  164. scheduler.ScheduleAbsolute(800, () => subscription1.Dispose());
  165. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  166. scheduler.Start();
  167. results1.Messages.AssertEqual(
  168. OnNext(340, 5),
  169. OnNext(410, 6),
  170. OnNext(520, 7)
  171. );
  172. results2.Messages.AssertEqual(
  173. OnNext(410, 6),
  174. OnNext(520, 7),
  175. OnError<int>(630, ex)
  176. );
  177. results3.Messages.AssertEqual(
  178. OnError<int>(900, ex)
  179. );
  180. }
  181. [TestMethod]
  182. public void Canceled()
  183. {
  184. var scheduler = new TestScheduler();
  185. var xs = scheduler.CreateHotObservable(
  186. OnCompleted<int>(630),
  187. OnNext(640, 9),
  188. OnCompleted<int>(650),
  189. OnError<int>(660, new Exception())
  190. );
  191. var s = default(Subject<int>);
  192. var subscription = default(IDisposable);
  193. var results1 = scheduler.CreateObserver<int>();
  194. var subscription1 = default(IDisposable);
  195. var results2 = scheduler.CreateObserver<int>();
  196. var subscription2 = default(IDisposable);
  197. var results3 = scheduler.CreateObserver<int>();
  198. var subscription3 = default(IDisposable);
  199. scheduler.ScheduleAbsolute(100, () => s = new Subject<int>());
  200. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(s));
  201. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  202. scheduler.ScheduleAbsolute(300, () => subscription1 = s.Subscribe(results1));
  203. scheduler.ScheduleAbsolute(400, () => subscription2 = s.Subscribe(results2));
  204. scheduler.ScheduleAbsolute(900, () => subscription3 = s.Subscribe(results3));
  205. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  206. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  207. scheduler.ScheduleAbsolute(800, () => subscription1.Dispose());
  208. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  209. scheduler.Start();
  210. results1.Messages.AssertEqual(
  211. );
  212. results2.Messages.AssertEqual(
  213. OnCompleted<int>(630)
  214. );
  215. results3.Messages.AssertEqual(
  216. OnCompleted<int>(900)
  217. );
  218. }
  219. [TestMethod]
  220. public void Dispose()
  221. {
  222. var scheduler = new TestScheduler();
  223. var s = new Subject<int>();
  224. scheduler.ScheduleAbsolute(300, () => s.OnNext(1));
  225. scheduler.ScheduleAbsolute(998, () => s.OnNext(2));
  226. scheduler.ScheduleAbsolute(999, () => s.OnNext(3));
  227. scheduler.ScheduleAbsolute(1001, () => s.OnNext(3));
  228. var results = scheduler.Start(() => s);
  229. results.Messages.AssertEqual(
  230. OnNext(300, 1),
  231. OnNext(998, 2),
  232. OnNext(999, 3)
  233. );
  234. }
  235. [TestMethod]
  236. public void PreComplete()
  237. {
  238. var scheduler = new TestScheduler();
  239. var s = new Subject<int>();
  240. scheduler.ScheduleAbsolute(90, () => s.OnCompleted());
  241. var results = scheduler.Start(() => s);
  242. results.Messages.AssertEqual(
  243. OnCompleted<int>(200)
  244. );
  245. }
  246. [TestMethod]
  247. public void SubjectDisposed()
  248. {
  249. var scheduler = new TestScheduler();
  250. var subject = default(Subject<int>);
  251. var results1 = scheduler.CreateObserver<int>();
  252. var subscription1 = default(IDisposable);
  253. var results2 = scheduler.CreateObserver<int>();
  254. var subscription2 = default(IDisposable);
  255. var results3 = scheduler.CreateObserver<int>();
  256. var subscription3 = default(IDisposable);
  257. scheduler.ScheduleAbsolute(100, () => subject = new Subject<int>());
  258. scheduler.ScheduleAbsolute(200, () => subscription1 = subject.Subscribe(results1));
  259. scheduler.ScheduleAbsolute(300, () => subscription2 = subject.Subscribe(results2));
  260. scheduler.ScheduleAbsolute(400, () => subscription3 = subject.Subscribe(results3));
  261. scheduler.ScheduleAbsolute(500, () => subscription1.Dispose());
  262. scheduler.ScheduleAbsolute(600, () => subject.Dispose());
  263. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  264. scheduler.ScheduleAbsolute(800, () => subscription3.Dispose());
  265. scheduler.ScheduleAbsolute(150, () => subject.OnNext(1));
  266. scheduler.ScheduleAbsolute(250, () => subject.OnNext(2));
  267. scheduler.ScheduleAbsolute(350, () => subject.OnNext(3));
  268. scheduler.ScheduleAbsolute(450, () => subject.OnNext(4));
  269. scheduler.ScheduleAbsolute(550, () => subject.OnNext(5));
  270. scheduler.ScheduleAbsolute(650, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.OnNext(6)));
  271. scheduler.ScheduleAbsolute(750, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.OnCompleted()));
  272. scheduler.ScheduleAbsolute(850, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.OnError(new Exception())));
  273. scheduler.ScheduleAbsolute(950, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.Subscribe()));
  274. scheduler.Start();
  275. results1.Messages.AssertEqual(
  276. OnNext(250, 2),
  277. OnNext(350, 3),
  278. OnNext(450, 4)
  279. );
  280. results2.Messages.AssertEqual(
  281. OnNext(350, 3),
  282. OnNext(450, 4),
  283. OnNext(550, 5)
  284. );
  285. results3.Messages.AssertEqual(
  286. OnNext(450, 4),
  287. OnNext(550, 5)
  288. );
  289. }
  290. [TestMethod]
  291. public void Subject_Create_ArgumentChecking()
  292. {
  293. ReactiveAssert.Throws<ArgumentNullException>(() => Subject.Create<int, int>(null, Observable.Return(42)));
  294. ReactiveAssert.Throws<ArgumentNullException>(() => Subject.Create<int, int>(Observer.Create<int>(x => { }), null));
  295. ReactiveAssert.Throws<ArgumentNullException>(() => Subject.Create(null, Observable.Return(42)));
  296. ReactiveAssert.Throws<ArgumentNullException>(() => Subject.Create(Observer.Create<int>(x => { }), null));
  297. }
  298. [TestMethod]
  299. public void Subject_Create1()
  300. {
  301. var _x = default(int);
  302. var _ex = default(Exception);
  303. var done = false;
  304. var v = Observer.Create<int>(x => _x = x, ex => _ex = ex, () => done = true);
  305. var o = Observable.Return(42);
  306. var s = Subject.Create<int, int>(v, o);
  307. ReactiveAssert.Throws<ArgumentNullException>(() => s.Subscribe(null));
  308. s.Subscribe(x => _x = x);
  309. Assert.Equal(42, _x);
  310. s.OnNext(21);
  311. Assert.Equal(21, _x);
  312. ReactiveAssert.Throws<ArgumentNullException>(() => s.OnError(null));
  313. var e = new Exception();
  314. s.OnError(e);
  315. Assert.Same(e, _ex);
  316. s.OnCompleted();
  317. Assert.False(done); // already cut off
  318. }
  319. [TestMethod]
  320. public void Subject_Create2()
  321. {
  322. var _x = default(int);
  323. var _ex = default(Exception);
  324. var done = false;
  325. var v = Observer.Create<int>(x => _x = x, ex => _ex = ex, () => done = true);
  326. var o = Observable.Return(42);
  327. var s = Subject.Create<int>(v, o);
  328. ReactiveAssert.Throws<ArgumentNullException>(() => s.Subscribe(null));
  329. s.Subscribe(x => _x = x);
  330. Assert.Equal(42, _x);
  331. s.OnNext(21);
  332. Assert.Equal(21, _x);
  333. ReactiveAssert.Throws<ArgumentNullException>(() => s.OnError(null));
  334. var e = new Exception();
  335. s.OnError(e);
  336. Assert.Same(e, _ex);
  337. s.OnCompleted();
  338. Assert.False(done); // already cut off
  339. }
  340. [TestMethod]
  341. public void Subject_Synchronize_ArgumentChecking()
  342. {
  343. var s = new Subject<int>();
  344. ReactiveAssert.Throws<ArgumentNullException>(() => Subject.Synchronize(default(ISubject<int, int>)));
  345. ReactiveAssert.Throws<ArgumentNullException>(() => Subject.Synchronize(default(ISubject<int, int>), Scheduler.Immediate));
  346. ReactiveAssert.Throws<ArgumentNullException>(() => Subject.Synchronize((ISubject<int, int>)s, null));
  347. ReactiveAssert.Throws<ArgumentNullException>(() => Subject.Synchronize(default(ISubject<int>)));
  348. ReactiveAssert.Throws<ArgumentNullException>(() => Subject.Synchronize(default(ISubject<int>), Scheduler.Immediate));
  349. ReactiveAssert.Throws<ArgumentNullException>(() => Subject.Synchronize(s, null));
  350. }
  351. [TestMethod]
  352. public void Subject_Synchronize1()
  353. {
  354. var N = 10;
  355. var y = 0;
  356. var o = Observer.Create<int>(x => y += x);
  357. var s = Subject.Synchronize(Subject.Create(o, Observable.Empty<string>()));
  358. var e = new ManualResetEvent(false);
  359. var t = Enumerable.Range(0, N).Select(x => new Thread(() => { e.WaitOne(); s.OnNext(x); })).ToArray();
  360. foreach (var u in t)
  361. {
  362. u.Start();
  363. }
  364. e.Set();
  365. foreach (var u in t)
  366. {
  367. u.Join();
  368. }
  369. Assert.Equal(Enumerable.Range(0, N).Sum(), y);
  370. }
  371. [TestMethod]
  372. public void Subject_Synchronize2()
  373. {
  374. var N = 10;
  375. var s = Subject.Synchronize(new Subject<int>());
  376. var y = 0;
  377. var d = s.Subscribe(x => y += x);
  378. var e = new ManualResetEvent(false);
  379. var t = Enumerable.Range(0, N).Select(x => new Thread(() => { e.WaitOne(); s.OnNext(x); })).ToArray();
  380. foreach (var u in t)
  381. {
  382. u.Start();
  383. }
  384. e.Set();
  385. foreach (var u in t)
  386. {
  387. u.Join();
  388. }
  389. Assert.Equal(Enumerable.Range(0, N).Sum(), y);
  390. }
  391. [TestMethod]
  392. public void HasObservers()
  393. {
  394. var s = new Subject<int>();
  395. Assert.False(s.HasObservers);
  396. var d1 = s.Subscribe(_ => { });
  397. Assert.True(s.HasObservers);
  398. d1.Dispose();
  399. Assert.False(s.HasObservers);
  400. var d2 = s.Subscribe(_ => { });
  401. Assert.True(s.HasObservers);
  402. var d3 = s.Subscribe(_ => { });
  403. Assert.True(s.HasObservers);
  404. d2.Dispose();
  405. Assert.True(s.HasObservers);
  406. d3.Dispose();
  407. Assert.False(s.HasObservers);
  408. }
  409. [TestMethod]
  410. public void HasObservers_Dispose1()
  411. {
  412. var s = new Subject<int>();
  413. Assert.False(s.HasObservers);
  414. Assert.False(s.IsDisposed);
  415. var d = s.Subscribe(_ => { });
  416. Assert.True(s.HasObservers);
  417. Assert.False(s.IsDisposed);
  418. s.Dispose();
  419. Assert.False(s.HasObservers);
  420. Assert.True(s.IsDisposed);
  421. d.Dispose();
  422. Assert.False(s.HasObservers);
  423. Assert.True(s.IsDisposed);
  424. }
  425. [TestMethod]
  426. public void HasObservers_Dispose2()
  427. {
  428. var s = new Subject<int>();
  429. Assert.False(s.HasObservers);
  430. Assert.False(s.IsDisposed);
  431. var d = s.Subscribe(_ => { });
  432. Assert.True(s.HasObservers);
  433. Assert.False(s.IsDisposed);
  434. d.Dispose();
  435. Assert.False(s.HasObservers);
  436. Assert.False(s.IsDisposed);
  437. s.Dispose();
  438. Assert.False(s.HasObservers);
  439. Assert.True(s.IsDisposed);
  440. }
  441. [TestMethod]
  442. public void HasObservers_Dispose3()
  443. {
  444. var s = new Subject<int>();
  445. Assert.False(s.HasObservers);
  446. Assert.False(s.IsDisposed);
  447. s.Dispose();
  448. Assert.False(s.HasObservers);
  449. Assert.True(s.IsDisposed);
  450. }
  451. [TestMethod]
  452. public void HasObservers_OnCompleted()
  453. {
  454. var s = new Subject<int>();
  455. Assert.False(s.HasObservers);
  456. var d = s.Subscribe(_ => { });
  457. Assert.True(s.HasObservers);
  458. s.OnNext(42);
  459. Assert.True(s.HasObservers);
  460. s.OnCompleted();
  461. Assert.False(s.HasObservers);
  462. }
  463. [TestMethod]
  464. public void HasObservers_OnError()
  465. {
  466. var s = new Subject<int>();
  467. Assert.False(s.HasObservers);
  468. var d = s.Subscribe(_ => { }, ex => { });
  469. Assert.True(s.HasObservers);
  470. s.OnNext(42);
  471. Assert.True(s.HasObservers);
  472. s.OnError(new Exception());
  473. Assert.False(s.HasObservers);
  474. }
  475. }
  476. }