ReplaySubjectTest.cs 72 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Reactive;
  8. using System.Reactive.Concurrency;
  9. using System.Reactive.Subjects;
  10. using System.Threading;
  11. using System.Threading.Tasks;
  12. using Microsoft.Reactive.Testing;
  13. using Xunit;
  14. using ReactiveTests.Dummies;
  15. namespace ReactiveTests.Tests
  16. {
  17. public partial class ReplaySubjectTest : ReactiveTest
  18. {
  19. [Fact]
  20. public void Subscribe_ArgumentChecking()
  21. {
  22. ReactiveAssert.Throws<ArgumentNullException>(() => new ReplaySubject<int>().Subscribe(null));
  23. ReactiveAssert.Throws<ArgumentNullException>(() => new ReplaySubject<int>(1).Subscribe(null));
  24. ReactiveAssert.Throws<ArgumentNullException>(() => new ReplaySubject<int>(2).Subscribe(null));
  25. ReactiveAssert.Throws<ArgumentNullException>(() => new ReplaySubject<int>(DummyScheduler.Instance).Subscribe(null));
  26. }
  27. [Fact]
  28. public void OnError_ArgumentChecking()
  29. {
  30. ReactiveAssert.Throws<ArgumentNullException>(() => new ReplaySubject<int>().OnError(null));
  31. ReactiveAssert.Throws<ArgumentNullException>(() => new ReplaySubject<int>(1).OnError(null));
  32. ReactiveAssert.Throws<ArgumentNullException>(() => new ReplaySubject<int>(2).OnError(null));
  33. ReactiveAssert.Throws<ArgumentNullException>(() => new ReplaySubject<int>(DummyScheduler.Instance).OnError(null));
  34. }
  35. [Fact]
  36. public void Constructor_ArgumentChecking()
  37. {
  38. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => new ReplaySubject<int>(-1));
  39. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => new ReplaySubject<int>(-1, DummyScheduler.Instance));
  40. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => new ReplaySubject<int>(-1, TimeSpan.Zero));
  41. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => new ReplaySubject<int>(-1, TimeSpan.Zero, DummyScheduler.Instance));
  42. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => new ReplaySubject<int>(TimeSpan.FromTicks(-1)));
  43. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => new ReplaySubject<int>(TimeSpan.FromTicks(-1), DummyScheduler.Instance));
  44. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => new ReplaySubject<int>(0, TimeSpan.FromTicks(-1)));
  45. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => new ReplaySubject<int>(0, TimeSpan.FromTicks(-1), DummyScheduler.Instance));
  46. ReactiveAssert.Throws<ArgumentNullException>(() => new ReplaySubject<int>(null));
  47. ReactiveAssert.Throws<ArgumentNullException>(() => new ReplaySubject<int>(0, null));
  48. ReactiveAssert.Throws<ArgumentNullException>(() => new ReplaySubject<int>(TimeSpan.Zero, null));
  49. ReactiveAssert.Throws<ArgumentNullException>(() => new ReplaySubject<int>(0, TimeSpan.Zero, null));
  50. // zero allowed
  51. new ReplaySubject<int>(0);
  52. new ReplaySubject<int>(TimeSpan.Zero);
  53. new ReplaySubject<int>(0, TimeSpan.Zero);
  54. new ReplaySubject<int>(0, DummyScheduler.Instance);
  55. new ReplaySubject<int>(TimeSpan.Zero, DummyScheduler.Instance);
  56. new ReplaySubject<int>(0, TimeSpan.Zero, DummyScheduler.Instance);
  57. }
  58. [Fact]
  59. public void Infinite_ReplayByTime()
  60. {
  61. var scheduler = new TestScheduler();
  62. var xs = scheduler.CreateHotObservable(
  63. OnNext(70, 1),
  64. OnNext(110, 2),
  65. OnNext(220, 3),
  66. OnNext(270, 4),
  67. OnNext(340, 5),
  68. OnNext(410, 6),
  69. OnNext(520, 7),
  70. OnNext(630, 8),
  71. OnNext(710, 9),
  72. OnNext(870, 10),
  73. OnNext(940, 11),
  74. OnNext(1020, 12)
  75. );
  76. var subject = default(ReplaySubject<int>);
  77. var subscription = default(IDisposable);
  78. var results1 = scheduler.CreateObserver<int>();
  79. var subscription1 = default(IDisposable);
  80. var results2 = scheduler.CreateObserver<int>();
  81. var subscription2 = default(IDisposable);
  82. var results3 = scheduler.CreateObserver<int>();
  83. var subscription3 = default(IDisposable);
  84. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(3, TimeSpan.FromTicks(100), scheduler));
  85. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  86. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  87. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  88. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  89. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  90. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  91. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  92. scheduler.ScheduleAbsolute(800, () => subscription1.Dispose());
  93. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  94. scheduler.Start();
  95. results1.Messages.AssertEqual(
  96. OnNext(301, 3),
  97. OnNext(302, 4),
  98. OnNext(341, 5),
  99. OnNext(411, 6),
  100. OnNext(521, 7)
  101. );
  102. results2.Messages.AssertEqual(
  103. OnNext(401, 5),
  104. OnNext(411, 6),
  105. OnNext(521, 7),
  106. OnNext(631, 8)
  107. );
  108. results3.Messages.AssertEqual(
  109. OnNext(901, 10),
  110. OnNext(941, 11)
  111. );
  112. }
  113. [Fact]
  114. public void Infinite_ReplayOne()
  115. {
  116. var scheduler = new TestScheduler();
  117. var xs = scheduler.CreateHotObservable(
  118. OnNext(70, 1),
  119. OnNext(110, 2),
  120. OnNext(220, 3),
  121. OnNext(270, 4),
  122. OnNext(340, 5),
  123. OnNext(410, 6),
  124. OnNext(520, 7),
  125. OnNext(630, 8),
  126. OnNext(710, 9),
  127. OnNext(870, 10),
  128. OnNext(940, 11),
  129. OnNext(1020, 12)
  130. );
  131. var subject = default(ReplaySubject<int>);
  132. var subscription = default(IDisposable);
  133. var results1 = scheduler.CreateObserver<int>();
  134. var subscription1 = default(IDisposable);
  135. var results2 = scheduler.CreateObserver<int>();
  136. var subscription2 = default(IDisposable);
  137. var results3 = scheduler.CreateObserver<int>();
  138. var subscription3 = default(IDisposable);
  139. var results4 = scheduler.CreateObserver<int>();
  140. var subscription4 = default(IDisposable);
  141. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(1));
  142. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  143. scheduler.ScheduleAbsolute(1200, () => subscription.Dispose());
  144. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  145. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  146. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  147. scheduler.ScheduleAbsolute(1100, () => subscription4 = subject.Subscribe(results4));
  148. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  149. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  150. scheduler.ScheduleAbsolute(800, () => subscription1.Dispose());
  151. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  152. scheduler.Start();
  153. results1.Messages.AssertEqual(
  154. OnNext(300, 4),
  155. OnNext(340, 5),
  156. OnNext(410, 6),
  157. OnNext(520, 7)
  158. );
  159. results2.Messages.AssertEqual(
  160. OnNext(400, 5),
  161. OnNext(410, 6),
  162. OnNext(520, 7),
  163. OnNext(630, 8)
  164. );
  165. results3.Messages.AssertEqual(
  166. OnNext(900, 10),
  167. OnNext(940, 11)
  168. );
  169. results4.Messages.AssertEqual(
  170. OnNext(1100, 12)
  171. );
  172. }
  173. [Fact]
  174. public void Infinite_ReplayMany()
  175. {
  176. var scheduler = new TestScheduler();
  177. var xs = scheduler.CreateHotObservable(
  178. OnNext(70, 1),
  179. OnNext(110, 2),
  180. OnNext(220, 3),
  181. OnNext(270, 4),
  182. OnNext(340, 5),
  183. OnNext(410, 6),
  184. OnNext(520, 7),
  185. OnNext(630, 8),
  186. OnNext(710, 9),
  187. OnNext(870, 10),
  188. OnNext(940, 11),
  189. OnNext(1020, 12)
  190. );
  191. var subject = default(ReplaySubject<int>);
  192. var subscription = default(IDisposable);
  193. var results1 = scheduler.CreateObserver<int>();
  194. var subscription1 = default(IDisposable);
  195. var results2 = scheduler.CreateObserver<int>();
  196. var subscription2 = default(IDisposable);
  197. var results3 = scheduler.CreateObserver<int>();
  198. var subscription3 = default(IDisposable);
  199. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(3));
  200. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  201. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  202. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  203. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  204. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  205. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  206. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  207. scheduler.ScheduleAbsolute(800, () => subscription1.Dispose());
  208. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  209. scheduler.Start();
  210. results1.Messages.AssertEqual(
  211. OnNext(300, 3),
  212. OnNext(300, 4),
  213. OnNext(340, 5),
  214. OnNext(410, 6),
  215. OnNext(520, 7)
  216. );
  217. results2.Messages.AssertEqual(
  218. OnNext(400, 3),
  219. OnNext(400, 4),
  220. OnNext(400, 5),
  221. OnNext(410, 6),
  222. OnNext(520, 7),
  223. OnNext(630, 8)
  224. );
  225. results3.Messages.AssertEqual(
  226. OnNext(900, 8),
  227. OnNext(900, 9),
  228. OnNext(900, 10),
  229. OnNext(940, 11)
  230. );
  231. }
  232. [Fact]
  233. public void Infinite_ReplayAll()
  234. {
  235. var scheduler = new TestScheduler();
  236. var xs = scheduler.CreateHotObservable(
  237. OnNext(70, 1),
  238. OnNext(110, 2),
  239. OnNext(220, 3),
  240. OnNext(270, 4),
  241. OnNext(340, 5),
  242. OnNext(410, 6),
  243. OnNext(520, 7),
  244. OnNext(630, 8),
  245. OnNext(710, 9),
  246. OnNext(870, 10),
  247. OnNext(940, 11),
  248. OnNext(1020, 12)
  249. );
  250. var subject = default(ReplaySubject<int>);
  251. var subscription = default(IDisposable);
  252. var results1 = scheduler.CreateObserver<int>();
  253. var subscription1 = default(IDisposable);
  254. var results2 = scheduler.CreateObserver<int>();
  255. var subscription2 = default(IDisposable);
  256. var results3 = scheduler.CreateObserver<int>();
  257. var subscription3 = default(IDisposable);
  258. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>());
  259. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  260. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  261. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  262. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  263. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  264. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  265. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  266. scheduler.ScheduleAbsolute(800, () => subscription1.Dispose());
  267. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  268. scheduler.Start();
  269. results1.Messages.AssertEqual(
  270. OnNext(300, 3),
  271. OnNext(300, 4),
  272. OnNext(340, 5),
  273. OnNext(410, 6),
  274. OnNext(520, 7)
  275. );
  276. results2.Messages.AssertEqual(
  277. OnNext(400, 3),
  278. OnNext(400, 4),
  279. OnNext(400, 5),
  280. OnNext(410, 6),
  281. OnNext(520, 7),
  282. OnNext(630, 8)
  283. );
  284. results3.Messages.AssertEqual(
  285. OnNext(900, 3),
  286. OnNext(900, 4),
  287. OnNext(900, 5),
  288. OnNext(900, 6),
  289. OnNext(900, 7),
  290. OnNext(900, 8),
  291. OnNext(900, 9),
  292. OnNext(900, 10),
  293. OnNext(940, 11)
  294. );
  295. }
  296. [Fact]
  297. public void Infinite2()
  298. {
  299. var scheduler = new TestScheduler();
  300. var xs = scheduler.CreateHotObservable(
  301. OnNext(70, 1),
  302. OnNext(110, 2),
  303. OnNext(220, 3),
  304. OnNext(270, 4),
  305. OnNext(280, -1),
  306. OnNext(290, -2),
  307. OnNext(340, 5),
  308. OnNext(410, 6),
  309. OnNext(520, 7),
  310. OnNext(630, 8),
  311. OnNext(710, 9),
  312. OnNext(870, 10),
  313. OnNext(940, 11),
  314. OnNext(1020, 12)
  315. );
  316. var subject = default(ReplaySubject<int>);
  317. var subscription = default(IDisposable);
  318. var results1 = scheduler.CreateObserver<int>();
  319. var subscription1 = default(IDisposable);
  320. var results2 = scheduler.CreateObserver<int>();
  321. var subscription2 = default(IDisposable);
  322. var results3 = scheduler.CreateObserver<int>();
  323. var subscription3 = default(IDisposable);
  324. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(3, TimeSpan.FromTicks(100), scheduler));
  325. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  326. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  327. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  328. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  329. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  330. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  331. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  332. scheduler.ScheduleAbsolute(800, () => subscription1.Dispose());
  333. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  334. scheduler.Start();
  335. results1.Messages.AssertEqual(
  336. OnNext(301, 4),
  337. OnNext(302, -1),
  338. OnNext(303, -2),
  339. OnNext(341, 5),
  340. OnNext(411, 6),
  341. OnNext(521, 7)
  342. );
  343. results2.Messages.AssertEqual(
  344. OnNext(401, 5),
  345. OnNext(411, 6),
  346. OnNext(521, 7),
  347. OnNext(631, 8)
  348. );
  349. results3.Messages.AssertEqual(
  350. OnNext(901, 10),
  351. OnNext(941, 11)
  352. );
  353. }
  354. [Fact]
  355. public void Finite_ReplayByTime()
  356. {
  357. var scheduler = new TestScheduler();
  358. var xs = scheduler.CreateHotObservable(
  359. OnNext(70, 1),
  360. OnNext(110, 2),
  361. OnNext(220, 3),
  362. OnNext(270, 4),
  363. OnNext(340, 5),
  364. OnNext(410, 6),
  365. OnNext(520, 7),
  366. OnCompleted<int>(630),
  367. OnNext(640, 9),
  368. OnCompleted<int>(650),
  369. OnError<int>(660, new Exception())
  370. );
  371. var subject = default(ReplaySubject<int>);
  372. var subscription = default(IDisposable);
  373. var results1 = scheduler.CreateObserver<int>();
  374. var subscription1 = default(IDisposable);
  375. var results2 = scheduler.CreateObserver<int>();
  376. var subscription2 = default(IDisposable);
  377. var results3 = scheduler.CreateObserver<int>();
  378. var subscription3 = default(IDisposable);
  379. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(3, TimeSpan.FromTicks(100), scheduler));
  380. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  381. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  382. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  383. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  384. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  385. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  386. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  387. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  388. scheduler.Start();
  389. results1.Messages.AssertEqual(
  390. OnNext(301, 3),
  391. OnNext(302, 4),
  392. OnNext(341, 5),
  393. OnNext(411, 6),
  394. OnNext(521, 7)
  395. );
  396. results2.Messages.AssertEqual(
  397. OnNext(401, 5),
  398. OnNext(411, 6),
  399. OnNext(521, 7),
  400. OnCompleted<int>(631)
  401. );
  402. results3.Messages.AssertEqual(
  403. OnCompleted<int>(901)
  404. );
  405. }
  406. [Fact]
  407. public void Finite_ReplayOne()
  408. {
  409. var scheduler = new TestScheduler();
  410. var xs = scheduler.CreateHotObservable(
  411. OnNext(70, 1),
  412. OnNext(110, 2),
  413. OnNext(220, 3),
  414. OnNext(270, 4),
  415. OnNext(340, 5),
  416. OnNext(410, 6),
  417. OnNext(520, 7),
  418. OnCompleted<int>(630),
  419. OnNext(640, 9),
  420. OnCompleted<int>(650),
  421. OnError<int>(660, new Exception())
  422. );
  423. var subject = default(ReplaySubject<int>);
  424. var subscription = default(IDisposable);
  425. var results1 = scheduler.CreateObserver<int>();
  426. var subscription1 = default(IDisposable);
  427. var results2 = scheduler.CreateObserver<int>();
  428. var subscription2 = default(IDisposable);
  429. var results3 = scheduler.CreateObserver<int>();
  430. var subscription3 = default(IDisposable);
  431. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(1));
  432. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  433. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  434. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  435. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  436. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  437. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  438. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  439. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  440. scheduler.Start();
  441. results1.Messages.AssertEqual(
  442. OnNext(300, 4),
  443. OnNext(340, 5),
  444. OnNext(410, 6),
  445. OnNext(520, 7)
  446. );
  447. results2.Messages.AssertEqual(
  448. OnNext(400, 5),
  449. OnNext(410, 6),
  450. OnNext(520, 7),
  451. OnCompleted<int>(630)
  452. );
  453. results3.Messages.AssertEqual(
  454. OnNext(900, 7),
  455. OnCompleted<int>(900)
  456. );
  457. }
  458. [Fact]
  459. public void Finite_ReplayMany()
  460. {
  461. var scheduler = new TestScheduler();
  462. var xs = scheduler.CreateHotObservable(
  463. OnNext(70, 1),
  464. OnNext(110, 2),
  465. OnNext(220, 3),
  466. OnNext(270, 4),
  467. OnNext(340, 5),
  468. OnNext(410, 6),
  469. OnNext(520, 7),
  470. OnCompleted<int>(630),
  471. OnNext(640, 9),
  472. OnCompleted<int>(650),
  473. OnError<int>(660, new Exception())
  474. );
  475. var subject = default(ReplaySubject<int>);
  476. var subscription = default(IDisposable);
  477. var results1 = scheduler.CreateObserver<int>();
  478. var subscription1 = default(IDisposable);
  479. var results2 = scheduler.CreateObserver<int>();
  480. var subscription2 = default(IDisposable);
  481. var results3 = scheduler.CreateObserver<int>();
  482. var subscription3 = default(IDisposable);
  483. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(3));
  484. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  485. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  486. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  487. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  488. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  489. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  490. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  491. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  492. scheduler.Start();
  493. results1.Messages.AssertEqual(
  494. OnNext(300, 3),
  495. OnNext(300, 4),
  496. OnNext(340, 5),
  497. OnNext(410, 6),
  498. OnNext(520, 7)
  499. );
  500. results2.Messages.AssertEqual(
  501. OnNext(400, 3),
  502. OnNext(400, 4),
  503. OnNext(400, 5),
  504. OnNext(410, 6),
  505. OnNext(520, 7),
  506. OnCompleted<int>(630)
  507. );
  508. results3.Messages.AssertEqual(
  509. OnNext(900, 5),
  510. OnNext(900, 6),
  511. OnNext(900, 7),
  512. OnCompleted<int>(900)
  513. );
  514. }
  515. [Fact]
  516. public void Finite_ReplayAll()
  517. {
  518. var scheduler = new TestScheduler();
  519. var xs = scheduler.CreateHotObservable(
  520. OnNext(70, 1),
  521. OnNext(110, 2),
  522. OnNext(220, 3),
  523. OnNext(270, 4),
  524. OnNext(340, 5),
  525. OnNext(410, 6),
  526. OnNext(520, 7),
  527. OnCompleted<int>(630),
  528. OnNext(640, 9),
  529. OnCompleted<int>(650),
  530. OnError<int>(660, new Exception())
  531. );
  532. var subject = default(ReplaySubject<int>);
  533. var subscription = default(IDisposable);
  534. var results1 = scheduler.CreateObserver<int>();
  535. var subscription1 = default(IDisposable);
  536. var results2 = scheduler.CreateObserver<int>();
  537. var subscription2 = default(IDisposable);
  538. var results3 = scheduler.CreateObserver<int>();
  539. var subscription3 = default(IDisposable);
  540. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>());
  541. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  542. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  543. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  544. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  545. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  546. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  547. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  548. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  549. scheduler.Start();
  550. results1.Messages.AssertEqual(
  551. OnNext(300, 3),
  552. OnNext(300, 4),
  553. OnNext(340, 5),
  554. OnNext(410, 6),
  555. OnNext(520, 7)
  556. );
  557. results2.Messages.AssertEqual(
  558. OnNext(400, 3),
  559. OnNext(400, 4),
  560. OnNext(400, 5),
  561. OnNext(410, 6),
  562. OnNext(520, 7),
  563. OnCompleted<int>(630)
  564. );
  565. results3.Messages.AssertEqual(
  566. OnNext(900, 3),
  567. OnNext(900, 4),
  568. OnNext(900, 5),
  569. OnNext(900, 6),
  570. OnNext(900, 7),
  571. OnCompleted<int>(900)
  572. );
  573. }
  574. [Fact]
  575. public void Error_ReplayByTime()
  576. {
  577. var scheduler = new TestScheduler();
  578. var ex = new Exception();
  579. var xs = scheduler.CreateHotObservable(
  580. OnNext(70, 1),
  581. OnNext(110, 2),
  582. OnNext(220, 3),
  583. OnNext(270, 4),
  584. OnNext(340, 5),
  585. OnNext(410, 6),
  586. OnNext(520, 7),
  587. OnError<int>(630, ex),
  588. OnNext(640, 9),
  589. OnCompleted<int>(650),
  590. OnError<int>(660, new Exception())
  591. );
  592. var subject = default(ReplaySubject<int>);
  593. var subscription = default(IDisposable);
  594. var results1 = scheduler.CreateObserver<int>();
  595. var subscription1 = default(IDisposable);
  596. var results2 = scheduler.CreateObserver<int>();
  597. var subscription2 = default(IDisposable);
  598. var results3 = scheduler.CreateObserver<int>();
  599. var subscription3 = default(IDisposable);
  600. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(3, TimeSpan.FromTicks(100), scheduler));
  601. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  602. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  603. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  604. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  605. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  606. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  607. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  608. scheduler.ScheduleAbsolute(800, () => subscription1.Dispose());
  609. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  610. scheduler.Start();
  611. results1.Messages.AssertEqual(
  612. OnNext(301, 3),
  613. OnNext(302, 4),
  614. OnNext(341, 5),
  615. OnNext(411, 6),
  616. OnNext(521, 7)
  617. );
  618. results2.Messages.AssertEqual(
  619. OnNext(401, 5),
  620. OnNext(411, 6),
  621. OnNext(521, 7),
  622. OnError<int>(631, ex)
  623. );
  624. results3.Messages.AssertEqual(
  625. OnError<int>(901, ex)
  626. );
  627. }
  628. [Fact]
  629. public void Error_ReplayOne()
  630. {
  631. var scheduler = new TestScheduler();
  632. var ex = new Exception();
  633. var xs = scheduler.CreateHotObservable(
  634. OnNext(70, 1),
  635. OnNext(110, 2),
  636. OnNext(220, 3),
  637. OnNext(270, 4),
  638. OnNext(340, 5),
  639. OnNext(410, 6),
  640. OnNext(520, 7),
  641. OnError<int>(630, ex),
  642. OnNext(640, 9),
  643. OnCompleted<int>(650),
  644. OnError<int>(660, new Exception())
  645. );
  646. var subject = default(ReplaySubject<int>);
  647. var subscription = default(IDisposable);
  648. var results1 = scheduler.CreateObserver<int>();
  649. var subscription1 = default(IDisposable);
  650. var results2 = scheduler.CreateObserver<int>();
  651. var subscription2 = default(IDisposable);
  652. var results3 = scheduler.CreateObserver<int>();
  653. var subscription3 = default(IDisposable);
  654. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(1));
  655. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  656. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  657. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  658. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  659. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  660. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  661. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  662. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  663. scheduler.Start();
  664. results1.Messages.AssertEqual(
  665. OnNext(300, 4),
  666. OnNext(340, 5),
  667. OnNext(410, 6),
  668. OnNext(520, 7)
  669. );
  670. results2.Messages.AssertEqual(
  671. OnNext(400, 5),
  672. OnNext(410, 6),
  673. OnNext(520, 7),
  674. OnError<int>(630, ex)
  675. );
  676. results3.Messages.AssertEqual(
  677. OnNext(900, 7),
  678. OnError<int>(900, ex)
  679. );
  680. }
  681. [Fact]
  682. public void Error_ReplayMany()
  683. {
  684. var scheduler = new TestScheduler();
  685. var ex = new Exception();
  686. var xs = scheduler.CreateHotObservable(
  687. OnNext(70, 1),
  688. OnNext(110, 2),
  689. OnNext(220, 3),
  690. OnNext(270, 4),
  691. OnNext(340, 5),
  692. OnNext(410, 6),
  693. OnNext(520, 7),
  694. OnError<int>(630, ex),
  695. OnNext(640, 9),
  696. OnCompleted<int>(650),
  697. OnError<int>(660, new Exception())
  698. );
  699. var subject = default(ReplaySubject<int>);
  700. var subscription = default(IDisposable);
  701. var results1 = scheduler.CreateObserver<int>();
  702. var subscription1 = default(IDisposable);
  703. var results2 = scheduler.CreateObserver<int>();
  704. var subscription2 = default(IDisposable);
  705. var results3 = scheduler.CreateObserver<int>();
  706. var subscription3 = default(IDisposable);
  707. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(3));
  708. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  709. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  710. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  711. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  712. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  713. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  714. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  715. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  716. scheduler.Start();
  717. results1.Messages.AssertEqual(
  718. OnNext(300, 3),
  719. OnNext(300, 4),
  720. OnNext(340, 5),
  721. OnNext(410, 6),
  722. OnNext(520, 7)
  723. );
  724. results2.Messages.AssertEqual(
  725. OnNext(400, 3),
  726. OnNext(400, 4),
  727. OnNext(400, 5),
  728. OnNext(410, 6),
  729. OnNext(520, 7),
  730. OnError<int>(630, ex)
  731. );
  732. results3.Messages.AssertEqual(
  733. OnNext(900, 5),
  734. OnNext(900, 6),
  735. OnNext(900, 7),
  736. OnError<int>(900, ex)
  737. );
  738. }
  739. [Fact]
  740. public void Error_ReplayAll()
  741. {
  742. var scheduler = new TestScheduler();
  743. var ex = new Exception();
  744. var xs = scheduler.CreateHotObservable(
  745. OnNext(70, 1),
  746. OnNext(110, 2),
  747. OnNext(220, 3),
  748. OnNext(270, 4),
  749. OnNext(340, 5),
  750. OnNext(410, 6),
  751. OnNext(520, 7),
  752. OnError<int>(630, ex),
  753. OnNext(640, 9),
  754. OnCompleted<int>(650),
  755. OnError<int>(660, new Exception())
  756. );
  757. var subject = default(ReplaySubject<int>);
  758. var subscription = default(IDisposable);
  759. var results1 = scheduler.CreateObserver<int>();
  760. var subscription1 = default(IDisposable);
  761. var results2 = scheduler.CreateObserver<int>();
  762. var subscription2 = default(IDisposable);
  763. var results3 = scheduler.CreateObserver<int>();
  764. var subscription3 = default(IDisposable);
  765. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>());
  766. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  767. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  768. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  769. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  770. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  771. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  772. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  773. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  774. scheduler.Start();
  775. results1.Messages.AssertEqual(
  776. OnNext(300, 3),
  777. OnNext(300, 4),
  778. OnNext(340, 5),
  779. OnNext(410, 6),
  780. OnNext(520, 7)
  781. );
  782. results2.Messages.AssertEqual(
  783. OnNext(400, 3),
  784. OnNext(400, 4),
  785. OnNext(400, 5),
  786. OnNext(410, 6),
  787. OnNext(520, 7),
  788. OnError<int>(630, ex)
  789. );
  790. results3.Messages.AssertEqual(
  791. OnNext(900, 3),
  792. OnNext(900, 4),
  793. OnNext(900, 5),
  794. OnNext(900, 6),
  795. OnNext(900, 7),
  796. OnError<int>(900, ex)
  797. );
  798. }
  799. [Fact]
  800. public void Canceled_ReplayByTime()
  801. {
  802. var scheduler = new TestScheduler();
  803. var xs = scheduler.CreateHotObservable(
  804. OnCompleted<int>(630),
  805. OnNext(640, 9),
  806. OnCompleted<int>(650),
  807. OnError<int>(660, new Exception())
  808. );
  809. var subject = default(ReplaySubject<int>);
  810. var subscription = default(IDisposable);
  811. var results1 = scheduler.CreateObserver<int>();
  812. var subscription1 = default(IDisposable);
  813. var results2 = scheduler.CreateObserver<int>();
  814. var subscription2 = default(IDisposable);
  815. var results3 = scheduler.CreateObserver<int>();
  816. var subscription3 = default(IDisposable);
  817. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(3, TimeSpan.FromTicks(100), scheduler));
  818. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  819. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  820. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  821. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  822. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  823. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  824. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  825. scheduler.ScheduleAbsolute(800, () => subscription1.Dispose());
  826. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  827. scheduler.Start();
  828. results1.Messages.AssertEqual(
  829. );
  830. results2.Messages.AssertEqual(
  831. OnCompleted<int>(631)
  832. );
  833. results3.Messages.AssertEqual(
  834. OnCompleted<int>(901)
  835. );
  836. }
  837. [Fact]
  838. public void Canceled_ReplayOne()
  839. {
  840. var scheduler = new TestScheduler();
  841. var xs = scheduler.CreateHotObservable(
  842. OnCompleted<int>(630),
  843. OnNext(640, 9),
  844. OnCompleted<int>(650),
  845. OnError<int>(660, new Exception())
  846. );
  847. var subject = default(ReplaySubject<int>);
  848. var subscription = default(IDisposable);
  849. var results1 = scheduler.CreateObserver<int>();
  850. var subscription1 = default(IDisposable);
  851. var results2 = scheduler.CreateObserver<int>();
  852. var subscription2 = default(IDisposable);
  853. var results3 = scheduler.CreateObserver<int>();
  854. var subscription3 = default(IDisposable);
  855. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(1));
  856. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  857. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  858. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  859. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  860. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  861. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  862. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  863. scheduler.ScheduleAbsolute(800, () => subscription1.Dispose());
  864. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  865. scheduler.Start();
  866. results1.Messages.AssertEqual(
  867. );
  868. results2.Messages.AssertEqual(
  869. OnCompleted<int>(630)
  870. );
  871. results3.Messages.AssertEqual(
  872. OnCompleted<int>(900)
  873. );
  874. }
  875. [Fact]
  876. public void Canceled_ReplayMany()
  877. {
  878. var scheduler = new TestScheduler();
  879. var xs = scheduler.CreateHotObservable(
  880. OnCompleted<int>(630),
  881. OnNext(640, 9),
  882. OnCompleted<int>(650),
  883. OnError<int>(660, new Exception())
  884. );
  885. var subject = default(ReplaySubject<int>);
  886. var subscription = default(IDisposable);
  887. var results1 = scheduler.CreateObserver<int>();
  888. var subscription1 = default(IDisposable);
  889. var results2 = scheduler.CreateObserver<int>();
  890. var subscription2 = default(IDisposable);
  891. var results3 = scheduler.CreateObserver<int>();
  892. var subscription3 = default(IDisposable);
  893. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(3));
  894. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  895. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  896. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  897. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  898. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  899. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  900. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  901. scheduler.ScheduleAbsolute(800, () => subscription1.Dispose());
  902. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  903. scheduler.Start();
  904. results1.Messages.AssertEqual(
  905. );
  906. results2.Messages.AssertEqual(
  907. OnCompleted<int>(630)
  908. );
  909. results3.Messages.AssertEqual(
  910. OnCompleted<int>(900)
  911. );
  912. }
  913. [Fact]
  914. public void Canceled_ReplayAll()
  915. {
  916. var scheduler = new TestScheduler();
  917. var xs = scheduler.CreateHotObservable(
  918. OnCompleted<int>(630),
  919. OnNext(640, 9),
  920. OnCompleted<int>(650),
  921. OnError<int>(660, new Exception())
  922. );
  923. var subject = default(ReplaySubject<int>);
  924. var subscription = default(IDisposable);
  925. var results1 = scheduler.CreateObserver<int>();
  926. var subscription1 = default(IDisposable);
  927. var results2 = scheduler.CreateObserver<int>();
  928. var subscription2 = default(IDisposable);
  929. var results3 = scheduler.CreateObserver<int>();
  930. var subscription3 = default(IDisposable);
  931. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>());
  932. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  933. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  934. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  935. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  936. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  937. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  938. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  939. scheduler.ScheduleAbsolute(800, () => subscription1.Dispose());
  940. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  941. scheduler.Start();
  942. results1.Messages.AssertEqual(
  943. );
  944. results2.Messages.AssertEqual(
  945. OnCompleted<int>(630)
  946. );
  947. results3.Messages.AssertEqual(
  948. OnCompleted<int>(900)
  949. );
  950. }
  951. [Fact]
  952. public void SubjectDisposed()
  953. {
  954. var scheduler = new TestScheduler();
  955. var subject = default(ReplaySubject<int>);
  956. var results1 = scheduler.CreateObserver<int>();
  957. var subscription1 = default(IDisposable);
  958. var results2 = scheduler.CreateObserver<int>();
  959. var subscription2 = default(IDisposable);
  960. var results3 = scheduler.CreateObserver<int>();
  961. var subscription3 = default(IDisposable);
  962. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(scheduler));
  963. scheduler.ScheduleAbsolute(200, () => subscription1 = subject.Subscribe(results1));
  964. scheduler.ScheduleAbsolute(300, () => subscription2 = subject.Subscribe(results2));
  965. scheduler.ScheduleAbsolute(400, () => subscription3 = subject.Subscribe(results3));
  966. scheduler.ScheduleAbsolute(500, () => subscription1.Dispose());
  967. scheduler.ScheduleAbsolute(600, () => subject.Dispose());
  968. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  969. scheduler.ScheduleAbsolute(800, () => subscription3.Dispose());
  970. scheduler.ScheduleAbsolute(150, () => subject.OnNext(1));
  971. scheduler.ScheduleAbsolute(250, () => subject.OnNext(2));
  972. scheduler.ScheduleAbsolute(350, () => subject.OnNext(3));
  973. scheduler.ScheduleAbsolute(450, () => subject.OnNext(4));
  974. scheduler.ScheduleAbsolute(550, () => subject.OnNext(5));
  975. scheduler.ScheduleAbsolute(650, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.OnNext(6)));
  976. scheduler.ScheduleAbsolute(750, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.OnCompleted()));
  977. scheduler.ScheduleAbsolute(850, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.OnError(new Exception())));
  978. scheduler.ScheduleAbsolute(950, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.Subscribe()));
  979. scheduler.Start();
  980. results1.Messages.AssertEqual(
  981. OnNext(201, 1),
  982. OnNext(251, 2),
  983. OnNext(351, 3),
  984. OnNext(451, 4)
  985. );
  986. results2.Messages.AssertEqual(
  987. OnNext(301, 1),
  988. OnNext(302, 2),
  989. OnNext(351, 3),
  990. OnNext(451, 4),
  991. OnNext(551, 5)
  992. );
  993. results3.Messages.AssertEqual(
  994. OnNext(401, 1),
  995. OnNext(402, 2),
  996. OnNext(403, 3),
  997. OnNext(451, 4),
  998. OnNext(551, 5)
  999. );
  1000. }
  1001. [Fact]
  1002. public void SubjectDisposed_ReplayOne()
  1003. {
  1004. var scheduler = new TestScheduler();
  1005. var subject = default(ReplaySubject<int>);
  1006. var results1 = scheduler.CreateObserver<int>();
  1007. var subscription1 = default(IDisposable);
  1008. var results2 = scheduler.CreateObserver<int>();
  1009. var subscription2 = default(IDisposable);
  1010. var results3 = scheduler.CreateObserver<int>();
  1011. var subscription3 = default(IDisposable);
  1012. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(1));
  1013. scheduler.ScheduleAbsolute(200, () => subscription1 = subject.Subscribe(results1));
  1014. scheduler.ScheduleAbsolute(300, () => subscription2 = subject.Subscribe(results2));
  1015. scheduler.ScheduleAbsolute(400, () => subscription3 = subject.Subscribe(results3));
  1016. scheduler.ScheduleAbsolute(500, () => subscription1.Dispose());
  1017. scheduler.ScheduleAbsolute(600, () => subject.Dispose());
  1018. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  1019. scheduler.ScheduleAbsolute(800, () => subscription3.Dispose());
  1020. scheduler.ScheduleAbsolute(150, () => subject.OnNext(1));
  1021. scheduler.ScheduleAbsolute(250, () => subject.OnNext(2));
  1022. scheduler.ScheduleAbsolute(350, () => subject.OnNext(3));
  1023. scheduler.ScheduleAbsolute(450, () => subject.OnNext(4));
  1024. scheduler.ScheduleAbsolute(550, () => subject.OnNext(5));
  1025. scheduler.ScheduleAbsolute(650, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.OnNext(6)));
  1026. scheduler.ScheduleAbsolute(750, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.OnCompleted()));
  1027. scheduler.ScheduleAbsolute(850, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.OnError(new Exception())));
  1028. scheduler.ScheduleAbsolute(950, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.Subscribe()));
  1029. scheduler.Start();
  1030. results1.Messages.AssertEqual(
  1031. OnNext(200, 1),
  1032. OnNext(250, 2),
  1033. OnNext(350, 3),
  1034. OnNext(450, 4)
  1035. );
  1036. results2.Messages.AssertEqual(
  1037. OnNext(300, 2),
  1038. OnNext(350, 3),
  1039. OnNext(450, 4),
  1040. OnNext(550, 5)
  1041. );
  1042. results3.Messages.AssertEqual(
  1043. OnNext(400, 3),
  1044. OnNext(450, 4),
  1045. OnNext(550, 5)
  1046. );
  1047. }
  1048. [Fact]
  1049. public void SubjectDisposed_ReplayMany()
  1050. {
  1051. var scheduler = new TestScheduler();
  1052. var subject = default(ReplaySubject<int>);
  1053. var results1 = scheduler.CreateObserver<int>();
  1054. var subscription1 = default(IDisposable);
  1055. var results2 = scheduler.CreateObserver<int>();
  1056. var subscription2 = default(IDisposable);
  1057. var results3 = scheduler.CreateObserver<int>();
  1058. var subscription3 = default(IDisposable);
  1059. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(3));
  1060. scheduler.ScheduleAbsolute(200, () => subscription1 = subject.Subscribe(results1));
  1061. scheduler.ScheduleAbsolute(300, () => subscription2 = subject.Subscribe(results2));
  1062. scheduler.ScheduleAbsolute(400, () => subscription3 = subject.Subscribe(results3));
  1063. scheduler.ScheduleAbsolute(500, () => subscription1.Dispose());
  1064. scheduler.ScheduleAbsolute(600, () => subject.Dispose());
  1065. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  1066. scheduler.ScheduleAbsolute(800, () => subscription3.Dispose());
  1067. scheduler.ScheduleAbsolute(150, () => subject.OnNext(1));
  1068. scheduler.ScheduleAbsolute(250, () => subject.OnNext(2));
  1069. scheduler.ScheduleAbsolute(350, () => subject.OnNext(3));
  1070. scheduler.ScheduleAbsolute(450, () => subject.OnNext(4));
  1071. scheduler.ScheduleAbsolute(550, () => subject.OnNext(5));
  1072. scheduler.ScheduleAbsolute(650, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.OnNext(6)));
  1073. scheduler.ScheduleAbsolute(750, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.OnCompleted()));
  1074. scheduler.ScheduleAbsolute(850, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.OnError(new Exception())));
  1075. scheduler.ScheduleAbsolute(950, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.Subscribe()));
  1076. scheduler.Start();
  1077. results1.Messages.AssertEqual(
  1078. OnNext(200, 1),
  1079. OnNext(250, 2),
  1080. OnNext(350, 3),
  1081. OnNext(450, 4)
  1082. );
  1083. results2.Messages.AssertEqual(
  1084. OnNext(300, 1),
  1085. OnNext(300, 2),
  1086. OnNext(350, 3),
  1087. OnNext(450, 4),
  1088. OnNext(550, 5)
  1089. );
  1090. results3.Messages.AssertEqual(
  1091. OnNext(400, 1),
  1092. OnNext(400, 2),
  1093. OnNext(400, 3),
  1094. OnNext(450, 4),
  1095. OnNext(550, 5)
  1096. );
  1097. }
  1098. [Fact]
  1099. public void SubjectDisposed_ReplayAll()
  1100. {
  1101. var scheduler = new TestScheduler();
  1102. var subject = default(ReplaySubject<int>);
  1103. var results1 = scheduler.CreateObserver<int>();
  1104. var subscription1 = default(IDisposable);
  1105. var results2 = scheduler.CreateObserver<int>();
  1106. var subscription2 = default(IDisposable);
  1107. var results3 = scheduler.CreateObserver<int>();
  1108. var subscription3 = default(IDisposable);
  1109. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>());
  1110. scheduler.ScheduleAbsolute(200, () => subscription1 = subject.Subscribe(results1));
  1111. scheduler.ScheduleAbsolute(300, () => subscription2 = subject.Subscribe(results2));
  1112. scheduler.ScheduleAbsolute(400, () => subscription3 = subject.Subscribe(results3));
  1113. scheduler.ScheduleAbsolute(500, () => subscription1.Dispose());
  1114. scheduler.ScheduleAbsolute(600, () => subject.Dispose());
  1115. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  1116. scheduler.ScheduleAbsolute(800, () => subscription3.Dispose());
  1117. scheduler.ScheduleAbsolute(150, () => subject.OnNext(1));
  1118. scheduler.ScheduleAbsolute(250, () => subject.OnNext(2));
  1119. scheduler.ScheduleAbsolute(350, () => subject.OnNext(3));
  1120. scheduler.ScheduleAbsolute(450, () => subject.OnNext(4));
  1121. scheduler.ScheduleAbsolute(550, () => subject.OnNext(5));
  1122. scheduler.ScheduleAbsolute(650, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.OnNext(6)));
  1123. scheduler.ScheduleAbsolute(750, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.OnCompleted()));
  1124. scheduler.ScheduleAbsolute(850, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.OnError(new Exception())));
  1125. scheduler.ScheduleAbsolute(950, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.Subscribe()));
  1126. scheduler.Start();
  1127. results1.Messages.AssertEqual(
  1128. OnNext(200, 1),
  1129. OnNext(250, 2),
  1130. OnNext(350, 3),
  1131. OnNext(450, 4)
  1132. );
  1133. results2.Messages.AssertEqual(
  1134. OnNext(300, 1),
  1135. OnNext(300, 2),
  1136. OnNext(350, 3),
  1137. OnNext(450, 4),
  1138. OnNext(550, 5)
  1139. );
  1140. results3.Messages.AssertEqual(
  1141. OnNext(400, 1),
  1142. OnNext(400, 2),
  1143. OnNext(400, 3),
  1144. OnNext(450, 4),
  1145. OnNext(550, 5)
  1146. );
  1147. }
  1148. //
  1149. // TODO: Create a failing test for this for the other implementations (ReplayOne/Many/All).
  1150. // I don't understand the behavior.
  1151. // I think it may have to do with calling Trim() on Subscription (as well as in the OnNext calls). -LC
  1152. //
  1153. [Fact]
  1154. public void ReplaySubjectDiesOut()
  1155. {
  1156. //
  1157. // Tests v1.x behavior as documented in ReplaySubject.cs (Subscribe method).
  1158. //
  1159. var scheduler = new TestScheduler();
  1160. var xs = scheduler.CreateHotObservable(
  1161. OnNext(70, 1),
  1162. OnNext(110, 2),
  1163. OnNext(220, 3),
  1164. OnNext(270, 4),
  1165. OnNext(340, 5),
  1166. OnNext(410, 6),
  1167. OnNext(520, 7),
  1168. OnCompleted<int>(580)
  1169. );
  1170. var subject = default(ReplaySubject<int>);
  1171. var results1 = scheduler.CreateObserver<int>();
  1172. var results2 = scheduler.CreateObserver<int>();
  1173. var results3 = scheduler.CreateObserver<int>();
  1174. var results4 = scheduler.CreateObserver<int>();
  1175. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(int.MaxValue, TimeSpan.FromTicks(100), scheduler));
  1176. scheduler.ScheduleAbsolute(200, () => xs.Subscribe(subject));
  1177. scheduler.ScheduleAbsolute(300, () => subject.Subscribe(results1));
  1178. scheduler.ScheduleAbsolute(400, () => subject.Subscribe(results2));
  1179. scheduler.ScheduleAbsolute(600, () => subject.Subscribe(results3));
  1180. scheduler.ScheduleAbsolute(900, () => subject.Subscribe(results4));
  1181. scheduler.Start();
  1182. results1.Messages.AssertEqual(
  1183. OnNext(301, 3),
  1184. OnNext(302, 4),
  1185. OnNext(341, 5),
  1186. OnNext(411, 6),
  1187. OnNext(521, 7),
  1188. OnCompleted<int>(581)
  1189. );
  1190. results2.Messages.AssertEqual(
  1191. OnNext(401, 5),
  1192. OnNext(411, 6),
  1193. OnNext(521, 7),
  1194. OnCompleted<int>(581)
  1195. );
  1196. results3.Messages.AssertEqual(
  1197. OnNext(601, 7),
  1198. OnCompleted<int>(602)
  1199. );
  1200. results4.Messages.AssertEqual(
  1201. OnCompleted<int>(901)
  1202. );
  1203. }
  1204. [Fact]
  1205. public void HasObservers()
  1206. {
  1207. HasObserversImpl(new ReplaySubject<int>());
  1208. HasObserversImpl(new ReplaySubject<int>(1));
  1209. HasObserversImpl(new ReplaySubject<int>(3));
  1210. HasObserversImpl(new ReplaySubject<int>(TimeSpan.FromSeconds(1)));
  1211. }
  1212. private static void HasObserversImpl(ReplaySubject<int> s)
  1213. {
  1214. Assert.False(s.HasObservers);
  1215. var d1 = s.Subscribe(_ => { });
  1216. Assert.True(s.HasObservers);
  1217. d1.Dispose();
  1218. Assert.False(s.HasObservers);
  1219. var d2 = s.Subscribe(_ => { });
  1220. Assert.True(s.HasObservers);
  1221. var d3 = s.Subscribe(_ => { });
  1222. Assert.True(s.HasObservers);
  1223. d2.Dispose();
  1224. Assert.True(s.HasObservers);
  1225. d3.Dispose();
  1226. Assert.False(s.HasObservers);
  1227. }
  1228. [Fact]
  1229. public void HasObservers_Dispose1()
  1230. {
  1231. HasObservers_Dispose1Impl(new ReplaySubject<int>());
  1232. HasObservers_Dispose1Impl(new ReplaySubject<int>(1));
  1233. HasObservers_Dispose1Impl(new ReplaySubject<int>(3));
  1234. HasObservers_Dispose1Impl(new ReplaySubject<int>(TimeSpan.FromSeconds(1)));
  1235. }
  1236. private static void HasObservers_Dispose1Impl(ReplaySubject<int> s)
  1237. {
  1238. Assert.False(s.HasObservers);
  1239. Assert.False(s.IsDisposed);
  1240. var d = s.Subscribe(_ => { });
  1241. Assert.True(s.HasObservers);
  1242. Assert.False(s.IsDisposed);
  1243. s.Dispose();
  1244. Assert.False(s.HasObservers);
  1245. Assert.True(s.IsDisposed);
  1246. d.Dispose();
  1247. Assert.False(s.HasObservers);
  1248. Assert.True(s.IsDisposed);
  1249. }
  1250. [Fact]
  1251. public void HasObservers_Dispose2()
  1252. {
  1253. HasObservers_Dispose2Impl(new ReplaySubject<int>());
  1254. HasObservers_Dispose2Impl(new ReplaySubject<int>(1));
  1255. HasObservers_Dispose2Impl(new ReplaySubject<int>(3));
  1256. HasObservers_Dispose2Impl(new ReplaySubject<int>(TimeSpan.FromSeconds(1)));
  1257. }
  1258. private static void HasObservers_Dispose2Impl(ReplaySubject<int> s)
  1259. {
  1260. Assert.False(s.HasObservers);
  1261. Assert.False(s.IsDisposed);
  1262. var d = s.Subscribe(_ => { });
  1263. Assert.True(s.HasObservers);
  1264. Assert.False(s.IsDisposed);
  1265. d.Dispose();
  1266. Assert.False(s.HasObservers);
  1267. Assert.False(s.IsDisposed);
  1268. s.Dispose();
  1269. Assert.False(s.HasObservers);
  1270. Assert.True(s.IsDisposed);
  1271. }
  1272. [Fact]
  1273. public void HasObservers_Dispose3()
  1274. {
  1275. HasObservers_Dispose3Impl(new ReplaySubject<int>());
  1276. HasObservers_Dispose3Impl(new ReplaySubject<int>(1));
  1277. HasObservers_Dispose3Impl(new ReplaySubject<int>(3));
  1278. HasObservers_Dispose3Impl(new ReplaySubject<int>(TimeSpan.FromSeconds(1)));
  1279. }
  1280. private static void HasObservers_Dispose3Impl(ReplaySubject<int> s)
  1281. {
  1282. Assert.False(s.HasObservers);
  1283. Assert.False(s.IsDisposed);
  1284. s.Dispose();
  1285. Assert.False(s.HasObservers);
  1286. Assert.True(s.IsDisposed);
  1287. }
  1288. [Fact]
  1289. public void HasObservers_OnCompleted()
  1290. {
  1291. HasObservers_OnCompletedImpl(new ReplaySubject<int>());
  1292. HasObservers_OnCompletedImpl(new ReplaySubject<int>(1));
  1293. HasObservers_OnCompletedImpl(new ReplaySubject<int>(3));
  1294. HasObservers_OnCompletedImpl(new ReplaySubject<int>(TimeSpan.FromSeconds(1)));
  1295. }
  1296. private static void HasObservers_OnCompletedImpl(ReplaySubject<int> s)
  1297. {
  1298. Assert.False(s.HasObservers);
  1299. var d = s.Subscribe(_ => { });
  1300. Assert.True(s.HasObservers);
  1301. s.OnNext(42);
  1302. Assert.True(s.HasObservers);
  1303. s.OnCompleted();
  1304. Assert.False(s.HasObservers);
  1305. }
  1306. [Fact]
  1307. public void HasObservers_OnError()
  1308. {
  1309. HasObservers_OnErrorImpl(new ReplaySubject<int>());
  1310. HasObservers_OnErrorImpl(new ReplaySubject<int>(1));
  1311. HasObservers_OnErrorImpl(new ReplaySubject<int>(3));
  1312. HasObservers_OnErrorImpl(new ReplaySubject<int>(TimeSpan.FromSeconds(1)));
  1313. }
  1314. private static void HasObservers_OnErrorImpl(ReplaySubject<int> s)
  1315. {
  1316. Assert.False(s.HasObservers);
  1317. var d = s.Subscribe(_ => { }, ex => { });
  1318. Assert.True(s.HasObservers);
  1319. s.OnNext(42);
  1320. Assert.True(s.HasObservers);
  1321. s.OnError(new Exception());
  1322. Assert.False(s.HasObservers);
  1323. }
  1324. [Fact]
  1325. public void Completed_to_late_subscriber_ReplayAll()
  1326. {
  1327. var s = new ReplaySubject<int>();
  1328. s.OnNext(1);
  1329. s.OnNext(2);
  1330. s.OnCompleted();
  1331. var scheduler = new TestScheduler();
  1332. var observer = scheduler.CreateObserver<int>();
  1333. s.Subscribe(observer);
  1334. Assert.Equal(3, observer.Messages.Count);
  1335. Assert.Equal(1, observer.Messages[0].Value.Value);
  1336. Assert.Equal(2, observer.Messages[1].Value.Value);
  1337. Assert.Equal(NotificationKind.OnCompleted, observer.Messages[2].Value.Kind);
  1338. }
  1339. [Fact]
  1340. public void Completed_to_late_subscriber_ReplayOne()
  1341. {
  1342. var s = new ReplaySubject<int>(1);
  1343. s.OnNext(1);
  1344. s.OnNext(2);
  1345. s.OnCompleted();
  1346. var scheduler = new TestScheduler();
  1347. var observer = scheduler.CreateObserver<int>();
  1348. s.Subscribe(observer);
  1349. Assert.Equal(2, observer.Messages.Count);
  1350. Assert.Equal(2, observer.Messages[0].Value.Value);
  1351. Assert.Equal(NotificationKind.OnCompleted, observer.Messages[1].Value.Kind);
  1352. }
  1353. [Fact]
  1354. public void Completed_to_late_subscriber_ReplayMany()
  1355. {
  1356. var s = new ReplaySubject<int>(2);
  1357. s.OnNext(1);
  1358. s.OnNext(2);
  1359. s.OnNext(3);
  1360. s.OnCompleted();
  1361. var scheduler = new TestScheduler();
  1362. var observer = scheduler.CreateObserver<int>();
  1363. s.Subscribe(observer);
  1364. Assert.Equal(3, observer.Messages.Count);
  1365. Assert.Equal(2, observer.Messages[0].Value.Value);
  1366. Assert.Equal(3, observer.Messages[1].Value.Value);
  1367. Assert.Equal(NotificationKind.OnCompleted, observer.Messages[2].Value.Kind);
  1368. }
  1369. [Fact]
  1370. public void Completed_to_late_subscriber_ReplayByTime()
  1371. {
  1372. var s = new ReplaySubject<int>(TimeSpan.FromMinutes(1));
  1373. s.OnNext(1);
  1374. s.OnNext(2);
  1375. s.OnNext(3);
  1376. s.OnCompleted();
  1377. var scheduler = new TestScheduler();
  1378. var observer = scheduler.CreateObserver<int>();
  1379. s.Subscribe(observer);
  1380. Assert.Equal(4, observer.Messages.Count);
  1381. Assert.Equal(1, observer.Messages[0].Value.Value);
  1382. Assert.Equal(2, observer.Messages[1].Value.Value);
  1383. Assert.Equal(3, observer.Messages[2].Value.Value);
  1384. Assert.Equal(NotificationKind.OnCompleted, observer.Messages[3].Value.Kind);
  1385. }
  1386. [Fact]
  1387. public void Errored_to_late_subscriber_ReplayAll()
  1388. {
  1389. var expectedException = new Exception("Test");
  1390. var s = new ReplaySubject<int>();
  1391. s.OnNext(1);
  1392. s.OnNext(2);
  1393. s.OnError(expectedException);
  1394. var scheduler = new TestScheduler();
  1395. var observer = scheduler.CreateObserver<int>();
  1396. s.Subscribe(observer);
  1397. Assert.Equal(3, observer.Messages.Count);
  1398. Assert.Equal(1, observer.Messages[0].Value.Value);
  1399. Assert.Equal(2, observer.Messages[1].Value.Value);
  1400. Assert.Equal(NotificationKind.OnError, observer.Messages[2].Value.Kind);
  1401. Assert.Equal(expectedException, observer.Messages[2].Value.Exception);
  1402. }
  1403. [Fact]
  1404. public void Errored_to_late_subscriber_ReplayOne()
  1405. {
  1406. var expectedException = new Exception("Test");
  1407. var s = new ReplaySubject<int>(1);
  1408. s.OnNext(1);
  1409. s.OnNext(2);
  1410. s.OnError(expectedException);
  1411. var scheduler = new TestScheduler();
  1412. var observer = scheduler.CreateObserver<int>();
  1413. s.Subscribe(observer);
  1414. Assert.Equal(2, observer.Messages.Count);
  1415. Assert.Equal(2, observer.Messages[0].Value.Value);
  1416. Assert.Equal(NotificationKind.OnError, observer.Messages[1].Value.Kind);
  1417. Assert.Equal(expectedException, observer.Messages[1].Value.Exception);
  1418. }
  1419. [Fact]
  1420. public void Errored_to_late_subscriber_ReplayMany()
  1421. {
  1422. var expectedException = new Exception("Test");
  1423. var s = new ReplaySubject<int>(2);
  1424. s.OnNext(1);
  1425. s.OnNext(2);
  1426. s.OnNext(3);
  1427. s.OnError(expectedException);
  1428. var scheduler = new TestScheduler();
  1429. var observer = scheduler.CreateObserver<int>();
  1430. s.Subscribe(observer);
  1431. Assert.Equal(3, observer.Messages.Count);
  1432. Assert.Equal(2, observer.Messages[0].Value.Value);
  1433. Assert.Equal(3, observer.Messages[1].Value.Value);
  1434. Assert.Equal(NotificationKind.OnError, observer.Messages[2].Value.Kind);
  1435. Assert.Equal(expectedException, observer.Messages[2].Value.Exception);
  1436. }
  1437. [Fact]
  1438. public void Errored_to_late_subscriber_ReplayByTime()
  1439. {
  1440. var expectedException = new Exception("Test");
  1441. var s = new ReplaySubject<int>(TimeSpan.FromMinutes(1));
  1442. s.OnNext(1);
  1443. s.OnNext(2);
  1444. s.OnNext(3);
  1445. s.OnError(expectedException);
  1446. var scheduler = new TestScheduler();
  1447. var observer = scheduler.CreateObserver<int>();
  1448. s.Subscribe(observer);
  1449. Assert.Equal(4, observer.Messages.Count);
  1450. Assert.Equal(1, observer.Messages[0].Value.Value);
  1451. Assert.Equal(2, observer.Messages[1].Value.Value);
  1452. Assert.Equal(3, observer.Messages[2].Value.Value);
  1453. Assert.Equal(NotificationKind.OnError, observer.Messages[3].Value.Kind);
  1454. Assert.Equal(expectedException, observer.Messages[3].Value.Exception);
  1455. }
  1456. [Fact]
  1457. public void ReplaySubject_Reentrant()
  1458. {
  1459. var r = new ReplaySubject<int>(4);
  1460. r.OnNext(0);
  1461. r.OnNext(1);
  1462. r.OnNext(2);
  1463. r.OnNext(3);
  1464. r.OnNext(4);
  1465. var xs = new List<int>();
  1466. var i = 0;
  1467. r.Subscribe(x =>
  1468. {
  1469. xs.Add(x);
  1470. if (++i <= 10)
  1471. {
  1472. r.OnNext(x);
  1473. }
  1474. });
  1475. r.OnNext(5);
  1476. Assert.True(xs.SequenceEqual(new[]
  1477. {
  1478. 1, 2, 3, 4, // original
  1479. 1, 2, 3, 4, // reentrant (+ fed back)
  1480. 1, 2, 3, 4, // reentrant (+ first two fed back)
  1481. 1, 2, // reentrant
  1482. 5 // tune in
  1483. }));
  1484. }
  1485. #if !NO_INTERNALSTEST
  1486. [Fact]
  1487. public void FastImmediateObserver_Simple1()
  1488. {
  1489. var res = FastImmediateObserverTest(fio =>
  1490. {
  1491. fio.OnNext(1);
  1492. fio.OnNext(2);
  1493. fio.OnNext(3);
  1494. fio.OnCompleted();
  1495. fio.EnsureActive(4);
  1496. });
  1497. res.AssertEqual(
  1498. OnNext(0, 1),
  1499. OnNext(1, 2),
  1500. OnNext(2, 3),
  1501. OnCompleted<int>(3)
  1502. );
  1503. }
  1504. [Fact]
  1505. public void FastImmediateObserver_Simple2()
  1506. {
  1507. var ex = new Exception();
  1508. var res = FastImmediateObserverTest(fio =>
  1509. {
  1510. fio.OnNext(1);
  1511. fio.OnNext(2);
  1512. fio.OnNext(3);
  1513. fio.OnError(ex);
  1514. fio.EnsureActive(4);
  1515. });
  1516. res.AssertEqual(
  1517. OnNext(0, 1),
  1518. OnNext(1, 2),
  1519. OnNext(2, 3),
  1520. OnError<int>(3, ex)
  1521. );
  1522. }
  1523. [Fact]
  1524. public void FastImmediateObserver_Simple3()
  1525. {
  1526. var res = FastImmediateObserverTest(fio =>
  1527. {
  1528. fio.OnNext(1);
  1529. fio.EnsureActive();
  1530. fio.OnNext(2);
  1531. fio.EnsureActive();
  1532. fio.OnNext(3);
  1533. fio.EnsureActive();
  1534. fio.OnCompleted();
  1535. fio.EnsureActive();
  1536. });
  1537. res.AssertEqual(
  1538. OnNext(0, 1),
  1539. OnNext(1, 2),
  1540. OnNext(2, 3),
  1541. OnCompleted<int>(3)
  1542. );
  1543. }
  1544. [Fact]
  1545. public void FastImmediateObserver_Fault()
  1546. {
  1547. var xs = new List<int>();
  1548. var o = Observer.Create<int>(
  1549. x => { xs.Add(x); if (x == 2) throw new Exception(); },
  1550. ex => { },
  1551. () => { }
  1552. );
  1553. var fio = new FastImmediateObserver<int>(o);
  1554. fio.OnNext(1);
  1555. fio.OnNext(2);
  1556. fio.OnNext(3);
  1557. ReactiveAssert.Throws<Exception>(() => fio.EnsureActive());
  1558. fio.OnNext(4);
  1559. fio.EnsureActive();
  1560. fio.OnNext(2);
  1561. fio.EnsureActive();
  1562. Assert.True(xs.Count == 2);
  1563. }
  1564. [Fact]
  1565. public void FastImmediateObserver_Ownership1()
  1566. {
  1567. var xs = new List<int>();
  1568. var o = Observer.Create<int>(
  1569. xs.Add,
  1570. ex => { },
  1571. () => { }
  1572. );
  1573. var fio = new FastImmediateObserver<int>(o);
  1574. var ts = new Task[16];
  1575. var N = 100;
  1576. for (var i = 0; i < ts.Length; i++)
  1577. {
  1578. var j = i;
  1579. ts[i] = Task.Factory.StartNew(() =>
  1580. {
  1581. for (var k = 0; k < N; k++)
  1582. {
  1583. fio.OnNext(j * N + k);
  1584. }
  1585. fio.EnsureActive(N);
  1586. });
  1587. }
  1588. Task.WaitAll(ts);
  1589. Assert.True(xs.Count == ts.Length * N);
  1590. }
  1591. [Fact]
  1592. public void FastImmediateObserver_Ownership2()
  1593. {
  1594. var cd = new CountdownEvent(3);
  1595. var w = new ManualResetEvent(false);
  1596. var e = new ManualResetEvent(false);
  1597. var xs = new List<int>();
  1598. var o = Observer.Create<int>(
  1599. x => { xs.Add(x); w.Set(); e.WaitOne(); cd.Signal(); },
  1600. ex => { },
  1601. () => { }
  1602. );
  1603. var fio = new FastImmediateObserver<int>(o);
  1604. fio.OnNext(1);
  1605. var t = Task.Factory.StartNew(() =>
  1606. {
  1607. fio.EnsureActive();
  1608. });
  1609. w.WaitOne();
  1610. fio.OnNext(2);
  1611. fio.OnNext(3);
  1612. fio.EnsureActive(2);
  1613. e.Set();
  1614. cd.Wait();
  1615. Assert.True(xs.Count == 3);
  1616. }
  1617. private IEnumerable<Recorded<Notification<int>>> FastImmediateObserverTest(Action<IScheduledObserver<int>> f)
  1618. {
  1619. var ns = new List<Recorded<Notification<int>>>();
  1620. var l = 0L;
  1621. var o = Observer.Create<int>(
  1622. x => { ns.Add(OnNext<int>(l++, x)); },
  1623. ex => { ns.Add(OnError<int>(l++, ex)); },
  1624. () => { ns.Add(OnCompleted<int>(l++)); }
  1625. );
  1626. var fio = new FastImmediateObserver<int>(o);
  1627. f(fio);
  1628. return ns;
  1629. }
  1630. #endif
  1631. }
  1632. }