GroupByUntilTest.cs 133 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094309530963097309830993100310131023103310431053106310731083109311031113112311331143115311631173118311931203121312231233124312531263127312831293130313131323133313431353136313731383139314031413142314331443145314631473148314931503151315231533154315531563157315831593160316131623163316431653166316731683169317031713172317331743175317631773178317931803181318231833184318531863187318831893190319131923193319431953196319731983199320032013202320332043205320632073208320932103211321232133214321532163217321832193220322132223223322432253226322732283229323032313232323332343235323632373238323932403241324232433244324532463247324832493250325132523253325432553256325732583259326032613262326332643265326632673268326932703271327232733274327532763277327832793280328132823283328432853286328732883289329032913292329332943295329632973298329933003301330233033304330533063307330833093310331133123313331433153316331733183319332033213322332333243325332633273328332933303331333233333334333533363337333833393340334133423343334433453346334733483349335033513352335333543355335633573358335933603361336233633364336533663367336833693370337133723373337433753376337733783379338033813382338333843385338633873388338933903391339233933394339533963397339833993400340134023403340434053406340734083409341034113412341334143415341634173418341934203421342234233424342534263427342834293430343134323433343434353436343734383439344034413442344334443445344634473448344934503451345234533454345534563457345834593460346134623463346434653466346734683469347034713472347334743475347634773478347934803481348234833484348534863487348834893490349134923493349434953496349734983499350035013502350335043505350635073508350935103511351235133514351535163517351835193520352135223523352435253526352735283529353035313532353335343535353635373538353935403541354235433544354535463547354835493550355135523553355435553556355735583559356035613562356335643565356635673568356935703571357235733574357535763577357835793580358135823583358435853586358735883589359035913592359335943595359635973598359936003601360236033604360536063607360836093610361136123613361436153616361736183619362036213622362336243625362636273628362936303631363236333634363536363637363836393640364136423643364436453646364736483649365036513652365336543655365636573658365936603661366236633664366536663667366836693670367136723673367436753676367736783679368036813682368336843685368636873688368936903691369236933694369536963697369836993700370137023703370437053706370737083709371037113712371337143715371637173718371937203721372237233724372537263727372837293730373137323733373437353736373737383739374037413742374337443745374637473748374937503751375237533754375537563757375837593760376137623763
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Text;
  8. using System.Threading.Tasks;
  9. using System.Reactive;
  10. using System.Reactive.Concurrency;
  11. using System.Reactive.Linq;
  12. using Microsoft.Reactive.Testing;
  13. using Xunit;
  14. using ReactiveTests.Dummies;
  15. using System.Reflection;
  16. using System.Threading;
  17. using System.Reactive.Disposables;
  18. using System.Reactive.Subjects;
  19. namespace ReactiveTests.Tests
  20. {
  21. public class GroupByUntilTest : ReactiveTest
  22. {
  23. #region + GroupByUntil +
  24. [Fact]
  25. public void GroupByUntil_ArgumentChecking()
  26. {
  27. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(default(IObservable<int>), DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, EqualityComparer<int>.Default));
  28. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, default(Func<int, int>), DummyFunc<int, int>.Instance, DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, EqualityComparer<int>.Default));
  29. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, DummyFunc<int, int>.Instance, default(Func<int, int>), DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, EqualityComparer<int>.Default));
  30. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, default(Func<IGroupedObservable<int, int>, IObservable<int>>), EqualityComparer<int>.Default));
  31. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, default(IEqualityComparer<int>)));
  32. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(default(IObservable<int>), DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance));
  33. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, default(Func<int, int>), DummyFunc<int, int>.Instance, DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance));
  34. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, DummyFunc<int, int>.Instance, default(Func<int, int>), DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance));
  35. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, default(Func<IGroupedObservable<int, int>, IObservable<int>>)));
  36. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(default(IObservable<int>), DummyFunc<int, int>.Instance, DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, EqualityComparer<int>.Default));
  37. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, default(Func<int, int>), DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, EqualityComparer<int>.Default));
  38. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, DummyFunc<int, int>.Instance, default(Func<IGroupedObservable<int, int>, IObservable<int>>), EqualityComparer<int>.Default));
  39. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, default(IEqualityComparer<int>)));
  40. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(default(IObservable<int>), DummyFunc<int, int>.Instance, DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance));
  41. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, default(Func<int, int>), DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance));
  42. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, DummyFunc<int, int>.Instance, default(Func<IGroupedObservable<int, int>, IObservable<int>>)));
  43. }
  44. [Fact]
  45. public void GroupByUntil_WithKeyComparer()
  46. {
  47. var scheduler = new TestScheduler();
  48. var keyInvoked = 0;
  49. var xs = scheduler.CreateHotObservable(
  50. OnNext(90, "error"),
  51. OnNext(110, "error"),
  52. OnNext(130, "error"),
  53. OnNext(220, " foo"),
  54. OnNext(240, " FoO "),
  55. OnNext(270, "baR "),
  56. OnNext(310, "foO "),
  57. OnNext(350, " Baz "),
  58. OnNext(360, " qux "),
  59. OnNext(390, " bar"),
  60. OnNext(420, " BAR "),
  61. OnNext(470, "FOO "),
  62. OnNext(480, "baz "),
  63. OnNext(510, " bAZ "),
  64. OnNext(530, " fOo "),
  65. OnCompleted<string>(570),
  66. OnNext(580, "error"),
  67. OnCompleted<string>(600),
  68. OnError<string>(650, new Exception())
  69. );
  70. var comparer = new GroupByComparer(scheduler);
  71. var res = scheduler.Start(() =>
  72. xs.GroupByUntil(
  73. x =>
  74. {
  75. keyInvoked++;
  76. return x.Trim();
  77. },
  78. g => g.Skip(2),
  79. comparer
  80. ).Select(g => g.Key)
  81. );
  82. res.Messages.AssertEqual(
  83. OnNext(220, "foo"),
  84. OnNext(270, "baR"),
  85. OnNext(350, "Baz"),
  86. OnNext(360, "qux"),
  87. OnNext(470, "FOO"),
  88. OnCompleted<string>(570)
  89. );
  90. xs.Subscriptions.AssertEqual(
  91. Subscribe(200, 570)
  92. );
  93. Assert.Equal(12, keyInvoked);
  94. }
  95. [Fact]
  96. public void GroupByUntil_Outer_Complete()
  97. {
  98. var scheduler = new TestScheduler();
  99. var keyInvoked = 0;
  100. var eleInvoked = 0;
  101. var xs = scheduler.CreateHotObservable(
  102. OnNext(90, "error"),
  103. OnNext(110, "error"),
  104. OnNext(130, "error"),
  105. OnNext(220, " foo"),
  106. OnNext(240, " FoO "),
  107. OnNext(270, "baR "),
  108. OnNext(310, "foO "),
  109. OnNext(350, " Baz "),
  110. OnNext(360, " qux "),
  111. OnNext(390, " bar"),
  112. OnNext(420, " BAR "),
  113. OnNext(470, "FOO "),
  114. OnNext(480, "baz "),
  115. OnNext(510, " bAZ "),
  116. OnNext(530, " fOo "),
  117. OnCompleted<string>(570),
  118. OnNext(580, "error"),
  119. OnCompleted<string>(600),
  120. OnError<string>(650, new Exception())
  121. );
  122. var comparer = new GroupByComparer(scheduler);
  123. var res = scheduler.Start(() =>
  124. xs.GroupByUntil(
  125. x =>
  126. {
  127. keyInvoked++;
  128. return x.Trim();
  129. },
  130. x =>
  131. {
  132. eleInvoked++;
  133. return Reverse(x);
  134. },
  135. g => g.Skip(2),
  136. comparer
  137. ).Select(g => g.Key)
  138. );
  139. res.Messages.AssertEqual(
  140. OnNext(220, "foo"),
  141. OnNext(270, "baR"),
  142. OnNext(350, "Baz"),
  143. OnNext(360, "qux"),
  144. OnNext(470, "FOO"),
  145. OnCompleted<string>(570)
  146. );
  147. xs.Subscriptions.AssertEqual(
  148. Subscribe(200, 570)
  149. );
  150. Assert.Equal(12, keyInvoked);
  151. Assert.Equal(12, eleInvoked);
  152. }
  153. [Fact]
  154. public void GroupByUntil_Outer_Error()
  155. {
  156. var scheduler = new TestScheduler();
  157. var keyInvoked = 0;
  158. var eleInvoked = 0;
  159. var ex = new Exception();
  160. var xs = scheduler.CreateHotObservable(
  161. OnNext(90, "error"),
  162. OnNext(110, "error"),
  163. OnNext(130, "error"),
  164. OnNext(220, " foo"),
  165. OnNext(240, " FoO "),
  166. OnNext(270, "baR "),
  167. OnNext(310, "foO "),
  168. OnNext(350, " Baz "),
  169. OnNext(360, " qux "),
  170. OnNext(390, " bar"),
  171. OnNext(420, " BAR "),
  172. OnNext(470, "FOO "),
  173. OnNext(480, "baz "),
  174. OnNext(510, " bAZ "),
  175. OnNext(530, " fOo "),
  176. OnError<string>(570, ex),
  177. OnNext(580, "error"),
  178. OnCompleted<string>(600),
  179. OnError<string>(650, new Exception())
  180. );
  181. var comparer = new GroupByComparer(scheduler);
  182. var res = scheduler.Start(() =>
  183. xs.GroupByUntil(
  184. x =>
  185. {
  186. keyInvoked++;
  187. return x.Trim();
  188. },
  189. x =>
  190. {
  191. eleInvoked++;
  192. return Reverse(x);
  193. },
  194. g => g.Skip(2),
  195. comparer
  196. ).Select(g => g.Key)
  197. );
  198. res.Messages.AssertEqual(
  199. OnNext(220, "foo"),
  200. OnNext(270, "baR"),
  201. OnNext(350, "Baz"),
  202. OnNext(360, "qux"),
  203. OnNext(470, "FOO"),
  204. OnError<string>(570, ex)
  205. );
  206. xs.Subscriptions.AssertEqual(
  207. Subscribe(200, 570)
  208. );
  209. Assert.Equal(12, keyInvoked);
  210. Assert.Equal(12, eleInvoked);
  211. }
  212. [Fact]
  213. public void GroupByUntil_Outer_Dispose()
  214. {
  215. var scheduler = new TestScheduler();
  216. var keyInvoked = 0;
  217. var eleInvoked = 0;
  218. var xs = scheduler.CreateHotObservable(
  219. OnNext(90, "error"),
  220. OnNext(110, "error"),
  221. OnNext(130, "error"),
  222. OnNext(220, " foo"),
  223. OnNext(240, " FoO "),
  224. OnNext(270, "baR "),
  225. OnNext(310, "foO "),
  226. OnNext(350, " Baz "),
  227. OnNext(360, " qux "),
  228. OnNext(390, " bar"),
  229. OnNext(420, " BAR "),
  230. OnNext(470, "FOO "),
  231. OnNext(480, "baz "),
  232. OnNext(510, " bAZ "),
  233. OnNext(530, " fOo "),
  234. OnCompleted<string>(570),
  235. OnNext(580, "error"),
  236. OnCompleted<string>(600),
  237. OnError<string>(650, new Exception())
  238. );
  239. var comparer = new GroupByComparer(scheduler);
  240. var res = scheduler.Start(() =>
  241. xs.GroupByUntil(
  242. x =>
  243. {
  244. keyInvoked++;
  245. return x.Trim();
  246. },
  247. x =>
  248. {
  249. eleInvoked++;
  250. return Reverse(x);
  251. },
  252. g => g.Skip(2),
  253. comparer
  254. ).Select(g => g.Key),
  255. 355
  256. );
  257. res.Messages.AssertEqual(
  258. OnNext(220, "foo"),
  259. OnNext(270, "baR"),
  260. OnNext(350, "Baz")
  261. );
  262. xs.Subscriptions.AssertEqual(
  263. Subscribe(200, 355)
  264. );
  265. Assert.Equal(5, keyInvoked);
  266. Assert.Equal(5, eleInvoked);
  267. }
  268. [Fact]
  269. public void GroupByUntil_Outer_KeyThrow()
  270. {
  271. var scheduler = new TestScheduler();
  272. var keyInvoked = 0;
  273. var eleInvoked = 0;
  274. var ex = new Exception();
  275. var xs = scheduler.CreateHotObservable(
  276. OnNext(90, "error"),
  277. OnNext(110, "error"),
  278. OnNext(130, "error"),
  279. OnNext(220, " foo"),
  280. OnNext(240, " FoO "),
  281. OnNext(270, "baR "),
  282. OnNext(310, "foO "),
  283. OnNext(350, " Baz "),
  284. OnNext(360, " qux "),
  285. OnNext(390, " bar"),
  286. OnNext(420, " BAR "),
  287. OnNext(470, "FOO "),
  288. OnNext(480, "baz "),
  289. OnNext(510, " bAZ "),
  290. OnNext(530, " fOo "),
  291. OnCompleted<string>(570),
  292. OnNext(580, "error"),
  293. OnCompleted<string>(600),
  294. OnError<string>(650, new Exception())
  295. );
  296. var comparer = new GroupByComparer(scheduler);
  297. var res = scheduler.Start(() =>
  298. xs.GroupByUntil(
  299. x =>
  300. {
  301. keyInvoked++;
  302. if (keyInvoked == 10)
  303. throw ex;
  304. return x.Trim();
  305. },
  306. x =>
  307. {
  308. eleInvoked++;
  309. return Reverse(x);
  310. },
  311. g => g.Skip(2),
  312. comparer
  313. ).Select(g => g.Key)
  314. );
  315. res.Messages.AssertEqual(
  316. OnNext(220, "foo"),
  317. OnNext(270, "baR"),
  318. OnNext(350, "Baz"),
  319. OnNext(360, "qux"),
  320. OnNext(470, "FOO"),
  321. OnError<string>(480, ex)
  322. );
  323. xs.Subscriptions.AssertEqual(
  324. Subscribe(200, 480)
  325. );
  326. Assert.Equal(10, keyInvoked);
  327. Assert.Equal(9, eleInvoked);
  328. }
  329. [Fact]
  330. public void GroupByUntil_Outer_EleThrow()
  331. {
  332. var scheduler = new TestScheduler();
  333. var keyInvoked = 0;
  334. var eleInvoked = 0;
  335. var ex = new Exception();
  336. var xs = scheduler.CreateHotObservable(
  337. OnNext(90, "error"),
  338. OnNext(110, "error"),
  339. OnNext(130, "error"),
  340. OnNext(220, " foo"),
  341. OnNext(240, " FoO "),
  342. OnNext(270, "baR "),
  343. OnNext(310, "foO "),
  344. OnNext(350, " Baz "),
  345. OnNext(360, " qux "),
  346. OnNext(390, " bar"),
  347. OnNext(420, " BAR "),
  348. OnNext(470, "FOO "),
  349. OnNext(480, "baz "),
  350. OnNext(510, " bAZ "),
  351. OnNext(530, " fOo "),
  352. OnCompleted<string>(570),
  353. OnNext(580, "error"),
  354. OnCompleted<string>(600),
  355. OnError<string>(650, new Exception())
  356. );
  357. var comparer = new GroupByComparer(scheduler);
  358. var res = scheduler.Start(() =>
  359. xs.GroupByUntil(
  360. x =>
  361. {
  362. keyInvoked++;
  363. return x.Trim();
  364. },
  365. x =>
  366. {
  367. eleInvoked++;
  368. if (eleInvoked == 10)
  369. throw ex;
  370. return Reverse(x);
  371. },
  372. g => g.Skip(2),
  373. comparer
  374. ).Select(g => g.Key)
  375. );
  376. res.Messages.AssertEqual(
  377. OnNext(220, "foo"),
  378. OnNext(270, "baR"),
  379. OnNext(350, "Baz"),
  380. OnNext(360, "qux"),
  381. OnNext(470, "FOO"),
  382. OnError<string>(480, ex)
  383. );
  384. xs.Subscriptions.AssertEqual(
  385. Subscribe(200, 480)
  386. );
  387. Assert.Equal(10, keyInvoked);
  388. Assert.Equal(10, eleInvoked);
  389. }
  390. [Fact]
  391. public void GroupByUntil_Outer_ComparerEqualsThrow()
  392. {
  393. var scheduler = new TestScheduler();
  394. var keyInvoked = 0;
  395. var eleInvoked = 0;
  396. var xs = scheduler.CreateHotObservable(
  397. OnNext(90, "error"),
  398. OnNext(110, "error"),
  399. OnNext(130, "error"),
  400. OnNext(220, " foo"),
  401. OnNext(240, " FoO "),
  402. OnNext(270, "baR "),
  403. OnNext(310, "foO "),
  404. OnNext(350, " Baz "),
  405. OnNext(360, " qux "),
  406. OnNext(390, " bar"),
  407. OnNext(420, " BAR "),
  408. OnNext(470, "FOO "),
  409. OnNext(480, "baz "),
  410. OnNext(510, " bAZ "),
  411. OnNext(530, " fOo "),
  412. OnCompleted<string>(570),
  413. OnNext(580, "error"),
  414. OnCompleted<string>(600),
  415. OnError<string>(650, new Exception())
  416. );
  417. var comparer = new GroupByComparer(scheduler, 250, ushort.MaxValue);
  418. var res = scheduler.Start(() =>
  419. xs.GroupByUntil(
  420. x =>
  421. {
  422. keyInvoked++;
  423. return x.Trim();
  424. },
  425. x =>
  426. {
  427. eleInvoked++;
  428. return Reverse(x);
  429. },
  430. g => g.Skip(2),
  431. comparer
  432. ).Select(g => g.Key)
  433. );
  434. res.Messages.AssertEqual(
  435. OnNext(220, "foo"),
  436. OnNext(270, "baR"),
  437. OnError<string>(310, comparer.EqualsException)
  438. );
  439. xs.Subscriptions.AssertEqual(
  440. Subscribe(200, 310)
  441. );
  442. Assert.Equal(4, keyInvoked);
  443. Assert.Equal(3, eleInvoked);
  444. }
  445. [Fact]
  446. public void GroupByUntil_Outer_ComparerGetHashCodeThrow()
  447. {
  448. var scheduler = new TestScheduler();
  449. var keyInvoked = 0;
  450. var eleInvoked = 0;
  451. var xs = scheduler.CreateHotObservable(
  452. OnNext(90, "error"),
  453. OnNext(110, "error"),
  454. OnNext(130, "error"),
  455. OnNext(220, " foo"),
  456. OnNext(240, " FoO "),
  457. OnNext(270, "baR "),
  458. OnNext(310, "foO "),
  459. OnNext(350, " Baz "),
  460. OnNext(360, " qux "),
  461. OnNext(390, " bar"),
  462. OnNext(420, " BAR "),
  463. OnNext(470, "FOO "),
  464. OnNext(480, "baz "),
  465. OnNext(510, " bAZ "),
  466. OnNext(530, " fOo "),
  467. OnCompleted<string>(570),
  468. OnNext(580, "error"),
  469. OnCompleted<string>(600),
  470. OnError<string>(650, new Exception())
  471. );
  472. var comparer = new GroupByComparer(scheduler, ushort.MaxValue, 410);
  473. var res = scheduler.Start(() =>
  474. xs.GroupByUntil(
  475. x =>
  476. {
  477. keyInvoked++;
  478. return x.Trim();
  479. },
  480. x =>
  481. {
  482. eleInvoked++;
  483. return Reverse(x);
  484. },
  485. g => g.Skip(2),
  486. comparer
  487. ).Select(g => g.Key)
  488. );
  489. res.Messages.AssertEqual(
  490. OnNext(220, "foo"),
  491. OnNext(270, "baR"),
  492. OnNext(350, "Baz"),
  493. OnNext(360, "qux"),
  494. OnError<string>(420, comparer.HashCodeException)
  495. );
  496. xs.Subscriptions.AssertEqual(
  497. Subscribe(200, 420)
  498. );
  499. Assert.Equal(8, keyInvoked);
  500. Assert.Equal(7, eleInvoked);
  501. }
  502. [Fact]
  503. public void GroupByUntil_Inner_Complete()
  504. {
  505. var scheduler = new TestScheduler();
  506. var xs = scheduler.CreateHotObservable(
  507. OnNext(90, "error"),
  508. OnNext(110, "error"),
  509. OnNext(130, "error"),
  510. OnNext(220, " foo"),
  511. OnNext(240, " FoO "),
  512. OnNext(270, "baR "),
  513. OnNext(310, "foO "),
  514. OnNext(350, " Baz "),
  515. OnNext(360, " qux "),
  516. OnNext(390, " bar"),
  517. OnNext(420, " BAR "),
  518. OnNext(470, "FOO "),
  519. OnNext(480, "baz "),
  520. OnNext(510, " bAZ "),
  521. OnNext(530, " fOo "),
  522. OnCompleted<string>(570),
  523. OnNext(580, "error"),
  524. OnCompleted<string>(600),
  525. OnError<string>(650, new Exception())
  526. );
  527. var comparer = new GroupByComparer(scheduler);
  528. var outer = default(IObservable<IGroupedObservable<string, string>>);
  529. var outerSubscription = default(IDisposable);
  530. var inners = new Dictionary<string, IObservable<string>>();
  531. var innerSubscriptions = new Dictionary<string, IDisposable>();
  532. var res = new Dictionary<string, ITestableObserver<string>>();
  533. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x => Reverse(x), g => g.Skip(2), comparer));
  534. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  535. {
  536. var result = scheduler.CreateObserver<string>();
  537. inners[group.Key] = group;
  538. res[group.Key] = result;
  539. scheduler.ScheduleRelative(100, () => innerSubscriptions[group.Key] = group.Subscribe(result));
  540. }));
  541. scheduler.ScheduleAbsolute(Disposed, () =>
  542. {
  543. outerSubscription.Dispose();
  544. foreach (var d in innerSubscriptions.Values)
  545. d.Dispose();
  546. });
  547. scheduler.Start();
  548. Assert.Equal(5, inners.Count);
  549. res["foo"].Messages.AssertEqual(
  550. OnCompleted<string>(320)
  551. );
  552. res["baR"].Messages.AssertEqual(
  553. OnNext(390, "rab "),
  554. OnNext(420, " RAB "), // Breaking change > v2.2 - prior to resolving a deadlock, the group would get closed prior to letting this message through
  555. OnCompleted<string>(420)
  556. );
  557. res["Baz"].Messages.AssertEqual(
  558. OnNext(480, " zab"),
  559. OnNext(510, " ZAb "), // Breaking change > v2.2 - prior to resolving a deadlock, the group would get closed prior to letting this message through
  560. OnCompleted<string>(510)
  561. );
  562. res["qux"].Messages.AssertEqual(
  563. OnCompleted<string>(570)
  564. );
  565. res["FOO"].Messages.AssertEqual(
  566. OnCompleted<string>(570)
  567. );
  568. xs.Subscriptions.AssertEqual(
  569. Subscribe(200, 570)
  570. );
  571. }
  572. [Fact]
  573. public void GroupByUntil_Inner_Complete_All()
  574. {
  575. var scheduler = new TestScheduler();
  576. var xs = scheduler.CreateHotObservable(
  577. OnNext(90, "error"),
  578. OnNext(110, "error"),
  579. OnNext(130, "error"),
  580. OnNext(220, " foo"),
  581. OnNext(240, " FoO "),
  582. OnNext(270, "baR "),
  583. OnNext(310, "foO "),
  584. OnNext(350, " Baz "),
  585. OnNext(360, " qux "),
  586. OnNext(390, " bar"),
  587. OnNext(420, " BAR "),
  588. OnNext(470, "FOO "),
  589. OnNext(480, "baz "),
  590. OnNext(510, " bAZ "),
  591. OnNext(530, " fOo "),
  592. OnCompleted<string>(570),
  593. OnNext(580, "error"),
  594. OnCompleted<string>(600),
  595. OnError<string>(650, new Exception())
  596. );
  597. var comparer = new GroupByComparer(scheduler);
  598. var outer = default(IObservable<IGroupedObservable<string, string>>);
  599. var outerSubscription = default(IDisposable);
  600. var inners = new Dictionary<string, IObservable<string>>();
  601. var innerSubscriptions = new Dictionary<string, IDisposable>();
  602. var res = new Dictionary<string, ITestableObserver<string>>();
  603. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x => Reverse(x), g => g.Skip(2), comparer));
  604. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  605. {
  606. var result = scheduler.CreateObserver<string>();
  607. inners[group.Key] = group;
  608. res[group.Key] = result;
  609. innerSubscriptions[group.Key] = group.Subscribe(result);
  610. }));
  611. scheduler.ScheduleAbsolute(Disposed, () =>
  612. {
  613. outerSubscription.Dispose();
  614. foreach (var d in innerSubscriptions.Values)
  615. d.Dispose();
  616. });
  617. scheduler.Start();
  618. Assert.Equal(5, inners.Count);
  619. res["foo"].Messages.AssertEqual(
  620. OnNext(220, "oof "),
  621. OnNext(240, " OoF "),
  622. OnNext(310, " Oof"),
  623. OnCompleted<string>(310)
  624. );
  625. res["baR"].Messages.AssertEqual(
  626. OnNext(270, " Rab"),
  627. OnNext(390, "rab "),
  628. OnNext(420, " RAB "),
  629. OnCompleted<string>(420)
  630. );
  631. res["Baz"].Messages.AssertEqual(
  632. OnNext(350, " zaB "),
  633. OnNext(480, " zab"),
  634. OnNext(510, " ZAb "),
  635. OnCompleted<string>(510)
  636. );
  637. res["qux"].Messages.AssertEqual(
  638. OnNext(360, " xuq "),
  639. OnCompleted<string>(570)
  640. );
  641. res["FOO"].Messages.AssertEqual(
  642. OnNext(470, " OOF"),
  643. OnNext(530, " oOf "),
  644. OnCompleted<string>(570)
  645. );
  646. xs.Subscriptions.AssertEqual(
  647. Subscribe(200, 570)
  648. );
  649. }
  650. [Fact]
  651. public void GroupByUntil_Inner_Error()
  652. {
  653. var scheduler = new TestScheduler();
  654. var ex1 = new Exception();
  655. var xs = scheduler.CreateHotObservable(
  656. OnNext(90, "error"),
  657. OnNext(110, "error"),
  658. OnNext(130, "error"),
  659. OnNext(220, " foo"),
  660. OnNext(240, " FoO "),
  661. OnNext(270, "baR "),
  662. OnNext(310, "foO "),
  663. OnNext(350, " Baz "),
  664. OnNext(360, " qux "),
  665. OnNext(390, " bar"),
  666. OnNext(420, " BAR "),
  667. OnNext(470, "FOO "),
  668. OnNext(480, "baz "),
  669. OnNext(510, " bAZ "),
  670. OnNext(530, " fOo "),
  671. OnError<string>(570, ex1),
  672. OnNext(580, "error"),
  673. OnCompleted<string>(600),
  674. OnError<string>(650, new Exception())
  675. );
  676. var comparer = new GroupByComparer(scheduler);
  677. var outer = default(IObservable<IGroupedObservable<string, string>>);
  678. var outerSubscription = default(IDisposable);
  679. var inners = new Dictionary<string, IObservable<string>>();
  680. var innerSubscriptions = new Dictionary<string, IDisposable>();
  681. var res = new Dictionary<string, ITestableObserver<string>>();
  682. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x => Reverse(x), g => g.Skip(2), comparer));
  683. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  684. {
  685. var result = scheduler.CreateObserver<string>();
  686. inners[group.Key] = group;
  687. res[group.Key] = result;
  688. scheduler.ScheduleRelative(100, () => innerSubscriptions[group.Key] = group.Subscribe(result));
  689. }, ex => { }));
  690. scheduler.ScheduleAbsolute(Disposed, () =>
  691. {
  692. outerSubscription.Dispose();
  693. foreach (var d in innerSubscriptions.Values)
  694. d.Dispose();
  695. });
  696. scheduler.Start();
  697. Assert.Equal(5, inners.Count);
  698. res["foo"].Messages.AssertEqual(
  699. OnCompleted<string>(320)
  700. );
  701. res["baR"].Messages.AssertEqual(
  702. OnNext(390, "rab "),
  703. OnNext(420, " RAB "), // Breaking change > v2.2 - prior to resolving a deadlock, the group would get closed prior to letting this message through
  704. OnCompleted<string>(420)
  705. );
  706. res["Baz"].Messages.AssertEqual(
  707. OnNext(480, " zab"),
  708. OnNext(510, " ZAb "), // Breaking change > v2.2 - prior to resolving a deadlock, the group would get closed prior to letting this message through
  709. OnCompleted<string>(510)
  710. );
  711. res["qux"].Messages.AssertEqual(
  712. OnError<string>(570, ex1)
  713. );
  714. res["FOO"].Messages.AssertEqual(
  715. OnError<string>(570, ex1)
  716. );
  717. xs.Subscriptions.AssertEqual(
  718. Subscribe(200, 570)
  719. );
  720. }
  721. [Fact]
  722. public void GroupByUntil_Inner_Dispose()
  723. {
  724. var scheduler = new TestScheduler();
  725. var xs = scheduler.CreateHotObservable(
  726. OnNext(90, "error"),
  727. OnNext(110, "error"),
  728. OnNext(130, "error"),
  729. OnNext(220, " foo"),
  730. OnNext(240, " FoO "),
  731. OnNext(270, "baR "),
  732. OnNext(310, "foO "),
  733. OnNext(350, " Baz "),
  734. OnNext(360, " qux "),
  735. OnNext(390, " bar"),
  736. OnNext(420, " BAR "),
  737. OnNext(470, "FOO "),
  738. OnNext(480, "baz "),
  739. OnNext(510, " bAZ "),
  740. OnNext(530, " fOo "),
  741. OnCompleted<string>(570),
  742. OnNext(580, "error"),
  743. OnCompleted<string>(600),
  744. OnError<string>(650, new Exception())
  745. );
  746. var comparer = new GroupByComparer(scheduler);
  747. var outer = default(IObservable<IGroupedObservable<string, string>>);
  748. var outerSubscription = default(IDisposable);
  749. var inners = new Dictionary<string, IObservable<string>>();
  750. var innerSubscriptions = new Dictionary<string, IDisposable>();
  751. var res = new Dictionary<string, ITestableObserver<string>>();
  752. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x => Reverse(x), g => g.Skip(2), comparer));
  753. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  754. {
  755. var result = scheduler.CreateObserver<string>();
  756. inners[group.Key] = group;
  757. res[group.Key] = result;
  758. innerSubscriptions[group.Key] = group.Subscribe(result);
  759. }));
  760. scheduler.ScheduleAbsolute(400, () =>
  761. {
  762. outerSubscription.Dispose();
  763. foreach (var d in innerSubscriptions.Values)
  764. d.Dispose();
  765. });
  766. scheduler.Start();
  767. Assert.Equal(4, inners.Count);
  768. res["foo"].Messages.AssertEqual(
  769. OnNext(220, "oof "),
  770. OnNext(240, " OoF "),
  771. OnNext(310, " Oof"),
  772. OnCompleted<string>(310)
  773. );
  774. res["baR"].Messages.AssertEqual(
  775. OnNext(270, " Rab"),
  776. OnNext(390, "rab ")
  777. );
  778. res["Baz"].Messages.AssertEqual(
  779. OnNext(350, " zaB ")
  780. );
  781. res["qux"].Messages.AssertEqual(
  782. OnNext(360, " xuq ")
  783. );
  784. xs.Subscriptions.AssertEqual(
  785. Subscribe(200, 400)
  786. );
  787. }
  788. [Fact]
  789. public void GroupByUntil_Inner_KeyThrow()
  790. {
  791. var scheduler = new TestScheduler();
  792. var xs = scheduler.CreateHotObservable(
  793. OnNext(90, "error"),
  794. OnNext(110, "error"),
  795. OnNext(130, "error"),
  796. OnNext(220, " foo"),
  797. OnNext(240, " FoO "),
  798. OnNext(270, "baR "),
  799. OnNext(310, "foO "),
  800. OnNext(350, " Baz "),
  801. OnNext(360, " qux "),
  802. OnNext(390, " bar"),
  803. OnNext(420, " BAR "),
  804. OnNext(470, "FOO "),
  805. OnNext(480, "baz "),
  806. OnNext(510, " bAZ "),
  807. OnNext(530, " fOo "),
  808. OnCompleted<string>(570),
  809. OnNext(580, "error"),
  810. OnCompleted<string>(600),
  811. OnError<string>(650, new Exception())
  812. );
  813. var comparer = new GroupByComparer(scheduler);
  814. var outer = default(IObservable<IGroupedObservable<string, string>>);
  815. var outerSubscription = default(IDisposable);
  816. var inners = new Dictionary<string, IObservable<string>>();
  817. var innerSubscriptions = new Dictionary<string, IDisposable>();
  818. var res = new Dictionary<string, ITestableObserver<string>>();
  819. var keyInvoked = 0;
  820. var ex = new Exception();
  821. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x =>
  822. {
  823. keyInvoked++;
  824. if (keyInvoked == 6)
  825. throw ex;
  826. return x.Trim();
  827. }, x => Reverse(x), g => g.Skip(2), comparer));
  828. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  829. {
  830. var result = scheduler.CreateObserver<string>();
  831. inners[group.Key] = group;
  832. res[group.Key] = result;
  833. innerSubscriptions[group.Key] = group.Subscribe(result);
  834. }, _ => { }));
  835. scheduler.ScheduleAbsolute(Disposed, () =>
  836. {
  837. outerSubscription.Dispose();
  838. foreach (var d in innerSubscriptions.Values)
  839. d.Dispose();
  840. });
  841. scheduler.Start();
  842. Assert.Equal(3, inners.Count);
  843. res["foo"].Messages.AssertEqual(
  844. OnNext(220, "oof "),
  845. OnNext(240, " OoF "),
  846. OnNext(310, " Oof"),
  847. OnCompleted<string>(310)
  848. );
  849. res["baR"].Messages.AssertEqual(
  850. OnNext(270, " Rab"),
  851. OnError<string>(360, ex)
  852. );
  853. res["Baz"].Messages.AssertEqual(
  854. OnNext(350, " zaB "),
  855. OnError<string>(360, ex)
  856. );
  857. xs.Subscriptions.AssertEqual(
  858. Subscribe(200, 360)
  859. );
  860. }
  861. [Fact]
  862. public void GroupByUntil_Inner_EleThrow()
  863. {
  864. var scheduler = new TestScheduler();
  865. var xs = scheduler.CreateHotObservable(
  866. OnNext(90, "error"),
  867. OnNext(110, "error"),
  868. OnNext(130, "error"),
  869. OnNext(220, " foo"),
  870. OnNext(240, " FoO "),
  871. OnNext(270, "baR "),
  872. OnNext(310, "foO "),
  873. OnNext(350, " Baz "),
  874. OnNext(360, " qux "),
  875. OnNext(390, " bar"),
  876. OnNext(420, " BAR "),
  877. OnNext(470, "FOO "),
  878. OnNext(480, "baz "),
  879. OnNext(510, " bAZ "),
  880. OnNext(530, " fOo "),
  881. OnCompleted<string>(570),
  882. OnNext(580, "error"),
  883. OnCompleted<string>(600),
  884. OnError<string>(650, new Exception())
  885. );
  886. var comparer = new GroupByComparer(scheduler);
  887. var outer = default(IObservable<IGroupedObservable<string, string>>);
  888. var outerSubscription = default(IDisposable);
  889. var inners = new Dictionary<string, IObservable<string>>();
  890. var innerSubscriptions = new Dictionary<string, IDisposable>();
  891. var res = new Dictionary<string, ITestableObserver<string>>();
  892. var eleInvoked = 0;
  893. var ex = new Exception();
  894. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x =>
  895. {
  896. eleInvoked++;
  897. if (eleInvoked == 6)
  898. throw ex;
  899. return Reverse(x);
  900. }, g => g.Skip(2), comparer));
  901. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  902. {
  903. var result = scheduler.CreateObserver<string>();
  904. inners[group.Key] = group;
  905. res[group.Key] = result;
  906. innerSubscriptions[group.Key] = group.Subscribe(result);
  907. }, _ => { }));
  908. scheduler.ScheduleAbsolute(Disposed, () =>
  909. {
  910. outerSubscription.Dispose();
  911. foreach (var d in innerSubscriptions.Values)
  912. d.Dispose();
  913. });
  914. scheduler.Start();
  915. Assert.Equal(4, inners.Count);
  916. res["foo"].Messages.AssertEqual(
  917. OnNext(220, "oof "),
  918. OnNext(240, " OoF "),
  919. OnNext(310, " Oof"),
  920. OnCompleted<string>(310)
  921. );
  922. res["baR"].Messages.AssertEqual(
  923. OnNext(270, " Rab"),
  924. OnError<string>(360, ex)
  925. );
  926. res["Baz"].Messages.AssertEqual(
  927. OnNext(350, " zaB "),
  928. OnError<string>(360, ex)
  929. );
  930. res["qux"].Messages.AssertEqual(
  931. OnError<string>(360, ex)
  932. );
  933. xs.Subscriptions.AssertEqual(
  934. Subscribe(200, 360)
  935. );
  936. }
  937. [Fact]
  938. public void GroupByUntil_Inner_Comparer_EqualsThrow()
  939. {
  940. var scheduler = new TestScheduler();
  941. var xs = scheduler.CreateHotObservable(
  942. OnNext(90, "error"),
  943. OnNext(110, "error"),
  944. OnNext(130, "error"),
  945. OnNext(220, " foo"),
  946. OnNext(240, " FoO "),
  947. OnNext(270, "baR "),
  948. OnNext(310, "foO "),
  949. OnNext(350, " Baz "),
  950. OnNext(360, " qux "),
  951. OnNext(390, " bar"),
  952. OnNext(420, " BAR "),
  953. OnNext(470, "FOO "),
  954. OnNext(480, "baz "),
  955. OnNext(510, " bAZ "),
  956. OnNext(530, " fOo "),
  957. OnCompleted<string>(570),
  958. OnNext(580, "error"),
  959. OnCompleted<string>(600),
  960. OnError<string>(650, new Exception())
  961. );
  962. var comparer = new GroupByComparer(scheduler, 400, ushort.MaxValue);
  963. var outer = default(IObservable<IGroupedObservable<string, string>>);
  964. var outerSubscription = default(IDisposable);
  965. var inners = new Dictionary<string, IObservable<string>>();
  966. var innerSubscriptions = new Dictionary<string, IDisposable>();
  967. var res = new Dictionary<string, ITestableObserver<string>>();
  968. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x => Reverse(x), g => g.Skip(2), comparer));
  969. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  970. {
  971. var result = scheduler.CreateObserver<string>();
  972. inners[group.Key] = group;
  973. res[group.Key] = result;
  974. innerSubscriptions[group.Key] = group.Subscribe(result);
  975. }, _ => { }));
  976. scheduler.ScheduleAbsolute(Disposed, () =>
  977. {
  978. outerSubscription.Dispose();
  979. foreach (var d in innerSubscriptions.Values)
  980. d.Dispose();
  981. });
  982. scheduler.Start();
  983. Assert.Equal(4, inners.Count);
  984. res["foo"].Messages.AssertEqual(
  985. OnNext(220, "oof "),
  986. OnNext(240, " OoF "),
  987. OnNext(310, " Oof"),
  988. OnCompleted<string>(310)
  989. );
  990. res["baR"].Messages.AssertEqual(
  991. OnNext(270, " Rab"),
  992. OnNext(390, "rab "),
  993. OnError<string>(420, comparer.EqualsException)
  994. );
  995. res["Baz"].Messages.AssertEqual(
  996. OnNext(350, " zaB "),
  997. OnError<string>(420, comparer.EqualsException)
  998. );
  999. res["qux"].Messages.AssertEqual(
  1000. OnNext(360, " xuq "),
  1001. OnError<string>(420, comparer.EqualsException)
  1002. );
  1003. xs.Subscriptions.AssertEqual(
  1004. Subscribe(200, 420)
  1005. );
  1006. }
  1007. [Fact]
  1008. public void GroupByUntil_Inner_Comparer_GetHashCodeThrow()
  1009. {
  1010. var scheduler = new TestScheduler();
  1011. var xs = scheduler.CreateHotObservable(
  1012. OnNext(90, "error"),
  1013. OnNext(110, "error"),
  1014. OnNext(130, "error"),
  1015. OnNext(220, " foo"),
  1016. OnNext(240, " FoO "),
  1017. OnNext(270, "baR "),
  1018. OnNext(310, "foO "),
  1019. OnNext(350, " Baz "),
  1020. OnNext(360, " qux "),
  1021. OnNext(390, " bar"),
  1022. OnNext(420, " BAR "),
  1023. OnNext(470, "FOO "),
  1024. OnNext(480, "baz "),
  1025. OnNext(510, " bAZ "),
  1026. OnNext(530, " fOo "),
  1027. OnCompleted<string>(570),
  1028. OnNext(580, "error"),
  1029. OnCompleted<string>(600),
  1030. OnError<string>(650, new Exception())
  1031. );
  1032. var comparer = new GroupByComparer(scheduler, ushort.MaxValue, 400);
  1033. var outer = default(IObservable<IGroupedObservable<string, string>>);
  1034. var outerSubscription = default(IDisposable);
  1035. var inners = new Dictionary<string, IObservable<string>>();
  1036. var innerSubscriptions = new Dictionary<string, IDisposable>();
  1037. var res = new Dictionary<string, ITestableObserver<string>>();
  1038. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x => Reverse(x), g => g.Skip(2), comparer));
  1039. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  1040. {
  1041. var result = scheduler.CreateObserver<string>();
  1042. inners[group.Key] = group;
  1043. res[group.Key] = result;
  1044. innerSubscriptions[group.Key] = group.Subscribe(result);
  1045. }, _ => { }));
  1046. scheduler.ScheduleAbsolute(Disposed, () =>
  1047. {
  1048. outerSubscription.Dispose();
  1049. foreach (var d in innerSubscriptions.Values)
  1050. d.Dispose();
  1051. });
  1052. scheduler.Start();
  1053. Assert.Equal(4, inners.Count);
  1054. res["foo"].Messages.AssertEqual(
  1055. OnNext(220, "oof "),
  1056. OnNext(240, " OoF "),
  1057. OnNext(310, " Oof"),
  1058. OnCompleted<string>(310)
  1059. );
  1060. res["baR"].Messages.AssertEqual(
  1061. OnNext(270, " Rab"),
  1062. OnNext(390, "rab "),
  1063. OnError<string>(420, comparer.HashCodeException)
  1064. );
  1065. res["Baz"].Messages.AssertEqual(
  1066. OnNext(350, " zaB "),
  1067. OnError<string>(420, comparer.HashCodeException)
  1068. );
  1069. res["qux"].Messages.AssertEqual(
  1070. OnNext(360, " xuq "),
  1071. OnError<string>(420, comparer.HashCodeException)
  1072. );
  1073. xs.Subscriptions.AssertEqual(
  1074. Subscribe(200, 420)
  1075. );
  1076. }
  1077. [Fact]
  1078. public void GroupByUntil_Outer_Independence()
  1079. {
  1080. var scheduler = new TestScheduler();
  1081. var xs = scheduler.CreateHotObservable(
  1082. OnNext(90, "error"),
  1083. OnNext(110, "error"),
  1084. OnNext(130, "error"),
  1085. OnNext(220, " foo"),
  1086. OnNext(240, " FoO "),
  1087. OnNext(270, "baR "),
  1088. OnNext(310, "foO "),
  1089. OnNext(350, " Baz "),
  1090. OnNext(360, " qux "),
  1091. OnNext(390, " bar"),
  1092. OnNext(420, " BAR "),
  1093. OnNext(470, "FOO "),
  1094. OnNext(480, "baz "),
  1095. OnNext(510, " bAZ "),
  1096. OnNext(530, " fOo "),
  1097. OnCompleted<string>(570),
  1098. OnNext(580, "error"),
  1099. OnCompleted<string>(600),
  1100. OnError<string>(650, new Exception())
  1101. );
  1102. var comparer = new GroupByComparer(scheduler);
  1103. var outer = default(IObservable<IGroupedObservable<string, string>>);
  1104. var outerSubscription = default(IDisposable);
  1105. var inners = new Dictionary<string, IObservable<string>>();
  1106. var innerSubscriptions = new Dictionary<string, IDisposable>();
  1107. var res = new Dictionary<string, ITestableObserver<string>>();
  1108. var outerResults = scheduler.CreateObserver<string>();
  1109. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x => Reverse(x), g => g.Skip(2), comparer));
  1110. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  1111. {
  1112. outerResults.OnNext(group.Key);
  1113. var result = scheduler.CreateObserver<string>();
  1114. inners[group.Key] = group;
  1115. res[group.Key] = result;
  1116. innerSubscriptions[group.Key] = group.Subscribe(result);
  1117. }, outerResults.OnError, outerResults.OnCompleted));
  1118. scheduler.ScheduleAbsolute(Disposed, () =>
  1119. {
  1120. outerSubscription.Dispose();
  1121. foreach (var d in innerSubscriptions.Values)
  1122. d.Dispose();
  1123. });
  1124. scheduler.ScheduleAbsolute(320, () => outerSubscription.Dispose());
  1125. scheduler.Start();
  1126. Assert.Equal(2, inners.Count);
  1127. outerResults.Messages.AssertEqual(
  1128. OnNext(220, "foo"),
  1129. OnNext(270, "baR")
  1130. );
  1131. res["foo"].Messages.AssertEqual(
  1132. OnNext(220, "oof "),
  1133. OnNext(240, " OoF "),
  1134. OnNext(310, " Oof"),
  1135. OnCompleted<string>(310)
  1136. );
  1137. res["baR"].Messages.AssertEqual(
  1138. OnNext(270, " Rab"),
  1139. OnNext(390, "rab "),
  1140. OnNext(420, " RAB "),
  1141. OnCompleted<string>(420)
  1142. );
  1143. xs.Subscriptions.AssertEqual(
  1144. Subscribe(200, 420)
  1145. );
  1146. }
  1147. [Fact]
  1148. public void GroupByUntil_Inner_Independence()
  1149. {
  1150. var scheduler = new TestScheduler();
  1151. var xs = scheduler.CreateHotObservable(
  1152. OnNext(90, "error"),
  1153. OnNext(110, "error"),
  1154. OnNext(130, "error"),
  1155. OnNext(220, " foo"),
  1156. OnNext(240, " FoO "),
  1157. OnNext(270, "baR "),
  1158. OnNext(310, "foO "),
  1159. OnNext(350, " Baz "),
  1160. OnNext(360, " qux "),
  1161. OnNext(390, " bar"),
  1162. OnNext(420, " BAR "),
  1163. OnNext(470, "FOO "),
  1164. OnNext(480, "baz "),
  1165. OnNext(510, " bAZ "),
  1166. OnNext(530, " fOo "),
  1167. OnCompleted<string>(570),
  1168. OnNext(580, "error"),
  1169. OnCompleted<string>(600),
  1170. OnError<string>(650, new Exception())
  1171. );
  1172. var comparer = new GroupByComparer(scheduler);
  1173. var outer = default(IObservable<IGroupedObservable<string, string>>);
  1174. var outerSubscription = default(IDisposable);
  1175. var inners = new Dictionary<string, IObservable<string>>();
  1176. var innerSubscriptions = new Dictionary<string, IDisposable>();
  1177. var res = new Dictionary<string, ITestableObserver<string>>();
  1178. var outerResults = scheduler.CreateObserver<string>();
  1179. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x => Reverse(x), g => g.Skip(2), comparer));
  1180. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  1181. {
  1182. outerResults.OnNext(group.Key);
  1183. var result = scheduler.CreateObserver<string>();
  1184. inners[group.Key] = group;
  1185. res[group.Key] = result;
  1186. innerSubscriptions[group.Key] = group.Subscribe(result);
  1187. }, outerResults.OnError, outerResults.OnCompleted));
  1188. scheduler.ScheduleAbsolute(Disposed, () =>
  1189. {
  1190. outerSubscription.Dispose();
  1191. foreach (var d in innerSubscriptions.Values)
  1192. d.Dispose();
  1193. });
  1194. scheduler.ScheduleAbsolute(320, () => innerSubscriptions["foo"].Dispose());
  1195. scheduler.Start();
  1196. Assert.Equal(5, inners.Count);
  1197. res["foo"].Messages.AssertEqual(
  1198. OnNext(220, "oof "),
  1199. OnNext(240, " OoF "),
  1200. OnNext(310, " Oof"),
  1201. OnCompleted<string>(310)
  1202. );
  1203. res["baR"].Messages.AssertEqual(
  1204. OnNext(270, " Rab"),
  1205. OnNext(390, "rab "),
  1206. OnNext(420, " RAB "),
  1207. OnCompleted<string>(420)
  1208. );
  1209. res["Baz"].Messages.AssertEqual(
  1210. OnNext(350, " zaB "),
  1211. OnNext(480, " zab"),
  1212. OnNext(510, " ZAb "),
  1213. OnCompleted<string>(510)
  1214. );
  1215. res["qux"].Messages.AssertEqual(
  1216. OnNext(360, " xuq "),
  1217. OnCompleted<string>(570)
  1218. );
  1219. res["FOO"].Messages.AssertEqual(
  1220. OnNext(470, " OOF"),
  1221. OnNext(530, " oOf "),
  1222. OnCompleted<string>(570)
  1223. );
  1224. xs.Subscriptions.AssertEqual(
  1225. Subscribe(200, 570)
  1226. );
  1227. }
  1228. [Fact]
  1229. public void GroupByUntil_Inner_Multiple_Independence()
  1230. {
  1231. var scheduler = new TestScheduler();
  1232. var xs = scheduler.CreateHotObservable(
  1233. OnNext(90, "error"),
  1234. OnNext(110, "error"),
  1235. OnNext(130, "error"),
  1236. OnNext(220, " foo"),
  1237. OnNext(240, " FoO "),
  1238. OnNext(270, "baR "),
  1239. OnNext(310, "foO "),
  1240. OnNext(350, " Baz "),
  1241. OnNext(360, " qux "),
  1242. OnNext(390, " bar"),
  1243. OnNext(420, " BAR "),
  1244. OnNext(470, "FOO "),
  1245. OnNext(480, "baz "),
  1246. OnNext(510, " bAZ "),
  1247. OnNext(530, " fOo "),
  1248. OnCompleted<string>(570),
  1249. OnNext(580, "error"),
  1250. OnCompleted<string>(600),
  1251. OnError<string>(650, new Exception())
  1252. );
  1253. var comparer = new GroupByComparer(scheduler);
  1254. var outer = default(IObservable<IGroupedObservable<string, string>>);
  1255. var outerSubscription = default(IDisposable);
  1256. var inners = new Dictionary<string, IObservable<string>>();
  1257. var innerSubscriptions = new Dictionary<string, IDisposable>();
  1258. var res = new Dictionary<string, ITestableObserver<string>>();
  1259. var outerResults = scheduler.CreateObserver<string>();
  1260. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x => Reverse(x), g => g.Skip(2), comparer));
  1261. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  1262. {
  1263. outerResults.OnNext(group.Key);
  1264. var result = scheduler.CreateObserver<string>();
  1265. inners[group.Key] = group;
  1266. res[group.Key] = result;
  1267. innerSubscriptions[group.Key] = group.Subscribe(result);
  1268. }, outerResults.OnError, outerResults.OnCompleted));
  1269. scheduler.ScheduleAbsolute(Disposed, () =>
  1270. {
  1271. outerSubscription.Dispose();
  1272. foreach (var d in innerSubscriptions.Values)
  1273. d.Dispose();
  1274. });
  1275. scheduler.ScheduleAbsolute(320, () => innerSubscriptions["foo"].Dispose());
  1276. scheduler.ScheduleAbsolute(280, () => innerSubscriptions["baR"].Dispose());
  1277. scheduler.ScheduleAbsolute(355, () => innerSubscriptions["Baz"].Dispose());
  1278. scheduler.ScheduleAbsolute(400, () => innerSubscriptions["qux"].Dispose());
  1279. scheduler.Start();
  1280. Assert.Equal(5, inners.Count);
  1281. res["foo"].Messages.AssertEqual(
  1282. OnNext(220, "oof "),
  1283. OnNext(240, " OoF "),
  1284. OnNext(310, " Oof"),
  1285. OnCompleted<string>(310)
  1286. );
  1287. res["baR"].Messages.AssertEqual(
  1288. OnNext(270, " Rab")
  1289. );
  1290. res["Baz"].Messages.AssertEqual(
  1291. OnNext(350, " zaB ")
  1292. );
  1293. res["qux"].Messages.AssertEqual(
  1294. OnNext(360, " xuq ")
  1295. );
  1296. res["FOO"].Messages.AssertEqual(
  1297. OnNext(470, " OOF"),
  1298. OnNext(530, " oOf "),
  1299. OnCompleted<string>(570)
  1300. );
  1301. xs.Subscriptions.AssertEqual(
  1302. Subscribe(200, 570)
  1303. );
  1304. }
  1305. [Fact]
  1306. public void GroupByUntil_Inner_Escape_Complete()
  1307. {
  1308. var scheduler = new TestScheduler();
  1309. var xs = scheduler.CreateHotObservable(
  1310. OnNext(220, " foo"),
  1311. OnNext(240, " FoO "),
  1312. OnNext(310, "foO "),
  1313. OnNext(470, "FOO "),
  1314. OnNext(530, " fOo "),
  1315. OnCompleted<string>(570)
  1316. );
  1317. var outer = default(IObservable<IGroupedObservable<string, string>>);
  1318. var outerSubscription = default(IDisposable);
  1319. var inner = default(IObservable<string>);
  1320. var innerSubscription = default(IDisposable);
  1321. var res = scheduler.CreateObserver<string>();
  1322. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), g => g.Skip(2)));
  1323. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  1324. {
  1325. inner = group;
  1326. }));
  1327. scheduler.ScheduleAbsolute(600, () => innerSubscription = inner.Subscribe(res));
  1328. scheduler.ScheduleAbsolute(Disposed, () =>
  1329. {
  1330. outerSubscription.Dispose();
  1331. innerSubscription.Dispose();
  1332. });
  1333. scheduler.Start();
  1334. xs.Subscriptions.AssertEqual(
  1335. Subscribe(200, 570)
  1336. );
  1337. res.Messages.AssertEqual(
  1338. OnCompleted<string>(600)
  1339. );
  1340. }
  1341. [Fact]
  1342. public void GroupByUntil_Inner_Escape_Error()
  1343. {
  1344. var scheduler = new TestScheduler();
  1345. var ex = new Exception();
  1346. var xs = scheduler.CreateHotObservable(
  1347. OnNext(220, " foo"),
  1348. OnNext(240, " FoO "),
  1349. OnNext(310, "foO "),
  1350. OnNext(470, "FOO "),
  1351. OnNext(530, " fOo "),
  1352. OnError<string>(570, ex)
  1353. );
  1354. var outer = default(IObservable<IGroupedObservable<string, string>>);
  1355. var outerSubscription = default(IDisposable);
  1356. var inner = default(IObservable<string>);
  1357. var innerSubscription = default(IDisposable);
  1358. var res = scheduler.CreateObserver<string>();
  1359. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), g => g.Skip(2)));
  1360. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  1361. {
  1362. inner = group;
  1363. }, _ => { }));
  1364. scheduler.ScheduleAbsolute(600, () => innerSubscription = inner.Subscribe(res));
  1365. scheduler.ScheduleAbsolute(Disposed, () =>
  1366. {
  1367. outerSubscription.Dispose();
  1368. innerSubscription.Dispose();
  1369. });
  1370. scheduler.Start();
  1371. xs.Subscriptions.AssertEqual(
  1372. Subscribe(200, 570)
  1373. );
  1374. res.Messages.AssertEqual(
  1375. OnError<string>(600, ex)
  1376. );
  1377. }
  1378. [Fact]
  1379. public void GroupByUntil_Inner_Escape_Dispose()
  1380. {
  1381. var scheduler = new TestScheduler();
  1382. var xs = scheduler.CreateHotObservable(
  1383. OnNext(220, " foo"),
  1384. OnNext(240, " FoO "),
  1385. OnNext(310, "foO "),
  1386. OnNext(470, "FOO "),
  1387. OnNext(530, " fOo "),
  1388. OnError<string>(570, new Exception())
  1389. );
  1390. var outer = default(IObservable<IGroupedObservable<string, string>>);
  1391. var outerSubscription = default(IDisposable);
  1392. var inner = default(IObservable<string>);
  1393. var innerSubscription = default(IDisposable);
  1394. var res = scheduler.CreateObserver<string>();
  1395. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), g => g.Skip(2)));
  1396. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  1397. {
  1398. inner = group;
  1399. }));
  1400. scheduler.ScheduleAbsolute(290, () => outerSubscription.Dispose());
  1401. scheduler.ScheduleAbsolute(600, () => innerSubscription = inner.Subscribe(res));
  1402. scheduler.ScheduleAbsolute(Disposed, () =>
  1403. {
  1404. innerSubscription.Dispose();
  1405. });
  1406. scheduler.Start();
  1407. xs.Subscriptions.AssertEqual(
  1408. Subscribe(200, 290)
  1409. );
  1410. res.Messages.AssertEqual(
  1411. );
  1412. }
  1413. [Fact]
  1414. public void GroupByUntil_Default()
  1415. {
  1416. var scheduler = new TestScheduler();
  1417. var keyInvoked = 0;
  1418. var eleInvoked = 0;
  1419. var xs = scheduler.CreateHotObservable(
  1420. OnNext(90, "error"),
  1421. OnNext(110, "error"),
  1422. OnNext(130, "error"),
  1423. OnNext(220, " foo"),
  1424. OnNext(240, " FoO "),
  1425. OnNext(270, "baR "),
  1426. OnNext(310, "foO "),
  1427. OnNext(350, " Baz "),
  1428. OnNext(360, " qux "),
  1429. OnNext(390, " bar"),
  1430. OnNext(420, " BAR "),
  1431. OnNext(470, "FOO "),
  1432. OnNext(480, "baz "),
  1433. OnNext(510, " bAZ "),
  1434. OnNext(530, " fOo "),
  1435. OnCompleted<string>(570),
  1436. OnNext(580, "error"),
  1437. OnCompleted<string>(600),
  1438. OnError<string>(650, new Exception())
  1439. );
  1440. var res = scheduler.Start(() =>
  1441. xs.GroupByUntil(
  1442. x =>
  1443. {
  1444. keyInvoked++;
  1445. return x.Trim().ToLower();
  1446. },
  1447. x =>
  1448. {
  1449. eleInvoked++;
  1450. return Reverse(x);
  1451. },
  1452. g => g.Skip(2)
  1453. ).Select(g => g.Key)
  1454. );
  1455. res.Messages.AssertEqual(
  1456. OnNext(220, "foo"),
  1457. OnNext(270, "bar"),
  1458. OnNext(350, "baz"),
  1459. OnNext(360, "qux"),
  1460. OnNext(470, "foo"),
  1461. OnCompleted<string>(570)
  1462. );
  1463. xs.Subscriptions.AssertEqual(
  1464. Subscribe(200, 570)
  1465. );
  1466. Assert.Equal(12, keyInvoked);
  1467. Assert.Equal(12, eleInvoked);
  1468. }
  1469. [Fact]
  1470. public void GroupByUntil_DurationSelector_Throws()
  1471. {
  1472. var scheduler = new TestScheduler();
  1473. var xs = scheduler.CreateHotObservable(
  1474. OnNext(210, "foo")
  1475. );
  1476. var ex = new Exception();
  1477. var res = scheduler.Start(() =>
  1478. xs.GroupByUntil<string, string, string>(x => x, g => { throw ex; })
  1479. );
  1480. res.Messages.AssertEqual(
  1481. OnError<IGroupedObservable<string, string>>(210, ex)
  1482. );
  1483. xs.Subscriptions.AssertEqual(
  1484. Subscribe(200, 210)
  1485. );
  1486. }
  1487. [Fact]
  1488. public void GroupByUntil_NullKeys_Simple_Never()
  1489. {
  1490. var scheduler = new TestScheduler();
  1491. var xs = scheduler.CreateHotObservable(
  1492. OnNext(220, "bar"),
  1493. OnNext(240, "foo"),
  1494. OnNext(310, "qux"),
  1495. OnNext(470, "baz"),
  1496. OnCompleted<string>(500)
  1497. );
  1498. var res = scheduler.Start(() => xs.GroupByUntil(x => x[0] == 'b' ? null : x.ToUpper(), g => Observable.Never<Unit>()).SelectMany(g => g, (g, x) => (g.Key ?? "(null)") + x));
  1499. res.Messages.AssertEqual(
  1500. OnNext(220, "(null)bar"),
  1501. OnNext(240, "FOOfoo"),
  1502. OnNext(310, "QUXqux"),
  1503. OnNext(470, "(null)baz"),
  1504. OnCompleted<string>(500)
  1505. );
  1506. xs.Subscriptions.AssertEqual(
  1507. Subscribe(200, 500)
  1508. );
  1509. }
  1510. [Fact]
  1511. public void GroupByUntil_NullKeys_Simple_Expire1()
  1512. {
  1513. var scheduler = new TestScheduler();
  1514. var xs = scheduler.CreateHotObservable(
  1515. OnNext(220, "bar"),
  1516. OnNext(240, "foo"),
  1517. OnNext(310, "qux"),
  1518. OnNext(470, "baz"),
  1519. OnCompleted<string>(500)
  1520. );
  1521. var n = 0;
  1522. var res = scheduler.Start(() => xs.GroupByUntil(x => x[0] == 'b' ? null : x.ToUpper(), g => { if (g.Key == null) n++; return Observable.Timer(TimeSpan.FromTicks(50), scheduler); }).SelectMany(g => g, (g, x) => (g.Key ?? "(null)") + x));
  1523. Assert.Equal(2, n);
  1524. res.Messages.AssertEqual(
  1525. OnNext(220, "(null)bar"),
  1526. OnNext(240, "FOOfoo"),
  1527. OnNext(310, "QUXqux"),
  1528. OnNext(470, "(null)baz"),
  1529. OnCompleted<string>(500)
  1530. );
  1531. xs.Subscriptions.AssertEqual(
  1532. Subscribe(200, 500)
  1533. );
  1534. }
  1535. [Fact]
  1536. public void GroupByUntil_NullKeys_Simple_Expire2()
  1537. {
  1538. var scheduler = new TestScheduler();
  1539. var xs = scheduler.CreateHotObservable(
  1540. OnNext(220, "bar"),
  1541. OnNext(240, "foo"),
  1542. OnNext(310, "qux"),
  1543. OnNext(470, "baz"),
  1544. OnCompleted<string>(500)
  1545. );
  1546. var n = 0;
  1547. var res = scheduler.Start(() => xs.GroupByUntil(x => x[0] == 'b' ? null : x.ToUpper(), g => { if (g.Key == null) n++; return Observable.Timer(TimeSpan.FromTicks(50), scheduler).IgnoreElements(); }).SelectMany(g => g, (g, x) => (g.Key ?? "(null)") + x));
  1548. Assert.Equal(2, n);
  1549. res.Messages.AssertEqual(
  1550. OnNext(220, "(null)bar"),
  1551. OnNext(240, "FOOfoo"),
  1552. OnNext(310, "QUXqux"),
  1553. OnNext(470, "(null)baz"),
  1554. OnCompleted<string>(500)
  1555. );
  1556. xs.Subscriptions.AssertEqual(
  1557. Subscribe(200, 500)
  1558. );
  1559. }
  1560. [Fact]
  1561. public void GroupByUntil_NullKeys_Error()
  1562. {
  1563. var scheduler = new TestScheduler();
  1564. var ex = new Exception();
  1565. var xs = scheduler.CreateHotObservable(
  1566. OnNext(220, "bar"),
  1567. OnNext(240, "foo"),
  1568. OnNext(310, "qux"),
  1569. OnNext(470, "baz"),
  1570. OnError<string>(500, ex)
  1571. );
  1572. var nullGroup = scheduler.CreateObserver<string>();
  1573. var err = default(Exception);
  1574. scheduler.ScheduleAbsolute(200, () => xs.GroupByUntil(x => x[0] == 'b' ? null : x.ToUpper(), g => Observable.Never<Unit>()).Where(g => g.Key == null).Subscribe(g => g.Subscribe(nullGroup), ex_ => err = ex_));
  1575. scheduler.Start();
  1576. Assert.Same(ex, err);
  1577. nullGroup.Messages.AssertEqual(
  1578. OnNext(220, "bar"),
  1579. OnNext(470, "baz"),
  1580. OnError<string>(500, ex)
  1581. );
  1582. xs.Subscriptions.AssertEqual(
  1583. Subscribe(200, 500)
  1584. );
  1585. }
  1586. #endregion
  1587. #region + GroupByUntil w/capacity +
  1588. private const int _groupByUntilCapacity = 1024;
  1589. [Fact]
  1590. public void GroupByUntil_Capacity_ArgumentChecking()
  1591. {
  1592. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(default(IObservable<int>), DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, _groupByUntilCapacity, EqualityComparer<int>.Default));
  1593. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, default(Func<int, int>), DummyFunc<int, int>.Instance, DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, _groupByUntilCapacity, EqualityComparer<int>.Default));
  1594. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, DummyFunc<int, int>.Instance, default(Func<int, int>), DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, _groupByUntilCapacity, EqualityComparer<int>.Default));
  1595. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, default(Func<IGroupedObservable<int, int>, IObservable<int>>), _groupByUntilCapacity, EqualityComparer<int>.Default));
  1596. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, _groupByUntilCapacity, default(IEqualityComparer<int>)));
  1597. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(default(IObservable<int>), DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, _groupByUntilCapacity));
  1598. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, default(Func<int, int>), DummyFunc<int, int>.Instance, DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, _groupByUntilCapacity));
  1599. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, DummyFunc<int, int>.Instance, default(Func<int, int>), DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, _groupByUntilCapacity));
  1600. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, default(Func<IGroupedObservable<int, int>, IObservable<int>>), _groupByUntilCapacity));
  1601. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(default(IObservable<int>), DummyFunc<int, int>.Instance, DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, _groupByUntilCapacity, EqualityComparer<int>.Default));
  1602. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, default(Func<int, int>), DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, _groupByUntilCapacity, EqualityComparer<int>.Default));
  1603. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, DummyFunc<int, int>.Instance, default(Func<IGroupedObservable<int, int>, IObservable<int>>), _groupByUntilCapacity, EqualityComparer<int>.Default));
  1604. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, _groupByUntilCapacity, default(IEqualityComparer<int>)));
  1605. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(default(IObservable<int>), DummyFunc<int, int>.Instance, DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, _groupByUntilCapacity));
  1606. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, default(Func<int, int>), DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, _groupByUntilCapacity));
  1607. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, DummyFunc<int, int>.Instance, default(Func<IGroupedObservable<int, int>, IObservable<int>>), _groupByUntilCapacity));
  1608. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, -1, EqualityComparer<int>.Default));
  1609. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, -1));
  1610. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, -1, EqualityComparer<int>.Default));
  1611. ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.GroupByUntil(DummyObservable<int>.Instance, DummyFunc<int, int>.Instance, DummyFunc<IGroupedObservable<int, int>, IObservable<int>>.Instance, -1));
  1612. }
  1613. [Fact]
  1614. public void GroupByUntil_Capacity_WithKeyComparer()
  1615. {
  1616. var scheduler = new TestScheduler();
  1617. var keyInvoked = 0;
  1618. var xs = scheduler.CreateHotObservable(
  1619. OnNext(90, "error"),
  1620. OnNext(110, "error"),
  1621. OnNext(130, "error"),
  1622. OnNext(220, " foo"),
  1623. OnNext(240, " FoO "),
  1624. OnNext(270, "baR "),
  1625. OnNext(310, "foO "),
  1626. OnNext(350, " Baz "),
  1627. OnNext(360, " qux "),
  1628. OnNext(390, " bar"),
  1629. OnNext(420, " BAR "),
  1630. OnNext(470, "FOO "),
  1631. OnNext(480, "baz "),
  1632. OnNext(510, " bAZ "),
  1633. OnNext(530, " fOo "),
  1634. OnCompleted<string>(570),
  1635. OnNext(580, "error"),
  1636. OnCompleted<string>(600),
  1637. OnError<string>(650, new Exception())
  1638. );
  1639. var comparer = new GroupByComparer(scheduler);
  1640. var res = scheduler.Start(() =>
  1641. xs.GroupByUntil(
  1642. x =>
  1643. {
  1644. keyInvoked++;
  1645. return x.Trim();
  1646. },
  1647. g => g.Skip(2),
  1648. _groupByUntilCapacity,
  1649. comparer
  1650. ).Select(g => g.Key)
  1651. );
  1652. res.Messages.AssertEqual(
  1653. OnNext(220, "foo"),
  1654. OnNext(270, "baR"),
  1655. OnNext(350, "Baz"),
  1656. OnNext(360, "qux"),
  1657. OnNext(470, "FOO"),
  1658. OnCompleted<string>(570)
  1659. );
  1660. xs.Subscriptions.AssertEqual(
  1661. Subscribe(200, 570)
  1662. );
  1663. Assert.Equal(12, keyInvoked);
  1664. }
  1665. [Fact]
  1666. public void GroupByUntil_Capacity_Outer_Complete()
  1667. {
  1668. var scheduler = new TestScheduler();
  1669. var keyInvoked = 0;
  1670. var eleInvoked = 0;
  1671. var xs = scheduler.CreateHotObservable(
  1672. OnNext(90, "error"),
  1673. OnNext(110, "error"),
  1674. OnNext(130, "error"),
  1675. OnNext(220, " foo"),
  1676. OnNext(240, " FoO "),
  1677. OnNext(270, "baR "),
  1678. OnNext(310, "foO "),
  1679. OnNext(350, " Baz "),
  1680. OnNext(360, " qux "),
  1681. OnNext(390, " bar"),
  1682. OnNext(420, " BAR "),
  1683. OnNext(470, "FOO "),
  1684. OnNext(480, "baz "),
  1685. OnNext(510, " bAZ "),
  1686. OnNext(530, " fOo "),
  1687. OnCompleted<string>(570),
  1688. OnNext(580, "error"),
  1689. OnCompleted<string>(600),
  1690. OnError<string>(650, new Exception())
  1691. );
  1692. var comparer = new GroupByComparer(scheduler);
  1693. var res = scheduler.Start(() =>
  1694. xs.GroupByUntil(
  1695. x =>
  1696. {
  1697. keyInvoked++;
  1698. return x.Trim();
  1699. },
  1700. x =>
  1701. {
  1702. eleInvoked++;
  1703. return Reverse(x);
  1704. },
  1705. g => g.Skip(2),
  1706. _groupByUntilCapacity,
  1707. comparer
  1708. ).Select(g => g.Key)
  1709. );
  1710. res.Messages.AssertEqual(
  1711. OnNext(220, "foo"),
  1712. OnNext(270, "baR"),
  1713. OnNext(350, "Baz"),
  1714. OnNext(360, "qux"),
  1715. OnNext(470, "FOO"),
  1716. OnCompleted<string>(570)
  1717. );
  1718. xs.Subscriptions.AssertEqual(
  1719. Subscribe(200, 570)
  1720. );
  1721. Assert.Equal(12, keyInvoked);
  1722. Assert.Equal(12, eleInvoked);
  1723. }
  1724. [Fact]
  1725. public void GroupByUntil_Capacity_Outer_Error()
  1726. {
  1727. var scheduler = new TestScheduler();
  1728. var keyInvoked = 0;
  1729. var eleInvoked = 0;
  1730. var ex = new Exception();
  1731. var xs = scheduler.CreateHotObservable(
  1732. OnNext(90, "error"),
  1733. OnNext(110, "error"),
  1734. OnNext(130, "error"),
  1735. OnNext(220, " foo"),
  1736. OnNext(240, " FoO "),
  1737. OnNext(270, "baR "),
  1738. OnNext(310, "foO "),
  1739. OnNext(350, " Baz "),
  1740. OnNext(360, " qux "),
  1741. OnNext(390, " bar"),
  1742. OnNext(420, " BAR "),
  1743. OnNext(470, "FOO "),
  1744. OnNext(480, "baz "),
  1745. OnNext(510, " bAZ "),
  1746. OnNext(530, " fOo "),
  1747. OnError<string>(570, ex),
  1748. OnNext(580, "error"),
  1749. OnCompleted<string>(600),
  1750. OnError<string>(650, new Exception())
  1751. );
  1752. var comparer = new GroupByComparer(scheduler);
  1753. var res = scheduler.Start(() =>
  1754. xs.GroupByUntil(
  1755. x =>
  1756. {
  1757. keyInvoked++;
  1758. return x.Trim();
  1759. },
  1760. x =>
  1761. {
  1762. eleInvoked++;
  1763. return Reverse(x);
  1764. },
  1765. g => g.Skip(2),
  1766. _groupByUntilCapacity,
  1767. comparer
  1768. ).Select(g => g.Key)
  1769. );
  1770. res.Messages.AssertEqual(
  1771. OnNext(220, "foo"),
  1772. OnNext(270, "baR"),
  1773. OnNext(350, "Baz"),
  1774. OnNext(360, "qux"),
  1775. OnNext(470, "FOO"),
  1776. OnError<string>(570, ex)
  1777. );
  1778. xs.Subscriptions.AssertEqual(
  1779. Subscribe(200, 570)
  1780. );
  1781. Assert.Equal(12, keyInvoked);
  1782. Assert.Equal(12, eleInvoked);
  1783. }
  1784. [Fact]
  1785. public void GroupByUntil_Capacity_Outer_Dispose()
  1786. {
  1787. var scheduler = new TestScheduler();
  1788. var keyInvoked = 0;
  1789. var eleInvoked = 0;
  1790. var xs = scheduler.CreateHotObservable(
  1791. OnNext(90, "error"),
  1792. OnNext(110, "error"),
  1793. OnNext(130, "error"),
  1794. OnNext(220, " foo"),
  1795. OnNext(240, " FoO "),
  1796. OnNext(270, "baR "),
  1797. OnNext(310, "foO "),
  1798. OnNext(350, " Baz "),
  1799. OnNext(360, " qux "),
  1800. OnNext(390, " bar"),
  1801. OnNext(420, " BAR "),
  1802. OnNext(470, "FOO "),
  1803. OnNext(480, "baz "),
  1804. OnNext(510, " bAZ "),
  1805. OnNext(530, " fOo "),
  1806. OnCompleted<string>(570),
  1807. OnNext(580, "error"),
  1808. OnCompleted<string>(600),
  1809. OnError<string>(650, new Exception())
  1810. );
  1811. var comparer = new GroupByComparer(scheduler);
  1812. var res = scheduler.Start(() =>
  1813. xs.GroupByUntil(
  1814. x =>
  1815. {
  1816. keyInvoked++;
  1817. return x.Trim();
  1818. },
  1819. x =>
  1820. {
  1821. eleInvoked++;
  1822. return Reverse(x);
  1823. },
  1824. g => g.Skip(2),
  1825. _groupByUntilCapacity,
  1826. comparer
  1827. ).Select(g => g.Key),
  1828. 355
  1829. );
  1830. res.Messages.AssertEqual(
  1831. OnNext(220, "foo"),
  1832. OnNext(270, "baR"),
  1833. OnNext(350, "Baz")
  1834. );
  1835. xs.Subscriptions.AssertEqual(
  1836. Subscribe(200, 355)
  1837. );
  1838. Assert.Equal(5, keyInvoked);
  1839. Assert.Equal(5, eleInvoked);
  1840. }
  1841. [Fact]
  1842. public void GroupByUntil_Capacity_Outer_KeyThrow()
  1843. {
  1844. var scheduler = new TestScheduler();
  1845. var keyInvoked = 0;
  1846. var eleInvoked = 0;
  1847. var ex = new Exception();
  1848. var xs = scheduler.CreateHotObservable(
  1849. OnNext(90, "error"),
  1850. OnNext(110, "error"),
  1851. OnNext(130, "error"),
  1852. OnNext(220, " foo"),
  1853. OnNext(240, " FoO "),
  1854. OnNext(270, "baR "),
  1855. OnNext(310, "foO "),
  1856. OnNext(350, " Baz "),
  1857. OnNext(360, " qux "),
  1858. OnNext(390, " bar"),
  1859. OnNext(420, " BAR "),
  1860. OnNext(470, "FOO "),
  1861. OnNext(480, "baz "),
  1862. OnNext(510, " bAZ "),
  1863. OnNext(530, " fOo "),
  1864. OnCompleted<string>(570),
  1865. OnNext(580, "error"),
  1866. OnCompleted<string>(600),
  1867. OnError<string>(650, new Exception())
  1868. );
  1869. var comparer = new GroupByComparer(scheduler);
  1870. var res = scheduler.Start(() =>
  1871. xs.GroupByUntil(
  1872. x =>
  1873. {
  1874. keyInvoked++;
  1875. if (keyInvoked == 10)
  1876. throw ex;
  1877. return x.Trim();
  1878. },
  1879. x =>
  1880. {
  1881. eleInvoked++;
  1882. return Reverse(x);
  1883. },
  1884. g => g.Skip(2),
  1885. _groupByUntilCapacity,
  1886. comparer
  1887. ).Select(g => g.Key)
  1888. );
  1889. res.Messages.AssertEqual(
  1890. OnNext(220, "foo"),
  1891. OnNext(270, "baR"),
  1892. OnNext(350, "Baz"),
  1893. OnNext(360, "qux"),
  1894. OnNext(470, "FOO"),
  1895. OnError<string>(480, ex)
  1896. );
  1897. xs.Subscriptions.AssertEqual(
  1898. Subscribe(200, 480)
  1899. );
  1900. Assert.Equal(10, keyInvoked);
  1901. Assert.Equal(9, eleInvoked);
  1902. }
  1903. [Fact]
  1904. public void GroupByUntil_Capacity_Outer_EleThrow()
  1905. {
  1906. var scheduler = new TestScheduler();
  1907. var keyInvoked = 0;
  1908. var eleInvoked = 0;
  1909. var ex = new Exception();
  1910. var xs = scheduler.CreateHotObservable(
  1911. OnNext(90, "error"),
  1912. OnNext(110, "error"),
  1913. OnNext(130, "error"),
  1914. OnNext(220, " foo"),
  1915. OnNext(240, " FoO "),
  1916. OnNext(270, "baR "),
  1917. OnNext(310, "foO "),
  1918. OnNext(350, " Baz "),
  1919. OnNext(360, " qux "),
  1920. OnNext(390, " bar"),
  1921. OnNext(420, " BAR "),
  1922. OnNext(470, "FOO "),
  1923. OnNext(480, "baz "),
  1924. OnNext(510, " bAZ "),
  1925. OnNext(530, " fOo "),
  1926. OnCompleted<string>(570),
  1927. OnNext(580, "error"),
  1928. OnCompleted<string>(600),
  1929. OnError<string>(650, new Exception())
  1930. );
  1931. var comparer = new GroupByComparer(scheduler);
  1932. var res = scheduler.Start(() =>
  1933. xs.GroupByUntil(
  1934. x =>
  1935. {
  1936. keyInvoked++;
  1937. return x.Trim();
  1938. },
  1939. x =>
  1940. {
  1941. eleInvoked++;
  1942. if (eleInvoked == 10)
  1943. throw ex;
  1944. return Reverse(x);
  1945. },
  1946. g => g.Skip(2),
  1947. _groupByUntilCapacity,
  1948. comparer
  1949. ).Select(g => g.Key)
  1950. );
  1951. res.Messages.AssertEqual(
  1952. OnNext(220, "foo"),
  1953. OnNext(270, "baR"),
  1954. OnNext(350, "Baz"),
  1955. OnNext(360, "qux"),
  1956. OnNext(470, "FOO"),
  1957. OnError<string>(480, ex)
  1958. );
  1959. xs.Subscriptions.AssertEqual(
  1960. Subscribe(200, 480)
  1961. );
  1962. Assert.Equal(10, keyInvoked);
  1963. Assert.Equal(10, eleInvoked);
  1964. }
  1965. [Fact]
  1966. public void GroupByUntil_Capacity_Outer_ComparerEqualsThrow()
  1967. {
  1968. var scheduler = new TestScheduler();
  1969. var keyInvoked = 0;
  1970. var eleInvoked = 0;
  1971. var xs = scheduler.CreateHotObservable(
  1972. OnNext(90, "error"),
  1973. OnNext(110, "error"),
  1974. OnNext(130, "error"),
  1975. OnNext(220, " foo"),
  1976. OnNext(240, " FoO "),
  1977. OnNext(270, "baR "),
  1978. OnNext(310, "foO "),
  1979. OnNext(350, " Baz "),
  1980. OnNext(360, " qux "),
  1981. OnNext(390, " bar"),
  1982. OnNext(420, " BAR "),
  1983. OnNext(470, "FOO "),
  1984. OnNext(480, "baz "),
  1985. OnNext(510, " bAZ "),
  1986. OnNext(530, " fOo "),
  1987. OnCompleted<string>(570),
  1988. OnNext(580, "error"),
  1989. OnCompleted<string>(600),
  1990. OnError<string>(650, new Exception())
  1991. );
  1992. var comparer = new GroupByComparer(scheduler, 250, ushort.MaxValue);
  1993. var res = scheduler.Start(() =>
  1994. xs.GroupByUntil(
  1995. x =>
  1996. {
  1997. keyInvoked++;
  1998. return x.Trim();
  1999. },
  2000. x =>
  2001. {
  2002. eleInvoked++;
  2003. return Reverse(x);
  2004. },
  2005. g => g.Skip(2),
  2006. _groupByUntilCapacity,
  2007. comparer
  2008. ).Select(g => g.Key)
  2009. );
  2010. res.Messages.AssertEqual(
  2011. OnNext(220, "foo"),
  2012. OnNext(270, "baR"),
  2013. OnError<string>(310, comparer.EqualsException)
  2014. );
  2015. xs.Subscriptions.AssertEqual(
  2016. Subscribe(200, 310)
  2017. );
  2018. Assert.Equal(4, keyInvoked);
  2019. Assert.Equal(3, eleInvoked);
  2020. }
  2021. [Fact]
  2022. public void GroupByUntil_Capacity_Outer_ComparerGetHashCodeThrow()
  2023. {
  2024. var scheduler = new TestScheduler();
  2025. var keyInvoked = 0;
  2026. var eleInvoked = 0;
  2027. var xs = scheduler.CreateHotObservable(
  2028. OnNext(90, "error"),
  2029. OnNext(110, "error"),
  2030. OnNext(130, "error"),
  2031. OnNext(220, " foo"),
  2032. OnNext(240, " FoO "),
  2033. OnNext(270, "baR "),
  2034. OnNext(310, "foO "),
  2035. OnNext(350, " Baz "),
  2036. OnNext(360, " qux "),
  2037. OnNext(390, " bar"),
  2038. OnNext(420, " BAR "),
  2039. OnNext(470, "FOO "),
  2040. OnNext(480, "baz "),
  2041. OnNext(510, " bAZ "),
  2042. OnNext(530, " fOo "),
  2043. OnCompleted<string>(570),
  2044. OnNext(580, "error"),
  2045. OnCompleted<string>(600),
  2046. OnError<string>(650, new Exception())
  2047. );
  2048. var comparer = new GroupByComparer(scheduler, ushort.MaxValue, 410);
  2049. var res = scheduler.Start(() =>
  2050. xs.GroupByUntil(
  2051. x =>
  2052. {
  2053. keyInvoked++;
  2054. return x.Trim();
  2055. },
  2056. x =>
  2057. {
  2058. eleInvoked++;
  2059. return Reverse(x);
  2060. },
  2061. g => g.Skip(2),
  2062. _groupByUntilCapacity,
  2063. comparer
  2064. ).Select(g => g.Key)
  2065. );
  2066. res.Messages.AssertEqual(
  2067. OnNext(220, "foo"),
  2068. OnNext(270, "baR"),
  2069. OnNext(350, "Baz"),
  2070. OnNext(360, "qux"),
  2071. OnError<string>(420, comparer.HashCodeException)
  2072. );
  2073. xs.Subscriptions.AssertEqual(
  2074. Subscribe(200, 420)
  2075. );
  2076. Assert.Equal(8, keyInvoked);
  2077. Assert.Equal(7, eleInvoked);
  2078. }
  2079. [Fact]
  2080. public void GroupByUntil_Capacity_Inner_Complete()
  2081. {
  2082. var scheduler = new TestScheduler();
  2083. var xs = scheduler.CreateHotObservable(
  2084. OnNext(90, "error"),
  2085. OnNext(110, "error"),
  2086. OnNext(130, "error"),
  2087. OnNext(220, " foo"),
  2088. OnNext(240, " FoO "),
  2089. OnNext(270, "baR "),
  2090. OnNext(310, "foO "),
  2091. OnNext(350, " Baz "),
  2092. OnNext(360, " qux "),
  2093. OnNext(390, " bar"),
  2094. OnNext(420, " BAR "),
  2095. OnNext(470, "FOO "),
  2096. OnNext(480, "baz "),
  2097. OnNext(510, " bAZ "),
  2098. OnNext(530, " fOo "),
  2099. OnCompleted<string>(570),
  2100. OnNext(580, "error"),
  2101. OnCompleted<string>(600),
  2102. OnError<string>(650, new Exception())
  2103. );
  2104. var comparer = new GroupByComparer(scheduler);
  2105. var outer = default(IObservable<IGroupedObservable<string, string>>);
  2106. var outerSubscription = default(IDisposable);
  2107. var inners = new Dictionary<string, IObservable<string>>();
  2108. var innerSubscriptions = new Dictionary<string, IDisposable>();
  2109. var res = new Dictionary<string, ITestableObserver<string>>();
  2110. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x => Reverse(x), g => g.Skip(2), _groupByUntilCapacity, comparer));
  2111. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  2112. {
  2113. var result = scheduler.CreateObserver<string>();
  2114. inners[group.Key] = group;
  2115. res[group.Key] = result;
  2116. scheduler.ScheduleRelative(100, () => innerSubscriptions[group.Key] = group.Subscribe(result));
  2117. }));
  2118. scheduler.ScheduleAbsolute(Disposed, () =>
  2119. {
  2120. outerSubscription.Dispose();
  2121. foreach (var d in innerSubscriptions.Values)
  2122. d.Dispose();
  2123. });
  2124. scheduler.Start();
  2125. Assert.Equal(5, inners.Count);
  2126. res["foo"].Messages.AssertEqual(
  2127. OnCompleted<string>(320)
  2128. );
  2129. res["baR"].Messages.AssertEqual(
  2130. OnNext(390, "rab "),
  2131. OnNext(420, " RAB "), // Breaking change > v2.2 - prior to resolving a deadlock, the group would get closed prior to letting this message through
  2132. OnCompleted<string>(420)
  2133. );
  2134. res["Baz"].Messages.AssertEqual(
  2135. OnNext(480, " zab"),
  2136. OnNext(510, " ZAb "), // Breaking change > v2.2 - prior to resolving a deadlock, the group would get closed prior to letting this message through
  2137. OnCompleted<string>(510)
  2138. );
  2139. res["qux"].Messages.AssertEqual(
  2140. OnCompleted<string>(570)
  2141. );
  2142. res["FOO"].Messages.AssertEqual(
  2143. OnCompleted<string>(570)
  2144. );
  2145. xs.Subscriptions.AssertEqual(
  2146. Subscribe(200, 570)
  2147. );
  2148. }
  2149. [Fact]
  2150. public void GroupByUntil_Capacity_Inner_Complete_All()
  2151. {
  2152. var scheduler = new TestScheduler();
  2153. var xs = scheduler.CreateHotObservable(
  2154. OnNext(90, "error"),
  2155. OnNext(110, "error"),
  2156. OnNext(130, "error"),
  2157. OnNext(220, " foo"),
  2158. OnNext(240, " FoO "),
  2159. OnNext(270, "baR "),
  2160. OnNext(310, "foO "),
  2161. OnNext(350, " Baz "),
  2162. OnNext(360, " qux "),
  2163. OnNext(390, " bar"),
  2164. OnNext(420, " BAR "),
  2165. OnNext(470, "FOO "),
  2166. OnNext(480, "baz "),
  2167. OnNext(510, " bAZ "),
  2168. OnNext(530, " fOo "),
  2169. OnCompleted<string>(570),
  2170. OnNext(580, "error"),
  2171. OnCompleted<string>(600),
  2172. OnError<string>(650, new Exception())
  2173. );
  2174. var comparer = new GroupByComparer(scheduler);
  2175. var outer = default(IObservable<IGroupedObservable<string, string>>);
  2176. var outerSubscription = default(IDisposable);
  2177. var inners = new Dictionary<string, IObservable<string>>();
  2178. var innerSubscriptions = new Dictionary<string, IDisposable>();
  2179. var res = new Dictionary<string, ITestableObserver<string>>();
  2180. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x => Reverse(x), g => g.Skip(2), _groupByUntilCapacity, comparer));
  2181. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  2182. {
  2183. var result = scheduler.CreateObserver<string>();
  2184. inners[group.Key] = group;
  2185. res[group.Key] = result;
  2186. innerSubscriptions[group.Key] = group.Subscribe(result);
  2187. }));
  2188. scheduler.ScheduleAbsolute(Disposed, () =>
  2189. {
  2190. outerSubscription.Dispose();
  2191. foreach (var d in innerSubscriptions.Values)
  2192. d.Dispose();
  2193. });
  2194. scheduler.Start();
  2195. Assert.Equal(5, inners.Count);
  2196. res["foo"].Messages.AssertEqual(
  2197. OnNext(220, "oof "),
  2198. OnNext(240, " OoF "),
  2199. OnNext(310, " Oof"),
  2200. OnCompleted<string>(310)
  2201. );
  2202. res["baR"].Messages.AssertEqual(
  2203. OnNext(270, " Rab"),
  2204. OnNext(390, "rab "),
  2205. OnNext(420, " RAB "),
  2206. OnCompleted<string>(420)
  2207. );
  2208. res["Baz"].Messages.AssertEqual(
  2209. OnNext(350, " zaB "),
  2210. OnNext(480, " zab"),
  2211. OnNext(510, " ZAb "),
  2212. OnCompleted<string>(510)
  2213. );
  2214. res["qux"].Messages.AssertEqual(
  2215. OnNext(360, " xuq "),
  2216. OnCompleted<string>(570)
  2217. );
  2218. res["FOO"].Messages.AssertEqual(
  2219. OnNext(470, " OOF"),
  2220. OnNext(530, " oOf "),
  2221. OnCompleted<string>(570)
  2222. );
  2223. xs.Subscriptions.AssertEqual(
  2224. Subscribe(200, 570)
  2225. );
  2226. }
  2227. [Fact]
  2228. public void GroupByUntil_Capacity_Inner_Error()
  2229. {
  2230. var scheduler = new TestScheduler();
  2231. var ex1 = new Exception();
  2232. var xs = scheduler.CreateHotObservable(
  2233. OnNext(90, "error"),
  2234. OnNext(110, "error"),
  2235. OnNext(130, "error"),
  2236. OnNext(220, " foo"),
  2237. OnNext(240, " FoO "),
  2238. OnNext(270, "baR "),
  2239. OnNext(310, "foO "),
  2240. OnNext(350, " Baz "),
  2241. OnNext(360, " qux "),
  2242. OnNext(390, " bar"),
  2243. OnNext(420, " BAR "),
  2244. OnNext(470, "FOO "),
  2245. OnNext(480, "baz "),
  2246. OnNext(510, " bAZ "),
  2247. OnNext(530, " fOo "),
  2248. OnError<string>(570, ex1),
  2249. OnNext(580, "error"),
  2250. OnCompleted<string>(600),
  2251. OnError<string>(650, new Exception())
  2252. );
  2253. var comparer = new GroupByComparer(scheduler);
  2254. var outer = default(IObservable<IGroupedObservable<string, string>>);
  2255. var outerSubscription = default(IDisposable);
  2256. var inners = new Dictionary<string, IObservable<string>>();
  2257. var innerSubscriptions = new Dictionary<string, IDisposable>();
  2258. var res = new Dictionary<string, ITestableObserver<string>>();
  2259. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x => Reverse(x), g => g.Skip(2), _groupByUntilCapacity, comparer));
  2260. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  2261. {
  2262. var result = scheduler.CreateObserver<string>();
  2263. inners[group.Key] = group;
  2264. res[group.Key] = result;
  2265. scheduler.ScheduleRelative(100, () => innerSubscriptions[group.Key] = group.Subscribe(result));
  2266. }, ex => { }));
  2267. scheduler.ScheduleAbsolute(Disposed, () =>
  2268. {
  2269. outerSubscription.Dispose();
  2270. foreach (var d in innerSubscriptions.Values)
  2271. d.Dispose();
  2272. });
  2273. scheduler.Start();
  2274. Assert.Equal(5, inners.Count);
  2275. res["foo"].Messages.AssertEqual(
  2276. OnCompleted<string>(320)
  2277. );
  2278. res["baR"].Messages.AssertEqual(
  2279. OnNext(390, "rab "),
  2280. OnNext(420, " RAB "), // Breaking change > v2.2 - prior to resolving a deadlock, the group would get closed prior to letting this message through
  2281. OnCompleted<string>(420)
  2282. );
  2283. res["Baz"].Messages.AssertEqual(
  2284. OnNext(480, " zab"),
  2285. OnNext(510, " ZAb "), // Breaking change > v2.2 - prior to resolving a deadlock, the group would get closed prior to letting this message through
  2286. OnCompleted<string>(510)
  2287. );
  2288. res["qux"].Messages.AssertEqual(
  2289. OnError<string>(570, ex1)
  2290. );
  2291. res["FOO"].Messages.AssertEqual(
  2292. OnError<string>(570, ex1)
  2293. );
  2294. xs.Subscriptions.AssertEqual(
  2295. Subscribe(200, 570)
  2296. );
  2297. }
  2298. [Fact]
  2299. public void GroupByUntil_Capacity_Inner_Dispose()
  2300. {
  2301. var scheduler = new TestScheduler();
  2302. var xs = scheduler.CreateHotObservable(
  2303. OnNext(90, "error"),
  2304. OnNext(110, "error"),
  2305. OnNext(130, "error"),
  2306. OnNext(220, " foo"),
  2307. OnNext(240, " FoO "),
  2308. OnNext(270, "baR "),
  2309. OnNext(310, "foO "),
  2310. OnNext(350, " Baz "),
  2311. OnNext(360, " qux "),
  2312. OnNext(390, " bar"),
  2313. OnNext(420, " BAR "),
  2314. OnNext(470, "FOO "),
  2315. OnNext(480, "baz "),
  2316. OnNext(510, " bAZ "),
  2317. OnNext(530, " fOo "),
  2318. OnCompleted<string>(570),
  2319. OnNext(580, "error"),
  2320. OnCompleted<string>(600),
  2321. OnError<string>(650, new Exception())
  2322. );
  2323. var comparer = new GroupByComparer(scheduler);
  2324. var outer = default(IObservable<IGroupedObservable<string, string>>);
  2325. var outerSubscription = default(IDisposable);
  2326. var inners = new Dictionary<string, IObservable<string>>();
  2327. var innerSubscriptions = new Dictionary<string, IDisposable>();
  2328. var res = new Dictionary<string, ITestableObserver<string>>();
  2329. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x => Reverse(x), g => g.Skip(2), _groupByUntilCapacity, comparer));
  2330. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  2331. {
  2332. var result = scheduler.CreateObserver<string>();
  2333. inners[group.Key] = group;
  2334. res[group.Key] = result;
  2335. innerSubscriptions[group.Key] = group.Subscribe(result);
  2336. }));
  2337. scheduler.ScheduleAbsolute(400, () =>
  2338. {
  2339. outerSubscription.Dispose();
  2340. foreach (var d in innerSubscriptions.Values)
  2341. d.Dispose();
  2342. });
  2343. scheduler.Start();
  2344. Assert.Equal(4, inners.Count);
  2345. res["foo"].Messages.AssertEqual(
  2346. OnNext(220, "oof "),
  2347. OnNext(240, " OoF "),
  2348. OnNext(310, " Oof"),
  2349. OnCompleted<string>(310)
  2350. );
  2351. res["baR"].Messages.AssertEqual(
  2352. OnNext(270, " Rab"),
  2353. OnNext(390, "rab ")
  2354. );
  2355. res["Baz"].Messages.AssertEqual(
  2356. OnNext(350, " zaB ")
  2357. );
  2358. res["qux"].Messages.AssertEqual(
  2359. OnNext(360, " xuq ")
  2360. );
  2361. xs.Subscriptions.AssertEqual(
  2362. Subscribe(200, 400)
  2363. );
  2364. }
  2365. [Fact]
  2366. public void GroupByUntil_Capacity_Inner_KeyThrow()
  2367. {
  2368. var scheduler = new TestScheduler();
  2369. var xs = scheduler.CreateHotObservable(
  2370. OnNext(90, "error"),
  2371. OnNext(110, "error"),
  2372. OnNext(130, "error"),
  2373. OnNext(220, " foo"),
  2374. OnNext(240, " FoO "),
  2375. OnNext(270, "baR "),
  2376. OnNext(310, "foO "),
  2377. OnNext(350, " Baz "),
  2378. OnNext(360, " qux "),
  2379. OnNext(390, " bar"),
  2380. OnNext(420, " BAR "),
  2381. OnNext(470, "FOO "),
  2382. OnNext(480, "baz "),
  2383. OnNext(510, " bAZ "),
  2384. OnNext(530, " fOo "),
  2385. OnCompleted<string>(570),
  2386. OnNext(580, "error"),
  2387. OnCompleted<string>(600),
  2388. OnError<string>(650, new Exception())
  2389. );
  2390. var comparer = new GroupByComparer(scheduler);
  2391. var outer = default(IObservable<IGroupedObservable<string, string>>);
  2392. var outerSubscription = default(IDisposable);
  2393. var inners = new Dictionary<string, IObservable<string>>();
  2394. var innerSubscriptions = new Dictionary<string, IDisposable>();
  2395. var res = new Dictionary<string, ITestableObserver<string>>();
  2396. var keyInvoked = 0;
  2397. var ex = new Exception();
  2398. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x =>
  2399. {
  2400. keyInvoked++;
  2401. if (keyInvoked == 6)
  2402. throw ex;
  2403. return x.Trim();
  2404. }, x => Reverse(x), g => g.Skip(2), _groupByUntilCapacity, comparer));
  2405. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  2406. {
  2407. var result = scheduler.CreateObserver<string>();
  2408. inners[group.Key] = group;
  2409. res[group.Key] = result;
  2410. innerSubscriptions[group.Key] = group.Subscribe(result);
  2411. }, _ => { }));
  2412. scheduler.ScheduleAbsolute(Disposed, () =>
  2413. {
  2414. outerSubscription.Dispose();
  2415. foreach (var d in innerSubscriptions.Values)
  2416. d.Dispose();
  2417. });
  2418. scheduler.Start();
  2419. Assert.Equal(3, inners.Count);
  2420. res["foo"].Messages.AssertEqual(
  2421. OnNext(220, "oof "),
  2422. OnNext(240, " OoF "),
  2423. OnNext(310, " Oof"),
  2424. OnCompleted<string>(310)
  2425. );
  2426. res["baR"].Messages.AssertEqual(
  2427. OnNext(270, " Rab"),
  2428. OnError<string>(360, ex)
  2429. );
  2430. res["Baz"].Messages.AssertEqual(
  2431. OnNext(350, " zaB "),
  2432. OnError<string>(360, ex)
  2433. );
  2434. xs.Subscriptions.AssertEqual(
  2435. Subscribe(200, 360)
  2436. );
  2437. }
  2438. [Fact]
  2439. public void GroupByUntil_Capacity_Inner_EleThrow()
  2440. {
  2441. var scheduler = new TestScheduler();
  2442. var xs = scheduler.CreateHotObservable(
  2443. OnNext(90, "error"),
  2444. OnNext(110, "error"),
  2445. OnNext(130, "error"),
  2446. OnNext(220, " foo"),
  2447. OnNext(240, " FoO "),
  2448. OnNext(270, "baR "),
  2449. OnNext(310, "foO "),
  2450. OnNext(350, " Baz "),
  2451. OnNext(360, " qux "),
  2452. OnNext(390, " bar"),
  2453. OnNext(420, " BAR "),
  2454. OnNext(470, "FOO "),
  2455. OnNext(480, "baz "),
  2456. OnNext(510, " bAZ "),
  2457. OnNext(530, " fOo "),
  2458. OnCompleted<string>(570),
  2459. OnNext(580, "error"),
  2460. OnCompleted<string>(600),
  2461. OnError<string>(650, new Exception())
  2462. );
  2463. var comparer = new GroupByComparer(scheduler);
  2464. var outer = default(IObservable<IGroupedObservable<string, string>>);
  2465. var outerSubscription = default(IDisposable);
  2466. var inners = new Dictionary<string, IObservable<string>>();
  2467. var innerSubscriptions = new Dictionary<string, IDisposable>();
  2468. var res = new Dictionary<string, ITestableObserver<string>>();
  2469. var eleInvoked = 0;
  2470. var ex = new Exception();
  2471. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x =>
  2472. {
  2473. eleInvoked++;
  2474. if (eleInvoked == 6)
  2475. throw ex;
  2476. return Reverse(x);
  2477. }, g => g.Skip(2), _groupByUntilCapacity, comparer));
  2478. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  2479. {
  2480. var result = scheduler.CreateObserver<string>();
  2481. inners[group.Key] = group;
  2482. res[group.Key] = result;
  2483. innerSubscriptions[group.Key] = group.Subscribe(result);
  2484. }, _ => { }));
  2485. scheduler.ScheduleAbsolute(Disposed, () =>
  2486. {
  2487. outerSubscription.Dispose();
  2488. foreach (var d in innerSubscriptions.Values)
  2489. d.Dispose();
  2490. });
  2491. scheduler.Start();
  2492. Assert.Equal(4, inners.Count);
  2493. res["foo"].Messages.AssertEqual(
  2494. OnNext(220, "oof "),
  2495. OnNext(240, " OoF "),
  2496. OnNext(310, " Oof"),
  2497. OnCompleted<string>(310)
  2498. );
  2499. res["baR"].Messages.AssertEqual(
  2500. OnNext(270, " Rab"),
  2501. OnError<string>(360, ex)
  2502. );
  2503. res["Baz"].Messages.AssertEqual(
  2504. OnNext(350, " zaB "),
  2505. OnError<string>(360, ex)
  2506. );
  2507. res["qux"].Messages.AssertEqual(
  2508. OnError<string>(360, ex)
  2509. );
  2510. xs.Subscriptions.AssertEqual(
  2511. Subscribe(200, 360)
  2512. );
  2513. }
  2514. [Fact]
  2515. public void GroupByUntil_Capacity_Inner_Comparer_EqualsThrow()
  2516. {
  2517. var scheduler = new TestScheduler();
  2518. var xs = scheduler.CreateHotObservable(
  2519. OnNext(90, "error"),
  2520. OnNext(110, "error"),
  2521. OnNext(130, "error"),
  2522. OnNext(220, " foo"),
  2523. OnNext(240, " FoO "),
  2524. OnNext(270, "baR "),
  2525. OnNext(310, "foO "),
  2526. OnNext(350, " Baz "),
  2527. OnNext(360, " qux "),
  2528. OnNext(390, " bar"),
  2529. OnNext(420, " BAR "),
  2530. OnNext(470, "FOO "),
  2531. OnNext(480, "baz "),
  2532. OnNext(510, " bAZ "),
  2533. OnNext(530, " fOo "),
  2534. OnCompleted<string>(570),
  2535. OnNext(580, "error"),
  2536. OnCompleted<string>(600),
  2537. OnError<string>(650, new Exception())
  2538. );
  2539. var comparer = new GroupByComparer(scheduler, 400, ushort.MaxValue);
  2540. var outer = default(IObservable<IGroupedObservable<string, string>>);
  2541. var outerSubscription = default(IDisposable);
  2542. var inners = new Dictionary<string, IObservable<string>>();
  2543. var innerSubscriptions = new Dictionary<string, IDisposable>();
  2544. var res = new Dictionary<string, ITestableObserver<string>>();
  2545. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x => Reverse(x), g => g.Skip(2), _groupByUntilCapacity, comparer));
  2546. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  2547. {
  2548. var result = scheduler.CreateObserver<string>();
  2549. inners[group.Key] = group;
  2550. res[group.Key] = result;
  2551. innerSubscriptions[group.Key] = group.Subscribe(result);
  2552. }, _ => { }));
  2553. scheduler.ScheduleAbsolute(Disposed, () =>
  2554. {
  2555. outerSubscription.Dispose();
  2556. foreach (var d in innerSubscriptions.Values)
  2557. d.Dispose();
  2558. });
  2559. scheduler.Start();
  2560. Assert.Equal(4, inners.Count);
  2561. res["foo"].Messages.AssertEqual(
  2562. OnNext(220, "oof "),
  2563. OnNext(240, " OoF "),
  2564. OnNext(310, " Oof"),
  2565. OnCompleted<string>(310)
  2566. );
  2567. res["baR"].Messages.AssertEqual(
  2568. OnNext(270, " Rab"),
  2569. OnNext(390, "rab "),
  2570. OnError<string>(420, comparer.EqualsException)
  2571. );
  2572. res["Baz"].Messages.AssertEqual(
  2573. OnNext(350, " zaB "),
  2574. OnError<string>(420, comparer.EqualsException)
  2575. );
  2576. res["qux"].Messages.AssertEqual(
  2577. OnNext(360, " xuq "),
  2578. OnError<string>(420, comparer.EqualsException)
  2579. );
  2580. xs.Subscriptions.AssertEqual(
  2581. Subscribe(200, 420)
  2582. );
  2583. }
  2584. [Fact]
  2585. public void GroupByUntil_Capacity_Inner_Comparer_GetHashCodeThrow()
  2586. {
  2587. var scheduler = new TestScheduler();
  2588. var xs = scheduler.CreateHotObservable(
  2589. OnNext(90, "error"),
  2590. OnNext(110, "error"),
  2591. OnNext(130, "error"),
  2592. OnNext(220, " foo"),
  2593. OnNext(240, " FoO "),
  2594. OnNext(270, "baR "),
  2595. OnNext(310, "foO "),
  2596. OnNext(350, " Baz "),
  2597. OnNext(360, " qux "),
  2598. OnNext(390, " bar"),
  2599. OnNext(420, " BAR "),
  2600. OnNext(470, "FOO "),
  2601. OnNext(480, "baz "),
  2602. OnNext(510, " bAZ "),
  2603. OnNext(530, " fOo "),
  2604. OnCompleted<string>(570),
  2605. OnNext(580, "error"),
  2606. OnCompleted<string>(600),
  2607. OnError<string>(650, new Exception())
  2608. );
  2609. var comparer = new GroupByComparer(scheduler, ushort.MaxValue, 400);
  2610. var outer = default(IObservable<IGroupedObservable<string, string>>);
  2611. var outerSubscription = default(IDisposable);
  2612. var inners = new Dictionary<string, IObservable<string>>();
  2613. var innerSubscriptions = new Dictionary<string, IDisposable>();
  2614. var res = new Dictionary<string, ITestableObserver<string>>();
  2615. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x => Reverse(x), g => g.Skip(2), _groupByUntilCapacity, comparer));
  2616. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  2617. {
  2618. var result = scheduler.CreateObserver<string>();
  2619. inners[group.Key] = group;
  2620. res[group.Key] = result;
  2621. innerSubscriptions[group.Key] = group.Subscribe(result);
  2622. }, _ => { }));
  2623. scheduler.ScheduleAbsolute(Disposed, () =>
  2624. {
  2625. outerSubscription.Dispose();
  2626. foreach (var d in innerSubscriptions.Values)
  2627. d.Dispose();
  2628. });
  2629. scheduler.Start();
  2630. Assert.Equal(4, inners.Count);
  2631. res["foo"].Messages.AssertEqual(
  2632. OnNext(220, "oof "),
  2633. OnNext(240, " OoF "),
  2634. OnNext(310, " Oof"),
  2635. OnCompleted<string>(310)
  2636. );
  2637. res["baR"].Messages.AssertEqual(
  2638. OnNext(270, " Rab"),
  2639. OnNext(390, "rab "),
  2640. OnError<string>(420, comparer.HashCodeException)
  2641. );
  2642. res["Baz"].Messages.AssertEqual(
  2643. OnNext(350, " zaB "),
  2644. OnError<string>(420, comparer.HashCodeException)
  2645. );
  2646. res["qux"].Messages.AssertEqual(
  2647. OnNext(360, " xuq "),
  2648. OnError<string>(420, comparer.HashCodeException)
  2649. );
  2650. xs.Subscriptions.AssertEqual(
  2651. Subscribe(200, 420)
  2652. );
  2653. }
  2654. [Fact]
  2655. public void GroupByUntil_Capacity_Outer_Independence()
  2656. {
  2657. var scheduler = new TestScheduler();
  2658. var xs = scheduler.CreateHotObservable(
  2659. OnNext(90, "error"),
  2660. OnNext(110, "error"),
  2661. OnNext(130, "error"),
  2662. OnNext(220, " foo"),
  2663. OnNext(240, " FoO "),
  2664. OnNext(270, "baR "),
  2665. OnNext(310, "foO "),
  2666. OnNext(350, " Baz "),
  2667. OnNext(360, " qux "),
  2668. OnNext(390, " bar"),
  2669. OnNext(420, " BAR "),
  2670. OnNext(470, "FOO "),
  2671. OnNext(480, "baz "),
  2672. OnNext(510, " bAZ "),
  2673. OnNext(530, " fOo "),
  2674. OnCompleted<string>(570),
  2675. OnNext(580, "error"),
  2676. OnCompleted<string>(600),
  2677. OnError<string>(650, new Exception())
  2678. );
  2679. var comparer = new GroupByComparer(scheduler);
  2680. var outer = default(IObservable<IGroupedObservable<string, string>>);
  2681. var outerSubscription = default(IDisposable);
  2682. var inners = new Dictionary<string, IObservable<string>>();
  2683. var innerSubscriptions = new Dictionary<string, IDisposable>();
  2684. var res = new Dictionary<string, ITestableObserver<string>>();
  2685. var outerResults = scheduler.CreateObserver<string>();
  2686. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x => Reverse(x), g => g.Skip(2), _groupByUntilCapacity, comparer));
  2687. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  2688. {
  2689. outerResults.OnNext(group.Key);
  2690. var result = scheduler.CreateObserver<string>();
  2691. inners[group.Key] = group;
  2692. res[group.Key] = result;
  2693. innerSubscriptions[group.Key] = group.Subscribe(result);
  2694. }, outerResults.OnError, outerResults.OnCompleted));
  2695. scheduler.ScheduleAbsolute(Disposed, () =>
  2696. {
  2697. outerSubscription.Dispose();
  2698. foreach (var d in innerSubscriptions.Values)
  2699. d.Dispose();
  2700. });
  2701. scheduler.ScheduleAbsolute(320, () => outerSubscription.Dispose());
  2702. scheduler.Start();
  2703. Assert.Equal(2, inners.Count);
  2704. outerResults.Messages.AssertEqual(
  2705. OnNext(220, "foo"),
  2706. OnNext(270, "baR")
  2707. );
  2708. res["foo"].Messages.AssertEqual(
  2709. OnNext(220, "oof "),
  2710. OnNext(240, " OoF "),
  2711. OnNext(310, " Oof"),
  2712. OnCompleted<string>(310)
  2713. );
  2714. res["baR"].Messages.AssertEqual(
  2715. OnNext(270, " Rab"),
  2716. OnNext(390, "rab "),
  2717. OnNext(420, " RAB "),
  2718. OnCompleted<string>(420)
  2719. );
  2720. xs.Subscriptions.AssertEqual(
  2721. Subscribe(200, 420)
  2722. );
  2723. }
  2724. [Fact]
  2725. public void GroupByUntil_Capacity_Inner_Independence()
  2726. {
  2727. var scheduler = new TestScheduler();
  2728. var xs = scheduler.CreateHotObservable(
  2729. OnNext(90, "error"),
  2730. OnNext(110, "error"),
  2731. OnNext(130, "error"),
  2732. OnNext(220, " foo"),
  2733. OnNext(240, " FoO "),
  2734. OnNext(270, "baR "),
  2735. OnNext(310, "foO "),
  2736. OnNext(350, " Baz "),
  2737. OnNext(360, " qux "),
  2738. OnNext(390, " bar"),
  2739. OnNext(420, " BAR "),
  2740. OnNext(470, "FOO "),
  2741. OnNext(480, "baz "),
  2742. OnNext(510, " bAZ "),
  2743. OnNext(530, " fOo "),
  2744. OnCompleted<string>(570),
  2745. OnNext(580, "error"),
  2746. OnCompleted<string>(600),
  2747. OnError<string>(650, new Exception())
  2748. );
  2749. var comparer = new GroupByComparer(scheduler);
  2750. var outer = default(IObservable<IGroupedObservable<string, string>>);
  2751. var outerSubscription = default(IDisposable);
  2752. var inners = new Dictionary<string, IObservable<string>>();
  2753. var innerSubscriptions = new Dictionary<string, IDisposable>();
  2754. var res = new Dictionary<string, ITestableObserver<string>>();
  2755. var outerResults = scheduler.CreateObserver<string>();
  2756. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x => Reverse(x), g => g.Skip(2), _groupByUntilCapacity, comparer));
  2757. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  2758. {
  2759. outerResults.OnNext(group.Key);
  2760. var result = scheduler.CreateObserver<string>();
  2761. inners[group.Key] = group;
  2762. res[group.Key] = result;
  2763. innerSubscriptions[group.Key] = group.Subscribe(result);
  2764. }, outerResults.OnError, outerResults.OnCompleted));
  2765. scheduler.ScheduleAbsolute(Disposed, () =>
  2766. {
  2767. outerSubscription.Dispose();
  2768. foreach (var d in innerSubscriptions.Values)
  2769. d.Dispose();
  2770. });
  2771. scheduler.ScheduleAbsolute(320, () => innerSubscriptions["foo"].Dispose());
  2772. scheduler.Start();
  2773. Assert.Equal(5, inners.Count);
  2774. res["foo"].Messages.AssertEqual(
  2775. OnNext(220, "oof "),
  2776. OnNext(240, " OoF "),
  2777. OnNext(310, " Oof"),
  2778. OnCompleted<string>(310)
  2779. );
  2780. res["baR"].Messages.AssertEqual(
  2781. OnNext(270, " Rab"),
  2782. OnNext(390, "rab "),
  2783. OnNext(420, " RAB "),
  2784. OnCompleted<string>(420)
  2785. );
  2786. res["Baz"].Messages.AssertEqual(
  2787. OnNext(350, " zaB "),
  2788. OnNext(480, " zab"),
  2789. OnNext(510, " ZAb "),
  2790. OnCompleted<string>(510)
  2791. );
  2792. res["qux"].Messages.AssertEqual(
  2793. OnNext(360, " xuq "),
  2794. OnCompleted<string>(570)
  2795. );
  2796. res["FOO"].Messages.AssertEqual(
  2797. OnNext(470, " OOF"),
  2798. OnNext(530, " oOf "),
  2799. OnCompleted<string>(570)
  2800. );
  2801. xs.Subscriptions.AssertEqual(
  2802. Subscribe(200, 570)
  2803. );
  2804. }
  2805. [Fact]
  2806. public void GroupByUntil_Capacity_Inner_Multiple_Independence()
  2807. {
  2808. var scheduler = new TestScheduler();
  2809. var xs = scheduler.CreateHotObservable(
  2810. OnNext(90, "error"),
  2811. OnNext(110, "error"),
  2812. OnNext(130, "error"),
  2813. OnNext(220, " foo"),
  2814. OnNext(240, " FoO "),
  2815. OnNext(270, "baR "),
  2816. OnNext(310, "foO "),
  2817. OnNext(350, " Baz "),
  2818. OnNext(360, " qux "),
  2819. OnNext(390, " bar"),
  2820. OnNext(420, " BAR "),
  2821. OnNext(470, "FOO "),
  2822. OnNext(480, "baz "),
  2823. OnNext(510, " bAZ "),
  2824. OnNext(530, " fOo "),
  2825. OnCompleted<string>(570),
  2826. OnNext(580, "error"),
  2827. OnCompleted<string>(600),
  2828. OnError<string>(650, new Exception())
  2829. );
  2830. var comparer = new GroupByComparer(scheduler);
  2831. var outer = default(IObservable<IGroupedObservable<string, string>>);
  2832. var outerSubscription = default(IDisposable);
  2833. var inners = new Dictionary<string, IObservable<string>>();
  2834. var innerSubscriptions = new Dictionary<string, IDisposable>();
  2835. var res = new Dictionary<string, ITestableObserver<string>>();
  2836. var outerResults = scheduler.CreateObserver<string>();
  2837. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), x => Reverse(x), g => g.Skip(2), _groupByUntilCapacity, comparer));
  2838. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  2839. {
  2840. outerResults.OnNext(group.Key);
  2841. var result = scheduler.CreateObserver<string>();
  2842. inners[group.Key] = group;
  2843. res[group.Key] = result;
  2844. innerSubscriptions[group.Key] = group.Subscribe(result);
  2845. }, outerResults.OnError, outerResults.OnCompleted));
  2846. scheduler.ScheduleAbsolute(Disposed, () =>
  2847. {
  2848. outerSubscription.Dispose();
  2849. foreach (var d in innerSubscriptions.Values)
  2850. d.Dispose();
  2851. });
  2852. scheduler.ScheduleAbsolute(320, () => innerSubscriptions["foo"].Dispose());
  2853. scheduler.ScheduleAbsolute(280, () => innerSubscriptions["baR"].Dispose());
  2854. scheduler.ScheduleAbsolute(355, () => innerSubscriptions["Baz"].Dispose());
  2855. scheduler.ScheduleAbsolute(400, () => innerSubscriptions["qux"].Dispose());
  2856. scheduler.Start();
  2857. Assert.Equal(5, inners.Count);
  2858. res["foo"].Messages.AssertEqual(
  2859. OnNext(220, "oof "),
  2860. OnNext(240, " OoF "),
  2861. OnNext(310, " Oof"),
  2862. OnCompleted<string>(310)
  2863. );
  2864. res["baR"].Messages.AssertEqual(
  2865. OnNext(270, " Rab")
  2866. );
  2867. res["Baz"].Messages.AssertEqual(
  2868. OnNext(350, " zaB ")
  2869. );
  2870. res["qux"].Messages.AssertEqual(
  2871. OnNext(360, " xuq ")
  2872. );
  2873. res["FOO"].Messages.AssertEqual(
  2874. OnNext(470, " OOF"),
  2875. OnNext(530, " oOf "),
  2876. OnCompleted<string>(570)
  2877. );
  2878. xs.Subscriptions.AssertEqual(
  2879. Subscribe(200, 570)
  2880. );
  2881. }
  2882. [Fact]
  2883. public void GroupByUntil_Capacity_Inner_Escape_Complete()
  2884. {
  2885. var scheduler = new TestScheduler();
  2886. var xs = scheduler.CreateHotObservable(
  2887. OnNext(220, " foo"),
  2888. OnNext(240, " FoO "),
  2889. OnNext(310, "foO "),
  2890. OnNext(470, "FOO "),
  2891. OnNext(530, " fOo "),
  2892. OnCompleted<string>(570)
  2893. );
  2894. var outer = default(IObservable<IGroupedObservable<string, string>>);
  2895. var outerSubscription = default(IDisposable);
  2896. var inner = default(IObservable<string>);
  2897. var innerSubscription = default(IDisposable);
  2898. var res = scheduler.CreateObserver<string>();
  2899. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), g => g.Skip(2), _groupByUntilCapacity));
  2900. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  2901. {
  2902. inner = group;
  2903. }));
  2904. scheduler.ScheduleAbsolute(600, () => innerSubscription = inner.Subscribe(res));
  2905. scheduler.ScheduleAbsolute(Disposed, () =>
  2906. {
  2907. outerSubscription.Dispose();
  2908. innerSubscription.Dispose();
  2909. });
  2910. scheduler.Start();
  2911. xs.Subscriptions.AssertEqual(
  2912. Subscribe(200, 570)
  2913. );
  2914. res.Messages.AssertEqual(
  2915. OnCompleted<string>(600)
  2916. );
  2917. }
  2918. [Fact]
  2919. public void GroupByUntil_Capacity_Inner_Escape_Error()
  2920. {
  2921. var scheduler = new TestScheduler();
  2922. var ex = new Exception();
  2923. var xs = scheduler.CreateHotObservable(
  2924. OnNext(220, " foo"),
  2925. OnNext(240, " FoO "),
  2926. OnNext(310, "foO "),
  2927. OnNext(470, "FOO "),
  2928. OnNext(530, " fOo "),
  2929. OnError<string>(570, ex)
  2930. );
  2931. var outer = default(IObservable<IGroupedObservable<string, string>>);
  2932. var outerSubscription = default(IDisposable);
  2933. var inner = default(IObservable<string>);
  2934. var innerSubscription = default(IDisposable);
  2935. var res = scheduler.CreateObserver<string>();
  2936. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), g => g.Skip(2), _groupByUntilCapacity));
  2937. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  2938. {
  2939. inner = group;
  2940. }, _ => { }));
  2941. scheduler.ScheduleAbsolute(600, () => innerSubscription = inner.Subscribe(res));
  2942. scheduler.ScheduleAbsolute(Disposed, () =>
  2943. {
  2944. outerSubscription.Dispose();
  2945. innerSubscription.Dispose();
  2946. });
  2947. scheduler.Start();
  2948. xs.Subscriptions.AssertEqual(
  2949. Subscribe(200, 570)
  2950. );
  2951. res.Messages.AssertEqual(
  2952. OnError<string>(600, ex)
  2953. );
  2954. }
  2955. [Fact]
  2956. public void GroupByUntil_Capacity_Inner_Escape_Dispose()
  2957. {
  2958. var scheduler = new TestScheduler();
  2959. var xs = scheduler.CreateHotObservable(
  2960. OnNext(220, " foo"),
  2961. OnNext(240, " FoO "),
  2962. OnNext(310, "foO "),
  2963. OnNext(470, "FOO "),
  2964. OnNext(530, " fOo "),
  2965. OnError<string>(570, new Exception())
  2966. );
  2967. var outer = default(IObservable<IGroupedObservable<string, string>>);
  2968. var outerSubscription = default(IDisposable);
  2969. var inner = default(IObservable<string>);
  2970. var innerSubscription = default(IDisposable);
  2971. var res = scheduler.CreateObserver<string>();
  2972. scheduler.ScheduleAbsolute(Created, () => outer = xs.GroupByUntil(x => x.Trim(), g => g.Skip(2), _groupByUntilCapacity));
  2973. scheduler.ScheduleAbsolute(Subscribed, () => outerSubscription = outer.Subscribe(group =>
  2974. {
  2975. inner = group;
  2976. }));
  2977. scheduler.ScheduleAbsolute(290, () => outerSubscription.Dispose());
  2978. scheduler.ScheduleAbsolute(600, () => innerSubscription = inner.Subscribe(res));
  2979. scheduler.ScheduleAbsolute(Disposed, () =>
  2980. {
  2981. innerSubscription.Dispose();
  2982. });
  2983. scheduler.Start();
  2984. xs.Subscriptions.AssertEqual(
  2985. Subscribe(200, 290)
  2986. );
  2987. res.Messages.AssertEqual(
  2988. );
  2989. }
  2990. [Fact]
  2991. public void GroupByUntil_Capacity_Default()
  2992. {
  2993. var scheduler = new TestScheduler();
  2994. var keyInvoked = 0;
  2995. var eleInvoked = 0;
  2996. var xs = scheduler.CreateHotObservable(
  2997. OnNext(90, "error"),
  2998. OnNext(110, "error"),
  2999. OnNext(130, "error"),
  3000. OnNext(220, " foo"),
  3001. OnNext(240, " FoO "),
  3002. OnNext(270, "baR "),
  3003. OnNext(310, "foO "),
  3004. OnNext(350, " Baz "),
  3005. OnNext(360, " qux "),
  3006. OnNext(390, " bar"),
  3007. OnNext(420, " BAR "),
  3008. OnNext(470, "FOO "),
  3009. OnNext(480, "baz "),
  3010. OnNext(510, " bAZ "),
  3011. OnNext(530, " fOo "),
  3012. OnCompleted<string>(570),
  3013. OnNext(580, "error"),
  3014. OnCompleted<string>(600),
  3015. OnError<string>(650, new Exception())
  3016. );
  3017. var res = scheduler.Start(() =>
  3018. xs.GroupByUntil(
  3019. x =>
  3020. {
  3021. keyInvoked++;
  3022. return x.Trim().ToLower();
  3023. },
  3024. x =>
  3025. {
  3026. eleInvoked++;
  3027. return Reverse(x);
  3028. },
  3029. g => g.Skip(2),
  3030. _groupByUntilCapacity
  3031. ).Select(g => g.Key)
  3032. );
  3033. res.Messages.AssertEqual(
  3034. OnNext(220, "foo"),
  3035. OnNext(270, "bar"),
  3036. OnNext(350, "baz"),
  3037. OnNext(360, "qux"),
  3038. OnNext(470, "foo"),
  3039. OnCompleted<string>(570)
  3040. );
  3041. xs.Subscriptions.AssertEqual(
  3042. Subscribe(200, 570)
  3043. );
  3044. Assert.Equal(12, keyInvoked);
  3045. Assert.Equal(12, eleInvoked);
  3046. }
  3047. [Fact]
  3048. public void GroupByUntil_Capacity_DurationSelector_Throws()
  3049. {
  3050. var scheduler = new TestScheduler();
  3051. var xs = scheduler.CreateHotObservable(
  3052. OnNext(210, "foo")
  3053. );
  3054. var ex = new Exception();
  3055. var res = scheduler.Start(() =>
  3056. xs.GroupByUntil<string, string, string>(x => x, g => { throw ex; }, _groupByUntilCapacity)
  3057. );
  3058. res.Messages.AssertEqual(
  3059. OnError<IGroupedObservable<string, string>>(210, ex)
  3060. );
  3061. xs.Subscriptions.AssertEqual(
  3062. Subscribe(200, 210)
  3063. );
  3064. }
  3065. [Fact]
  3066. public void GroupByUntil_Capacity_NullKeys_Simple_Never()
  3067. {
  3068. var scheduler = new TestScheduler();
  3069. var xs = scheduler.CreateHotObservable(
  3070. OnNext(220, "bar"),
  3071. OnNext(240, "foo"),
  3072. OnNext(310, "qux"),
  3073. OnNext(470, "baz"),
  3074. OnCompleted<string>(500)
  3075. );
  3076. var res = scheduler.Start(() => xs.GroupByUntil(x => x[0] == 'b' ? null : x.ToUpper(), g => Observable.Never<Unit>(), _groupByUntilCapacity).SelectMany(g => g, (g, x) => (g.Key ?? "(null)") + x));
  3077. res.Messages.AssertEqual(
  3078. OnNext(220, "(null)bar"),
  3079. OnNext(240, "FOOfoo"),
  3080. OnNext(310, "QUXqux"),
  3081. OnNext(470, "(null)baz"),
  3082. OnCompleted<string>(500)
  3083. );
  3084. xs.Subscriptions.AssertEqual(
  3085. Subscribe(200, 500)
  3086. );
  3087. }
  3088. [Fact]
  3089. public void GroupByUntil_Capacity_NullKeys_Simple_Expire1()
  3090. {
  3091. var scheduler = new TestScheduler();
  3092. var xs = scheduler.CreateHotObservable(
  3093. OnNext(220, "bar"),
  3094. OnNext(240, "foo"),
  3095. OnNext(310, "qux"),
  3096. OnNext(470, "baz"),
  3097. OnCompleted<string>(500)
  3098. );
  3099. var n = 0;
  3100. var res = scheduler.Start(() => xs.GroupByUntil(x => x[0] == 'b' ? null : x.ToUpper(), g => { if (g.Key == null) n++; return Observable.Timer(TimeSpan.FromTicks(50), scheduler); }, _groupByUntilCapacity).SelectMany(g => g, (g, x) => (g.Key ?? "(null)") + x));
  3101. Assert.Equal(2, n);
  3102. res.Messages.AssertEqual(
  3103. OnNext(220, "(null)bar"),
  3104. OnNext(240, "FOOfoo"),
  3105. OnNext(310, "QUXqux"),
  3106. OnNext(470, "(null)baz"),
  3107. OnCompleted<string>(500)
  3108. );
  3109. xs.Subscriptions.AssertEqual(
  3110. Subscribe(200, 500)
  3111. );
  3112. }
  3113. [Fact]
  3114. public void GroupByUntil_Capacity_NullKeys_Simple_Expire2()
  3115. {
  3116. var scheduler = new TestScheduler();
  3117. var xs = scheduler.CreateHotObservable(
  3118. OnNext(220, "bar"),
  3119. OnNext(240, "foo"),
  3120. OnNext(310, "qux"),
  3121. OnNext(470, "baz"),
  3122. OnCompleted<string>(500)
  3123. );
  3124. var n = 0;
  3125. var res = scheduler.Start(() => xs.GroupByUntil(x => x[0] == 'b' ? null : x.ToUpper(), g => { if (g.Key == null) n++; return Observable.Timer(TimeSpan.FromTicks(50), scheduler).IgnoreElements(); }, _groupByUntilCapacity).SelectMany(g => g, (g, x) => (g.Key ?? "(null)") + x));
  3126. Assert.Equal(2, n);
  3127. res.Messages.AssertEqual(
  3128. OnNext(220, "(null)bar"),
  3129. OnNext(240, "FOOfoo"),
  3130. OnNext(310, "QUXqux"),
  3131. OnNext(470, "(null)baz"),
  3132. OnCompleted<string>(500)
  3133. );
  3134. xs.Subscriptions.AssertEqual(
  3135. Subscribe(200, 500)
  3136. );
  3137. }
  3138. [Fact]
  3139. public void GroupByUntil_Capacity_NullKeys_Error()
  3140. {
  3141. var scheduler = new TestScheduler();
  3142. var ex = new Exception();
  3143. var xs = scheduler.CreateHotObservable(
  3144. OnNext(220, "bar"),
  3145. OnNext(240, "foo"),
  3146. OnNext(310, "qux"),
  3147. OnNext(470, "baz"),
  3148. OnError<string>(500, ex)
  3149. );
  3150. var nullGroup = scheduler.CreateObserver<string>();
  3151. var err = default(Exception);
  3152. scheduler.ScheduleAbsolute(200, () => xs.GroupByUntil(x => x[0] == 'b' ? null : x.ToUpper(), g => Observable.Never<Unit>(), _groupByUntilCapacity).Where(g => g.Key == null).Subscribe(g => g.Subscribe(nullGroup), ex_ => err = ex_));
  3153. scheduler.Start();
  3154. Assert.Same(ex, err);
  3155. nullGroup.Messages.AssertEqual(
  3156. OnNext(220, "bar"),
  3157. OnNext(470, "baz"),
  3158. OnError<string>(500, ex)
  3159. );
  3160. xs.Subscriptions.AssertEqual(
  3161. Subscribe(200, 500)
  3162. );
  3163. }
  3164. #endregion
  3165. static string Reverse(string s)
  3166. {
  3167. var sb = new StringBuilder();
  3168. for (var i = s.Length - 1; i >= 0; i--)
  3169. sb.Append(s[i]);
  3170. return sb.ToString();
  3171. }
  3172. }
  3173. }