1
0

ObservableBindingTest.cs 92 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. using System;
  3. using System.Collections.Generic;
  4. using System.Reactive.Concurrency;
  5. using System.Reactive.Disposables;
  6. using System.Reactive.Linq;
  7. using System.Reactive.Subjects;
  8. using Microsoft.Reactive.Testing;
  9. using Microsoft.VisualStudio.TestTools.UnitTesting;
  10. using ReactiveTests.Dummies;
  11. namespace ReactiveTests.Tests
  12. {
  13. [TestClass]
  14. public partial class ObservableBindingTest : ReactiveTest
  15. {
  16. #region Multicast
  17. [TestMethod]
  18. public void Multicast_ArgumentChecking()
  19. {
  20. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Multicast<int, int>(null, new Subject<int>()));
  21. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Multicast<int, int>(DummyObservable<int>.Instance, null));
  22. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Multicast<int, int, int>(null, () => new Subject<int>(), xs => xs));
  23. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Multicast<int, int, int>(DummyObservable<int>.Instance, null, xs => xs));
  24. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Multicast<int, int, int>(DummyObservable<int>.Instance, () => new Subject<int>(), null));
  25. }
  26. [TestMethod]
  27. public void Multicast_Hot_1()
  28. {
  29. var scheduler = new TestScheduler();
  30. var s = new Subject<int>();
  31. var xs = scheduler.CreateHotObservable(
  32. OnNext(40, 0),
  33. OnNext(90, 1),
  34. OnNext(150, 2),
  35. OnNext(210, 3),
  36. OnNext(240, 4),
  37. OnNext(270, 5),
  38. OnNext(330, 6),
  39. OnNext(340, 7),
  40. OnCompleted<int>(390)
  41. );
  42. var c = default(IConnectableObservable<int>);
  43. var o = scheduler.CreateObserver<int>();
  44. var d1 = default(IDisposable);
  45. var d2 = default(IDisposable);
  46. scheduler.ScheduleAbsolute(50, () => c = xs.Multicast(s));
  47. scheduler.ScheduleAbsolute(100, () => d1 = c.Subscribe(o));
  48. scheduler.ScheduleAbsolute(200, () => d2 = c.Connect());
  49. scheduler.ScheduleAbsolute(300, () => d1.Dispose());
  50. scheduler.Start();
  51. o.Messages.AssertEqual(
  52. OnNext(210, 3),
  53. OnNext(240, 4),
  54. OnNext(270, 5)
  55. );
  56. xs.Subscriptions.AssertEqual(
  57. Subscribe(200, 390)
  58. );
  59. }
  60. [TestMethod]
  61. public void Multicast_Hot_2()
  62. {
  63. var scheduler = new TestScheduler();
  64. var s = new Subject<int>();
  65. var xs = scheduler.CreateHotObservable(
  66. OnNext(40, 0),
  67. OnNext(90, 1),
  68. OnNext(150, 2),
  69. OnNext(210, 3),
  70. OnNext(240, 4),
  71. OnNext(270, 5),
  72. OnNext(330, 6),
  73. OnNext(340, 7),
  74. OnCompleted<int>(390)
  75. );
  76. var c = default(IConnectableObservable<int>);
  77. var o = scheduler.CreateObserver<int>();
  78. var d1 = default(IDisposable);
  79. var d2 = default(IDisposable);
  80. scheduler.ScheduleAbsolute(50, () => c = xs.Multicast(s));
  81. scheduler.ScheduleAbsolute(100, () => d2 = c.Connect());
  82. scheduler.ScheduleAbsolute(200, () => d1 = c.Subscribe(o));
  83. scheduler.ScheduleAbsolute(300, () => d1.Dispose());
  84. scheduler.Start();
  85. o.Messages.AssertEqual(
  86. OnNext(210, 3),
  87. OnNext(240, 4),
  88. OnNext(270, 5)
  89. );
  90. xs.Subscriptions.AssertEqual(
  91. Subscribe(100, 390)
  92. );
  93. }
  94. [TestMethod]
  95. public void Multicast_Hot_3()
  96. {
  97. var scheduler = new TestScheduler();
  98. var s = new Subject<int>();
  99. var xs = scheduler.CreateHotObservable(
  100. OnNext(40, 0),
  101. OnNext(90, 1),
  102. OnNext(150, 2),
  103. OnNext(210, 3),
  104. OnNext(240, 4),
  105. OnNext(270, 5),
  106. OnNext(330, 6),
  107. OnNext(340, 7),
  108. OnCompleted<int>(390)
  109. );
  110. var c = default(IConnectableObservable<int>);
  111. var o = scheduler.CreateObserver<int>();
  112. var d1 = default(IDisposable);
  113. var d2 = default(IDisposable);
  114. scheduler.ScheduleAbsolute(50, () => c = xs.Multicast(s));
  115. scheduler.ScheduleAbsolute(100, () => d2 = c.Connect());
  116. scheduler.ScheduleAbsolute(200, () => d1 = c.Subscribe(o));
  117. scheduler.ScheduleAbsolute(300, () => d2.Dispose());
  118. scheduler.ScheduleAbsolute(335, () => d2 = c.Connect());
  119. scheduler.Start();
  120. o.Messages.AssertEqual(
  121. OnNext(210, 3),
  122. OnNext(240, 4),
  123. OnNext(270, 5),
  124. OnNext(340, 7),
  125. OnCompleted<int>(390)
  126. );
  127. xs.Subscriptions.AssertEqual(
  128. Subscribe(100, 300),
  129. Subscribe(335, 390)
  130. );
  131. }
  132. [TestMethod]
  133. public void Multicast_Hot_4()
  134. {
  135. var scheduler = new TestScheduler();
  136. var s = new Subject<int>();
  137. var ex = new Exception();
  138. var xs = scheduler.CreateHotObservable(
  139. OnNext(40, 0),
  140. OnNext(90, 1),
  141. OnNext(150, 2),
  142. OnNext(210, 3),
  143. OnNext(240, 4),
  144. OnNext(270, 5),
  145. OnNext(330, 6),
  146. OnNext(340, 7),
  147. OnError<int>(390, ex)
  148. );
  149. var c = default(IConnectableObservable<int>);
  150. var o = scheduler.CreateObserver<int>();
  151. var d1 = default(IDisposable);
  152. var d2 = default(IDisposable);
  153. scheduler.ScheduleAbsolute(50, () => c = xs.Multicast(s));
  154. scheduler.ScheduleAbsolute(100, () => d2 = c.Connect());
  155. scheduler.ScheduleAbsolute(200, () => d1 = c.Subscribe(o));
  156. scheduler.ScheduleAbsolute(300, () => d2.Dispose());
  157. scheduler.ScheduleAbsolute(335, () => d2 = c.Connect());
  158. scheduler.Start();
  159. o.Messages.AssertEqual(
  160. OnNext(210, 3),
  161. OnNext(240, 4),
  162. OnNext(270, 5),
  163. OnNext(340, 7),
  164. OnError<int>(390, ex)
  165. );
  166. xs.Subscriptions.AssertEqual(
  167. Subscribe(100, 300),
  168. Subscribe(335, 390)
  169. );
  170. }
  171. [TestMethod]
  172. public void Multicast_Hot_5()
  173. {
  174. var scheduler = new TestScheduler();
  175. var s = new Subject<int>();
  176. var ex = new Exception();
  177. var xs = scheduler.CreateHotObservable(
  178. OnNext(40, 0),
  179. OnNext(90, 1),
  180. OnNext(150, 2),
  181. OnNext(210, 3),
  182. OnNext(240, 4),
  183. OnNext(270, 5),
  184. OnNext(330, 6),
  185. OnNext(340, 7),
  186. OnError<int>(390, ex)
  187. );
  188. var c = default(IConnectableObservable<int>);
  189. var o = scheduler.CreateObserver<int>();
  190. var d1 = default(IDisposable);
  191. var d2 = default(IDisposable);
  192. scheduler.ScheduleAbsolute(50, () => c = xs.Multicast(s));
  193. scheduler.ScheduleAbsolute(100, () => d2 = c.Connect());
  194. scheduler.ScheduleAbsolute(400, () => d1 = c.Subscribe(o));
  195. scheduler.Start();
  196. o.Messages.AssertEqual(
  197. OnError<int>(400, ex)
  198. );
  199. xs.Subscriptions.AssertEqual(
  200. Subscribe(100, 390)
  201. );
  202. }
  203. [TestMethod]
  204. public void Multicast_Hot_6()
  205. {
  206. var scheduler = new TestScheduler();
  207. var s = new Subject<int>();
  208. var xs = scheduler.CreateHotObservable(
  209. OnNext(40, 0),
  210. OnNext(90, 1),
  211. OnNext(150, 2),
  212. OnNext(210, 3),
  213. OnNext(240, 4),
  214. OnNext(270, 5),
  215. OnNext(330, 6),
  216. OnNext(340, 7),
  217. OnCompleted<int>(390)
  218. );
  219. var c = default(IConnectableObservable<int>);
  220. var o = scheduler.CreateObserver<int>();
  221. var d1 = default(IDisposable);
  222. var d2 = default(IDisposable);
  223. scheduler.ScheduleAbsolute(50, () => c = xs.Multicast(s));
  224. scheduler.ScheduleAbsolute(100, () => d2 = c.Connect());
  225. scheduler.ScheduleAbsolute(400, () => d1 = c.Subscribe(o));
  226. scheduler.Start();
  227. o.Messages.AssertEqual(
  228. OnCompleted<int>(400)
  229. );
  230. xs.Subscriptions.AssertEqual(
  231. Subscribe(100, 390)
  232. );
  233. }
  234. [TestMethod]
  235. public void Multicast_Cold_Completed()
  236. {
  237. var scheduler = new TestScheduler();
  238. var xs = scheduler.CreateHotObservable(
  239. OnNext(40, 0),
  240. OnNext(90, 1),
  241. OnNext(150, 2),
  242. OnNext(210, 3),
  243. OnNext(240, 4),
  244. OnNext(270, 5),
  245. OnNext(330, 6),
  246. OnNext(340, 7),
  247. OnCompleted<int>(390)
  248. );
  249. var res = scheduler.Start(() =>
  250. xs.Multicast(() => new Subject<int>(), ys => ys)
  251. );
  252. res.Messages.AssertEqual(
  253. OnNext(210, 3),
  254. OnNext(240, 4),
  255. OnNext(270, 5),
  256. OnNext(330, 6),
  257. OnNext(340, 7),
  258. OnCompleted<int>(390)
  259. );
  260. xs.Subscriptions.AssertEqual(
  261. Subscribe(200, 390)
  262. );
  263. }
  264. [TestMethod]
  265. public void Multicast_Cold_Error()
  266. {
  267. var scheduler = new TestScheduler();
  268. var ex = new Exception();
  269. var xs = scheduler.CreateHotObservable(
  270. OnNext(40, 0),
  271. OnNext(90, 1),
  272. OnNext(150, 2),
  273. OnNext(210, 3),
  274. OnNext(240, 4),
  275. OnNext(270, 5),
  276. OnNext(330, 6),
  277. OnNext(340, 7),
  278. OnError<int>(390, ex)
  279. );
  280. var res = scheduler.Start(() =>
  281. xs.Multicast(() => new Subject<int>(), ys => ys)
  282. );
  283. res.Messages.AssertEqual(
  284. OnNext(210, 3),
  285. OnNext(240, 4),
  286. OnNext(270, 5),
  287. OnNext(330, 6),
  288. OnNext(340, 7),
  289. OnError<int>(390, ex)
  290. );
  291. xs.Subscriptions.AssertEqual(
  292. Subscribe(200, 390)
  293. );
  294. }
  295. [TestMethod]
  296. public void Multicast_Cold_Dispose()
  297. {
  298. var scheduler = new TestScheduler();
  299. var xs = scheduler.CreateHotObservable(
  300. OnNext(40, 0),
  301. OnNext(90, 1),
  302. OnNext(150, 2),
  303. OnNext(210, 3),
  304. OnNext(240, 4),
  305. OnNext(270, 5),
  306. OnNext(330, 6),
  307. OnNext(340, 7)
  308. );
  309. var res = scheduler.Start(() =>
  310. xs.Multicast(() => new Subject<int>(), ys => ys)
  311. );
  312. res.Messages.AssertEqual(
  313. OnNext(210, 3),
  314. OnNext(240, 4),
  315. OnNext(270, 5),
  316. OnNext(330, 6),
  317. OnNext(340, 7)
  318. );
  319. xs.Subscriptions.AssertEqual(
  320. Subscribe(200, 1000)
  321. );
  322. }
  323. [TestMethod]
  324. public void Multicast_Cold_Zip()
  325. {
  326. var scheduler = new TestScheduler();
  327. var xs = scheduler.CreateHotObservable(
  328. OnNext(40, 0),
  329. OnNext(90, 1),
  330. OnNext(150, 2),
  331. OnNext(210, 3),
  332. OnNext(240, 4),
  333. OnNext(270, 5),
  334. OnNext(330, 6),
  335. OnNext(340, 7),
  336. OnCompleted<int>(390)
  337. );
  338. var res = scheduler.Start(() =>
  339. xs.Multicast(() => new Subject<int>(), ys => ys.Zip(ys, (a, b) => a + b))
  340. );
  341. res.Messages.AssertEqual(
  342. OnNext(210, 6),
  343. OnNext(240, 8),
  344. OnNext(270, 10),
  345. OnNext(330, 12),
  346. OnNext(340, 14),
  347. OnCompleted<int>(390)
  348. );
  349. xs.Subscriptions.AssertEqual(
  350. Subscribe(200, 390)
  351. );
  352. }
  353. [TestMethod]
  354. public void Multicast_SubjectSelectorThrows()
  355. {
  356. var ex = new Exception();
  357. var scheduler = new TestScheduler();
  358. var xs = scheduler.CreateHotObservable(
  359. OnNext(210, 1),
  360. OnNext(240, 2),
  361. OnCompleted<int>(300)
  362. );
  363. var res = scheduler.Start(() =>
  364. xs.Multicast<int, int, int>(() => { throw ex; }, _ => _)
  365. );
  366. res.Messages.AssertEqual(
  367. OnError<int>(200, ex)
  368. );
  369. xs.Subscriptions.AssertEqual(
  370. );
  371. }
  372. [TestMethod]
  373. public void Multicast_SelectorThrows()
  374. {
  375. var ex = new Exception();
  376. var scheduler = new TestScheduler();
  377. var xs = scheduler.CreateHotObservable(
  378. OnNext(210, 1),
  379. OnNext(240, 2),
  380. OnCompleted<int>(300)
  381. );
  382. var res = scheduler.Start(() =>
  383. xs.Multicast<int, int, int>(() => new Subject<int>(), _ => { throw ex; })
  384. );
  385. res.Messages.AssertEqual(
  386. OnError<int>(200, ex)
  387. );
  388. xs.Subscriptions.AssertEqual(
  389. );
  390. }
  391. #endregion
  392. #region Publish
  393. [TestMethod]
  394. public void Publish_Cold_Zip()
  395. {
  396. var scheduler = new TestScheduler();
  397. var xs = scheduler.CreateHotObservable(
  398. OnNext(40, 0),
  399. OnNext(90, 1),
  400. OnNext(150, 2),
  401. OnNext(210, 3),
  402. OnNext(240, 4),
  403. OnNext(270, 5),
  404. OnNext(330, 6),
  405. OnNext(340, 7),
  406. OnCompleted<int>(390)
  407. );
  408. var res = scheduler.Start(() =>
  409. xs.Publish(ys => ys.Zip(ys, (a, b) => a + b))
  410. );
  411. res.Messages.AssertEqual(
  412. OnNext(210, 6),
  413. OnNext(240, 8),
  414. OnNext(270, 10),
  415. OnNext(330, 12),
  416. OnNext(340, 14),
  417. OnCompleted<int>(390)
  418. );
  419. xs.Subscriptions.AssertEqual(
  420. Subscribe(200, 390)
  421. );
  422. }
  423. [TestMethod]
  424. public void Publish_ArgumentChecking()
  425. {
  426. var someObservable = Observable.Empty<int>();
  427. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Publish(default(IObservable<int>)));
  428. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Publish(default(IObservable<int>), x => x));
  429. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Publish<int, int>(someObservable, null));
  430. }
  431. [TestMethod]
  432. public void Publish_Basic()
  433. {
  434. var scheduler = new TestScheduler();
  435. var xs = scheduler.CreateHotObservable(
  436. OnNext(110, 7),
  437. OnNext(220, 3),
  438. OnNext(280, 4),
  439. OnNext(290, 1),
  440. OnNext(340, 8),
  441. OnNext(360, 5),
  442. OnNext(370, 6),
  443. OnNext(390, 7),
  444. OnNext(410, 13),
  445. OnNext(430, 2),
  446. OnNext(450, 9),
  447. OnNext(520, 11),
  448. OnNext(560, 20),
  449. OnCompleted<int>(600)
  450. );
  451. var ys = default(IConnectableObservable<int>);
  452. var subscription = default(IDisposable);
  453. var connection = default(IDisposable);
  454. var res = scheduler.CreateObserver<int>();
  455. scheduler.ScheduleAbsolute(Created, () => ys = xs.Publish());
  456. scheduler.ScheduleAbsolute(Subscribed, () => subscription = ys.Subscribe(res));
  457. scheduler.ScheduleAbsolute(Disposed, () => subscription.Dispose());
  458. scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
  459. scheduler.ScheduleAbsolute(400, () => connection.Dispose());
  460. scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
  461. scheduler.ScheduleAbsolute(550, () => connection.Dispose());
  462. scheduler.ScheduleAbsolute(650, () => connection = ys.Connect());
  463. scheduler.ScheduleAbsolute(800, () => connection.Dispose());
  464. scheduler.Start();
  465. res.Messages.AssertEqual(
  466. OnNext(340, 8),
  467. OnNext(360, 5),
  468. OnNext(370, 6),
  469. OnNext(390, 7),
  470. OnNext(520, 11)
  471. );
  472. xs.Subscriptions.AssertEqual(
  473. Subscribe(300, 400),
  474. Subscribe(500, 550),
  475. Subscribe(650, 800)
  476. );
  477. }
  478. [TestMethod]
  479. public void Publish_Error()
  480. {
  481. var scheduler = new TestScheduler();
  482. var ex = new Exception();
  483. var xs = scheduler.CreateHotObservable(
  484. OnNext(110, 7),
  485. OnNext(220, 3),
  486. OnNext(280, 4),
  487. OnNext(290, 1),
  488. OnNext(340, 8),
  489. OnNext(360, 5),
  490. OnNext(370, 6),
  491. OnNext(390, 7),
  492. OnNext(410, 13),
  493. OnNext(430, 2),
  494. OnNext(450, 9),
  495. OnNext(520, 11),
  496. OnNext(560, 20),
  497. OnError<int>(600, ex)
  498. );
  499. var ys = default(IConnectableObservable<int>);
  500. var subscription = default(IDisposable);
  501. var connection = default(IDisposable);
  502. var res = scheduler.CreateObserver<int>();
  503. scheduler.ScheduleAbsolute(Created, () => ys = xs.Publish());
  504. scheduler.ScheduleAbsolute(Subscribed, () => subscription = ys.Subscribe(res));
  505. scheduler.ScheduleAbsolute(Disposed, () => subscription.Dispose());
  506. scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
  507. scheduler.ScheduleAbsolute(400, () => connection.Dispose());
  508. scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
  509. scheduler.ScheduleAbsolute(800, () => connection.Dispose());
  510. scheduler.Start();
  511. res.Messages.AssertEqual(
  512. OnNext(340, 8),
  513. OnNext(360, 5),
  514. OnNext(370, 6),
  515. OnNext(390, 7),
  516. OnNext(520, 11),
  517. OnNext(560, 20),
  518. OnError<int>(600, ex)
  519. );
  520. xs.Subscriptions.AssertEqual(
  521. Subscribe(300, 400),
  522. Subscribe(500, 600)
  523. );
  524. }
  525. [TestMethod]
  526. public void Publish_Complete()
  527. {
  528. var scheduler = new TestScheduler();
  529. var xs = scheduler.CreateHotObservable(
  530. OnNext(110, 7),
  531. OnNext(220, 3),
  532. OnNext(280, 4),
  533. OnNext(290, 1),
  534. OnNext(340, 8),
  535. OnNext(360, 5),
  536. OnNext(370, 6),
  537. OnNext(390, 7),
  538. OnNext(410, 13),
  539. OnNext(430, 2),
  540. OnNext(450, 9),
  541. OnNext(520, 11),
  542. OnNext(560, 20),
  543. OnCompleted<int>(600)
  544. );
  545. var ys = default(IConnectableObservable<int>);
  546. var subscription = default(IDisposable);
  547. var connection = default(IDisposable);
  548. var res = scheduler.CreateObserver<int>();
  549. scheduler.ScheduleAbsolute(Created, () => ys = xs.Publish());
  550. scheduler.ScheduleAbsolute(Subscribed, () => subscription = ys.Subscribe(res));
  551. scheduler.ScheduleAbsolute(Disposed, () => subscription.Dispose());
  552. scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
  553. scheduler.ScheduleAbsolute(400, () => connection.Dispose());
  554. scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
  555. scheduler.ScheduleAbsolute(800, () => connection.Dispose());
  556. scheduler.Start();
  557. res.Messages.AssertEqual(
  558. OnNext(340, 8),
  559. OnNext(360, 5),
  560. OnNext(370, 6),
  561. OnNext(390, 7),
  562. OnNext(520, 11),
  563. OnNext(560, 20),
  564. OnCompleted<int>(600)
  565. );
  566. xs.Subscriptions.AssertEqual(
  567. Subscribe(300, 400),
  568. Subscribe(500, 600)
  569. );
  570. }
  571. [TestMethod]
  572. public void Publish_Dispose()
  573. {
  574. var scheduler = new TestScheduler();
  575. var xs = scheduler.CreateHotObservable(
  576. OnNext(110, 7),
  577. OnNext(220, 3),
  578. OnNext(280, 4),
  579. OnNext(290, 1),
  580. OnNext(340, 8),
  581. OnNext(360, 5),
  582. OnNext(370, 6),
  583. OnNext(390, 7),
  584. OnNext(410, 13),
  585. OnNext(430, 2),
  586. OnNext(450, 9),
  587. OnNext(520, 11),
  588. OnNext(560, 20),
  589. OnCompleted<int>(600)
  590. );
  591. var ys = default(IConnectableObservable<int>);
  592. var subscription = default(IDisposable);
  593. var connection = default(IDisposable);
  594. var res = scheduler.CreateObserver<int>();
  595. scheduler.ScheduleAbsolute(Created, () => ys = xs.Publish());
  596. scheduler.ScheduleAbsolute(Subscribed, () => subscription = ys.Subscribe(res));
  597. scheduler.ScheduleAbsolute(350, () => subscription.Dispose());
  598. scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
  599. scheduler.ScheduleAbsolute(400, () => connection.Dispose());
  600. scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
  601. scheduler.ScheduleAbsolute(550, () => connection.Dispose());
  602. scheduler.ScheduleAbsolute(650, () => connection = ys.Connect());
  603. scheduler.ScheduleAbsolute(800, () => connection.Dispose());
  604. scheduler.Start();
  605. res.Messages.AssertEqual(
  606. OnNext(340, 8)
  607. );
  608. xs.Subscriptions.AssertEqual(
  609. Subscribe(300, 400),
  610. Subscribe(500, 550),
  611. Subscribe(650, 800)
  612. );
  613. }
  614. [TestMethod]
  615. public void Publish_MultipleConnections()
  616. {
  617. var xs = Observable.Never<int>();
  618. var ys = xs.Publish();
  619. var connection1 = ys.Connect();
  620. var connection2 = ys.Connect();
  621. Assert.AreSame(connection1, connection2);
  622. connection1.Dispose();
  623. connection2.Dispose();
  624. var connection3 = ys.Connect();
  625. Assert.AreNotSame(connection1, connection3);
  626. connection3.Dispose();
  627. }
  628. [TestMethod]
  629. public void PublishLambda_Zip_Complete()
  630. {
  631. var scheduler = new TestScheduler();
  632. var xs = scheduler.CreateHotObservable(
  633. OnNext(110, 7),
  634. OnNext(220, 3),
  635. OnNext(280, 4),
  636. OnNext(290, 1),
  637. OnNext(340, 8),
  638. OnNext(360, 5),
  639. OnNext(370, 6),
  640. OnNext(390, 7),
  641. OnNext(410, 13),
  642. OnNext(430, 2),
  643. OnNext(450, 9),
  644. OnNext(520, 11),
  645. OnNext(560, 20),
  646. OnCompleted<int>(600)
  647. );
  648. var res = scheduler.Start(() =>
  649. xs.Publish(_xs => _xs.Zip(_xs.Skip(1), (prev, cur) => cur + prev))
  650. );
  651. res.Messages.AssertEqual(
  652. OnNext(280, 7),
  653. OnNext(290, 5),
  654. OnNext(340, 9),
  655. OnNext(360, 13),
  656. OnNext(370, 11),
  657. OnNext(390, 13),
  658. OnNext(410, 20),
  659. OnNext(430, 15),
  660. OnNext(450, 11),
  661. OnNext(520, 20),
  662. OnNext(560, 31),
  663. OnCompleted<int>(600)
  664. );
  665. xs.Subscriptions.AssertEqual(
  666. Subscribe(200, 600)
  667. );
  668. }
  669. [TestMethod]
  670. public void PublishLambda_Zip_Error()
  671. {
  672. var scheduler = new TestScheduler();
  673. var ex = new Exception();
  674. var xs = scheduler.CreateHotObservable(
  675. OnNext(110, 7),
  676. OnNext(220, 3),
  677. OnNext(280, 4),
  678. OnNext(290, 1),
  679. OnNext(340, 8),
  680. OnNext(360, 5),
  681. OnNext(370, 6),
  682. OnNext(390, 7),
  683. OnNext(410, 13),
  684. OnNext(430, 2),
  685. OnNext(450, 9),
  686. OnNext(520, 11),
  687. OnNext(560, 20),
  688. OnError<int>(600, ex)
  689. );
  690. var res = scheduler.Start(() =>
  691. xs.Publish(_xs => _xs.Zip(_xs.Skip(1), (prev, cur) => cur + prev))
  692. );
  693. res.Messages.AssertEqual(
  694. OnNext(280, 7),
  695. OnNext(290, 5),
  696. OnNext(340, 9),
  697. OnNext(360, 13),
  698. OnNext(370, 11),
  699. OnNext(390, 13),
  700. OnNext(410, 20),
  701. OnNext(430, 15),
  702. OnNext(450, 11),
  703. OnNext(520, 20),
  704. OnNext(560, 31),
  705. OnError<int>(600, ex)
  706. );
  707. xs.Subscriptions.AssertEqual(
  708. Subscribe(200, 600)
  709. );
  710. }
  711. [TestMethod]
  712. public void PublishLambda_Zip_Dispose()
  713. {
  714. var scheduler = new TestScheduler();
  715. var xs = scheduler.CreateHotObservable(
  716. OnNext(110, 7),
  717. OnNext(220, 3),
  718. OnNext(280, 4),
  719. OnNext(290, 1),
  720. OnNext(340, 8),
  721. OnNext(360, 5),
  722. OnNext(370, 6),
  723. OnNext(390, 7),
  724. OnNext(410, 13),
  725. OnNext(430, 2),
  726. OnNext(450, 9),
  727. OnNext(520, 11),
  728. OnNext(560, 20),
  729. OnCompleted<int>(600)
  730. );
  731. var res = scheduler.Start(() =>
  732. xs.Publish(_xs => _xs.Zip(_xs.Skip(1), (prev, cur) => cur + prev)),
  733. 470
  734. );
  735. res.Messages.AssertEqual(
  736. OnNext(280, 7),
  737. OnNext(290, 5),
  738. OnNext(340, 9),
  739. OnNext(360, 13),
  740. OnNext(370, 11),
  741. OnNext(390, 13),
  742. OnNext(410, 20),
  743. OnNext(430, 15),
  744. OnNext(450, 11)
  745. );
  746. xs.Subscriptions.AssertEqual(
  747. Subscribe(200, 470)
  748. );
  749. }
  750. [TestMethod]
  751. public void PublishWithInitialValue_ArgumentChecking()
  752. {
  753. var someObservable = Observable.Empty<int>();
  754. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Publish(default(IObservable<int>), 1));
  755. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Publish(default(IObservable<int>), x => x, 1));
  756. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Publish(someObservable, default(Func<IObservable<int>, IObservable<int>>), 1));
  757. }
  758. [TestMethod]
  759. public void PublishWithInitialValue_SanityCheck()
  760. {
  761. var someObservable = Observable.Empty<int>();
  762. Observable.Publish(Observable.Range(1, 10), x => x, 0).AssertEqual(Observable.Range(0, 11));
  763. }
  764. [TestMethod]
  765. public void PublishWithInitialValue_Basic()
  766. {
  767. var scheduler = new TestScheduler();
  768. var xs = scheduler.CreateHotObservable(
  769. OnNext(110, 7),
  770. OnNext(220, 3),
  771. OnNext(280, 4),
  772. OnNext(290, 1),
  773. OnNext(340, 8),
  774. OnNext(360, 5),
  775. OnNext(370, 6),
  776. OnNext(390, 7),
  777. OnNext(410, 13),
  778. OnNext(430, 2),
  779. OnNext(450, 9),
  780. OnNext(520, 11),
  781. OnNext(560, 20),
  782. OnCompleted<int>(600)
  783. );
  784. var ys = default(IConnectableObservable<int>);
  785. var subscription = default(IDisposable);
  786. var connection = default(IDisposable);
  787. var res = scheduler.CreateObserver<int>();
  788. scheduler.ScheduleAbsolute(Created, () => ys = xs.Publish(1979));
  789. scheduler.ScheduleAbsolute(Subscribed, () => subscription = ys.Subscribe(res));
  790. scheduler.ScheduleAbsolute(Disposed, () => subscription.Dispose());
  791. scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
  792. scheduler.ScheduleAbsolute(400, () => connection.Dispose());
  793. scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
  794. scheduler.ScheduleAbsolute(550, () => connection.Dispose());
  795. scheduler.ScheduleAbsolute(650, () => connection = ys.Connect());
  796. scheduler.ScheduleAbsolute(800, () => connection.Dispose());
  797. scheduler.Start();
  798. res.Messages.AssertEqual(
  799. OnNext(200, 1979),
  800. OnNext(340, 8),
  801. OnNext(360, 5),
  802. OnNext(370, 6),
  803. OnNext(390, 7),
  804. OnNext(520, 11)
  805. );
  806. xs.Subscriptions.AssertEqual(
  807. Subscribe(300, 400),
  808. Subscribe(500, 550),
  809. Subscribe(650, 800)
  810. );
  811. }
  812. [TestMethod]
  813. public void PublishWithInitialValue_Error()
  814. {
  815. var scheduler = new TestScheduler();
  816. var ex = new Exception();
  817. var xs = scheduler.CreateHotObservable(
  818. OnNext(110, 7),
  819. OnNext(220, 3),
  820. OnNext(280, 4),
  821. OnNext(290, 1),
  822. OnNext(340, 8),
  823. OnNext(360, 5),
  824. OnNext(370, 6),
  825. OnNext(390, 7),
  826. OnNext(410, 13),
  827. OnNext(430, 2),
  828. OnNext(450, 9),
  829. OnNext(520, 11),
  830. OnNext(560, 20),
  831. OnError<int>(600, ex)
  832. );
  833. var ys = default(IConnectableObservable<int>);
  834. var subscription = default(IDisposable);
  835. var connection = default(IDisposable);
  836. var res = scheduler.CreateObserver<int>();
  837. scheduler.ScheduleAbsolute(Created, () => ys = xs.Publish(1979));
  838. scheduler.ScheduleAbsolute(Subscribed, () => subscription = ys.Subscribe(res));
  839. scheduler.ScheduleAbsolute(Disposed, () => subscription.Dispose());
  840. scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
  841. scheduler.ScheduleAbsolute(400, () => connection.Dispose());
  842. scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
  843. scheduler.ScheduleAbsolute(800, () => connection.Dispose());
  844. scheduler.Start();
  845. res.Messages.AssertEqual(
  846. OnNext(200, 1979),
  847. OnNext(340, 8),
  848. OnNext(360, 5),
  849. OnNext(370, 6),
  850. OnNext(390, 7),
  851. OnNext(520, 11),
  852. OnNext(560, 20),
  853. OnError<int>(600, ex)
  854. );
  855. xs.Subscriptions.AssertEqual(
  856. Subscribe(300, 400),
  857. Subscribe(500, 600)
  858. );
  859. }
  860. [TestMethod]
  861. public void PublishWithInitialValue_Complete()
  862. {
  863. var scheduler = new TestScheduler();
  864. var xs = scheduler.CreateHotObservable(
  865. OnNext(110, 7),
  866. OnNext(220, 3),
  867. OnNext(280, 4),
  868. OnNext(290, 1),
  869. OnNext(340, 8),
  870. OnNext(360, 5),
  871. OnNext(370, 6),
  872. OnNext(390, 7),
  873. OnNext(410, 13),
  874. OnNext(430, 2),
  875. OnNext(450, 9),
  876. OnNext(520, 11),
  877. OnNext(560, 20),
  878. OnCompleted<int>(600)
  879. );
  880. var ys = default(IConnectableObservable<int>);
  881. var subscription = default(IDisposable);
  882. var connection = default(IDisposable);
  883. var res = scheduler.CreateObserver<int>();
  884. scheduler.ScheduleAbsolute(Created, () => ys = xs.Publish(1979));
  885. scheduler.ScheduleAbsolute(Subscribed, () => subscription = ys.Subscribe(res));
  886. scheduler.ScheduleAbsolute(Disposed, () => subscription.Dispose());
  887. scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
  888. scheduler.ScheduleAbsolute(400, () => connection.Dispose());
  889. scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
  890. scheduler.ScheduleAbsolute(800, () => connection.Dispose());
  891. scheduler.Start();
  892. res.Messages.AssertEqual(
  893. OnNext(200, 1979),
  894. OnNext(340, 8),
  895. OnNext(360, 5),
  896. OnNext(370, 6),
  897. OnNext(390, 7),
  898. OnNext(520, 11),
  899. OnNext(560, 20),
  900. OnCompleted<int>(600)
  901. );
  902. xs.Subscriptions.AssertEqual(
  903. Subscribe(300, 400),
  904. Subscribe(500, 600)
  905. );
  906. }
  907. [TestMethod]
  908. public void PublishWithInitialValue_Dispose()
  909. {
  910. var scheduler = new TestScheduler();
  911. var xs = scheduler.CreateHotObservable(
  912. OnNext(110, 7),
  913. OnNext(220, 3),
  914. OnNext(280, 4),
  915. OnNext(290, 1),
  916. OnNext(340, 8),
  917. OnNext(360, 5),
  918. OnNext(370, 6),
  919. OnNext(390, 7),
  920. OnNext(410, 13),
  921. OnNext(430, 2),
  922. OnNext(450, 9),
  923. OnNext(520, 11),
  924. OnNext(560, 20),
  925. OnCompleted<int>(600)
  926. );
  927. var ys = default(IConnectableObservable<int>);
  928. var subscription = default(IDisposable);
  929. var connection = default(IDisposable);
  930. var res = scheduler.CreateObserver<int>();
  931. scheduler.ScheduleAbsolute(Created, () => ys = xs.Publish(1979));
  932. scheduler.ScheduleAbsolute(Subscribed, () => subscription = ys.Subscribe(res));
  933. scheduler.ScheduleAbsolute(350, () => subscription.Dispose());
  934. scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
  935. scheduler.ScheduleAbsolute(400, () => connection.Dispose());
  936. scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
  937. scheduler.ScheduleAbsolute(550, () => connection.Dispose());
  938. scheduler.ScheduleAbsolute(650, () => connection = ys.Connect());
  939. scheduler.ScheduleAbsolute(800, () => connection.Dispose());
  940. scheduler.Start();
  941. res.Messages.AssertEqual(
  942. OnNext(200, 1979),
  943. OnNext(340, 8)
  944. );
  945. xs.Subscriptions.AssertEqual(
  946. Subscribe(300, 400),
  947. Subscribe(500, 550),
  948. Subscribe(650, 800)
  949. );
  950. }
  951. [TestMethod]
  952. public void PublishWithInitialValue_MultipleConnections()
  953. {
  954. var xs = Observable.Never<int>();
  955. var ys = xs.Publish(1979);
  956. var connection1 = ys.Connect();
  957. var connection2 = ys.Connect();
  958. Assert.AreSame(connection1, connection2);
  959. connection1.Dispose();
  960. connection2.Dispose();
  961. var connection3 = ys.Connect();
  962. Assert.AreNotSame(connection1, connection3);
  963. connection3.Dispose();
  964. }
  965. [TestMethod]
  966. public void PublishWithInitialValueLambda_Zip_Complete()
  967. {
  968. var scheduler = new TestScheduler();
  969. var xs = scheduler.CreateHotObservable(
  970. OnNext(110, 7),
  971. OnNext(220, 3),
  972. OnNext(280, 4),
  973. OnNext(290, 1),
  974. OnNext(340, 8),
  975. OnNext(360, 5),
  976. OnNext(370, 6),
  977. OnNext(390, 7),
  978. OnNext(410, 13),
  979. OnNext(430, 2),
  980. OnNext(450, 9),
  981. OnNext(520, 11),
  982. OnNext(560, 20),
  983. OnCompleted<int>(600)
  984. );
  985. var res = scheduler.Start(() =>
  986. xs.Publish(_xs => _xs.Zip(_xs.Skip(1), (prev, cur) => cur + prev), 1979)
  987. );
  988. res.Messages.AssertEqual(
  989. OnNext(220, 1982),
  990. OnNext(280, 7),
  991. OnNext(290, 5),
  992. OnNext(340, 9),
  993. OnNext(360, 13),
  994. OnNext(370, 11),
  995. OnNext(390, 13),
  996. OnNext(410, 20),
  997. OnNext(430, 15),
  998. OnNext(450, 11),
  999. OnNext(520, 20),
  1000. OnNext(560, 31),
  1001. OnCompleted<int>(600)
  1002. );
  1003. xs.Subscriptions.AssertEqual(
  1004. Subscribe(200, 600)
  1005. );
  1006. }
  1007. [TestMethod]
  1008. public void PublishWithInitialValueLambda_Zip_Error()
  1009. {
  1010. var scheduler = new TestScheduler();
  1011. var ex = new Exception();
  1012. var xs = scheduler.CreateHotObservable(
  1013. OnNext(110, 7),
  1014. OnNext(220, 3),
  1015. OnNext(280, 4),
  1016. OnNext(290, 1),
  1017. OnNext(340, 8),
  1018. OnNext(360, 5),
  1019. OnNext(370, 6),
  1020. OnNext(390, 7),
  1021. OnNext(410, 13),
  1022. OnNext(430, 2),
  1023. OnNext(450, 9),
  1024. OnNext(520, 11),
  1025. OnNext(560, 20),
  1026. OnError<int>(600, ex)
  1027. );
  1028. var res = scheduler.Start(() =>
  1029. xs.Publish(_xs => _xs.Zip(_xs.Skip(1), (prev, cur) => cur + prev), 1979)
  1030. );
  1031. res.Messages.AssertEqual(
  1032. OnNext(220, 1982),
  1033. OnNext(280, 7),
  1034. OnNext(290, 5),
  1035. OnNext(340, 9),
  1036. OnNext(360, 13),
  1037. OnNext(370, 11),
  1038. OnNext(390, 13),
  1039. OnNext(410, 20),
  1040. OnNext(430, 15),
  1041. OnNext(450, 11),
  1042. OnNext(520, 20),
  1043. OnNext(560, 31),
  1044. OnError<int>(600, ex)
  1045. );
  1046. xs.Subscriptions.AssertEqual(
  1047. Subscribe(200, 600)
  1048. );
  1049. }
  1050. [TestMethod]
  1051. public void PublishWithInitialValueLambda_Zip_Dispose()
  1052. {
  1053. var scheduler = new TestScheduler();
  1054. var xs = scheduler.CreateHotObservable(
  1055. OnNext(110, 7),
  1056. OnNext(220, 3),
  1057. OnNext(280, 4),
  1058. OnNext(290, 1),
  1059. OnNext(340, 8),
  1060. OnNext(360, 5),
  1061. OnNext(370, 6),
  1062. OnNext(390, 7),
  1063. OnNext(410, 13),
  1064. OnNext(430, 2),
  1065. OnNext(450, 9),
  1066. OnNext(520, 11),
  1067. OnNext(560, 20),
  1068. OnCompleted<int>(600)
  1069. );
  1070. var res = scheduler.Start(() =>
  1071. xs.Publish(_xs => _xs.Zip(_xs.Skip(1), (prev, cur) => cur + prev), 1979),
  1072. 470
  1073. );
  1074. res.Messages.AssertEqual(
  1075. OnNext(220, 1982),
  1076. OnNext(280, 7),
  1077. OnNext(290, 5),
  1078. OnNext(340, 9),
  1079. OnNext(360, 13),
  1080. OnNext(370, 11),
  1081. OnNext(390, 13),
  1082. OnNext(410, 20),
  1083. OnNext(430, 15),
  1084. OnNext(450, 11)
  1085. );
  1086. xs.Subscriptions.AssertEqual(
  1087. Subscribe(200, 470)
  1088. );
  1089. }
  1090. #endregion
  1091. #region PublishLast
  1092. [TestMethod]
  1093. public void PublishLast_ArgumentChecking()
  1094. {
  1095. var someObservable = Observable.Empty<int>();
  1096. var scheduler = new TestScheduler();
  1097. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.PublishLast(default(IObservable<int>)));
  1098. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.PublishLast(default(IObservable<int>), x => x));
  1099. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.PublishLast<int, int>(someObservable, null));
  1100. }
  1101. [TestMethod]
  1102. public void PublishLast_Basic()
  1103. {
  1104. var scheduler = new TestScheduler();
  1105. var xs = scheduler.CreateHotObservable(
  1106. OnNext(110, 7),
  1107. OnNext(220, 3),
  1108. OnNext(280, 4),
  1109. OnNext(290, 1),
  1110. OnNext(340, 8),
  1111. OnNext(360, 5),
  1112. OnNext(370, 6),
  1113. OnNext(390, 7),
  1114. OnNext(410, 13),
  1115. OnNext(430, 2),
  1116. OnNext(450, 9),
  1117. OnNext(520, 11),
  1118. OnNext(560, 20),
  1119. OnCompleted<int>(600)
  1120. );
  1121. var ys = default(IConnectableObservable<int>);
  1122. var subscription = default(IDisposable);
  1123. var connection = default(IDisposable);
  1124. var res = scheduler.CreateObserver<int>();
  1125. scheduler.ScheduleAbsolute(Created, () => ys = xs.PublishLast());
  1126. scheduler.ScheduleAbsolute(Subscribed, () => subscription = ys.Subscribe(res));
  1127. scheduler.ScheduleAbsolute(Disposed, () => subscription.Dispose());
  1128. scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
  1129. scheduler.ScheduleAbsolute(400, () => connection.Dispose());
  1130. scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
  1131. scheduler.ScheduleAbsolute(550, () => connection.Dispose());
  1132. scheduler.ScheduleAbsolute(650, () => connection = ys.Connect());
  1133. scheduler.ScheduleAbsolute(800, () => connection.Dispose());
  1134. scheduler.Start();
  1135. res.Messages.AssertEqual(
  1136. );
  1137. xs.Subscriptions.AssertEqual(
  1138. Subscribe(300, 400),
  1139. Subscribe(500, 550),
  1140. Subscribe(650, 800)
  1141. );
  1142. }
  1143. [TestMethod]
  1144. public void PublishLast_Error()
  1145. {
  1146. var scheduler = new TestScheduler();
  1147. var ex = new Exception();
  1148. var xs = scheduler.CreateHotObservable(
  1149. OnNext(110, 7),
  1150. OnNext(220, 3),
  1151. OnNext(280, 4),
  1152. OnNext(290, 1),
  1153. OnNext(340, 8),
  1154. OnNext(360, 5),
  1155. OnNext(370, 6),
  1156. OnNext(390, 7),
  1157. OnNext(410, 13),
  1158. OnNext(430, 2),
  1159. OnNext(450, 9),
  1160. OnNext(520, 11),
  1161. OnNext(560, 20),
  1162. OnError<int>(600, ex)
  1163. );
  1164. var ys = default(IConnectableObservable<int>);
  1165. var subscription = default(IDisposable);
  1166. var connection = default(IDisposable);
  1167. var res = scheduler.CreateObserver<int>();
  1168. scheduler.ScheduleAbsolute(Created, () => ys = xs.PublishLast());
  1169. scheduler.ScheduleAbsolute(Subscribed, () => subscription = ys.Subscribe(res));
  1170. scheduler.ScheduleAbsolute(Disposed, () => subscription.Dispose());
  1171. scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
  1172. scheduler.ScheduleAbsolute(400, () => connection.Dispose());
  1173. scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
  1174. scheduler.ScheduleAbsolute(800, () => connection.Dispose());
  1175. scheduler.Start();
  1176. res.Messages.AssertEqual(
  1177. OnError<int>(600, ex)
  1178. );
  1179. xs.Subscriptions.AssertEqual(
  1180. Subscribe(300, 400),
  1181. Subscribe(500, 600)
  1182. );
  1183. }
  1184. [TestMethod]
  1185. public void PublishLast_Complete()
  1186. {
  1187. var scheduler = new TestScheduler();
  1188. var xs = scheduler.CreateHotObservable(
  1189. OnNext(110, 7),
  1190. OnNext(220, 3),
  1191. OnNext(280, 4),
  1192. OnNext(290, 1),
  1193. OnNext(340, 8),
  1194. OnNext(360, 5),
  1195. OnNext(370, 6),
  1196. OnNext(390, 7),
  1197. OnNext(410, 13),
  1198. OnNext(430, 2),
  1199. OnNext(450, 9),
  1200. OnNext(520, 11),
  1201. OnNext(560, 20),
  1202. OnCompleted<int>(600)
  1203. );
  1204. var ys = default(IConnectableObservable<int>);
  1205. var subscription = default(IDisposable);
  1206. var connection = default(IDisposable);
  1207. var res = scheduler.CreateObserver<int>();
  1208. scheduler.ScheduleAbsolute(Created, () => ys = xs.PublishLast());
  1209. scheduler.ScheduleAbsolute(Subscribed, () => subscription = ys.Subscribe(res));
  1210. scheduler.ScheduleAbsolute(Disposed, () => subscription.Dispose());
  1211. scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
  1212. scheduler.ScheduleAbsolute(400, () => connection.Dispose());
  1213. scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
  1214. scheduler.ScheduleAbsolute(800, () => connection.Dispose());
  1215. scheduler.Start();
  1216. res.Messages.AssertEqual(
  1217. OnNext(600, 20),
  1218. OnCompleted<int>(600)
  1219. );
  1220. xs.Subscriptions.AssertEqual(
  1221. Subscribe(300, 400),
  1222. Subscribe(500, 600)
  1223. );
  1224. }
  1225. [TestMethod]
  1226. public void PublishLast_Dispose()
  1227. {
  1228. var scheduler = new TestScheduler();
  1229. var xs = scheduler.CreateHotObservable(
  1230. OnNext(110, 7),
  1231. OnNext(220, 3),
  1232. OnNext(280, 4),
  1233. OnNext(290, 1),
  1234. OnNext(340, 8),
  1235. OnNext(360, 5),
  1236. OnNext(370, 6),
  1237. OnNext(390, 7),
  1238. OnNext(410, 13),
  1239. OnNext(430, 2),
  1240. OnNext(450, 9),
  1241. OnNext(520, 11),
  1242. OnNext(560, 20),
  1243. OnCompleted<int>(600)
  1244. );
  1245. var ys = default(IConnectableObservable<int>);
  1246. var subscription = default(IDisposable);
  1247. var connection = default(IDisposable);
  1248. var res = scheduler.CreateObserver<int>();
  1249. scheduler.ScheduleAbsolute(Created, () => ys = xs.PublishLast());
  1250. scheduler.ScheduleAbsolute(Subscribed, () => subscription = ys.Subscribe(res));
  1251. scheduler.ScheduleAbsolute(350, () => subscription.Dispose());
  1252. scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
  1253. scheduler.ScheduleAbsolute(400, () => connection.Dispose());
  1254. scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
  1255. scheduler.ScheduleAbsolute(550, () => connection.Dispose());
  1256. scheduler.ScheduleAbsolute(650, () => connection = ys.Connect());
  1257. scheduler.ScheduleAbsolute(800, () => connection.Dispose());
  1258. scheduler.Start();
  1259. res.Messages.AssertEqual(
  1260. );
  1261. xs.Subscriptions.AssertEqual(
  1262. Subscribe(300, 400),
  1263. Subscribe(500, 550),
  1264. Subscribe(650, 800)
  1265. );
  1266. }
  1267. [TestMethod]
  1268. public void PublishLast_MultipleConnections()
  1269. {
  1270. var xs = Observable.Never<int>();
  1271. var ys = xs.PublishLast();
  1272. var connection1 = ys.Connect();
  1273. var connection2 = ys.Connect();
  1274. Assert.AreSame(connection1, connection2);
  1275. connection1.Dispose();
  1276. connection2.Dispose();
  1277. var connection3 = ys.Connect();
  1278. Assert.AreNotSame(connection1, connection3);
  1279. connection3.Dispose();
  1280. }
  1281. [TestMethod]
  1282. public void PublishLastLambda_Zip_Complete()
  1283. {
  1284. var scheduler = new TestScheduler();
  1285. var xs = scheduler.CreateHotObservable(
  1286. OnNext(110, 7),
  1287. OnNext(220, 3),
  1288. OnNext(280, 4),
  1289. OnNext(290, 1),
  1290. OnNext(340, 8),
  1291. OnNext(360, 5),
  1292. OnNext(370, 6),
  1293. OnNext(390, 7),
  1294. OnNext(410, 13),
  1295. OnNext(430, 2),
  1296. OnNext(450, 9),
  1297. OnNext(520, 11),
  1298. OnNext(560, 20),
  1299. OnCompleted<int>(600)
  1300. );
  1301. var res = scheduler.Start(() =>
  1302. xs.PublishLast(_xs => _xs.Zip(_xs, (x, y) => x + y))
  1303. );
  1304. res.Messages.AssertEqual(
  1305. OnNext(600, 40),
  1306. OnCompleted<int>(600)
  1307. );
  1308. xs.Subscriptions.AssertEqual(
  1309. Subscribe(200, 600)
  1310. );
  1311. }
  1312. [TestMethod]
  1313. public void PublishLastLambda_Zip_Error()
  1314. {
  1315. var scheduler = new TestScheduler();
  1316. var ex = new Exception();
  1317. var xs = scheduler.CreateHotObservable(
  1318. OnNext(110, 7),
  1319. OnNext(220, 3),
  1320. OnNext(280, 4),
  1321. OnNext(290, 1),
  1322. OnNext(340, 8),
  1323. OnNext(360, 5),
  1324. OnNext(370, 6),
  1325. OnNext(390, 7),
  1326. OnNext(410, 13),
  1327. OnNext(430, 2),
  1328. OnNext(450, 9),
  1329. OnNext(520, 11),
  1330. OnNext(560, 20),
  1331. OnError<int>(600, ex)
  1332. );
  1333. var res = scheduler.Start(() =>
  1334. xs.PublishLast(_xs => _xs.Zip(_xs, (x, y) => x + y))
  1335. );
  1336. res.Messages.AssertEqual(
  1337. OnError<int>(600, ex)
  1338. );
  1339. xs.Subscriptions.AssertEqual(
  1340. Subscribe(200, 600)
  1341. );
  1342. }
  1343. [TestMethod]
  1344. public void PublishLastLambda_Zip_Dispose()
  1345. {
  1346. var scheduler = new TestScheduler();
  1347. var xs = scheduler.CreateHotObservable(
  1348. OnNext(110, 7),
  1349. OnNext(220, 3),
  1350. OnNext(280, 4),
  1351. OnNext(290, 1),
  1352. OnNext(340, 8),
  1353. OnNext(360, 5),
  1354. OnNext(370, 6),
  1355. OnNext(390, 7),
  1356. OnNext(410, 13),
  1357. OnNext(430, 2),
  1358. OnNext(450, 9),
  1359. OnNext(520, 11),
  1360. OnNext(560, 20),
  1361. OnCompleted<int>(600)
  1362. );
  1363. var res = scheduler.Start(() =>
  1364. xs.PublishLast(_xs => _xs.Zip(_xs, (x, y) => x + y)),
  1365. 470
  1366. );
  1367. res.Messages.AssertEqual(
  1368. );
  1369. xs.Subscriptions.AssertEqual(
  1370. Subscribe(200, 470)
  1371. );
  1372. }
  1373. #endregion
  1374. #region RefCount
  1375. [TestMethod]
  1376. public void RefCount_ArgumentChecking()
  1377. {
  1378. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.RefCount<int>(null));
  1379. }
  1380. [TestMethod]
  1381. public void RefCount_ConnectsOnFirst()
  1382. {
  1383. var scheduler = new TestScheduler();
  1384. var xs = scheduler.CreateHotObservable<int>(
  1385. OnNext(210, 1),
  1386. OnNext(220, 2),
  1387. OnNext(230, 3),
  1388. OnNext(240, 4),
  1389. OnCompleted<int>(250)
  1390. );
  1391. var subject = new MySubject();
  1392. var conn = new ConnectableObservable<int>(xs, subject);
  1393. var res = scheduler.Start(() =>
  1394. conn.RefCount()
  1395. );
  1396. res.Messages.AssertEqual(
  1397. OnNext(210, 1),
  1398. OnNext(220, 2),
  1399. OnNext(230, 3),
  1400. OnNext(240, 4),
  1401. OnCompleted<int>(250)
  1402. );
  1403. Assert.IsTrue(subject.Disposed);
  1404. }
  1405. [TestMethod]
  1406. public void RefCount_NotConnected()
  1407. {
  1408. var disconnected = false;
  1409. var count = 0;
  1410. var xs = Observable.Defer(() =>
  1411. {
  1412. count++;
  1413. return Observable.Create<int>(obs =>
  1414. {
  1415. return () => { disconnected = true; };
  1416. });
  1417. });
  1418. var subject = new MySubject();
  1419. var conn = new ConnectableObservable<int>(xs, subject);
  1420. var refd = conn.RefCount();
  1421. var dis1 = refd.Subscribe();
  1422. Assert.AreEqual(1, count);
  1423. Assert.AreEqual(1, subject.SubscribeCount);
  1424. Assert.IsFalse(disconnected);
  1425. var dis2 = refd.Subscribe();
  1426. Assert.AreEqual(1, count);
  1427. Assert.AreEqual(2, subject.SubscribeCount);
  1428. Assert.IsFalse(disconnected);
  1429. dis1.Dispose();
  1430. Assert.IsFalse(disconnected);
  1431. dis2.Dispose();
  1432. Assert.IsTrue(disconnected);
  1433. disconnected = false;
  1434. var dis3 = refd.Subscribe();
  1435. Assert.AreEqual(2, count);
  1436. Assert.AreEqual(3, subject.SubscribeCount);
  1437. Assert.IsFalse(disconnected);
  1438. dis3.Dispose();
  1439. Assert.IsTrue(disconnected);
  1440. }
  1441. [TestMethod]
  1442. public void RefCount_OnError()
  1443. {
  1444. var ex = new Exception();
  1445. var xs = Observable.Throw<int>(ex, Scheduler.Immediate);
  1446. var res = xs.Publish().RefCount();
  1447. res.Subscribe(_ => { Assert.Fail(); }, ex_ => { Assert.AreSame(ex, ex_); }, () => { Assert.Fail(); });
  1448. res.Subscribe(_ => { Assert.Fail(); }, ex_ => { Assert.AreSame(ex, ex_); }, () => { Assert.Fail(); });
  1449. }
  1450. [TestMethod]
  1451. public void RefCount_Publish()
  1452. {
  1453. var scheduler = new TestScheduler();
  1454. var xs = scheduler.CreateHotObservable<int>(
  1455. OnNext(210, 1),
  1456. OnNext(220, 2),
  1457. OnNext(230, 3),
  1458. OnNext(240, 4),
  1459. OnNext(250, 5),
  1460. OnNext(260, 6),
  1461. OnNext(270, 7),
  1462. OnNext(280, 8),
  1463. OnNext(290, 9),
  1464. OnCompleted<int>(300)
  1465. );
  1466. var res = xs.Publish().RefCount();
  1467. var d1 = default(IDisposable);
  1468. var o1 = scheduler.CreateObserver<int>();
  1469. scheduler.ScheduleAbsolute(215, () => { d1 = res.Subscribe(o1); });
  1470. scheduler.ScheduleAbsolute(235, () => { d1.Dispose(); });
  1471. var d2 = default(IDisposable);
  1472. var o2 = scheduler.CreateObserver<int>();
  1473. scheduler.ScheduleAbsolute(225, () => { d2 = res.Subscribe(o2); });
  1474. scheduler.ScheduleAbsolute(275, () => { d2.Dispose(); });
  1475. var d3 = default(IDisposable);
  1476. var o3 = scheduler.CreateObserver<int>();
  1477. scheduler.ScheduleAbsolute(255, () => { d3 = res.Subscribe(o3); });
  1478. scheduler.ScheduleAbsolute(265, () => { d3.Dispose(); });
  1479. var d4 = default(IDisposable);
  1480. var o4 = scheduler.CreateObserver<int>();
  1481. scheduler.ScheduleAbsolute(285, () => { d4 = res.Subscribe(o4); });
  1482. scheduler.ScheduleAbsolute(320, () => { d4.Dispose(); });
  1483. scheduler.Start();
  1484. o1.Messages.AssertEqual(
  1485. OnNext(220, 2),
  1486. OnNext(230, 3)
  1487. );
  1488. o2.Messages.AssertEqual(
  1489. OnNext(230, 3),
  1490. OnNext(240, 4),
  1491. OnNext(250, 5),
  1492. OnNext(260, 6),
  1493. OnNext(270, 7)
  1494. );
  1495. o3.Messages.AssertEqual(
  1496. OnNext(260, 6)
  1497. );
  1498. o4.Messages.AssertEqual(
  1499. OnNext(290, 9),
  1500. OnCompleted<int>(300)
  1501. );
  1502. xs.Subscriptions.AssertEqual(
  1503. Subscribe(215, 275),
  1504. Subscribe(285, 300)
  1505. );
  1506. }
  1507. #endregion
  1508. #region Replay
  1509. [TestMethod]
  1510. public void Replay_ArgumentChecking()
  1511. {
  1512. var someObservable = Observable.Empty<int>();
  1513. var scheduler = new TestScheduler();
  1514. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>)));
  1515. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>), x => x));
  1516. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay<int, int>(someObservable, null));
  1517. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay<int>(null, DummyScheduler.Instance));
  1518. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay<int>(DummyObservable<int>.Instance, (IScheduler)null));
  1519. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay<int, int>(null, DummyFunc<IObservable<int>, IObservable<int>>.Instance, DummyScheduler.Instance));
  1520. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay<int, int>(DummyObservable<int>.Instance, null, DummyScheduler.Instance));
  1521. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay<int, int>(DummyObservable<int>.Instance, DummyFunc<IObservable<int>, IObservable<int>>.Instance, null));
  1522. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>), TimeSpan.FromSeconds(1)));
  1523. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, TimeSpan.FromSeconds(-1)));
  1524. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>), x => x, TimeSpan.FromSeconds(1)));
  1525. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay<int, int>(someObservable, null, TimeSpan.FromSeconds(1)));
  1526. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay<int, int>(someObservable, x => x, TimeSpan.FromSeconds(-1)));
  1527. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>), TimeSpan.FromSeconds(1), scheduler));
  1528. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, TimeSpan.FromSeconds(-1), scheduler));
  1529. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(someObservable, TimeSpan.FromSeconds(1), default(IScheduler)));
  1530. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>), x => x, TimeSpan.FromSeconds(1), scheduler));
  1531. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay<int, int>(someObservable, null, TimeSpan.FromSeconds(1), scheduler));
  1532. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, x => x, TimeSpan.FromSeconds(-1), scheduler));
  1533. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(someObservable, x => x, TimeSpan.FromSeconds(1), default(IScheduler)));
  1534. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>), 1, scheduler));
  1535. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, -2, scheduler));
  1536. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(someObservable, 1, default(IScheduler)));
  1537. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>), x => x, 1, scheduler));
  1538. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay<int, int>(someObservable, null, -2, scheduler));
  1539. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, x => x, -2, scheduler));
  1540. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(someObservable, x => x, 1, default(IScheduler)));
  1541. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>), 1));
  1542. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, -2));
  1543. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>), x => x, 1));
  1544. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay<int, int>(someObservable, null, 1));
  1545. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, x => x, -2));
  1546. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>), 1, TimeSpan.FromSeconds(1)));
  1547. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, -2, TimeSpan.FromSeconds(1)));
  1548. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, 1, TimeSpan.FromSeconds(-1)));
  1549. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>), x => x, 1, TimeSpan.FromSeconds(1)));
  1550. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay<int, int>(someObservable, null, 1, TimeSpan.FromSeconds(1)));
  1551. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, x => x, -2, TimeSpan.FromSeconds(1)));
  1552. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, x => x, 1, TimeSpan.FromSeconds(-1)));
  1553. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>), 1, TimeSpan.FromSeconds(1), scheduler));
  1554. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, -2, TimeSpan.FromSeconds(1), scheduler));
  1555. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, 1, TimeSpan.FromSeconds(-1), scheduler));
  1556. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(someObservable, 1, TimeSpan.FromSeconds(1), null));
  1557. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>), x => x, 1, TimeSpan.FromSeconds(1), scheduler));
  1558. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay<int, int>(someObservable, null, 1, TimeSpan.FromSeconds(1), scheduler));
  1559. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, x => x, -2, TimeSpan.FromSeconds(1), scheduler));
  1560. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, x => x, 1, TimeSpan.FromSeconds(-1), scheduler));
  1561. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(someObservable, x => x, 1, TimeSpan.FromSeconds(1), null));
  1562. }
  1563. [TestMethod]
  1564. public void ReplayCount_Basic()
  1565. {
  1566. var scheduler = new TestScheduler();
  1567. var xs = scheduler.CreateHotObservable(
  1568. OnNext(110, 7),
  1569. OnNext(220, 3),
  1570. OnNext(280, 4),
  1571. OnNext(290, 1),
  1572. OnNext(340, 8),
  1573. OnNext(360, 5),
  1574. OnNext(370, 6),
  1575. OnNext(390, 7),
  1576. OnNext(410, 13),
  1577. OnNext(430, 2),
  1578. OnNext(450, 9),
  1579. OnNext(520, 11),
  1580. OnNext(560, 20),
  1581. OnCompleted<int>(600)
  1582. );
  1583. var ys = default(IConnectableObservable<int>);
  1584. var subscription = default(IDisposable);
  1585. var connection = default(IDisposable);
  1586. var res = scheduler.CreateObserver<int>();
  1587. scheduler.ScheduleAbsolute(Created, () => ys = xs.Replay(3, scheduler));
  1588. scheduler.ScheduleAbsolute(450, () => subscription = ys.Subscribe(res));
  1589. scheduler.ScheduleAbsolute(Disposed, () => subscription.Dispose());
  1590. scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
  1591. scheduler.ScheduleAbsolute(400, () => connection.Dispose());
  1592. scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
  1593. scheduler.ScheduleAbsolute(550, () => connection.Dispose());
  1594. scheduler.ScheduleAbsolute(650, () => connection = ys.Connect());
  1595. scheduler.ScheduleAbsolute(800, () => connection.Dispose());
  1596. scheduler.Start();
  1597. res.Messages.AssertEqual(
  1598. OnNext(451, 5),
  1599. OnNext(452, 6),
  1600. OnNext(453, 7),
  1601. OnNext(521, 11)
  1602. );
  1603. xs.Subscriptions.AssertEqual(
  1604. Subscribe(300, 400),
  1605. Subscribe(500, 550),
  1606. Subscribe(650, 800)
  1607. );
  1608. }
  1609. [TestMethod]
  1610. public void ReplayCount_Error()
  1611. {
  1612. var scheduler = new TestScheduler();
  1613. var ex = new Exception();
  1614. var xs = scheduler.CreateHotObservable(
  1615. OnNext(110, 7),
  1616. OnNext(220, 3),
  1617. OnNext(280, 4),
  1618. OnNext(290, 1),
  1619. OnNext(340, 8),
  1620. OnNext(360, 5),
  1621. OnNext(370, 6),
  1622. OnNext(390, 7),
  1623. OnNext(410, 13),
  1624. OnNext(430, 2),
  1625. OnNext(450, 9),
  1626. OnNext(520, 11),
  1627. OnNext(560, 20),
  1628. OnError<int>(600, ex)
  1629. );
  1630. var ys = default(IConnectableObservable<int>);
  1631. var subscription = default(IDisposable);
  1632. var connection = default(IDisposable);
  1633. var res = scheduler.CreateObserver<int>();
  1634. scheduler.ScheduleAbsolute(Created, () => ys = xs.Replay(3, scheduler));
  1635. scheduler.ScheduleAbsolute(450, () => subscription = ys.Subscribe(res));
  1636. scheduler.ScheduleAbsolute(Disposed, () => subscription.Dispose());
  1637. scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
  1638. scheduler.ScheduleAbsolute(400, () => connection.Dispose());
  1639. scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
  1640. scheduler.ScheduleAbsolute(800, () => connection.Dispose());
  1641. scheduler.Start();
  1642. res.Messages.AssertEqual(
  1643. OnNext(451, 5),
  1644. OnNext(452, 6),
  1645. OnNext(453, 7),
  1646. OnNext(521, 11),
  1647. OnNext(561, 20),
  1648. OnError<int>(601, ex)
  1649. );
  1650. xs.Subscriptions.AssertEqual(
  1651. Subscribe(300, 400),
  1652. Subscribe(500, 600)
  1653. );
  1654. }
  1655. [TestMethod]
  1656. public void ReplayCount_Complete()
  1657. {
  1658. var scheduler = new TestScheduler();
  1659. var xs = scheduler.CreateHotObservable(
  1660. OnNext(110, 7),
  1661. OnNext(220, 3),
  1662. OnNext(280, 4),
  1663. OnNext(290, 1),
  1664. OnNext(340, 8),
  1665. OnNext(360, 5),
  1666. OnNext(370, 6),
  1667. OnNext(390, 7),
  1668. OnNext(410, 13),
  1669. OnNext(430, 2),
  1670. OnNext(450, 9),
  1671. OnNext(520, 11),
  1672. OnNext(560, 20),
  1673. OnCompleted<int>(600)
  1674. );
  1675. var ys = default(IConnectableObservable<int>);
  1676. var subscription = default(IDisposable);
  1677. var connection = default(IDisposable);
  1678. var res = scheduler.CreateObserver<int>();
  1679. scheduler.ScheduleAbsolute(Created, () => ys = xs.Replay(3, scheduler));
  1680. scheduler.ScheduleAbsolute(450, () => subscription = ys.Subscribe(res));
  1681. scheduler.ScheduleAbsolute(Disposed, () => subscription.Dispose());
  1682. scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
  1683. scheduler.ScheduleAbsolute(400, () => connection.Dispose());
  1684. scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
  1685. scheduler.ScheduleAbsolute(800, () => connection.Dispose());
  1686. scheduler.Start();
  1687. res.Messages.AssertEqual(
  1688. OnNext(451, 5),
  1689. OnNext(452, 6),
  1690. OnNext(453, 7),
  1691. OnNext(521, 11),
  1692. OnNext(561, 20),
  1693. OnCompleted<int>(601)
  1694. );
  1695. xs.Subscriptions.AssertEqual(
  1696. Subscribe(300, 400),
  1697. Subscribe(500, 600)
  1698. );
  1699. }
  1700. [TestMethod]
  1701. public void ReplayCount_Dispose()
  1702. {
  1703. var scheduler = new TestScheduler();
  1704. var xs = scheduler.CreateHotObservable(
  1705. OnNext(110, 7),
  1706. OnNext(220, 3),
  1707. OnNext(280, 4),
  1708. OnNext(290, 1),
  1709. OnNext(340, 8),
  1710. OnNext(360, 5),
  1711. OnNext(370, 6),
  1712. OnNext(390, 7),
  1713. OnNext(410, 13),
  1714. OnNext(430, 2),
  1715. OnNext(450, 9),
  1716. OnNext(520, 11),
  1717. OnNext(560, 20),
  1718. OnCompleted<int>(600)
  1719. );
  1720. var ys = default(IConnectableObservable<int>);
  1721. var subscription = default(IDisposable);
  1722. var connection = default(IDisposable);
  1723. var res = scheduler.CreateObserver<int>();
  1724. scheduler.ScheduleAbsolute(Created, () => ys = xs.Replay(3, scheduler));
  1725. scheduler.ScheduleAbsolute(450, () => subscription = ys.Subscribe(res));
  1726. scheduler.ScheduleAbsolute(475, () => subscription.Dispose());
  1727. scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
  1728. scheduler.ScheduleAbsolute(400, () => connection.Dispose());
  1729. scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
  1730. scheduler.ScheduleAbsolute(550, () => connection.Dispose());
  1731. scheduler.ScheduleAbsolute(650, () => connection = ys.Connect());
  1732. scheduler.ScheduleAbsolute(800, () => connection.Dispose());
  1733. scheduler.Start();
  1734. res.Messages.AssertEqual(
  1735. OnNext(451, 5),
  1736. OnNext(452, 6),
  1737. OnNext(453, 7)
  1738. );
  1739. xs.Subscriptions.AssertEqual(
  1740. Subscribe(300, 400),
  1741. Subscribe(500, 550),
  1742. Subscribe(650, 800)
  1743. );
  1744. }
  1745. [TestMethod]
  1746. public void ReplayCount_MultipleConnections()
  1747. {
  1748. var xs = Observable.Never<int>();
  1749. var ys = xs.Replay(3, new TestScheduler());
  1750. var connection1 = ys.Connect();
  1751. var connection2 = ys.Connect();
  1752. Assert.AreSame(connection1, connection2);
  1753. connection1.Dispose();
  1754. connection2.Dispose();
  1755. var connection3 = ys.Connect();
  1756. Assert.AreNotSame(connection1, connection3);
  1757. connection3.Dispose();
  1758. }
  1759. [TestMethod]
  1760. public void ReplayCountLambda_Zip_Complete()
  1761. {
  1762. var scheduler = new TestScheduler();
  1763. var xs = scheduler.CreateHotObservable(
  1764. OnNext(110, 7),
  1765. OnNext(220, 3),
  1766. OnNext(280, 4),
  1767. OnNext(290, 1),
  1768. OnNext(340, 8),
  1769. OnNext(360, 5),
  1770. OnNext(370, 6),
  1771. OnNext(390, 7),
  1772. OnNext(410, 13),
  1773. OnNext(430, 2),
  1774. OnNext(450, 9),
  1775. OnNext(520, 11),
  1776. OnNext(560, 20),
  1777. OnCompleted<int>(600)
  1778. );
  1779. var res = scheduler.Start(() =>
  1780. xs.Replay(_xs => _xs.Take(6).Repeat(), 3, scheduler),
  1781. 610
  1782. );
  1783. res.Messages.AssertEqual(
  1784. OnNext(221, 3),
  1785. OnNext(281, 4),
  1786. OnNext(291, 1),
  1787. OnNext(341, 8),
  1788. OnNext(361, 5),
  1789. OnNext(371, 6),
  1790. OnNext(372, 8),
  1791. OnNext(373, 5),
  1792. OnNext(374, 6),
  1793. OnNext(391, 7),
  1794. OnNext(411, 13),
  1795. OnNext(431, 2),
  1796. OnNext(432, 7),
  1797. OnNext(433, 13),
  1798. OnNext(434, 2),
  1799. OnNext(451, 9),
  1800. OnNext(521, 11),
  1801. OnNext(561, 20),
  1802. OnNext(562, 9),
  1803. OnNext(563, 11),
  1804. OnNext(564, 20),
  1805. OnNext(602, 9),
  1806. OnNext(603, 11),
  1807. OnNext(604, 20),
  1808. OnNext(606, 9),
  1809. OnNext(607, 11),
  1810. OnNext(608, 20)
  1811. );
  1812. xs.Subscriptions.AssertEqual(
  1813. Subscribe(200, 600)
  1814. );
  1815. }
  1816. [TestMethod]
  1817. public void ReplayCountLambda_Zip_Error()
  1818. {
  1819. var scheduler = new TestScheduler();
  1820. var ex = new Exception();
  1821. var xs = scheduler.CreateHotObservable(
  1822. OnNext(110, 7),
  1823. OnNext(220, 3),
  1824. OnNext(280, 4),
  1825. OnNext(290, 1),
  1826. OnNext(340, 8),
  1827. OnNext(360, 5),
  1828. OnNext(370, 6),
  1829. OnNext(390, 7),
  1830. OnNext(410, 13),
  1831. OnNext(430, 2),
  1832. OnNext(450, 9),
  1833. OnNext(520, 11),
  1834. OnNext(560, 20),
  1835. OnError<int>(600, ex)
  1836. );
  1837. var res = scheduler.Start(() =>
  1838. xs.Replay(_xs => _xs.Take(6).Repeat(), 3, scheduler)
  1839. );
  1840. res.Messages.AssertEqual(
  1841. OnNext(221, 3),
  1842. OnNext(281, 4),
  1843. OnNext(291, 1),
  1844. OnNext(341, 8),
  1845. OnNext(361, 5),
  1846. OnNext(371, 6),
  1847. OnNext(372, 8),
  1848. OnNext(373, 5),
  1849. OnNext(374, 6),
  1850. OnNext(391, 7),
  1851. OnNext(411, 13),
  1852. OnNext(431, 2),
  1853. OnNext(432, 7),
  1854. OnNext(433, 13),
  1855. OnNext(434, 2),
  1856. OnNext(451, 9),
  1857. OnNext(521, 11),
  1858. OnNext(561, 20),
  1859. OnNext(562, 9),
  1860. OnNext(563, 11),
  1861. OnNext(564, 20),
  1862. OnError<int>(601, ex)
  1863. );
  1864. xs.Subscriptions.AssertEqual(
  1865. Subscribe(200, 600)
  1866. );
  1867. }
  1868. [TestMethod]
  1869. public void ReplayCountLambda_Zip_Dispose()
  1870. {
  1871. var scheduler = new TestScheduler();
  1872. var xs = scheduler.CreateHotObservable(
  1873. OnNext(110, 7),
  1874. OnNext(220, 3),
  1875. OnNext(280, 4),
  1876. OnNext(290, 1),
  1877. OnNext(340, 8),
  1878. OnNext(360, 5),
  1879. OnNext(370, 6),
  1880. OnNext(390, 7),
  1881. OnNext(410, 13),
  1882. OnNext(430, 2),
  1883. OnNext(450, 9),
  1884. OnNext(520, 11),
  1885. OnNext(560, 20),
  1886. OnCompleted<int>(600)
  1887. );
  1888. var res = scheduler.Start(() =>
  1889. xs.Replay(_xs => _xs.Take(6).Repeat(), 3, scheduler),
  1890. 470
  1891. );
  1892. res.Messages.AssertEqual(
  1893. OnNext(221, 3),
  1894. OnNext(281, 4),
  1895. OnNext(291, 1),
  1896. OnNext(341, 8),
  1897. OnNext(361, 5),
  1898. OnNext(371, 6),
  1899. OnNext(372, 8),
  1900. OnNext(373, 5),
  1901. OnNext(374, 6),
  1902. OnNext(391, 7),
  1903. OnNext(411, 13),
  1904. OnNext(431, 2),
  1905. OnNext(432, 7),
  1906. OnNext(433, 13),
  1907. OnNext(434, 2),
  1908. OnNext(451, 9)
  1909. );
  1910. xs.Subscriptions.AssertEqual(
  1911. Subscribe(200, 470)
  1912. );
  1913. }
  1914. [TestMethod]
  1915. public void ReplayTime_Basic()
  1916. {
  1917. var scheduler = new TestScheduler();
  1918. var xs = scheduler.CreateHotObservable(
  1919. OnNext(110, 7),
  1920. OnNext(220, 3),
  1921. OnNext(280, 4),
  1922. OnNext(290, 1),
  1923. OnNext(340, 8),
  1924. OnNext(360, 5),
  1925. OnNext(370, 6),
  1926. OnNext(390, 7),
  1927. OnNext(410, 13),
  1928. OnNext(430, 2),
  1929. OnNext(450, 9),
  1930. OnNext(520, 11),
  1931. OnNext(560, 20),
  1932. OnCompleted<int>(600)
  1933. );
  1934. var ys = default(IConnectableObservable<int>);
  1935. var subscription = default(IDisposable);
  1936. var connection = default(IDisposable);
  1937. var res = scheduler.CreateObserver<int>();
  1938. scheduler.ScheduleAbsolute(Created, () => ys = xs.Replay(TimeSpan.FromTicks(150), scheduler));
  1939. scheduler.ScheduleAbsolute(450, () => subscription = ys.Subscribe(res));
  1940. scheduler.ScheduleAbsolute(Disposed, () => subscription.Dispose());
  1941. scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
  1942. scheduler.ScheduleAbsolute(400, () => connection.Dispose());
  1943. scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
  1944. scheduler.ScheduleAbsolute(550, () => connection.Dispose());
  1945. scheduler.ScheduleAbsolute(650, () => connection = ys.Connect());
  1946. scheduler.ScheduleAbsolute(800, () => connection.Dispose());
  1947. scheduler.Start();
  1948. res.Messages.AssertEqual(
  1949. OnNext(451, 8),
  1950. OnNext(452, 5),
  1951. OnNext(453, 6),
  1952. OnNext(454, 7),
  1953. OnNext(521, 11)
  1954. );
  1955. xs.Subscriptions.AssertEqual(
  1956. Subscribe(300, 400),
  1957. Subscribe(500, 550),
  1958. Subscribe(650, 800)
  1959. );
  1960. }
  1961. [TestMethod]
  1962. public void ReplayTime_Error()
  1963. {
  1964. var scheduler = new TestScheduler();
  1965. var ex = new Exception();
  1966. var xs = scheduler.CreateHotObservable(
  1967. OnNext(110, 7),
  1968. OnNext(220, 3),
  1969. OnNext(280, 4),
  1970. OnNext(290, 1),
  1971. OnNext(340, 8),
  1972. OnNext(360, 5),
  1973. OnNext(370, 6),
  1974. OnNext(390, 7),
  1975. OnNext(410, 13),
  1976. OnNext(430, 2),
  1977. OnNext(450, 9),
  1978. OnNext(520, 11),
  1979. OnNext(560, 20),
  1980. OnError<int>(600, ex)
  1981. );
  1982. var ys = default(IConnectableObservable<int>);
  1983. var subscription = default(IDisposable);
  1984. var connection = default(IDisposable);
  1985. var res = scheduler.CreateObserver<int>();
  1986. scheduler.ScheduleAbsolute(Created, () => ys = xs.Replay(TimeSpan.FromTicks(75), scheduler));
  1987. scheduler.ScheduleAbsolute(450, () => subscription = ys.Subscribe(res));
  1988. scheduler.ScheduleAbsolute(Disposed, () => subscription.Dispose());
  1989. scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
  1990. scheduler.ScheduleAbsolute(400, () => connection.Dispose());
  1991. scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
  1992. scheduler.ScheduleAbsolute(800, () => connection.Dispose());
  1993. scheduler.Start();
  1994. res.Messages.AssertEqual(
  1995. OnNext(451, 7),
  1996. OnNext(521, 11),
  1997. OnNext(561, 20),
  1998. OnError<int>(601, ex)
  1999. );
  2000. xs.Subscriptions.AssertEqual(
  2001. Subscribe(300, 400),
  2002. Subscribe(500, 600)
  2003. );
  2004. }
  2005. [TestMethod]
  2006. public void ReplayTime_Complete()
  2007. {
  2008. var scheduler = new TestScheduler();
  2009. var xs = scheduler.CreateHotObservable(
  2010. OnNext(110, 7),
  2011. OnNext(220, 3),
  2012. OnNext(280, 4),
  2013. OnNext(290, 1),
  2014. OnNext(340, 8),
  2015. OnNext(360, 5),
  2016. OnNext(370, 6),
  2017. OnNext(390, 7),
  2018. OnNext(410, 13),
  2019. OnNext(430, 2),
  2020. OnNext(450, 9),
  2021. OnNext(520, 11),
  2022. OnNext(560, 20),
  2023. OnCompleted<int>(600)
  2024. );
  2025. var ys = default(IConnectableObservable<int>);
  2026. var subscription = default(IDisposable);
  2027. var connection = default(IDisposable);
  2028. var res = scheduler.CreateObserver<int>();
  2029. scheduler.ScheduleAbsolute(Created, () => ys = xs.Replay(TimeSpan.FromTicks(85), scheduler));
  2030. scheduler.ScheduleAbsolute(450, () => subscription = ys.Subscribe(res));
  2031. scheduler.ScheduleAbsolute(Disposed, () => subscription.Dispose());
  2032. scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
  2033. scheduler.ScheduleAbsolute(400, () => connection.Dispose());
  2034. scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
  2035. scheduler.ScheduleAbsolute(800, () => connection.Dispose());
  2036. scheduler.Start();
  2037. res.Messages.AssertEqual(
  2038. OnNext(451, 6),
  2039. OnNext(452, 7),
  2040. OnNext(521, 11),
  2041. OnNext(561, 20),
  2042. OnCompleted<int>(601)
  2043. );
  2044. xs.Subscriptions.AssertEqual(
  2045. Subscribe(300, 400),
  2046. Subscribe(500, 600)
  2047. );
  2048. }
  2049. [TestMethod]
  2050. public void ReplayTime_Dispose()
  2051. {
  2052. var scheduler = new TestScheduler();
  2053. var xs = scheduler.CreateHotObservable(
  2054. OnNext(110, 7),
  2055. OnNext(220, 3),
  2056. OnNext(280, 4),
  2057. OnNext(290, 1),
  2058. OnNext(340, 8),
  2059. OnNext(360, 5),
  2060. OnNext(370, 6),
  2061. OnNext(390, 7),
  2062. OnNext(410, 13),
  2063. OnNext(430, 2),
  2064. OnNext(450, 9),
  2065. OnNext(520, 11),
  2066. OnNext(560, 20),
  2067. OnCompleted<int>(600)
  2068. );
  2069. var ys = default(IConnectableObservable<int>);
  2070. var subscription = default(IDisposable);
  2071. var connection = default(IDisposable);
  2072. var res = scheduler.CreateObserver<int>();
  2073. scheduler.ScheduleAbsolute(Created, () => ys = xs.Replay(TimeSpan.FromTicks(100), scheduler));
  2074. scheduler.ScheduleAbsolute(450, () => subscription = ys.Subscribe(res));
  2075. scheduler.ScheduleAbsolute(475, () => subscription.Dispose());
  2076. scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
  2077. scheduler.ScheduleAbsolute(400, () => connection.Dispose());
  2078. scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
  2079. scheduler.ScheduleAbsolute(550, () => connection.Dispose());
  2080. scheduler.ScheduleAbsolute(650, () => connection = ys.Connect());
  2081. scheduler.ScheduleAbsolute(800, () => connection.Dispose());
  2082. scheduler.Start();
  2083. res.Messages.AssertEqual(
  2084. OnNext(451, 5),
  2085. OnNext(452, 6),
  2086. OnNext(453, 7)
  2087. );
  2088. xs.Subscriptions.AssertEqual(
  2089. Subscribe(300, 400),
  2090. Subscribe(500, 550),
  2091. Subscribe(650, 800)
  2092. );
  2093. }
  2094. [TestMethod]
  2095. public void ReplayTime_MultipleConnections()
  2096. {
  2097. var xs = Observable.Never<int>();
  2098. var ys = xs.Replay(TimeSpan.FromTicks(100), new TestScheduler());
  2099. var connection1 = ys.Connect();
  2100. var connection2 = ys.Connect();
  2101. Assert.AreSame(connection1, connection2);
  2102. connection1.Dispose();
  2103. connection2.Dispose();
  2104. var connection3 = ys.Connect();
  2105. Assert.AreNotSame(connection1, connection3);
  2106. connection3.Dispose();
  2107. }
  2108. [TestMethod]
  2109. public void ReplayTimeLambda_Zip_Complete()
  2110. {
  2111. var scheduler = new TestScheduler();
  2112. var xs = scheduler.CreateHotObservable(
  2113. OnNext(110, 7),
  2114. OnNext(220, 3),
  2115. OnNext(280, 4),
  2116. OnNext(290, 1),
  2117. OnNext(340, 8),
  2118. OnNext(360, 5),
  2119. OnNext(370, 6),
  2120. OnNext(390, 7),
  2121. OnNext(410, 13),
  2122. OnNext(430, 2),
  2123. OnNext(450, 9),
  2124. OnNext(520, 11),
  2125. OnNext(560, 20),
  2126. OnCompleted<int>(600)
  2127. );
  2128. var res = scheduler.Start(() =>
  2129. xs.Replay(_xs => _xs.Take(6).Repeat(), TimeSpan.FromTicks(50), scheduler),
  2130. 610
  2131. );
  2132. res.Messages.AssertEqual(
  2133. OnNext(221, 3),
  2134. OnNext(281, 4),
  2135. OnNext(291, 1),
  2136. OnNext(341, 8),
  2137. OnNext(361, 5),
  2138. OnNext(371, 6),
  2139. OnNext(372, 8),
  2140. OnNext(373, 5),
  2141. OnNext(374, 6),
  2142. OnNext(391, 7),
  2143. OnNext(411, 13),
  2144. OnNext(431, 2),
  2145. OnNext(432, 7),
  2146. OnNext(433, 13),
  2147. OnNext(434, 2),
  2148. OnNext(451, 9),
  2149. OnNext(521, 11),
  2150. OnNext(561, 20),
  2151. OnNext(562, 11),
  2152. OnNext(563, 20),
  2153. OnNext(602, 20),
  2154. OnNext(604, 20),
  2155. OnNext(606, 20),
  2156. OnNext(608, 20)
  2157. );
  2158. xs.Subscriptions.AssertEqual(
  2159. Subscribe(200, 600)
  2160. );
  2161. }
  2162. [TestMethod]
  2163. public void ReplayTimeLambda_Zip_Error()
  2164. {
  2165. var scheduler = new TestScheduler();
  2166. var ex = new Exception();
  2167. var xs = scheduler.CreateHotObservable(
  2168. OnNext(110, 7),
  2169. OnNext(220, 3),
  2170. OnNext(280, 4),
  2171. OnNext(290, 1),
  2172. OnNext(340, 8),
  2173. OnNext(360, 5),
  2174. OnNext(370, 6),
  2175. OnNext(390, 7),
  2176. OnNext(410, 13),
  2177. OnNext(430, 2),
  2178. OnNext(450, 9),
  2179. OnNext(520, 11),
  2180. OnNext(560, 20),
  2181. OnError<int>(600, ex)
  2182. );
  2183. var res = scheduler.Start(() =>
  2184. xs.Replay(_xs => _xs.Take(6).Repeat(), TimeSpan.FromTicks(50), scheduler)
  2185. );
  2186. res.Messages.AssertEqual(
  2187. OnNext(221, 3),
  2188. OnNext(281, 4),
  2189. OnNext(291, 1),
  2190. OnNext(341, 8),
  2191. OnNext(361, 5),
  2192. OnNext(371, 6),
  2193. OnNext(372, 8),
  2194. OnNext(373, 5),
  2195. OnNext(374, 6),
  2196. OnNext(391, 7),
  2197. OnNext(411, 13),
  2198. OnNext(431, 2),
  2199. OnNext(432, 7),
  2200. OnNext(433, 13),
  2201. OnNext(434, 2),
  2202. OnNext(451, 9),
  2203. OnNext(521, 11),
  2204. OnNext(561, 20),
  2205. OnNext(562, 11),
  2206. OnNext(563, 20),
  2207. OnError<int>(601, ex)
  2208. );
  2209. xs.Subscriptions.AssertEqual(
  2210. Subscribe(200, 600)
  2211. );
  2212. }
  2213. [TestMethod]
  2214. public void ReplayTimeLambda_Zip_Dispose()
  2215. {
  2216. var scheduler = new TestScheduler();
  2217. var xs = scheduler.CreateHotObservable(
  2218. OnNext(110, 7),
  2219. OnNext(220, 3),
  2220. OnNext(280, 4),
  2221. OnNext(290, 1),
  2222. OnNext(340, 8),
  2223. OnNext(360, 5),
  2224. OnNext(370, 6),
  2225. OnNext(390, 7),
  2226. OnNext(410, 13),
  2227. OnNext(430, 2),
  2228. OnNext(450, 9),
  2229. OnNext(520, 11),
  2230. OnNext(560, 20),
  2231. OnCompleted<int>(600)
  2232. );
  2233. var res = scheduler.Start(() =>
  2234. xs.Replay(_xs => _xs.Take(6).Repeat(), TimeSpan.FromTicks(50), scheduler),
  2235. 470
  2236. );
  2237. res.Messages.AssertEqual(
  2238. OnNext(221, 3),
  2239. OnNext(281, 4),
  2240. OnNext(291, 1),
  2241. OnNext(341, 8),
  2242. OnNext(361, 5),
  2243. OnNext(371, 6),
  2244. OnNext(372, 8),
  2245. OnNext(373, 5),
  2246. OnNext(374, 6),
  2247. OnNext(391, 7),
  2248. OnNext(411, 13),
  2249. OnNext(431, 2),
  2250. OnNext(432, 7),
  2251. OnNext(433, 13),
  2252. OnNext(434, 2),
  2253. OnNext(451, 9)
  2254. );
  2255. xs.Subscriptions.AssertEqual(
  2256. Subscribe(200, 470)
  2257. );
  2258. }
  2259. [TestMethod]
  2260. public void Replay_Default1()
  2261. {
  2262. var s = new Subject<int>();
  2263. var xs = s.Replay(100, DefaultScheduler.Instance);
  2264. var ys = s.Replay(100);
  2265. xs.Connect();
  2266. ys.Connect();
  2267. s.OnNext(1);
  2268. s.OnNext(2);
  2269. s.OnCompleted();
  2270. xs.AssertEqual(ys);
  2271. }
  2272. [TestMethod]
  2273. public void Replay_Default2()
  2274. {
  2275. var s = new Subject<int>();
  2276. var xs = s.Replay(TimeSpan.FromHours(1), DefaultScheduler.Instance);
  2277. var ys = s.Replay(TimeSpan.FromHours(1));
  2278. xs.Connect();
  2279. ys.Connect();
  2280. s.OnNext(1);
  2281. s.OnNext(2);
  2282. s.OnCompleted();
  2283. xs.AssertEqual(ys);
  2284. }
  2285. [TestMethod]
  2286. public void Replay_Default3()
  2287. {
  2288. var s = new Subject<int>();
  2289. var xs = s.Replay(100, TimeSpan.FromHours(1), DefaultScheduler.Instance);
  2290. var ys = s.Replay(100, TimeSpan.FromHours(1));
  2291. xs.Connect();
  2292. ys.Connect();
  2293. s.OnNext(1);
  2294. s.OnNext(2);
  2295. s.OnCompleted();
  2296. xs.AssertEqual(ys);
  2297. }
  2298. [TestMethod]
  2299. public void Replay_Default4()
  2300. {
  2301. var s = new Subject<int>();
  2302. var xs = s.Replay(DefaultScheduler.Instance);
  2303. var ys = s.Replay();
  2304. xs.Connect();
  2305. ys.Connect();
  2306. s.OnNext(1);
  2307. s.OnNext(2);
  2308. s.OnCompleted();
  2309. xs.AssertEqual(ys);
  2310. }
  2311. [TestMethod]
  2312. public void ReplayLambda_Default1()
  2313. {
  2314. var xs = Observable.Range(1, 10).Replay(_xs => _xs, 100, DefaultScheduler.Instance);
  2315. var ys = Observable.Range(1, 10).Replay(_xs => _xs, 100);
  2316. xs.AssertEqual(ys);
  2317. }
  2318. [TestMethod]
  2319. public void ReplayLambda_Default2()
  2320. {
  2321. var xs = Observable.Range(1, 10).Replay(_xs => _xs, TimeSpan.FromHours(1), DefaultScheduler.Instance);
  2322. var ys = Observable.Range(1, 10).Replay(_xs => _xs, TimeSpan.FromHours(1));
  2323. xs.AssertEqual(ys);
  2324. }
  2325. [TestMethod]
  2326. public void ReplayLambda_Default3()
  2327. {
  2328. var xs = Observable.Range(1, 10).Replay(_xs => _xs, 100, TimeSpan.FromHours(1), DefaultScheduler.Instance);
  2329. var ys = Observable.Range(1, 10).Replay(_xs => _xs, 100, TimeSpan.FromHours(1));
  2330. xs.AssertEqual(ys);
  2331. }
  2332. [TestMethod]
  2333. public void ReplayLambda_Default4()
  2334. {
  2335. var xs = Observable.Range(1, 10).Replay(_xs => _xs, DefaultScheduler.Instance);
  2336. var ys = Observable.Range(1, 10).Replay(_xs => _xs);
  2337. xs.AssertEqual(ys);
  2338. }
  2339. #endregion
  2340. }
  2341. }