RefCountTest.cs 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Reactive;
  7. using System.Reactive.Concurrency;
  8. using System.Reactive.Disposables;
  9. using System.Reactive.Linq;
  10. using System.Reactive.Subjects;
  11. using Microsoft.Reactive.Testing;
  12. using Xunit;
  13. namespace ReactiveTests.Tests
  14. {
  15. public class RefCountTest : ReactiveTest
  16. {
  17. private sealed class DematerializingConnectableObservable<T> : IConnectableObservable<T>
  18. {
  19. private readonly IConnectableObservable<Notification<T>> _subject;
  20. public DematerializingConnectableObservable(IConnectableObservable<Notification<T>> subject)
  21. {
  22. _subject = subject;
  23. }
  24. public IDisposable Subscribe(IObserver<T> observer)
  25. {
  26. return _subject.Dematerialize().Subscribe(observer);
  27. }
  28. public IDisposable Connect()
  29. {
  30. return _subject.Connect();
  31. }
  32. }
  33. [Fact]
  34. public void RefCount_ArgumentChecking()
  35. {
  36. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.RefCount<int>(null));
  37. }
  38. [Fact]
  39. public void RefCount_ConnectsOnFirst()
  40. {
  41. var scheduler = new TestScheduler();
  42. var xs = scheduler.CreateHotObservable(
  43. OnNext(210, 1),
  44. OnNext(220, 2),
  45. OnNext(230, 3),
  46. OnNext(240, 4),
  47. OnCompleted<int>(250)
  48. );
  49. var subject = new MySubject();
  50. var conn = new ConnectableObservable<int>(xs, subject);
  51. var res = scheduler.Start(() =>
  52. conn.RefCount()
  53. );
  54. res.Messages.AssertEqual(
  55. OnNext(210, 1),
  56. OnNext(220, 2),
  57. OnNext(230, 3),
  58. OnNext(240, 4),
  59. OnCompleted<int>(250)
  60. );
  61. Assert.True(subject.Disposed);
  62. }
  63. [Fact]
  64. public void RefCount_NotConnected()
  65. {
  66. var disconnected = false;
  67. var count = 0;
  68. var xs = Observable.Defer(() =>
  69. {
  70. count++;
  71. return Observable.Create<int>(obs =>
  72. {
  73. return () => { disconnected = true; };
  74. });
  75. });
  76. var subject = new MySubject();
  77. var conn = new ConnectableObservable<int>(xs, subject);
  78. var refd = conn.RefCount();
  79. var dis1 = refd.Subscribe();
  80. Assert.Equal(1, count);
  81. Assert.Equal(1, subject.SubscribeCount);
  82. Assert.False(disconnected);
  83. var dis2 = refd.Subscribe();
  84. Assert.Equal(1, count);
  85. Assert.Equal(2, subject.SubscribeCount);
  86. Assert.False(disconnected);
  87. dis1.Dispose();
  88. Assert.False(disconnected);
  89. dis2.Dispose();
  90. Assert.True(disconnected);
  91. disconnected = false;
  92. var dis3 = refd.Subscribe();
  93. Assert.Equal(2, count);
  94. Assert.Equal(3, subject.SubscribeCount);
  95. Assert.False(disconnected);
  96. dis3.Dispose();
  97. Assert.True(disconnected);
  98. }
  99. [Fact]
  100. public void RefCount_OnError()
  101. {
  102. var ex = new Exception();
  103. var xs = Observable.Throw<int>(ex, Scheduler.Immediate);
  104. var res = xs.Publish().RefCount();
  105. res.Subscribe(_ => { Assert.True(false); }, ex_ => { Assert.Same(ex, ex_); }, () => { Assert.True(false); });
  106. res.Subscribe(_ => { Assert.True(false); }, ex_ => { Assert.Same(ex, ex_); }, () => { Assert.True(false); });
  107. }
  108. [Fact]
  109. public void RefCount_Publish()
  110. {
  111. var scheduler = new TestScheduler();
  112. var xs = scheduler.CreateHotObservable(
  113. OnNext(210, 1),
  114. OnNext(220, 2),
  115. OnNext(230, 3),
  116. OnNext(240, 4),
  117. OnNext(250, 5),
  118. OnNext(260, 6),
  119. OnNext(270, 7),
  120. OnNext(280, 8),
  121. OnNext(290, 9),
  122. OnCompleted<int>(300)
  123. );
  124. var res = xs.Publish().RefCount();
  125. var d1 = default(IDisposable);
  126. var o1 = scheduler.CreateObserver<int>();
  127. scheduler.ScheduleAbsolute(215, () => { d1 = res.Subscribe(o1); });
  128. scheduler.ScheduleAbsolute(235, () => { d1.Dispose(); });
  129. var d2 = default(IDisposable);
  130. var o2 = scheduler.CreateObserver<int>();
  131. scheduler.ScheduleAbsolute(225, () => { d2 = res.Subscribe(o2); });
  132. scheduler.ScheduleAbsolute(275, () => { d2.Dispose(); });
  133. var d3 = default(IDisposable);
  134. var o3 = scheduler.CreateObserver<int>();
  135. scheduler.ScheduleAbsolute(255, () => { d3 = res.Subscribe(o3); });
  136. scheduler.ScheduleAbsolute(265, () => { d3.Dispose(); });
  137. var d4 = default(IDisposable);
  138. var o4 = scheduler.CreateObserver<int>();
  139. scheduler.ScheduleAbsolute(285, () => { d4 = res.Subscribe(o4); });
  140. scheduler.ScheduleAbsolute(320, () => { d4.Dispose(); });
  141. scheduler.Start();
  142. o1.Messages.AssertEqual(
  143. OnNext(220, 2),
  144. OnNext(230, 3)
  145. );
  146. o2.Messages.AssertEqual(
  147. OnNext(230, 3),
  148. OnNext(240, 4),
  149. OnNext(250, 5),
  150. OnNext(260, 6),
  151. OnNext(270, 7)
  152. );
  153. o3.Messages.AssertEqual(
  154. OnNext(260, 6)
  155. );
  156. o4.Messages.AssertEqual(
  157. OnNext(290, 9),
  158. OnCompleted<int>(300)
  159. );
  160. xs.Subscriptions.AssertEqual(
  161. Subscribe(215, 275),
  162. Subscribe(285, 300)
  163. );
  164. }
  165. [Fact]
  166. public void RefCount_can_connect_again_if_previous_subscription_terminated_synchronously()
  167. {
  168. var seen = 0;
  169. var terminated = false;
  170. var subject = new ReplaySubject<Notification<int>>(1);
  171. var connectable = new DematerializingConnectableObservable<int>(subject.Publish());
  172. var refCount = connectable.RefCount();
  173. subject.OnNext(Notification.CreateOnNext(36));
  174. using (refCount.Subscribe(value => seen = value, () => terminated = true))
  175. {
  176. Assert.Equal(36, seen);
  177. }
  178. seen = 0;
  179. terminated = false;
  180. subject.OnNext(Notification.CreateOnCompleted<int>());
  181. using (refCount.Subscribe(value => seen = value, () => terminated = true))
  182. {
  183. Assert.Equal(0, seen);
  184. Assert.True(terminated);
  185. }
  186. seen = 0;
  187. terminated = false;
  188. subject.OnNext(Notification.CreateOnNext(36));
  189. using (refCount.Subscribe(value => seen = value, () => terminated = true))
  190. {
  191. Assert.Equal(36, seen);
  192. Assert.False(terminated);
  193. }
  194. }
  195. [Fact]
  196. public void LazyRefCount_ArgumentChecking()
  197. {
  198. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.RefCount<int>(null, TimeSpan.FromSeconds(2)));
  199. }
  200. [Fact]
  201. public void LazyRefCount_ConnectsOnFirst()
  202. {
  203. var scheduler = new TestScheduler();
  204. var xs = scheduler.CreateHotObservable(
  205. OnNext(210, 1),
  206. OnNext(220, 2),
  207. OnNext(230, 3),
  208. OnNext(240, 4),
  209. OnCompleted<int>(250)
  210. );
  211. var subject = new MySubject();
  212. var conn = new ConnectableObservable<int>(xs, subject);
  213. var res = scheduler.Start(() =>
  214. conn.RefCount(TimeSpan.FromSeconds(2))
  215. );
  216. res.Messages.AssertEqual(
  217. OnNext(210, 1),
  218. OnNext(220, 2),
  219. OnNext(230, 3),
  220. OnNext(240, 4),
  221. OnCompleted<int>(250)
  222. );
  223. Assert.True(subject.Disposed);
  224. }
  225. [Fact]
  226. public void LazyRefCount_NotConnected()
  227. {
  228. var scheduler = new TestScheduler();
  229. var disconnected = false;
  230. var count = 0;
  231. var xs = Observable.Defer(() =>
  232. {
  233. count++;
  234. return Observable.Create<int>(obs =>
  235. {
  236. return () => { disconnected = true; };
  237. });
  238. });
  239. var subject = new MySubject();
  240. var conn = new ConnectableObservable<int>(xs, subject);
  241. var refd = conn.RefCount(TimeSpan.FromTicks(20), scheduler);
  242. var dis1 = refd.Subscribe();
  243. Assert.Equal(1, count);
  244. Assert.Equal(1, subject.SubscribeCount);
  245. Assert.False(disconnected);
  246. var dis2 = refd.Subscribe();
  247. Assert.Equal(1, count);
  248. Assert.Equal(2, subject.SubscribeCount);
  249. Assert.False(disconnected);
  250. dis1.Dispose();
  251. Assert.False(disconnected);
  252. dis2.Dispose();
  253. Assert.False(disconnected);
  254. scheduler.AdvanceBy(19);
  255. Assert.False(disconnected);
  256. scheduler.AdvanceBy(1);
  257. Assert.True(disconnected);
  258. disconnected = false;
  259. var dis3 = refd.Subscribe();
  260. Assert.Equal(2, count);
  261. Assert.Equal(3, subject.SubscribeCount);
  262. Assert.False(disconnected);
  263. dis3.Dispose();
  264. scheduler.AdvanceBy(20);
  265. Assert.True(disconnected);
  266. }
  267. [Fact]
  268. public void LazyRefCount_OnError()
  269. {
  270. var ex = new Exception();
  271. var xs = Observable.Throw<int>(ex, Scheduler.Immediate);
  272. var res = xs.Publish().RefCount(TimeSpan.FromSeconds(2));
  273. res.Subscribe(_ => throw new Exception(), ex_ => { Assert.Same(ex, ex_); }, () => throw new Exception());
  274. res.Subscribe(_ => throw new Exception(), ex_ => { Assert.Same(ex, ex_); }, () => throw new Exception());
  275. }
  276. [Fact]
  277. public void LazyRefCount_Publish()
  278. {
  279. var scheduler = new TestScheduler();
  280. var xs = scheduler.CreateHotObservable(
  281. OnNext(210, 1),
  282. OnNext(220, 2),
  283. OnNext(230, 3),
  284. OnNext(240, 4),
  285. OnNext(250, 5),
  286. OnNext(260, 6),
  287. OnNext(270, 7),
  288. OnNext(280, 8),
  289. OnNext(290, 9),
  290. OnCompleted<int>(300)
  291. );
  292. var res = xs.Publish().RefCount(TimeSpan.FromTicks(9), scheduler);
  293. var d1 = default(IDisposable);
  294. var o1 = scheduler.CreateObserver<int>();
  295. scheduler.ScheduleAbsolute(215, () => { d1 = res.Subscribe(o1); });
  296. scheduler.ScheduleAbsolute(235, () => { d1.Dispose(); });
  297. var d2 = default(IDisposable);
  298. var o2 = scheduler.CreateObserver<int>();
  299. scheduler.ScheduleAbsolute(225, () => { d2 = res.Subscribe(o2); });
  300. scheduler.ScheduleAbsolute(275, () =>
  301. {
  302. d2.Dispose();
  303. });
  304. var d3 = default(IDisposable);
  305. var o3 = scheduler.CreateObserver<int>();
  306. scheduler.ScheduleAbsolute(255, () => { d3 = res.Subscribe(o3); });
  307. scheduler.ScheduleAbsolute(265, () => { d3.Dispose(); });
  308. var d4 = default(IDisposable);
  309. var o4 = scheduler.CreateObserver<int>();
  310. scheduler.ScheduleAbsolute(285, () => { d4 = res.Subscribe(o4); });
  311. scheduler.ScheduleAbsolute(320, () => { d4.Dispose(); });
  312. scheduler.Start();
  313. o1.Messages.AssertEqual(
  314. OnNext(220, 2),
  315. OnNext(230, 3)
  316. );
  317. o2.Messages.AssertEqual(
  318. OnNext(230, 3),
  319. OnNext(240, 4),
  320. OnNext(250, 5),
  321. OnNext(260, 6),
  322. OnNext(270, 7)
  323. );
  324. o3.Messages.AssertEqual(
  325. OnNext(260, 6)
  326. );
  327. o4.Messages.AssertEqual(
  328. OnNext(290, 9),
  329. OnCompleted<int>(300)
  330. );
  331. xs.Subscriptions.AssertEqual(
  332. Subscribe(215, 284),
  333. Subscribe(285, 300)
  334. );
  335. }
  336. [Fact]
  337. public void RefCount_source_already_completed_synchronously()
  338. {
  339. var subscribed = 0;
  340. var unsubscribed = 0;
  341. var o1 = Observable.Create<string>(observer =>
  342. {
  343. subscribed++;
  344. observer.OnCompleted();
  345. return Disposable.Create(() => unsubscribed++);
  346. });
  347. var o2 = o1.Publish().RefCount();
  348. var s1 = o2.Subscribe();
  349. Assert.Equal(1, subscribed);
  350. Assert.Equal(1, unsubscribed);
  351. var s2 = o2.Subscribe();
  352. Assert.Equal(1, subscribed);
  353. Assert.Equal(1, unsubscribed);
  354. }
  355. [Fact]
  356. public void RefCount_minObservers_not_connected_Eager()
  357. {
  358. int connected = 0;
  359. var source = Observable.Defer(() =>
  360. {
  361. connected++;
  362. return Observable.Never<int>();
  363. })
  364. .Publish()
  365. .RefCount(2);
  366. Assert.Equal(0, connected);
  367. source.Subscribe();
  368. Assert.Equal(0, connected);
  369. }
  370. [Fact]
  371. public void RefCount_minObservers_connected_Eager()
  372. {
  373. var connected = 0;
  374. var source = Observable.Defer(() =>
  375. {
  376. connected++;
  377. return Observable.Range(1, 5);
  378. })
  379. .Publish()
  380. .RefCount(2);
  381. Assert.Equal(0, connected);
  382. var list1 = new List<int>();
  383. source.Subscribe(list1.Add);
  384. Assert.Equal(0, connected);
  385. Assert.Empty(list1);
  386. var list2 = new List<int>();
  387. source.Subscribe(list2.Add);
  388. Assert.Equal(1, connected);
  389. var expected = new List<int>(new[] { 1, 2, 3, 4, 5 });
  390. Assert.Equal(expected, list1);
  391. Assert.Equal(expected, list2);
  392. }
  393. [Fact]
  394. public void RefCount_minObservers_not_connected_Lazy()
  395. {
  396. int connected = 0;
  397. var source = Observable.Defer(() =>
  398. {
  399. connected++;
  400. return Observable.Never<int>();
  401. })
  402. .Publish()
  403. .RefCount(2, TimeSpan.FromMinutes(1));
  404. Assert.Equal(0, connected);
  405. source.Subscribe();
  406. Assert.Equal(0, connected);
  407. }
  408. [Fact]
  409. public void RefCount_minObservers_connected_Lazy()
  410. {
  411. var connected = 0;
  412. var source = Observable.Defer(() =>
  413. {
  414. connected++;
  415. return Observable.Range(1, 5);
  416. })
  417. .Publish()
  418. .RefCount(2, TimeSpan.FromMinutes(1));
  419. Assert.Equal(0, connected);
  420. var list1 = new List<int>();
  421. source.Subscribe(list1.Add);
  422. Assert.Equal(0, connected);
  423. Assert.Empty(list1);
  424. var list2 = new List<int>();
  425. source.Subscribe(list2.Add);
  426. Assert.Equal(1, connected);
  427. var expected = new List<int>(new[] { 1, 2, 3, 4, 5 });
  428. Assert.Equal(expected, list1);
  429. Assert.Equal(expected, list2);
  430. }
  431. }
  432. }