RefCountTest.cs 18 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Reactive;
  7. using System.Reactive.Concurrency;
  8. using System.Reactive.Disposables;
  9. using System.Reactive.Linq;
  10. using System.Reactive.Subjects;
  11. using Microsoft.Reactive.Testing;
  12. using Xunit;
  13. namespace ReactiveTests.Tests
  14. {
  15. public class RefCountTest : ReactiveTest
  16. {
  17. private sealed class DematerializingConnectableObservable<T> : IConnectableObservable<T>
  18. {
  19. private readonly IConnectableObservable<Notification<T>> _subject;
  20. public DematerializingConnectableObservable(IConnectableObservable<Notification<T>> subject)
  21. {
  22. _subject = subject;
  23. }
  24. public IDisposable Subscribe(IObserver<T> observer)
  25. {
  26. return _subject.Dematerialize().Subscribe(observer);
  27. }
  28. public IDisposable Connect()
  29. {
  30. return _subject.Connect();
  31. }
  32. }
  33. [Fact]
  34. public void RefCount_ArgumentChecking()
  35. {
  36. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.RefCount<int>(null));
  37. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.RefCount<int>(null, 2));
  38. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.RefCount(Observable.Never<int>().Publish(), 0));
  39. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.RefCount(Observable.Never<int>().Publish(), -1));
  40. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.RefCount(Observable.Never<int>().Publish(), -2));
  41. }
  42. [Fact]
  43. public void RefCount_ConnectsOnFirst()
  44. {
  45. var scheduler = new TestScheduler();
  46. var xs = scheduler.CreateHotObservable(
  47. OnNext(210, 1),
  48. OnNext(220, 2),
  49. OnNext(230, 3),
  50. OnNext(240, 4),
  51. OnCompleted<int>(250)
  52. );
  53. var subject = new MySubject();
  54. var conn = new ConnectableObservable<int>(xs, subject);
  55. var res = scheduler.Start(() =>
  56. conn.RefCount()
  57. );
  58. res.Messages.AssertEqual(
  59. OnNext(210, 1),
  60. OnNext(220, 2),
  61. OnNext(230, 3),
  62. OnNext(240, 4),
  63. OnCompleted<int>(250)
  64. );
  65. Assert.True(subject.Disposed);
  66. }
  67. [Fact]
  68. public void RefCount_NotConnected()
  69. {
  70. var disconnected = false;
  71. var count = 0;
  72. var xs = Observable.Defer(() =>
  73. {
  74. count++;
  75. return Observable.Create<int>(obs =>
  76. {
  77. return () => { disconnected = true; };
  78. });
  79. });
  80. var subject = new MySubject();
  81. var conn = new ConnectableObservable<int>(xs, subject);
  82. var refd = conn.RefCount();
  83. var dis1 = refd.Subscribe();
  84. Assert.Equal(1, count);
  85. Assert.Equal(1, subject.SubscribeCount);
  86. Assert.False(disconnected);
  87. var dis2 = refd.Subscribe();
  88. Assert.Equal(1, count);
  89. Assert.Equal(2, subject.SubscribeCount);
  90. Assert.False(disconnected);
  91. dis1.Dispose();
  92. Assert.False(disconnected);
  93. dis2.Dispose();
  94. Assert.True(disconnected);
  95. disconnected = false;
  96. var dis3 = refd.Subscribe();
  97. Assert.Equal(2, count);
  98. Assert.Equal(3, subject.SubscribeCount);
  99. Assert.False(disconnected);
  100. dis3.Dispose();
  101. Assert.True(disconnected);
  102. }
  103. [Fact]
  104. public void RefCount_OnError()
  105. {
  106. var ex = new Exception();
  107. var xs = Observable.Throw<int>(ex, Scheduler.Immediate);
  108. var res = xs.Publish().RefCount();
  109. res.Subscribe(_ => { Assert.True(false); }, ex_ => { Assert.Same(ex, ex_); }, () => { Assert.True(false); });
  110. res.Subscribe(_ => { Assert.True(false); }, ex_ => { Assert.Same(ex, ex_); }, () => { Assert.True(false); });
  111. }
  112. [Fact]
  113. public void RefCount_Publish()
  114. {
  115. var scheduler = new TestScheduler();
  116. var xs = scheduler.CreateHotObservable(
  117. OnNext(210, 1),
  118. OnNext(220, 2),
  119. OnNext(230, 3),
  120. OnNext(240, 4),
  121. OnNext(250, 5),
  122. OnNext(260, 6),
  123. OnNext(270, 7),
  124. OnNext(280, 8),
  125. OnNext(290, 9),
  126. OnCompleted<int>(300)
  127. );
  128. var res = xs.Publish().RefCount();
  129. var d1 = default(IDisposable);
  130. var o1 = scheduler.CreateObserver<int>();
  131. scheduler.ScheduleAbsolute(215, () => { d1 = res.Subscribe(o1); });
  132. scheduler.ScheduleAbsolute(235, () => { d1.Dispose(); });
  133. var d2 = default(IDisposable);
  134. var o2 = scheduler.CreateObserver<int>();
  135. scheduler.ScheduleAbsolute(225, () => { d2 = res.Subscribe(o2); });
  136. scheduler.ScheduleAbsolute(275, () => { d2.Dispose(); });
  137. var d3 = default(IDisposable);
  138. var o3 = scheduler.CreateObserver<int>();
  139. scheduler.ScheduleAbsolute(255, () => { d3 = res.Subscribe(o3); });
  140. scheduler.ScheduleAbsolute(265, () => { d3.Dispose(); });
  141. var d4 = default(IDisposable);
  142. var o4 = scheduler.CreateObserver<int>();
  143. scheduler.ScheduleAbsolute(285, () => { d4 = res.Subscribe(o4); });
  144. scheduler.ScheduleAbsolute(320, () => { d4.Dispose(); });
  145. scheduler.Start();
  146. o1.Messages.AssertEqual(
  147. OnNext(220, 2),
  148. OnNext(230, 3)
  149. );
  150. o2.Messages.AssertEqual(
  151. OnNext(230, 3),
  152. OnNext(240, 4),
  153. OnNext(250, 5),
  154. OnNext(260, 6),
  155. OnNext(270, 7)
  156. );
  157. o3.Messages.AssertEqual(
  158. OnNext(260, 6)
  159. );
  160. o4.Messages.AssertEqual(
  161. OnNext(290, 9),
  162. OnCompleted<int>(300)
  163. );
  164. xs.Subscriptions.AssertEqual(
  165. Subscribe(215, 275),
  166. Subscribe(285, 300)
  167. );
  168. }
  169. [Fact]
  170. public void RefCount_can_connect_again_if_previous_subscription_terminated_synchronously()
  171. {
  172. var seen = 0;
  173. var terminated = false;
  174. var subject = new ReplaySubject<Notification<int>>(1);
  175. var connectable = new DematerializingConnectableObservable<int>(subject.Publish());
  176. var refCount = connectable.RefCount();
  177. subject.OnNext(Notification.CreateOnNext(36));
  178. using (refCount.Subscribe(value => seen = value, () => terminated = true))
  179. {
  180. Assert.Equal(36, seen);
  181. }
  182. seen = 0;
  183. terminated = false;
  184. subject.OnNext(Notification.CreateOnCompleted<int>());
  185. using (refCount.Subscribe(value => seen = value, () => terminated = true))
  186. {
  187. Assert.Equal(0, seen);
  188. Assert.True(terminated);
  189. }
  190. seen = 0;
  191. terminated = false;
  192. subject.OnNext(Notification.CreateOnNext(36));
  193. using (refCount.Subscribe(value => seen = value, () => terminated = true))
  194. {
  195. Assert.Equal(36, seen);
  196. Assert.False(terminated);
  197. }
  198. }
  199. [Fact]
  200. public void LazyRefCount_ArgumentChecking()
  201. {
  202. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.RefCount<int>(null, TimeSpan.FromSeconds(2)));
  203. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.RefCount<int>(null, TimeSpan.FromSeconds(2), Scheduler.Default));
  204. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.RefCount<int>(null, 2, TimeSpan.FromSeconds(2)));
  205. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.RefCount<int>(null, 2, TimeSpan.FromSeconds(2)));
  206. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.RefCount(Observable.Never<int>().Publish(), TimeSpan.FromSeconds(2), null));
  207. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.RefCount(Observable.Never<int>().Publish(), 0, TimeSpan.FromSeconds(2)));
  208. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.RefCount(Observable.Never<int>().Publish(), -1, TimeSpan.FromSeconds(2)));
  209. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.RefCount(Observable.Never<int>().Publish(), 2, TimeSpan.FromSeconds(2), null));
  210. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.RefCount(Observable.Never<int>().Publish(), 0, TimeSpan.FromSeconds(2), Scheduler.Default));
  211. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.RefCount(Observable.Never<int>().Publish(), -1, TimeSpan.FromSeconds(2), Scheduler.Default));
  212. }
  213. [Fact]
  214. public void LazyRefCount_ConnectsOnFirst()
  215. {
  216. var scheduler = new TestScheduler();
  217. var xs = scheduler.CreateHotObservable(
  218. OnNext(210, 1),
  219. OnNext(220, 2),
  220. OnNext(230, 3),
  221. OnNext(240, 4),
  222. OnCompleted<int>(250)
  223. );
  224. var subject = new MySubject();
  225. var conn = new ConnectableObservable<int>(xs, subject);
  226. var res = scheduler.Start(() =>
  227. conn.RefCount(TimeSpan.FromSeconds(2))
  228. );
  229. res.Messages.AssertEqual(
  230. OnNext(210, 1),
  231. OnNext(220, 2),
  232. OnNext(230, 3),
  233. OnNext(240, 4),
  234. OnCompleted<int>(250)
  235. );
  236. Assert.True(subject.Disposed);
  237. }
  238. [Fact]
  239. public void LazyRefCount_NotConnected()
  240. {
  241. var scheduler = new TestScheduler();
  242. var disconnected = false;
  243. var count = 0;
  244. var xs = Observable.Defer(() =>
  245. {
  246. count++;
  247. return Observable.Create<int>(obs =>
  248. {
  249. return () => { disconnected = true; };
  250. });
  251. });
  252. var subject = new MySubject();
  253. var conn = new ConnectableObservable<int>(xs, subject);
  254. var refd = conn.RefCount(TimeSpan.FromTicks(20), scheduler);
  255. var dis1 = refd.Subscribe();
  256. Assert.Equal(1, count);
  257. Assert.Equal(1, subject.SubscribeCount);
  258. Assert.False(disconnected);
  259. var dis2 = refd.Subscribe();
  260. Assert.Equal(1, count);
  261. Assert.Equal(2, subject.SubscribeCount);
  262. Assert.False(disconnected);
  263. dis1.Dispose();
  264. Assert.False(disconnected);
  265. dis2.Dispose();
  266. Assert.False(disconnected);
  267. scheduler.AdvanceBy(19);
  268. Assert.False(disconnected);
  269. scheduler.AdvanceBy(1);
  270. Assert.True(disconnected);
  271. disconnected = false;
  272. var dis3 = refd.Subscribe();
  273. Assert.Equal(2, count);
  274. Assert.Equal(3, subject.SubscribeCount);
  275. Assert.False(disconnected);
  276. dis3.Dispose();
  277. scheduler.AdvanceBy(20);
  278. Assert.True(disconnected);
  279. }
  280. [Fact]
  281. public void LazyRefCount_OnError()
  282. {
  283. var ex = new Exception();
  284. var xs = Observable.Throw<int>(ex, Scheduler.Immediate);
  285. var res = xs.Publish().RefCount(TimeSpan.FromSeconds(2));
  286. res.Subscribe(_ => throw new Exception(), ex_ => { Assert.Same(ex, ex_); }, () => throw new Exception());
  287. res.Subscribe(_ => throw new Exception(), ex_ => { Assert.Same(ex, ex_); }, () => throw new Exception());
  288. }
  289. [Fact]
  290. public void LazyRefCount_Publish()
  291. {
  292. var scheduler = new TestScheduler();
  293. var xs = scheduler.CreateHotObservable(
  294. OnNext(210, 1),
  295. OnNext(220, 2),
  296. OnNext(230, 3),
  297. OnNext(240, 4),
  298. OnNext(250, 5),
  299. OnNext(260, 6),
  300. OnNext(270, 7),
  301. OnNext(280, 8),
  302. OnNext(290, 9),
  303. OnCompleted<int>(300)
  304. );
  305. var res = xs.Publish().RefCount(TimeSpan.FromTicks(9), scheduler);
  306. var d1 = default(IDisposable);
  307. var o1 = scheduler.CreateObserver<int>();
  308. scheduler.ScheduleAbsolute(215, () => { d1 = res.Subscribe(o1); });
  309. scheduler.ScheduleAbsolute(235, () => { d1.Dispose(); });
  310. var d2 = default(IDisposable);
  311. var o2 = scheduler.CreateObserver<int>();
  312. scheduler.ScheduleAbsolute(225, () => { d2 = res.Subscribe(o2); });
  313. scheduler.ScheduleAbsolute(275, () =>
  314. {
  315. d2.Dispose();
  316. });
  317. var d3 = default(IDisposable);
  318. var o3 = scheduler.CreateObserver<int>();
  319. scheduler.ScheduleAbsolute(255, () => { d3 = res.Subscribe(o3); });
  320. scheduler.ScheduleAbsolute(265, () => { d3.Dispose(); });
  321. var d4 = default(IDisposable);
  322. var o4 = scheduler.CreateObserver<int>();
  323. scheduler.ScheduleAbsolute(285, () => { d4 = res.Subscribe(o4); });
  324. scheduler.ScheduleAbsolute(320, () => { d4.Dispose(); });
  325. scheduler.Start();
  326. o1.Messages.AssertEqual(
  327. OnNext(220, 2),
  328. OnNext(230, 3)
  329. );
  330. o2.Messages.AssertEqual(
  331. OnNext(230, 3),
  332. OnNext(240, 4),
  333. OnNext(250, 5),
  334. OnNext(260, 6),
  335. OnNext(270, 7)
  336. );
  337. o3.Messages.AssertEqual(
  338. OnNext(260, 6)
  339. );
  340. o4.Messages.AssertEqual(
  341. OnNext(290, 9),
  342. OnCompleted<int>(300)
  343. );
  344. xs.Subscriptions.AssertEqual(
  345. Subscribe(215, 284),
  346. Subscribe(285, 300)
  347. );
  348. }
  349. [Fact]
  350. public void RefCount_source_already_completed_synchronously()
  351. {
  352. var subscribed = 0;
  353. var unsubscribed = 0;
  354. var o1 = Observable.Create<string>(observer =>
  355. {
  356. subscribed++;
  357. observer.OnCompleted();
  358. return Disposable.Create(() => unsubscribed++);
  359. });
  360. var o2 = o1.Publish().RefCount();
  361. var s1 = o2.Subscribe();
  362. Assert.Equal(1, subscribed);
  363. Assert.Equal(1, unsubscribed);
  364. var s2 = o2.Subscribe();
  365. Assert.Equal(1, subscribed);
  366. Assert.Equal(1, unsubscribed);
  367. }
  368. [Fact]
  369. public void RefCount_minObservers_not_connected_Eager()
  370. {
  371. int connected = 0;
  372. var source = Observable.Defer(() =>
  373. {
  374. connected++;
  375. return Observable.Never<int>();
  376. })
  377. .Publish()
  378. .RefCount(2);
  379. Assert.Equal(0, connected);
  380. source.Subscribe();
  381. Assert.Equal(0, connected);
  382. }
  383. [Fact]
  384. public void RefCount_minObservers_connected_Eager()
  385. {
  386. var connected = 0;
  387. var source = Observable.Defer(() =>
  388. {
  389. connected++;
  390. return Observable.Range(1, 5);
  391. })
  392. .Publish()
  393. .RefCount(2);
  394. Assert.Equal(0, connected);
  395. var list1 = new List<int>();
  396. source.Subscribe(list1.Add);
  397. Assert.Equal(0, connected);
  398. Assert.Empty(list1);
  399. var list2 = new List<int>();
  400. source.Subscribe(list2.Add);
  401. Assert.Equal(1, connected);
  402. var expected = new List<int>(new[] { 1, 2, 3, 4, 5 });
  403. Assert.Equal(expected, list1);
  404. Assert.Equal(expected, list2);
  405. }
  406. [Fact]
  407. public void RefCount_minObservers_not_connected_Lazy()
  408. {
  409. int connected = 0;
  410. var source = Observable.Defer(() =>
  411. {
  412. connected++;
  413. return Observable.Never<int>();
  414. })
  415. .Publish()
  416. .RefCount(2, TimeSpan.FromMinutes(1));
  417. Assert.Equal(0, connected);
  418. source.Subscribe();
  419. Assert.Equal(0, connected);
  420. }
  421. [Fact]
  422. public void RefCount_minObservers_connected_Lazy()
  423. {
  424. var connected = 0;
  425. var source = Observable.Defer(() =>
  426. {
  427. connected++;
  428. return Observable.Range(1, 5);
  429. })
  430. .Publish()
  431. .RefCount(2, TimeSpan.FromMinutes(1));
  432. Assert.Equal(0, connected);
  433. var list1 = new List<int>();
  434. source.Subscribe(list1.Add);
  435. Assert.Equal(0, connected);
  436. Assert.Empty(list1);
  437. var list2 = new List<int>();
  438. source.Subscribe(list2.Add);
  439. Assert.Equal(1, connected);
  440. var expected = new List<int>(new[] { 1, 2, 3, 4, 5 });
  441. Assert.Equal(expected, list1);
  442. Assert.Equal(expected, list2);
  443. }
  444. }
  445. }