1
0

RefCountTest.cs 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Reactive;
  6. using System.Reactive.Concurrency;
  7. using System.Reactive.Disposables;
  8. using System.Reactive.Linq;
  9. using System.Reactive.Subjects;
  10. using Microsoft.Reactive.Testing;
  11. using Xunit;
  12. namespace ReactiveTests.Tests
  13. {
  14. public class RefCountTest : ReactiveTest
  15. {
  16. private sealed class DematerializingConnectableObservable<T> : IConnectableObservable<T>
  17. {
  18. private readonly IConnectableObservable<Notification<T>> _subject;
  19. public DematerializingConnectableObservable(IConnectableObservable<Notification<T>> subject)
  20. {
  21. _subject = subject;
  22. }
  23. public IDisposable Subscribe(IObserver<T> observer)
  24. {
  25. return _subject.Dematerialize().Subscribe(observer);
  26. }
  27. public IDisposable Connect()
  28. {
  29. return _subject.Connect();
  30. }
  31. }
  32. [Fact]
  33. public void RefCount_ArgumentChecking()
  34. {
  35. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.RefCount<int>(null));
  36. }
  37. [Fact]
  38. public void RefCount_ConnectsOnFirst()
  39. {
  40. var scheduler = new TestScheduler();
  41. var xs = scheduler.CreateHotObservable(
  42. OnNext(210, 1),
  43. OnNext(220, 2),
  44. OnNext(230, 3),
  45. OnNext(240, 4),
  46. OnCompleted<int>(250)
  47. );
  48. var subject = new MySubject();
  49. var conn = new ConnectableObservable<int>(xs, subject);
  50. var res = scheduler.Start(() =>
  51. conn.RefCount()
  52. );
  53. res.Messages.AssertEqual(
  54. OnNext(210, 1),
  55. OnNext(220, 2),
  56. OnNext(230, 3),
  57. OnNext(240, 4),
  58. OnCompleted<int>(250)
  59. );
  60. Assert.True(subject.Disposed);
  61. }
  62. [Fact]
  63. public void RefCount_NotConnected()
  64. {
  65. var disconnected = false;
  66. var count = 0;
  67. var xs = Observable.Defer(() =>
  68. {
  69. count++;
  70. return Observable.Create<int>(obs =>
  71. {
  72. return () => { disconnected = true; };
  73. });
  74. });
  75. var subject = new MySubject();
  76. var conn = new ConnectableObservable<int>(xs, subject);
  77. var refd = conn.RefCount();
  78. var dis1 = refd.Subscribe();
  79. Assert.Equal(1, count);
  80. Assert.Equal(1, subject.SubscribeCount);
  81. Assert.False(disconnected);
  82. var dis2 = refd.Subscribe();
  83. Assert.Equal(1, count);
  84. Assert.Equal(2, subject.SubscribeCount);
  85. Assert.False(disconnected);
  86. dis1.Dispose();
  87. Assert.False(disconnected);
  88. dis2.Dispose();
  89. Assert.True(disconnected);
  90. disconnected = false;
  91. var dis3 = refd.Subscribe();
  92. Assert.Equal(2, count);
  93. Assert.Equal(3, subject.SubscribeCount);
  94. Assert.False(disconnected);
  95. dis3.Dispose();
  96. Assert.True(disconnected);
  97. }
  98. [Fact]
  99. public void RefCount_OnError()
  100. {
  101. var ex = new Exception();
  102. var xs = Observable.Throw<int>(ex, Scheduler.Immediate);
  103. var res = xs.Publish().RefCount();
  104. res.Subscribe(_ => { Assert.True(false); }, ex_ => { Assert.Same(ex, ex_); }, () => { Assert.True(false); });
  105. res.Subscribe(_ => { Assert.True(false); }, ex_ => { Assert.Same(ex, ex_); }, () => { Assert.True(false); });
  106. }
  107. [Fact]
  108. public void RefCount_Publish()
  109. {
  110. var scheduler = new TestScheduler();
  111. var xs = scheduler.CreateHotObservable(
  112. OnNext(210, 1),
  113. OnNext(220, 2),
  114. OnNext(230, 3),
  115. OnNext(240, 4),
  116. OnNext(250, 5),
  117. OnNext(260, 6),
  118. OnNext(270, 7),
  119. OnNext(280, 8),
  120. OnNext(290, 9),
  121. OnCompleted<int>(300)
  122. );
  123. var res = xs.Publish().RefCount();
  124. var d1 = default(IDisposable);
  125. var o1 = scheduler.CreateObserver<int>();
  126. scheduler.ScheduleAbsolute(215, () => { d1 = res.Subscribe(o1); });
  127. scheduler.ScheduleAbsolute(235, () => { d1.Dispose(); });
  128. var d2 = default(IDisposable);
  129. var o2 = scheduler.CreateObserver<int>();
  130. scheduler.ScheduleAbsolute(225, () => { d2 = res.Subscribe(o2); });
  131. scheduler.ScheduleAbsolute(275, () => { d2.Dispose(); });
  132. var d3 = default(IDisposable);
  133. var o3 = scheduler.CreateObserver<int>();
  134. scheduler.ScheduleAbsolute(255, () => { d3 = res.Subscribe(o3); });
  135. scheduler.ScheduleAbsolute(265, () => { d3.Dispose(); });
  136. var d4 = default(IDisposable);
  137. var o4 = scheduler.CreateObserver<int>();
  138. scheduler.ScheduleAbsolute(285, () => { d4 = res.Subscribe(o4); });
  139. scheduler.ScheduleAbsolute(320, () => { d4.Dispose(); });
  140. scheduler.Start();
  141. o1.Messages.AssertEqual(
  142. OnNext(220, 2),
  143. OnNext(230, 3)
  144. );
  145. o2.Messages.AssertEqual(
  146. OnNext(230, 3),
  147. OnNext(240, 4),
  148. OnNext(250, 5),
  149. OnNext(260, 6),
  150. OnNext(270, 7)
  151. );
  152. o3.Messages.AssertEqual(
  153. OnNext(260, 6)
  154. );
  155. o4.Messages.AssertEqual(
  156. OnNext(290, 9),
  157. OnCompleted<int>(300)
  158. );
  159. xs.Subscriptions.AssertEqual(
  160. Subscribe(215, 275),
  161. Subscribe(285, 300)
  162. );
  163. }
  164. [Fact]
  165. public void RefCount_can_connect_again_if_previous_subscription_terminated_synchronously()
  166. {
  167. var seen = 0;
  168. var terminated = false;
  169. var subject = new ReplaySubject<Notification<int>>(1);
  170. var connectable = new DematerializingConnectableObservable<int>(subject.Publish());
  171. var refCount = connectable.RefCount();
  172. subject.OnNext(Notification.CreateOnNext(36));
  173. using (refCount.Subscribe(value => seen = value, () => terminated = true))
  174. {
  175. Assert.Equal(36, seen);
  176. }
  177. seen = 0;
  178. terminated = false;
  179. subject.OnNext(Notification.CreateOnCompleted<int>());
  180. using (refCount.Subscribe(value => seen = value, () => terminated = true))
  181. {
  182. Assert.Equal(0, seen);
  183. Assert.True(terminated);
  184. }
  185. seen = 0;
  186. terminated = false;
  187. subject.OnNext(Notification.CreateOnNext(36));
  188. using (refCount.Subscribe(value => seen = value, () => terminated = true))
  189. {
  190. Assert.Equal(36, seen);
  191. Assert.False(terminated);
  192. }
  193. }
  194. [Fact]
  195. public void LazyRefCount_ArgumentChecking()
  196. {
  197. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.RefCount<int>(null, TimeSpan.FromSeconds(2)));
  198. }
  199. [Fact]
  200. public void LazyRefCount_ConnectsOnFirst()
  201. {
  202. var scheduler = new TestScheduler();
  203. var xs = scheduler.CreateHotObservable(
  204. OnNext(210, 1),
  205. OnNext(220, 2),
  206. OnNext(230, 3),
  207. OnNext(240, 4),
  208. OnCompleted<int>(250)
  209. );
  210. var subject = new MySubject();
  211. var conn = new ConnectableObservable<int>(xs, subject);
  212. var res = scheduler.Start(() =>
  213. conn.RefCount(TimeSpan.FromSeconds(2))
  214. );
  215. res.Messages.AssertEqual(
  216. OnNext(210, 1),
  217. OnNext(220, 2),
  218. OnNext(230, 3),
  219. OnNext(240, 4),
  220. OnCompleted<int>(250)
  221. );
  222. Assert.True(subject.Disposed);
  223. }
  224. [Fact]
  225. public void LazyRefCount_NotConnected()
  226. {
  227. var scheduler = new TestScheduler();
  228. var disconnected = false;
  229. var count = 0;
  230. var xs = Observable.Defer(() =>
  231. {
  232. count++;
  233. return Observable.Create<int>(obs =>
  234. {
  235. return () => { disconnected = true; };
  236. });
  237. });
  238. var subject = new MySubject();
  239. var conn = new ConnectableObservable<int>(xs, subject);
  240. var refd = conn.RefCount(TimeSpan.FromTicks(20), scheduler);
  241. var dis1 = refd.Subscribe();
  242. Assert.Equal(1, count);
  243. Assert.Equal(1, subject.SubscribeCount);
  244. Assert.False(disconnected);
  245. var dis2 = refd.Subscribe();
  246. Assert.Equal(1, count);
  247. Assert.Equal(2, subject.SubscribeCount);
  248. Assert.False(disconnected);
  249. dis1.Dispose();
  250. Assert.False(disconnected);
  251. dis2.Dispose();
  252. Assert.False(disconnected);
  253. scheduler.AdvanceBy(19);
  254. Assert.False(disconnected);
  255. scheduler.AdvanceBy(1);
  256. Assert.True(disconnected);
  257. disconnected = false;
  258. var dis3 = refd.Subscribe();
  259. Assert.Equal(2, count);
  260. Assert.Equal(3, subject.SubscribeCount);
  261. Assert.False(disconnected);
  262. dis3.Dispose();
  263. scheduler.AdvanceBy(20);
  264. Assert.True(disconnected);
  265. }
  266. [Fact]
  267. public void LazyRefCount_OnError()
  268. {
  269. var ex = new Exception();
  270. var xs = Observable.Throw<int>(ex, Scheduler.Immediate);
  271. var res = xs.Publish().RefCount(TimeSpan.FromSeconds(2));
  272. res.Subscribe(_ => throw new Exception(), ex_ => { Assert.Same(ex, ex_); }, () => throw new Exception());
  273. res.Subscribe(_ => throw new Exception(), ex_ => { Assert.Same(ex, ex_); }, () => throw new Exception());
  274. }
  275. [Fact]
  276. public void LazyRefCount_Publish()
  277. {
  278. var scheduler = new TestScheduler();
  279. var xs = scheduler.CreateHotObservable(
  280. OnNext(210, 1),
  281. OnNext(220, 2),
  282. OnNext(230, 3),
  283. OnNext(240, 4),
  284. OnNext(250, 5),
  285. OnNext(260, 6),
  286. OnNext(270, 7),
  287. OnNext(280, 8),
  288. OnNext(290, 9),
  289. OnCompleted<int>(300)
  290. );
  291. var res = xs.Publish().RefCount(TimeSpan.FromTicks(9), scheduler);
  292. var d1 = default(IDisposable);
  293. var o1 = scheduler.CreateObserver<int>();
  294. scheduler.ScheduleAbsolute(215, () => { d1 = res.Subscribe(o1); });
  295. scheduler.ScheduleAbsolute(235, () => { d1.Dispose(); });
  296. var d2 = default(IDisposable);
  297. var o2 = scheduler.CreateObserver<int>();
  298. scheduler.ScheduleAbsolute(225, () => { d2 = res.Subscribe(o2); });
  299. scheduler.ScheduleAbsolute(275, () =>
  300. {
  301. d2.Dispose();
  302. });
  303. var d3 = default(IDisposable);
  304. var o3 = scheduler.CreateObserver<int>();
  305. scheduler.ScheduleAbsolute(255, () => { d3 = res.Subscribe(o3); });
  306. scheduler.ScheduleAbsolute(265, () => { d3.Dispose(); });
  307. var d4 = default(IDisposable);
  308. var o4 = scheduler.CreateObserver<int>();
  309. scheduler.ScheduleAbsolute(285, () => { d4 = res.Subscribe(o4); });
  310. scheduler.ScheduleAbsolute(320, () => { d4.Dispose(); });
  311. scheduler.Start();
  312. o1.Messages.AssertEqual(
  313. OnNext(220, 2),
  314. OnNext(230, 3)
  315. );
  316. o2.Messages.AssertEqual(
  317. OnNext(230, 3),
  318. OnNext(240, 4),
  319. OnNext(250, 5),
  320. OnNext(260, 6),
  321. OnNext(270, 7)
  322. );
  323. o3.Messages.AssertEqual(
  324. OnNext(260, 6)
  325. );
  326. o4.Messages.AssertEqual(
  327. OnNext(290, 9),
  328. OnCompleted<int>(300)
  329. );
  330. xs.Subscriptions.AssertEqual(
  331. Subscribe(215, 284),
  332. Subscribe(285, 300)
  333. );
  334. }
  335. [Fact]
  336. public void RefCount_source_already_completed_synchronously()
  337. {
  338. var subscribed = 0;
  339. var unsubscribed = 0;
  340. var o1 = Observable.Create<string>(observer =>
  341. {
  342. subscribed++;
  343. observer.OnCompleted();
  344. return Disposable.Create(() => unsubscribed++);
  345. });
  346. var o2 = o1.Publish().RefCount();
  347. var s1 = o2.Subscribe();
  348. Assert.Equal(1, subscribed);
  349. Assert.Equal(1, unsubscribed);
  350. var s2 = o2.Subscribe();
  351. Assert.Equal(1, subscribed);
  352. Assert.Equal(1, unsubscribed);
  353. }
  354. }
  355. }