1
0

ReplaySubjectTest.cs 66 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. using System;
  3. using System.Reactive;
  4. using System.Reactive.Concurrency;
  5. using System.Reactive.Subjects;
  6. using Microsoft.Reactive.Testing;
  7. using Microsoft.VisualStudio.TestTools.UnitTesting;
  8. using ReactiveTests.Dummies;
  9. namespace ReactiveTests.Tests
  10. {
  11. [TestClass]
  12. public partial class ReplaySubjectTest : ReactiveTest
  13. {
  14. [TestMethod]
  15. public void Subscribe_ArgumentChecking()
  16. {
  17. ReactiveAssert.Throws<ArgumentNullException>(() => new ReplaySubject<int>().Subscribe(null));
  18. ReactiveAssert.Throws<ArgumentNullException>(() => new ReplaySubject<int>(1).Subscribe(null));
  19. ReactiveAssert.Throws<ArgumentNullException>(() => new ReplaySubject<int>(2).Subscribe(null));
  20. ReactiveAssert.Throws<ArgumentNullException>(() => new ReplaySubject<int>(DummyScheduler.Instance).Subscribe(null));
  21. }
  22. [TestMethod]
  23. public void OnError_ArgumentChecking()
  24. {
  25. ReactiveAssert.Throws<ArgumentNullException>(() => new ReplaySubject<int>().OnError(null));
  26. ReactiveAssert.Throws<ArgumentNullException>(() => new ReplaySubject<int>(1).OnError(null));
  27. ReactiveAssert.Throws<ArgumentNullException>(() => new ReplaySubject<int>(2).OnError(null));
  28. ReactiveAssert.Throws<ArgumentNullException>(() => new ReplaySubject<int>(DummyScheduler.Instance).OnError(null));
  29. }
  30. [TestMethod]
  31. public void Constructor_ArgumentChecking()
  32. {
  33. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => new ReplaySubject<int>(-1));
  34. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => new ReplaySubject<int>(-1, DummyScheduler.Instance));
  35. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => new ReplaySubject<int>(-1, TimeSpan.Zero));
  36. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => new ReplaySubject<int>(-1, TimeSpan.Zero, DummyScheduler.Instance));
  37. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => new ReplaySubject<int>(TimeSpan.FromTicks(-1)));
  38. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => new ReplaySubject<int>(TimeSpan.FromTicks(-1), DummyScheduler.Instance));
  39. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => new ReplaySubject<int>(0, TimeSpan.FromTicks(-1)));
  40. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => new ReplaySubject<int>(0, TimeSpan.FromTicks(-1), DummyScheduler.Instance));
  41. ReactiveAssert.Throws<ArgumentNullException>(() => new ReplaySubject<int>(null));
  42. ReactiveAssert.Throws<ArgumentNullException>(() => new ReplaySubject<int>(0, null));
  43. ReactiveAssert.Throws<ArgumentNullException>(() => new ReplaySubject<int>(TimeSpan.Zero, null));
  44. ReactiveAssert.Throws<ArgumentNullException>(() => new ReplaySubject<int>(0, TimeSpan.Zero, null));
  45. // zero allowed
  46. new ReplaySubject<int>(0);
  47. new ReplaySubject<int>(TimeSpan.Zero);
  48. new ReplaySubject<int>(0, TimeSpan.Zero);
  49. new ReplaySubject<int>(0, DummyScheduler.Instance);
  50. new ReplaySubject<int>(TimeSpan.Zero, DummyScheduler.Instance);
  51. new ReplaySubject<int>(0, TimeSpan.Zero, DummyScheduler.Instance);
  52. }
  53. [TestMethod]
  54. public void Infinite_ReplayByTime()
  55. {
  56. var scheduler = new TestScheduler();
  57. var xs = scheduler.CreateHotObservable(
  58. OnNext(70, 1),
  59. OnNext(110, 2),
  60. OnNext(220, 3),
  61. OnNext(270, 4),
  62. OnNext(340, 5),
  63. OnNext(410, 6),
  64. OnNext(520, 7),
  65. OnNext(630, 8),
  66. OnNext(710, 9),
  67. OnNext(870, 10),
  68. OnNext(940, 11),
  69. OnNext(1020, 12)
  70. );
  71. var subject = default(ReplaySubject<int>);
  72. var subscription = default(IDisposable);
  73. var results1 = scheduler.CreateObserver<int>();
  74. var subscription1 = default(IDisposable);
  75. var results2 = scheduler.CreateObserver<int>();
  76. var subscription2 = default(IDisposable);
  77. var results3 = scheduler.CreateObserver<int>();
  78. var subscription3 = default(IDisposable);
  79. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(3, TimeSpan.FromTicks(100), scheduler));
  80. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  81. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  82. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  83. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  84. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  85. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  86. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  87. scheduler.ScheduleAbsolute(800, () => subscription1.Dispose());
  88. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  89. scheduler.Start();
  90. results1.Messages.AssertEqual(
  91. OnNext(301, 3),
  92. OnNext(302, 4),
  93. OnNext(341, 5),
  94. OnNext(411, 6),
  95. OnNext(521, 7)
  96. );
  97. results2.Messages.AssertEqual(
  98. OnNext(401, 5),
  99. OnNext(411, 6),
  100. OnNext(521, 7),
  101. OnNext(631, 8)
  102. );
  103. results3.Messages.AssertEqual(
  104. OnNext(901, 10),
  105. OnNext(941, 11)
  106. );
  107. }
  108. [TestMethod]
  109. public void Infinite_ReplayOne()
  110. {
  111. var scheduler = new TestScheduler();
  112. var xs = scheduler.CreateHotObservable(
  113. OnNext(70, 1),
  114. OnNext(110, 2),
  115. OnNext(220, 3),
  116. OnNext(270, 4),
  117. OnNext(340, 5),
  118. OnNext(410, 6),
  119. OnNext(520, 7),
  120. OnNext(630, 8),
  121. OnNext(710, 9),
  122. OnNext(870, 10),
  123. OnNext(940, 11),
  124. OnNext(1020, 12)
  125. );
  126. var subject = default(ReplaySubject<int>);
  127. var subscription = default(IDisposable);
  128. var results1 = scheduler.CreateObserver<int>();
  129. var subscription1 = default(IDisposable);
  130. var results2 = scheduler.CreateObserver<int>();
  131. var subscription2 = default(IDisposable);
  132. var results3 = scheduler.CreateObserver<int>();
  133. var subscription3 = default(IDisposable);
  134. var results4 = scheduler.CreateObserver<int>();
  135. var subscription4 = default(IDisposable);
  136. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(1));
  137. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  138. scheduler.ScheduleAbsolute(1200, () => subscription.Dispose());
  139. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  140. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  141. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  142. scheduler.ScheduleAbsolute(1100, () => subscription4 = subject.Subscribe(results4));
  143. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  144. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  145. scheduler.ScheduleAbsolute(800, () => subscription1.Dispose());
  146. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  147. scheduler.Start();
  148. results1.Messages.AssertEqual(
  149. OnNext(300, 4),
  150. OnNext(340, 5),
  151. OnNext(410, 6),
  152. OnNext(520, 7)
  153. );
  154. results2.Messages.AssertEqual(
  155. OnNext(400, 5),
  156. OnNext(410, 6),
  157. OnNext(520, 7),
  158. OnNext(630, 8)
  159. );
  160. results3.Messages.AssertEqual(
  161. OnNext(900, 10),
  162. OnNext(940, 11)
  163. );
  164. results4.Messages.AssertEqual(
  165. OnNext(1100, 12)
  166. );
  167. }
  168. [TestMethod]
  169. public void Infinite_ReplayMany()
  170. {
  171. var scheduler = new TestScheduler();
  172. var xs = scheduler.CreateHotObservable(
  173. OnNext(70, 1),
  174. OnNext(110, 2),
  175. OnNext(220, 3),
  176. OnNext(270, 4),
  177. OnNext(340, 5),
  178. OnNext(410, 6),
  179. OnNext(520, 7),
  180. OnNext(630, 8),
  181. OnNext(710, 9),
  182. OnNext(870, 10),
  183. OnNext(940, 11),
  184. OnNext(1020, 12)
  185. );
  186. var subject = default(ReplaySubject<int>);
  187. var subscription = default(IDisposable);
  188. var results1 = scheduler.CreateObserver<int>();
  189. var subscription1 = default(IDisposable);
  190. var results2 = scheduler.CreateObserver<int>();
  191. var subscription2 = default(IDisposable);
  192. var results3 = scheduler.CreateObserver<int>();
  193. var subscription3 = default(IDisposable);
  194. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(3));
  195. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  196. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  197. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  198. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  199. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  200. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  201. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  202. scheduler.ScheduleAbsolute(800, () => subscription1.Dispose());
  203. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  204. scheduler.Start();
  205. results1.Messages.AssertEqual(
  206. OnNext(300, 3),
  207. OnNext(300, 4),
  208. OnNext(340, 5),
  209. OnNext(410, 6),
  210. OnNext(520, 7)
  211. );
  212. results2.Messages.AssertEqual(
  213. OnNext(400, 3),
  214. OnNext(400, 4),
  215. OnNext(400, 5),
  216. OnNext(410, 6),
  217. OnNext(520, 7),
  218. OnNext(630, 8)
  219. );
  220. results3.Messages.AssertEqual(
  221. OnNext(900, 8),
  222. OnNext(900, 9),
  223. OnNext(900, 10),
  224. OnNext(940, 11)
  225. );
  226. }
  227. [TestMethod]
  228. public void Infinite_ReplayAll()
  229. {
  230. var scheduler = new TestScheduler();
  231. var xs = scheduler.CreateHotObservable(
  232. OnNext(70, 1),
  233. OnNext(110, 2),
  234. OnNext(220, 3),
  235. OnNext(270, 4),
  236. OnNext(340, 5),
  237. OnNext(410, 6),
  238. OnNext(520, 7),
  239. OnNext(630, 8),
  240. OnNext(710, 9),
  241. OnNext(870, 10),
  242. OnNext(940, 11),
  243. OnNext(1020, 12)
  244. );
  245. var subject = default(ReplaySubject<int>);
  246. var subscription = default(IDisposable);
  247. var results1 = scheduler.CreateObserver<int>();
  248. var subscription1 = default(IDisposable);
  249. var results2 = scheduler.CreateObserver<int>();
  250. var subscription2 = default(IDisposable);
  251. var results3 = scheduler.CreateObserver<int>();
  252. var subscription3 = default(IDisposable);
  253. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>());
  254. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  255. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  256. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  257. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  258. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  259. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  260. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  261. scheduler.ScheduleAbsolute(800, () => subscription1.Dispose());
  262. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  263. scheduler.Start();
  264. results1.Messages.AssertEqual(
  265. OnNext(300, 3),
  266. OnNext(300, 4),
  267. OnNext(340, 5),
  268. OnNext(410, 6),
  269. OnNext(520, 7)
  270. );
  271. results2.Messages.AssertEqual(
  272. OnNext(400, 3),
  273. OnNext(400, 4),
  274. OnNext(400, 5),
  275. OnNext(410, 6),
  276. OnNext(520, 7),
  277. OnNext(630, 8)
  278. );
  279. results3.Messages.AssertEqual(
  280. OnNext(900, 3),
  281. OnNext(900, 4),
  282. OnNext(900, 5),
  283. OnNext(900, 6),
  284. OnNext(900, 7),
  285. OnNext(900, 8),
  286. OnNext(900, 9),
  287. OnNext(900, 10),
  288. OnNext(940, 11)
  289. );
  290. }
  291. [TestMethod]
  292. public void Infinite2()
  293. {
  294. var scheduler = new TestScheduler();
  295. var xs = scheduler.CreateHotObservable(
  296. OnNext(70, 1),
  297. OnNext(110, 2),
  298. OnNext(220, 3),
  299. OnNext(270, 4),
  300. OnNext(280, -1),
  301. OnNext(290, -2),
  302. OnNext(340, 5),
  303. OnNext(410, 6),
  304. OnNext(520, 7),
  305. OnNext(630, 8),
  306. OnNext(710, 9),
  307. OnNext(870, 10),
  308. OnNext(940, 11),
  309. OnNext(1020, 12)
  310. );
  311. var subject = default(ReplaySubject<int>);
  312. var subscription = default(IDisposable);
  313. var results1 = scheduler.CreateObserver<int>();
  314. var subscription1 = default(IDisposable);
  315. var results2 = scheduler.CreateObserver<int>();
  316. var subscription2 = default(IDisposable);
  317. var results3 = scheduler.CreateObserver<int>();
  318. var subscription3 = default(IDisposable);
  319. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(3, TimeSpan.FromTicks(100), scheduler));
  320. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  321. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  322. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  323. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  324. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  325. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  326. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  327. scheduler.ScheduleAbsolute(800, () => subscription1.Dispose());
  328. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  329. scheduler.Start();
  330. results1.Messages.AssertEqual(
  331. OnNext(301, 4),
  332. OnNext(302, -1),
  333. OnNext(303, -2),
  334. OnNext(341, 5),
  335. OnNext(411, 6),
  336. OnNext(521, 7)
  337. );
  338. results2.Messages.AssertEqual(
  339. OnNext(401, 5),
  340. OnNext(411, 6),
  341. OnNext(521, 7),
  342. OnNext(631, 8)
  343. );
  344. results3.Messages.AssertEqual(
  345. OnNext(901, 10),
  346. OnNext(941, 11)
  347. );
  348. }
  349. [TestMethod]
  350. public void Finite_ReplayByTime()
  351. {
  352. var scheduler = new TestScheduler();
  353. var xs = scheduler.CreateHotObservable(
  354. OnNext(70, 1),
  355. OnNext(110, 2),
  356. OnNext(220, 3),
  357. OnNext(270, 4),
  358. OnNext(340, 5),
  359. OnNext(410, 6),
  360. OnNext(520, 7),
  361. OnCompleted<int>(630),
  362. OnNext(640, 9),
  363. OnCompleted<int>(650),
  364. OnError<int>(660, new Exception())
  365. );
  366. var subject = default(ReplaySubject<int>);
  367. var subscription = default(IDisposable);
  368. var results1 = scheduler.CreateObserver<int>();
  369. var subscription1 = default(IDisposable);
  370. var results2 = scheduler.CreateObserver<int>();
  371. var subscription2 = default(IDisposable);
  372. var results3 = scheduler.CreateObserver<int>();
  373. var subscription3 = default(IDisposable);
  374. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(3, TimeSpan.FromTicks(100), scheduler));
  375. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  376. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  377. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  378. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  379. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  380. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  381. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  382. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  383. scheduler.Start();
  384. results1.Messages.AssertEqual(
  385. OnNext(301, 3),
  386. OnNext(302, 4),
  387. OnNext(341, 5),
  388. OnNext(411, 6),
  389. OnNext(521, 7)
  390. );
  391. results2.Messages.AssertEqual(
  392. OnNext(401, 5),
  393. OnNext(411, 6),
  394. OnNext(521, 7),
  395. OnCompleted<int>(631)
  396. );
  397. results3.Messages.AssertEqual(
  398. OnCompleted<int>(901)
  399. );
  400. }
  401. [TestMethod]
  402. public void Finite_ReplayOne()
  403. {
  404. var scheduler = new TestScheduler();
  405. var xs = scheduler.CreateHotObservable(
  406. OnNext(70, 1),
  407. OnNext(110, 2),
  408. OnNext(220, 3),
  409. OnNext(270, 4),
  410. OnNext(340, 5),
  411. OnNext(410, 6),
  412. OnNext(520, 7),
  413. OnCompleted<int>(630),
  414. OnNext(640, 9),
  415. OnCompleted<int>(650),
  416. OnError<int>(660, new Exception())
  417. );
  418. var subject = default(ReplaySubject<int>);
  419. var subscription = default(IDisposable);
  420. var results1 = scheduler.CreateObserver<int>();
  421. var subscription1 = default(IDisposable);
  422. var results2 = scheduler.CreateObserver<int>();
  423. var subscription2 = default(IDisposable);
  424. var results3 = scheduler.CreateObserver<int>();
  425. var subscription3 = default(IDisposable);
  426. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(1));
  427. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  428. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  429. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  430. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  431. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  432. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  433. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  434. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  435. scheduler.Start();
  436. results1.Messages.AssertEqual(
  437. OnNext(300, 4),
  438. OnNext(340, 5),
  439. OnNext(410, 6),
  440. OnNext(520, 7)
  441. );
  442. results2.Messages.AssertEqual(
  443. OnNext(400, 5),
  444. OnNext(410, 6),
  445. OnNext(520, 7),
  446. OnCompleted<int>(630)
  447. );
  448. results3.Messages.AssertEqual(
  449. OnNext(900, 7),
  450. OnCompleted<int>(900)
  451. );
  452. }
  453. [TestMethod]
  454. public void Finite_ReplayMany()
  455. {
  456. var scheduler = new TestScheduler();
  457. var xs = scheduler.CreateHotObservable(
  458. OnNext(70, 1),
  459. OnNext(110, 2),
  460. OnNext(220, 3),
  461. OnNext(270, 4),
  462. OnNext(340, 5),
  463. OnNext(410, 6),
  464. OnNext(520, 7),
  465. OnCompleted<int>(630),
  466. OnNext(640, 9),
  467. OnCompleted<int>(650),
  468. OnError<int>(660, new Exception())
  469. );
  470. var subject = default(ReplaySubject<int>);
  471. var subscription = default(IDisposable);
  472. var results1 = scheduler.CreateObserver<int>();
  473. var subscription1 = default(IDisposable);
  474. var results2 = scheduler.CreateObserver<int>();
  475. var subscription2 = default(IDisposable);
  476. var results3 = scheduler.CreateObserver<int>();
  477. var subscription3 = default(IDisposable);
  478. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(3));
  479. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  480. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  481. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  482. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  483. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  484. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  485. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  486. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  487. scheduler.Start();
  488. results1.Messages.AssertEqual(
  489. OnNext(300, 3),
  490. OnNext(300, 4),
  491. OnNext(340, 5),
  492. OnNext(410, 6),
  493. OnNext(520, 7)
  494. );
  495. results2.Messages.AssertEqual(
  496. OnNext(400, 3),
  497. OnNext(400, 4),
  498. OnNext(400, 5),
  499. OnNext(410, 6),
  500. OnNext(520, 7),
  501. OnCompleted<int>(630)
  502. );
  503. results3.Messages.AssertEqual(
  504. OnNext(900, 5),
  505. OnNext(900, 6),
  506. OnNext(900, 7),
  507. OnCompleted<int>(900)
  508. );
  509. }
  510. [TestMethod]
  511. public void Finite_ReplayAll()
  512. {
  513. var scheduler = new TestScheduler();
  514. var xs = scheduler.CreateHotObservable(
  515. OnNext(70, 1),
  516. OnNext(110, 2),
  517. OnNext(220, 3),
  518. OnNext(270, 4),
  519. OnNext(340, 5),
  520. OnNext(410, 6),
  521. OnNext(520, 7),
  522. OnCompleted<int>(630),
  523. OnNext(640, 9),
  524. OnCompleted<int>(650),
  525. OnError<int>(660, new Exception())
  526. );
  527. var subject = default(ReplaySubject<int>);
  528. var subscription = default(IDisposable);
  529. var results1 = scheduler.CreateObserver<int>();
  530. var subscription1 = default(IDisposable);
  531. var results2 = scheduler.CreateObserver<int>();
  532. var subscription2 = default(IDisposable);
  533. var results3 = scheduler.CreateObserver<int>();
  534. var subscription3 = default(IDisposable);
  535. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>());
  536. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  537. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  538. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  539. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  540. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  541. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  542. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  543. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  544. scheduler.Start();
  545. results1.Messages.AssertEqual(
  546. OnNext(300, 3),
  547. OnNext(300, 4),
  548. OnNext(340, 5),
  549. OnNext(410, 6),
  550. OnNext(520, 7)
  551. );
  552. results2.Messages.AssertEqual(
  553. OnNext(400, 3),
  554. OnNext(400, 4),
  555. OnNext(400, 5),
  556. OnNext(410, 6),
  557. OnNext(520, 7),
  558. OnCompleted<int>(630)
  559. );
  560. results3.Messages.AssertEqual(
  561. OnNext(900, 3),
  562. OnNext(900, 4),
  563. OnNext(900, 5),
  564. OnNext(900, 6),
  565. OnNext(900, 7),
  566. OnCompleted<int>(900)
  567. );
  568. }
  569. [TestMethod]
  570. public void Error_ReplayByTime()
  571. {
  572. var scheduler = new TestScheduler();
  573. var ex = new Exception();
  574. var xs = scheduler.CreateHotObservable(
  575. OnNext(70, 1),
  576. OnNext(110, 2),
  577. OnNext(220, 3),
  578. OnNext(270, 4),
  579. OnNext(340, 5),
  580. OnNext(410, 6),
  581. OnNext(520, 7),
  582. OnError<int>(630, ex),
  583. OnNext(640, 9),
  584. OnCompleted<int>(650),
  585. OnError<int>(660, new Exception())
  586. );
  587. var subject = default(ReplaySubject<int>);
  588. var subscription = default(IDisposable);
  589. var results1 = scheduler.CreateObserver<int>();
  590. var subscription1 = default(IDisposable);
  591. var results2 = scheduler.CreateObserver<int>();
  592. var subscription2 = default(IDisposable);
  593. var results3 = scheduler.CreateObserver<int>();
  594. var subscription3 = default(IDisposable);
  595. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(3, TimeSpan.FromTicks(100), scheduler));
  596. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  597. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  598. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  599. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  600. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  601. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  602. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  603. scheduler.ScheduleAbsolute(800, () => subscription1.Dispose());
  604. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  605. scheduler.Start();
  606. results1.Messages.AssertEqual(
  607. OnNext(301, 3),
  608. OnNext(302, 4),
  609. OnNext(341, 5),
  610. OnNext(411, 6),
  611. OnNext(521, 7)
  612. );
  613. results2.Messages.AssertEqual(
  614. OnNext(401, 5),
  615. OnNext(411, 6),
  616. OnNext(521, 7),
  617. OnError<int>(631, ex)
  618. );
  619. results3.Messages.AssertEqual(
  620. OnError<int>(901, ex)
  621. );
  622. }
  623. [TestMethod]
  624. public void Error_ReplayOne()
  625. {
  626. var scheduler = new TestScheduler();
  627. var ex = new Exception();
  628. var xs = scheduler.CreateHotObservable(
  629. OnNext(70, 1),
  630. OnNext(110, 2),
  631. OnNext(220, 3),
  632. OnNext(270, 4),
  633. OnNext(340, 5),
  634. OnNext(410, 6),
  635. OnNext(520, 7),
  636. OnError<int>(630, ex),
  637. OnNext(640, 9),
  638. OnCompleted<int>(650),
  639. OnError<int>(660, new Exception())
  640. );
  641. var subject = default(ReplaySubject<int>);
  642. var subscription = default(IDisposable);
  643. var results1 = scheduler.CreateObserver<int>();
  644. var subscription1 = default(IDisposable);
  645. var results2 = scheduler.CreateObserver<int>();
  646. var subscription2 = default(IDisposable);
  647. var results3 = scheduler.CreateObserver<int>();
  648. var subscription3 = default(IDisposable);
  649. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(1));
  650. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  651. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  652. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  653. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  654. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  655. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  656. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  657. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  658. scheduler.Start();
  659. results1.Messages.AssertEqual(
  660. OnNext(300, 4),
  661. OnNext(340, 5),
  662. OnNext(410, 6),
  663. OnNext(520, 7)
  664. );
  665. results2.Messages.AssertEqual(
  666. OnNext(400, 5),
  667. OnNext(410, 6),
  668. OnNext(520, 7),
  669. OnError<int>(630, ex)
  670. );
  671. results3.Messages.AssertEqual(
  672. OnNext(900, 7),
  673. OnError<int>(900, ex)
  674. );
  675. }
  676. [TestMethod]
  677. public void Error_ReplayMany()
  678. {
  679. var scheduler = new TestScheduler();
  680. var ex = new Exception();
  681. var xs = scheduler.CreateHotObservable(
  682. OnNext(70, 1),
  683. OnNext(110, 2),
  684. OnNext(220, 3),
  685. OnNext(270, 4),
  686. OnNext(340, 5),
  687. OnNext(410, 6),
  688. OnNext(520, 7),
  689. OnError<int>(630, ex),
  690. OnNext(640, 9),
  691. OnCompleted<int>(650),
  692. OnError<int>(660, new Exception())
  693. );
  694. var subject = default(ReplaySubject<int>);
  695. var subscription = default(IDisposable);
  696. var results1 = scheduler.CreateObserver<int>();
  697. var subscription1 = default(IDisposable);
  698. var results2 = scheduler.CreateObserver<int>();
  699. var subscription2 = default(IDisposable);
  700. var results3 = scheduler.CreateObserver<int>();
  701. var subscription3 = default(IDisposable);
  702. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(3));
  703. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  704. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  705. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  706. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  707. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  708. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  709. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  710. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  711. scheduler.Start();
  712. results1.Messages.AssertEqual(
  713. OnNext(300, 3),
  714. OnNext(300, 4),
  715. OnNext(340, 5),
  716. OnNext(410, 6),
  717. OnNext(520, 7)
  718. );
  719. results2.Messages.AssertEqual(
  720. OnNext(400, 3),
  721. OnNext(400, 4),
  722. OnNext(400, 5),
  723. OnNext(410, 6),
  724. OnNext(520, 7),
  725. OnError<int>(630, ex)
  726. );
  727. results3.Messages.AssertEqual(
  728. OnNext(900, 5),
  729. OnNext(900, 6),
  730. OnNext(900, 7),
  731. OnError<int>(900, ex)
  732. );
  733. }
  734. [TestMethod]
  735. public void Error_ReplayAll()
  736. {
  737. var scheduler = new TestScheduler();
  738. var ex = new Exception();
  739. var xs = scheduler.CreateHotObservable(
  740. OnNext(70, 1),
  741. OnNext(110, 2),
  742. OnNext(220, 3),
  743. OnNext(270, 4),
  744. OnNext(340, 5),
  745. OnNext(410, 6),
  746. OnNext(520, 7),
  747. OnError<int>(630, ex),
  748. OnNext(640, 9),
  749. OnCompleted<int>(650),
  750. OnError<int>(660, new Exception())
  751. );
  752. var subject = default(ReplaySubject<int>);
  753. var subscription = default(IDisposable);
  754. var results1 = scheduler.CreateObserver<int>();
  755. var subscription1 = default(IDisposable);
  756. var results2 = scheduler.CreateObserver<int>();
  757. var subscription2 = default(IDisposable);
  758. var results3 = scheduler.CreateObserver<int>();
  759. var subscription3 = default(IDisposable);
  760. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>());
  761. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  762. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  763. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  764. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  765. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  766. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  767. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  768. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  769. scheduler.Start();
  770. results1.Messages.AssertEqual(
  771. OnNext(300, 3),
  772. OnNext(300, 4),
  773. OnNext(340, 5),
  774. OnNext(410, 6),
  775. OnNext(520, 7)
  776. );
  777. results2.Messages.AssertEqual(
  778. OnNext(400, 3),
  779. OnNext(400, 4),
  780. OnNext(400, 5),
  781. OnNext(410, 6),
  782. OnNext(520, 7),
  783. OnError<int>(630, ex)
  784. );
  785. results3.Messages.AssertEqual(
  786. OnNext(900, 3),
  787. OnNext(900, 4),
  788. OnNext(900, 5),
  789. OnNext(900, 6),
  790. OnNext(900, 7),
  791. OnError<int>(900, ex)
  792. );
  793. }
  794. [TestMethod]
  795. public void Canceled_ReplayByTime()
  796. {
  797. var scheduler = new TestScheduler();
  798. var xs = scheduler.CreateHotObservable(
  799. OnCompleted<int>(630),
  800. OnNext(640, 9),
  801. OnCompleted<int>(650),
  802. OnError<int>(660, new Exception())
  803. );
  804. var subject = default(ReplaySubject<int>);
  805. var subscription = default(IDisposable);
  806. var results1 = scheduler.CreateObserver<int>();
  807. var subscription1 = default(IDisposable);
  808. var results2 = scheduler.CreateObserver<int>();
  809. var subscription2 = default(IDisposable);
  810. var results3 = scheduler.CreateObserver<int>();
  811. var subscription3 = default(IDisposable);
  812. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(3, TimeSpan.FromTicks(100), scheduler));
  813. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  814. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  815. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  816. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  817. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  818. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  819. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  820. scheduler.ScheduleAbsolute(800, () => subscription1.Dispose());
  821. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  822. scheduler.Start();
  823. results1.Messages.AssertEqual(
  824. );
  825. results2.Messages.AssertEqual(
  826. OnCompleted<int>(631)
  827. );
  828. results3.Messages.AssertEqual(
  829. OnCompleted<int>(901)
  830. );
  831. }
  832. [TestMethod]
  833. public void Canceled_ReplayOne()
  834. {
  835. var scheduler = new TestScheduler();
  836. var xs = scheduler.CreateHotObservable(
  837. OnCompleted<int>(630),
  838. OnNext(640, 9),
  839. OnCompleted<int>(650),
  840. OnError<int>(660, new Exception())
  841. );
  842. var subject = default(ReplaySubject<int>);
  843. var subscription = default(IDisposable);
  844. var results1 = scheduler.CreateObserver<int>();
  845. var subscription1 = default(IDisposable);
  846. var results2 = scheduler.CreateObserver<int>();
  847. var subscription2 = default(IDisposable);
  848. var results3 = scheduler.CreateObserver<int>();
  849. var subscription3 = default(IDisposable);
  850. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(1));
  851. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  852. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  853. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  854. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  855. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  856. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  857. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  858. scheduler.ScheduleAbsolute(800, () => subscription1.Dispose());
  859. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  860. scheduler.Start();
  861. results1.Messages.AssertEqual(
  862. );
  863. results2.Messages.AssertEqual(
  864. OnCompleted<int>(630)
  865. );
  866. results3.Messages.AssertEqual(
  867. OnCompleted<int>(900)
  868. );
  869. }
  870. [TestMethod]
  871. public void Canceled_ReplayMany()
  872. {
  873. var scheduler = new TestScheduler();
  874. var xs = scheduler.CreateHotObservable(
  875. OnCompleted<int>(630),
  876. OnNext(640, 9),
  877. OnCompleted<int>(650),
  878. OnError<int>(660, new Exception())
  879. );
  880. var subject = default(ReplaySubject<int>);
  881. var subscription = default(IDisposable);
  882. var results1 = scheduler.CreateObserver<int>();
  883. var subscription1 = default(IDisposable);
  884. var results2 = scheduler.CreateObserver<int>();
  885. var subscription2 = default(IDisposable);
  886. var results3 = scheduler.CreateObserver<int>();
  887. var subscription3 = default(IDisposable);
  888. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(3));
  889. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  890. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  891. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  892. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  893. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  894. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  895. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  896. scheduler.ScheduleAbsolute(800, () => subscription1.Dispose());
  897. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  898. scheduler.Start();
  899. results1.Messages.AssertEqual(
  900. );
  901. results2.Messages.AssertEqual(
  902. OnCompleted<int>(630)
  903. );
  904. results3.Messages.AssertEqual(
  905. OnCompleted<int>(900)
  906. );
  907. }
  908. [TestMethod]
  909. public void Canceled_ReplayAll()
  910. {
  911. var scheduler = new TestScheduler();
  912. var xs = scheduler.CreateHotObservable(
  913. OnCompleted<int>(630),
  914. OnNext(640, 9),
  915. OnCompleted<int>(650),
  916. OnError<int>(660, new Exception())
  917. );
  918. var subject = default(ReplaySubject<int>);
  919. var subscription = default(IDisposable);
  920. var results1 = scheduler.CreateObserver<int>();
  921. var subscription1 = default(IDisposable);
  922. var results2 = scheduler.CreateObserver<int>();
  923. var subscription2 = default(IDisposable);
  924. var results3 = scheduler.CreateObserver<int>();
  925. var subscription3 = default(IDisposable);
  926. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>());
  927. scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
  928. scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
  929. scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
  930. scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
  931. scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
  932. scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
  933. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  934. scheduler.ScheduleAbsolute(800, () => subscription1.Dispose());
  935. scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
  936. scheduler.Start();
  937. results1.Messages.AssertEqual(
  938. );
  939. results2.Messages.AssertEqual(
  940. OnCompleted<int>(630)
  941. );
  942. results3.Messages.AssertEqual(
  943. OnCompleted<int>(900)
  944. );
  945. }
  946. [TestMethod]
  947. public void SubjectDisposed()
  948. {
  949. var scheduler = new TestScheduler();
  950. var subject = default(ReplaySubject<int>);
  951. var results1 = scheduler.CreateObserver<int>();
  952. var subscription1 = default(IDisposable);
  953. var results2 = scheduler.CreateObserver<int>();
  954. var subscription2 = default(IDisposable);
  955. var results3 = scheduler.CreateObserver<int>();
  956. var subscription3 = default(IDisposable);
  957. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(scheduler));
  958. scheduler.ScheduleAbsolute(200, () => subscription1 = subject.Subscribe(results1));
  959. scheduler.ScheduleAbsolute(300, () => subscription2 = subject.Subscribe(results2));
  960. scheduler.ScheduleAbsolute(400, () => subscription3 = subject.Subscribe(results3));
  961. scheduler.ScheduleAbsolute(500, () => subscription1.Dispose());
  962. scheduler.ScheduleAbsolute(600, () => subject.Dispose());
  963. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  964. scheduler.ScheduleAbsolute(800, () => subscription3.Dispose());
  965. scheduler.ScheduleAbsolute(150, () => subject.OnNext(1));
  966. scheduler.ScheduleAbsolute(250, () => subject.OnNext(2));
  967. scheduler.ScheduleAbsolute(350, () => subject.OnNext(3));
  968. scheduler.ScheduleAbsolute(450, () => subject.OnNext(4));
  969. scheduler.ScheduleAbsolute(550, () => subject.OnNext(5));
  970. scheduler.ScheduleAbsolute(650, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.OnNext(6)));
  971. scheduler.ScheduleAbsolute(750, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.OnCompleted()));
  972. scheduler.ScheduleAbsolute(850, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.OnError(new Exception())));
  973. scheduler.ScheduleAbsolute(950, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.Subscribe()));
  974. scheduler.Start();
  975. results1.Messages.AssertEqual(
  976. OnNext(201, 1),
  977. OnNext(251, 2),
  978. OnNext(351, 3),
  979. OnNext(451, 4)
  980. );
  981. results2.Messages.AssertEqual(
  982. OnNext(301, 1),
  983. OnNext(302, 2),
  984. OnNext(351, 3),
  985. OnNext(451, 4),
  986. OnNext(551, 5)
  987. );
  988. results3.Messages.AssertEqual(
  989. OnNext(401, 1),
  990. OnNext(402, 2),
  991. OnNext(403, 3),
  992. OnNext(451, 4),
  993. OnNext(551, 5)
  994. );
  995. }
  996. [TestMethod]
  997. public void SubjectDisposed_ReplayOne()
  998. {
  999. var scheduler = new TestScheduler();
  1000. var subject = default(ReplaySubject<int>);
  1001. var results1 = scheduler.CreateObserver<int>();
  1002. var subscription1 = default(IDisposable);
  1003. var results2 = scheduler.CreateObserver<int>();
  1004. var subscription2 = default(IDisposable);
  1005. var results3 = scheduler.CreateObserver<int>();
  1006. var subscription3 = default(IDisposable);
  1007. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(1));
  1008. scheduler.ScheduleAbsolute(200, () => subscription1 = subject.Subscribe(results1));
  1009. scheduler.ScheduleAbsolute(300, () => subscription2 = subject.Subscribe(results2));
  1010. scheduler.ScheduleAbsolute(400, () => subscription3 = subject.Subscribe(results3));
  1011. scheduler.ScheduleAbsolute(500, () => subscription1.Dispose());
  1012. scheduler.ScheduleAbsolute(600, () => subject.Dispose());
  1013. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  1014. scheduler.ScheduleAbsolute(800, () => subscription3.Dispose());
  1015. scheduler.ScheduleAbsolute(150, () => subject.OnNext(1));
  1016. scheduler.ScheduleAbsolute(250, () => subject.OnNext(2));
  1017. scheduler.ScheduleAbsolute(350, () => subject.OnNext(3));
  1018. scheduler.ScheduleAbsolute(450, () => subject.OnNext(4));
  1019. scheduler.ScheduleAbsolute(550, () => subject.OnNext(5));
  1020. scheduler.ScheduleAbsolute(650, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.OnNext(6)));
  1021. scheduler.ScheduleAbsolute(750, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.OnCompleted()));
  1022. scheduler.ScheduleAbsolute(850, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.OnError(new Exception())));
  1023. scheduler.ScheduleAbsolute(950, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.Subscribe()));
  1024. scheduler.Start();
  1025. results1.Messages.AssertEqual(
  1026. OnNext(200, 1),
  1027. OnNext(250, 2),
  1028. OnNext(350, 3),
  1029. OnNext(450, 4)
  1030. );
  1031. results2.Messages.AssertEqual(
  1032. OnNext(300, 2),
  1033. OnNext(350, 3),
  1034. OnNext(450, 4),
  1035. OnNext(550, 5)
  1036. );
  1037. results3.Messages.AssertEqual(
  1038. OnNext(400, 3),
  1039. OnNext(450, 4),
  1040. OnNext(550, 5)
  1041. );
  1042. }
  1043. [TestMethod]
  1044. public void SubjectDisposed_ReplayMany()
  1045. {
  1046. var scheduler = new TestScheduler();
  1047. var subject = default(ReplaySubject<int>);
  1048. var results1 = scheduler.CreateObserver<int>();
  1049. var subscription1 = default(IDisposable);
  1050. var results2 = scheduler.CreateObserver<int>();
  1051. var subscription2 = default(IDisposable);
  1052. var results3 = scheduler.CreateObserver<int>();
  1053. var subscription3 = default(IDisposable);
  1054. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(3));
  1055. scheduler.ScheduleAbsolute(200, () => subscription1 = subject.Subscribe(results1));
  1056. scheduler.ScheduleAbsolute(300, () => subscription2 = subject.Subscribe(results2));
  1057. scheduler.ScheduleAbsolute(400, () => subscription3 = subject.Subscribe(results3));
  1058. scheduler.ScheduleAbsolute(500, () => subscription1.Dispose());
  1059. scheduler.ScheduleAbsolute(600, () => subject.Dispose());
  1060. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  1061. scheduler.ScheduleAbsolute(800, () => subscription3.Dispose());
  1062. scheduler.ScheduleAbsolute(150, () => subject.OnNext(1));
  1063. scheduler.ScheduleAbsolute(250, () => subject.OnNext(2));
  1064. scheduler.ScheduleAbsolute(350, () => subject.OnNext(3));
  1065. scheduler.ScheduleAbsolute(450, () => subject.OnNext(4));
  1066. scheduler.ScheduleAbsolute(550, () => subject.OnNext(5));
  1067. scheduler.ScheduleAbsolute(650, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.OnNext(6)));
  1068. scheduler.ScheduleAbsolute(750, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.OnCompleted()));
  1069. scheduler.ScheduleAbsolute(850, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.OnError(new Exception())));
  1070. scheduler.ScheduleAbsolute(950, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.Subscribe()));
  1071. scheduler.Start();
  1072. results1.Messages.AssertEqual(
  1073. OnNext(200, 1),
  1074. OnNext(250, 2),
  1075. OnNext(350, 3),
  1076. OnNext(450, 4)
  1077. );
  1078. results2.Messages.AssertEqual(
  1079. OnNext(300, 1),
  1080. OnNext(300, 2),
  1081. OnNext(350, 3),
  1082. OnNext(450, 4),
  1083. OnNext(550, 5)
  1084. );
  1085. results3.Messages.AssertEqual(
  1086. OnNext(400, 1),
  1087. OnNext(400, 2),
  1088. OnNext(400, 3),
  1089. OnNext(450, 4),
  1090. OnNext(550, 5)
  1091. );
  1092. }
  1093. [TestMethod]
  1094. public void SubjectDisposed_ReplayAll()
  1095. {
  1096. var scheduler = new TestScheduler();
  1097. var subject = default(ReplaySubject<int>);
  1098. var results1 = scheduler.CreateObserver<int>();
  1099. var subscription1 = default(IDisposable);
  1100. var results2 = scheduler.CreateObserver<int>();
  1101. var subscription2 = default(IDisposable);
  1102. var results3 = scheduler.CreateObserver<int>();
  1103. var subscription3 = default(IDisposable);
  1104. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>());
  1105. scheduler.ScheduleAbsolute(200, () => subscription1 = subject.Subscribe(results1));
  1106. scheduler.ScheduleAbsolute(300, () => subscription2 = subject.Subscribe(results2));
  1107. scheduler.ScheduleAbsolute(400, () => subscription3 = subject.Subscribe(results3));
  1108. scheduler.ScheduleAbsolute(500, () => subscription1.Dispose());
  1109. scheduler.ScheduleAbsolute(600, () => subject.Dispose());
  1110. scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
  1111. scheduler.ScheduleAbsolute(800, () => subscription3.Dispose());
  1112. scheduler.ScheduleAbsolute(150, () => subject.OnNext(1));
  1113. scheduler.ScheduleAbsolute(250, () => subject.OnNext(2));
  1114. scheduler.ScheduleAbsolute(350, () => subject.OnNext(3));
  1115. scheduler.ScheduleAbsolute(450, () => subject.OnNext(4));
  1116. scheduler.ScheduleAbsolute(550, () => subject.OnNext(5));
  1117. scheduler.ScheduleAbsolute(650, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.OnNext(6)));
  1118. scheduler.ScheduleAbsolute(750, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.OnCompleted()));
  1119. scheduler.ScheduleAbsolute(850, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.OnError(new Exception())));
  1120. scheduler.ScheduleAbsolute(950, () => ReactiveAssert.Throws<ObjectDisposedException>(() => subject.Subscribe()));
  1121. scheduler.Start();
  1122. results1.Messages.AssertEqual(
  1123. OnNext(200, 1),
  1124. OnNext(250, 2),
  1125. OnNext(350, 3),
  1126. OnNext(450, 4)
  1127. );
  1128. results2.Messages.AssertEqual(
  1129. OnNext(300, 1),
  1130. OnNext(300, 2),
  1131. OnNext(350, 3),
  1132. OnNext(450, 4),
  1133. OnNext(550, 5)
  1134. );
  1135. results3.Messages.AssertEqual(
  1136. OnNext(400, 1),
  1137. OnNext(400, 2),
  1138. OnNext(400, 3),
  1139. OnNext(450, 4),
  1140. OnNext(550, 5)
  1141. );
  1142. }
  1143. //TODO: Create a failing test for this for the other implementations (ReplayOne/Many/All).
  1144. //I Don't understand the behavior.
  1145. //I think it may have to do with calling Trim() on Subscription (as well as in the OnNext calls). -LC
  1146. [TestMethod]
  1147. public void ReplaySubjectDiesOut()
  1148. {
  1149. //
  1150. // Tests v1.x behavior as documented in ReplaySubject.cs (Subscribe method).
  1151. //
  1152. var scheduler = new TestScheduler();
  1153. var xs = scheduler.CreateHotObservable(
  1154. OnNext(70, 1),
  1155. OnNext(110, 2),
  1156. OnNext(220, 3),
  1157. OnNext(270, 4),
  1158. OnNext(340, 5),
  1159. OnNext(410, 6),
  1160. OnNext(520, 7),
  1161. OnCompleted<int>(580)
  1162. );
  1163. var subject = default(ReplaySubject<int>);
  1164. var results1 = scheduler.CreateObserver<int>();
  1165. var results2 = scheduler.CreateObserver<int>();
  1166. var results3 = scheduler.CreateObserver<int>();
  1167. var results4 = scheduler.CreateObserver<int>();
  1168. scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject<int>(int.MaxValue, TimeSpan.FromTicks(100), scheduler));
  1169. scheduler.ScheduleAbsolute(200, () => xs.Subscribe(subject));
  1170. scheduler.ScheduleAbsolute(300, () => subject.Subscribe(results1));
  1171. scheduler.ScheduleAbsolute(400, () => subject.Subscribe(results2));
  1172. scheduler.ScheduleAbsolute(600, () => subject.Subscribe(results3));
  1173. scheduler.ScheduleAbsolute(900, () => subject.Subscribe(results4));
  1174. scheduler.Start();
  1175. results1.Messages.AssertEqual(
  1176. OnNext(301, 3),
  1177. OnNext(302, 4),
  1178. OnNext(341, 5),
  1179. OnNext(411, 6),
  1180. OnNext(521, 7),
  1181. OnCompleted<int>(581)
  1182. );
  1183. results2.Messages.AssertEqual(
  1184. OnNext(401, 5),
  1185. OnNext(411, 6),
  1186. OnNext(521, 7),
  1187. OnCompleted<int>(581)
  1188. );
  1189. results3.Messages.AssertEqual(
  1190. OnNext(601, 7),
  1191. OnCompleted<int>(602)
  1192. );
  1193. results4.Messages.AssertEqual(
  1194. OnCompleted<int>(901)
  1195. );
  1196. }
  1197. [TestMethod]
  1198. public void HasObservers()
  1199. {
  1200. HasObservers(new ReplaySubject<int>());
  1201. HasObservers(new ReplaySubject<int>(1));
  1202. HasObservers(new ReplaySubject<int>(3));
  1203. HasObservers(new ReplaySubject<int>(TimeSpan.FromSeconds(1)));
  1204. }
  1205. private static void HasObservers(ReplaySubject<int> s)
  1206. {
  1207. Assert.IsFalse(s.HasObservers);
  1208. var d1 = s.Subscribe(_ => { });
  1209. Assert.IsTrue(s.HasObservers);
  1210. d1.Dispose();
  1211. Assert.IsFalse(s.HasObservers);
  1212. var d2 = s.Subscribe(_ => { });
  1213. Assert.IsTrue(s.HasObservers);
  1214. var d3 = s.Subscribe(_ => { });
  1215. Assert.IsTrue(s.HasObservers);
  1216. d2.Dispose();
  1217. Assert.IsTrue(s.HasObservers);
  1218. d3.Dispose();
  1219. Assert.IsFalse(s.HasObservers);
  1220. }
  1221. [TestMethod]
  1222. public void HasObservers_Dispose1()
  1223. {
  1224. HasObservers_Dispose1(new ReplaySubject<int>());
  1225. HasObservers_Dispose1(new ReplaySubject<int>(1));
  1226. HasObservers_Dispose1(new ReplaySubject<int>(3));
  1227. HasObservers_Dispose1(new ReplaySubject<int>(TimeSpan.FromSeconds(1)));
  1228. }
  1229. private static void HasObservers_Dispose1(ReplaySubject<int> s)
  1230. {
  1231. Assert.IsFalse(s.HasObservers);
  1232. var d = s.Subscribe(_ => { });
  1233. Assert.IsTrue(s.HasObservers);
  1234. s.Dispose();
  1235. Assert.IsFalse(s.HasObservers);
  1236. d.Dispose();
  1237. Assert.IsFalse(s.HasObservers);
  1238. }
  1239. [TestMethod]
  1240. public void HasObservers_Dispose2()
  1241. {
  1242. HasObservers_Dispose2(new ReplaySubject<int>());
  1243. HasObservers_Dispose2(new ReplaySubject<int>(1));
  1244. HasObservers_Dispose2(new ReplaySubject<int>(3));
  1245. HasObservers_Dispose2(new ReplaySubject<int>(TimeSpan.FromSeconds(1)));
  1246. }
  1247. private static void HasObservers_Dispose2(ReplaySubject<int> s)
  1248. {
  1249. Assert.IsFalse(s.HasObservers);
  1250. var d = s.Subscribe(_ => { });
  1251. Assert.IsTrue(s.HasObservers);
  1252. d.Dispose();
  1253. Assert.IsFalse(s.HasObservers);
  1254. s.Dispose();
  1255. Assert.IsFalse(s.HasObservers);
  1256. }
  1257. [TestMethod]
  1258. public void HasObservers_Dispose3()
  1259. {
  1260. HasObservers_Dispose3(new ReplaySubject<int>());
  1261. HasObservers_Dispose3(new ReplaySubject<int>(1));
  1262. HasObservers_Dispose3(new ReplaySubject<int>(3));
  1263. HasObservers_Dispose3(new ReplaySubject<int>(TimeSpan.FromSeconds(1)));
  1264. }
  1265. private static void HasObservers_Dispose3(ReplaySubject<int> s)
  1266. {
  1267. Assert.IsFalse(s.HasObservers);
  1268. s.Dispose();
  1269. Assert.IsFalse(s.HasObservers);
  1270. }
  1271. [TestMethod]
  1272. public void HasObservers_OnCompleted()
  1273. {
  1274. HasObservers_OnCompleted(new ReplaySubject<int>());
  1275. HasObservers_OnCompleted(new ReplaySubject<int>(1));
  1276. HasObservers_OnCompleted(new ReplaySubject<int>(3));
  1277. HasObservers_OnCompleted(new ReplaySubject<int>(TimeSpan.FromSeconds(1)));
  1278. }
  1279. private static void HasObservers_OnCompleted(ReplaySubject<int> s)
  1280. {
  1281. Assert.IsFalse(s.HasObservers);
  1282. var d = s.Subscribe(_ => { });
  1283. Assert.IsTrue(s.HasObservers);
  1284. s.OnNext(42);
  1285. Assert.IsTrue(s.HasObservers);
  1286. s.OnCompleted();
  1287. Assert.IsFalse(s.HasObservers);
  1288. }
  1289. [TestMethod]
  1290. public void HasObservers_OnError()
  1291. {
  1292. HasObservers_OnError(new ReplaySubject<int>());
  1293. HasObservers_OnError(new ReplaySubject<int>(1));
  1294. HasObservers_OnError(new ReplaySubject<int>(3));
  1295. HasObservers_OnError(new ReplaySubject<int>(TimeSpan.FromSeconds(1)));
  1296. }
  1297. private static void HasObservers_OnError(ReplaySubject<int> s)
  1298. {
  1299. Assert.IsFalse(s.HasObservers);
  1300. var d = s.Subscribe(_ => { }, ex => { });
  1301. Assert.IsTrue(s.HasObservers);
  1302. s.OnNext(42);
  1303. Assert.IsTrue(s.HasObservers);
  1304. s.OnError(new Exception());
  1305. Assert.IsFalse(s.HasObservers);
  1306. }
  1307. //Potentially already covered by Finite_* tests
  1308. [TestMethod]
  1309. public void Completed_to_late_subscriber_ReplayAll()
  1310. {
  1311. var s = new ReplaySubject<int>();
  1312. s.OnNext(1);
  1313. s.OnNext(2);
  1314. s.OnCompleted();
  1315. var scheduler = new TestScheduler();
  1316. var observer = scheduler.CreateObserver<int>();
  1317. s.Subscribe(observer);
  1318. Assert.AreEqual(3, observer.Messages.Count);
  1319. Assert.AreEqual(1, observer.Messages[0].Value.Value);
  1320. Assert.AreEqual(2, observer.Messages[1].Value.Value);
  1321. Assert.AreEqual(NotificationKind.OnCompleted, observer.Messages[2].Value.Kind);
  1322. }
  1323. [TestMethod]
  1324. public void Completed_to_late_subscriber_ReplayOne()
  1325. {
  1326. var s = new ReplaySubject<int>(1);
  1327. s.OnNext(1);
  1328. s.OnNext(2);
  1329. s.OnCompleted();
  1330. var scheduler = new TestScheduler();
  1331. var observer = scheduler.CreateObserver<int>();
  1332. s.Subscribe(observer);
  1333. Assert.AreEqual(2, observer.Messages.Count);
  1334. Assert.AreEqual(2, observer.Messages[0].Value.Value);
  1335. Assert.AreEqual(NotificationKind.OnCompleted, observer.Messages[1].Value.Kind);
  1336. }
  1337. [TestMethod]
  1338. public void Completed_to_late_subscriber_ReplayMany()
  1339. {
  1340. var s = new ReplaySubject<int>(2);
  1341. s.OnNext(1);
  1342. s.OnNext(2);
  1343. s.OnNext(3);
  1344. s.OnCompleted();
  1345. var scheduler = new TestScheduler();
  1346. var observer = scheduler.CreateObserver<int>();
  1347. s.Subscribe(observer);
  1348. Assert.AreEqual(3, observer.Messages.Count);
  1349. Assert.AreEqual(2, observer.Messages[0].Value.Value);
  1350. Assert.AreEqual(3, observer.Messages[1].Value.Value);
  1351. Assert.AreEqual(NotificationKind.OnCompleted, observer.Messages[2].Value.Kind);
  1352. }
  1353. [TestMethod]
  1354. public void Completed_to_late_subscriber_ReplayByTime()
  1355. {
  1356. var s = new ReplaySubject<int>(TimeSpan.FromMinutes(1));
  1357. s.OnNext(1);
  1358. s.OnNext(2);
  1359. s.OnNext(3);
  1360. s.OnCompleted();
  1361. var scheduler = new TestScheduler();
  1362. var observer = scheduler.CreateObserver<int>();
  1363. s.Subscribe(observer);
  1364. Assert.AreEqual(4, observer.Messages.Count);
  1365. Assert.AreEqual(1, observer.Messages[0].Value.Value);
  1366. Assert.AreEqual(2, observer.Messages[1].Value.Value);
  1367. Assert.AreEqual(3, observer.Messages[2].Value.Value);
  1368. Assert.AreEqual(NotificationKind.OnCompleted, observer.Messages[3].Value.Kind);
  1369. }
  1370. //Potentially already covered by Error_* tests
  1371. [TestMethod]
  1372. public void Errored_to_late_subscriber_ReplayAll()
  1373. {
  1374. var expectedException = new Exception("Test");
  1375. var s = new ReplaySubject<int>();
  1376. s.OnNext(1);
  1377. s.OnNext(2);
  1378. s.OnError(expectedException);
  1379. var scheduler = new TestScheduler();
  1380. var observer = scheduler.CreateObserver<int>();
  1381. s.Subscribe(observer);
  1382. Assert.AreEqual(3, observer.Messages.Count);
  1383. Assert.AreEqual(1, observer.Messages[0].Value.Value);
  1384. Assert.AreEqual(2, observer.Messages[1].Value.Value);
  1385. Assert.AreEqual(NotificationKind.OnError, observer.Messages[2].Value.Kind);
  1386. Assert.AreEqual(expectedException, observer.Messages[2].Value.Exception);
  1387. }
  1388. [TestMethod]
  1389. public void Errored_to_late_subscriber_ReplayOne()
  1390. {
  1391. var expectedException = new Exception("Test");
  1392. var s = new ReplaySubject<int>(1);
  1393. s.OnNext(1);
  1394. s.OnNext(2);
  1395. s.OnError(expectedException);
  1396. var scheduler = new TestScheduler();
  1397. var observer = scheduler.CreateObserver<int>();
  1398. s.Subscribe(observer);
  1399. Assert.AreEqual(2, observer.Messages.Count);
  1400. Assert.AreEqual(2, observer.Messages[0].Value.Value);
  1401. Assert.AreEqual(NotificationKind.OnError, observer.Messages[1].Value.Kind);
  1402. Assert.AreEqual(expectedException, observer.Messages[1].Value.Exception);
  1403. }
  1404. [TestMethod]
  1405. public void Errored_to_late_subscriber_ReplayMany()
  1406. {
  1407. var expectedException = new Exception("Test");
  1408. var s = new ReplaySubject<int>(2);
  1409. s.OnNext(1);
  1410. s.OnNext(2);
  1411. s.OnNext(3);
  1412. s.OnError(expectedException);
  1413. var scheduler = new TestScheduler();
  1414. var observer = scheduler.CreateObserver<int>();
  1415. s.Subscribe(observer);
  1416. Assert.AreEqual(3, observer.Messages.Count);
  1417. Assert.AreEqual(2, observer.Messages[0].Value.Value);
  1418. Assert.AreEqual(3, observer.Messages[1].Value.Value);
  1419. Assert.AreEqual(NotificationKind.OnError, observer.Messages[2].Value.Kind);
  1420. Assert.AreEqual(expectedException, observer.Messages[2].Value.Exception);
  1421. }
  1422. [TestMethod]
  1423. public void Errored_to_late_subscriber_ReplayByTime()
  1424. {
  1425. var expectedException = new Exception("Test");
  1426. var s = new ReplaySubject<int>(TimeSpan.FromMinutes(1));
  1427. s.OnNext(1);
  1428. s.OnNext(2);
  1429. s.OnNext(3);
  1430. s.OnError(expectedException);
  1431. var scheduler = new TestScheduler();
  1432. var observer = scheduler.CreateObserver<int>();
  1433. s.Subscribe(observer);
  1434. Assert.AreEqual(4, observer.Messages.Count);
  1435. Assert.AreEqual(1, observer.Messages[0].Value.Value);
  1436. Assert.AreEqual(2, observer.Messages[1].Value.Value);
  1437. Assert.AreEqual(3, observer.Messages[2].Value.Value);
  1438. Assert.AreEqual(NotificationKind.OnError, observer.Messages[3].Value.Kind);
  1439. Assert.AreEqual(expectedException, observer.Messages[3].Value.Exception);
  1440. }
  1441. }
  1442. }