SignalR 253 B

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094309530963097309830993100310131023103310431053106310731083109311031113112311331143115311631173118311931203121312231233124312531263127312831293130313131323133313431353136313731383139314031413142314331443145314631473148314931503151315231533154315531563157315831593160316131623163316431653166316731683169317031713172317331743175317631773178317931803181318231833184318531863187318831893190319131923193319431953196319731983199320032013202320332043205320632073208320932103211321232133214321532163217321832193220322132223223322432253226322732283229323032313232323332343235323632373238323932403241324232433244324532463247324832493250325132523253325432553256325732583259326032613262326332643265326632673268326932703271327232733274327532763277327832793280328132823283328432853286328732883289329032913292329332943295329632973298329933003301330233033304330533063307330833093310331133123313331433153316331733183319332033213322332333243325332633273328332933303331333233333334333533363337333833393340334133423343334433453346334733483349335033513352335333543355335633573358335933603361336233633364336533663367336833693370337133723373337433753376337733783379338033813382338333843385338633873388338933903391339233933394339533963397339833993400340134023403340434053406340734083409341034113412341334143415341634173418341934203421342234233424342534263427342834293430343134323433343434353436343734383439344034413442344334443445344634473448344934503451345234533454345534563457345834593460346134623463346434653466346734683469347034713472347334743475347634773478347934803481348234833484348534863487348834893490349134923493349434953496349734983499350035013502350335043505350635073508350935103511351235133514351535163517351835193520352135223523352435253526352735283529353035313532353335343535353635373538353935403541354235433544354535463547354835493550355135523553355435553556355735583559356035613562356335643565356635673568356935703571357235733574357535763577357835793580358135823583358435853586358735883589359035913592359335943595359635973598359936003601360236033604360536063607360836093610361136123613361436153616361736183619362036213622362336243625362636273628362936303631363236333634363536363637363836393640364136423643364436453646364736483649365036513652365336543655365636573658365936603661366236633664366536663667366836693670367136723673367436753676367736783679368036813682368336843685368636873688368936903691369236933694369536963697369836993700370137023703
  1. commit 792745ad98f9c6314406ab9ce87e255ee0ba3e4a
  2. Author: BrennanConroy <[email protected]>
  3. Date: Mon Nov 13 15:05:35 2017 -0800
  4. React to CoreFxLab packages (#998)
  5. diff --git a/Directory.Build.props b/Directory.Build.props
  6. index b51ed601331..a391978b7de 100644
  7. --- a/Directory.Build.props
  8. +++ b/Directory.Build.props
  9. @@ -11,6 +11,11 @@
  10. <SignAssembly>true</SignAssembly>
  11. <PublicSign Condition="'$(OS)' != 'Windows_NT'">true</PublicSign>
  12. <TreatWarningsAsErrors>true</TreatWarningsAsErrors>
  13. + <LangVersion>latest</LangVersion>
  14. </PropertyGroup>
  15. + <ItemGroup>
  16. + <!-- This is an experimental version of the compiler. See https://github.com/dotnet/csharplang/issues/666 for more details. -->
  17. + <PackageReference Include="Microsoft.NETCore.Compilers" Version="$(MicrosoftNETCoreCompilersPackageVersion)" PrivateAssets="All" />
  18. + </ItemGroup>
  19. </Project>
  20. diff --git a/benchmarks/Microsoft.AspNetCore.SignalR.Microbenchmarks/MessageParserBenchmark.cs b/benchmarks/Microsoft.AspNetCore.SignalR.Microbenchmarks/MessageParserBenchmark.cs
  21. index a9b99c63e1b..0661272907c 100644
  22. --- a/benchmarks/Microsoft.AspNetCore.SignalR.Microbenchmarks/MessageParserBenchmark.cs
  23. +++ b/benchmarks/Microsoft.AspNetCore.SignalR.Microbenchmarks/MessageParserBenchmark.cs
  24. @@ -9,8 +9,8 @@ namespace Microsoft.AspNetCore.SignalR.Microbenchmarks
  25. public class MessageParserBenchmark
  26. {
  27. private static readonly Random Random = new Random();
  28. - private ReadOnlyBuffer<byte> _binaryInput;
  29. - private ReadOnlyBuffer<byte> _textInput;
  30. + private ReadOnlyMemory<byte> _binaryInput;
  31. + private ReadOnlyMemory<byte> _textInput;
  32. [Params(32, 64)]
  33. public int ChunkSize { get; set; }
  34. diff --git a/build/dependencies.props b/build/dependencies.props
  35. index 8608bcaf2e3..ddbe310bd48 100644
  36. --- a/build/dependencies.props
  37. +++ b/build/dependencies.props
  38. @@ -50,21 +50,23 @@
  39. <MicrosoftExtensionsOptionsPackageVersion>2.1.0-preview1-27475</MicrosoftExtensionsOptionsPackageVersion>
  40. <MicrosoftExtensionsSecurityHelperSourcesPackageVersion>2.1.0-preview1-27475</MicrosoftExtensionsSecurityHelperSourcesPackageVersion>
  41. <MicrosoftNETCoreApp20PackageVersion>2.0.0</MicrosoftNETCoreApp20PackageVersion>
  42. + <MicrosoftNETCoreCompilersPackageVersion>2.6.0-beta2-62211-02</MicrosoftNETCoreCompilersPackageVersion>
  43. <MicrosoftNETTestSdkPackageVersion>15.3.0</MicrosoftNETTestSdkPackageVersion>
  44. <MoqPackageVersion>4.7.49</MoqPackageVersion>
  45. <MsgPackCliPackageVersion>0.9.0-beta2</MsgPackCliPackageVersion>
  46. <NewtonsoftJsonPackageVersion>10.0.1</NewtonsoftJsonPackageVersion>
  47. <StackExchangeRedisStrongNamePackageVersion>1.2.4</StackExchangeRedisStrongNamePackageVersion>
  48. - <SystemBinaryPackageVersion>0.1.0-e170811-6</SystemBinaryPackageVersion>
  49. - <SystemBuffersPrimitivesPackageVersion>0.1.0-e170811-6</SystemBuffersPrimitivesPackageVersion>
  50. - <SystemIOPipelinesExtensionsPackageVersion>0.1.0-e170811-6</SystemIOPipelinesExtensionsPackageVersion>
  51. - <SystemIOPipelinesPackageVersion>0.1.0-e170811-6</SystemIOPipelinesPackageVersion>
  52. - <SystemMemoryPackageVersion>4.4.0-preview3-25519-03</SystemMemoryPackageVersion>
  53. - <SystemNumericsVectorsPackageVersion>4.4.0</SystemNumericsVectorsPackageVersion>
  54. + <SystemBinaryPackageVersion>0.1.0-alpha-002</SystemBinaryPackageVersion>
  55. + <SystemBuffersPrimitivesPackageVersion>0.1.0-alpha-002</SystemBuffersPrimitivesPackageVersion>
  56. + <SystemIOPipelinesExtensionsPackageVersion>0.1.0-alpha-002</SystemIOPipelinesExtensionsPackageVersion>
  57. + <SystemIOPipelinesPackageVersion>0.1.0-alpha-002</SystemIOPipelinesPackageVersion>
  58. + <SystemMemoryPackageVersion>4.5.0-preview1-25902-08</SystemMemoryPackageVersion>
  59. + <SystemNumericsVectorsPackageVersion>4.5.0-preview1-25902-08</SystemNumericsVectorsPackageVersion>
  60. <SystemReactiveLinqPackageVersion>3.1.1</SystemReactiveLinqPackageVersion>
  61. <SystemReflectionEmitPackageVersion>4.3.0</SystemReflectionEmitPackageVersion>
  62. - <SystemRuntimeCompilerServicesUnsafePackageVersion>4.4.0</SystemRuntimeCompilerServicesUnsafePackageVersion>
  63. - <SystemThreadingTasksChannelsPackageVersion>0.1.0-e170811-6</SystemThreadingTasksChannelsPackageVersion>
  64. + <SystemRuntimeCompilerServicesUnsafePackageVersion>4.5.0-preview1-25902-08</SystemRuntimeCompilerServicesUnsafePackageVersion>
  65. + <SystemThreadingChannelsPackageVersion>4.5.0-preview1-25902-08</SystemThreadingChannelsPackageVersion>
  66. + <SystemThreadingTasksExtensionsPackageVersion>4.4.0</SystemThreadingTasksExtensionsPackageVersion>
  67. <XunitPackageVersion>2.3.0</XunitPackageVersion>
  68. <XunitRunnerVisualStudioPackageVersion>2.3.0</XunitRunnerVisualStudioPackageVersion>
  69. </PropertyGroup>
  70. diff --git a/client-ts/Microsoft.AspNetCore.SignalR.Test.Server/EchoEndPoint.cs b/client-ts/Microsoft.AspNetCore.SignalR.Test.Server/EchoEndPoint.cs
  71. index e08b972bd11..433532b432e 100644
  72. --- a/client-ts/Microsoft.AspNetCore.SignalR.Test.Server/EchoEndPoint.cs
  73. +++ b/client-ts/Microsoft.AspNetCore.SignalR.Test.Server/EchoEndPoint.cs
  74. @@ -1,8 +1,9 @@
  75. -// Copyright (c) .NET Foundation. All rights reserved.
  76. +// Copyright (c) .NET Foundation. All rights reserved.
  77. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
  78. using System;
  79. using System.Threading.Tasks;
  80. +using Microsoft.AspNetCore.SignalR.Internal;
  81. using Microsoft.AspNetCore.Sockets;
  82. namespace Microsoft.AspNetCore.SignalR.Test.Server
  83. @@ -11,7 +12,7 @@ namespace Microsoft.AspNetCore.SignalR.Test.Server
  84. {
  85. public async override Task OnConnectedAsync(ConnectionContext connection)
  86. {
  87. - await connection.Transport.Out.WriteAsync(await connection.Transport.In.ReadAsync());
  88. + await connection.Transport.Writer.WriteAsync(await connection.Transport.Reader.ReadAsync());
  89. }
  90. }
  91. }
  92. diff --git a/samples/SocialWeather/PersistentConnectionLifeTimeManager.cs b/samples/SocialWeather/PersistentConnectionLifeTimeManager.cs
  93. index 87a6fbb9c3f..f0af31544a5 100644
  94. --- a/samples/SocialWeather/PersistentConnectionLifeTimeManager.cs
  95. +++ b/samples/SocialWeather/PersistentConnectionLifeTimeManager.cs
  96. @@ -40,7 +40,7 @@ namespace SocialWeather
  97. var ms = new MemoryStream();
  98. await formatter.WriteAsync(data, ms);
  99. - connection.Transport.Out.TryWrite(ms.ToArray());
  100. + connection.Transport.Writer.TryWrite(ms.ToArray());
  101. }
  102. }
  103. diff --git a/samples/SocialWeather/SocialWeatherEndPoint.cs b/samples/SocialWeather/SocialWeatherEndPoint.cs
  104. index e412cfafebf..17889ec1aa6 100644
  105. --- a/samples/SocialWeather/SocialWeatherEndPoint.cs
  106. +++ b/samples/SocialWeather/SocialWeatherEndPoint.cs
  107. @@ -1,4 +1,4 @@
  108. -// Copyright (c) .NET Foundation. All rights reserved.
  109. +// Copyright (c) .NET Foundation. All rights reserved.
  110. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
  111. using System.IO;
  112. @@ -34,9 +34,9 @@ namespace SocialWeather
  113. var formatter = _formatterResolver.GetFormatter<WeatherReport>(
  114. (string)connection.Metadata["formatType"]);
  115. - while (await connection.Transport.In.WaitToReadAsync())
  116. + while (await connection.Transport.Reader.WaitToReadAsync())
  117. {
  118. - if (connection.Transport.In.TryRead(out var buffer))
  119. + if (connection.Transport.Reader.TryRead(out var buffer))
  120. {
  121. var stream = new MemoryStream();
  122. await stream.WriteAsync(buffer, 0, buffer.Length);
  123. diff --git a/samples/SocketsSample/EndPoints/MessagesEndPoint.cs b/samples/SocketsSample/EndPoints/MessagesEndPoint.cs
  124. index 5559e565184..a17cb4624a9 100644
  125. --- a/samples/SocketsSample/EndPoints/MessagesEndPoint.cs
  126. +++ b/samples/SocketsSample/EndPoints/MessagesEndPoint.cs
  127. @@ -1,4 +1,4 @@
  128. -// Copyright (c) .NET Foundation. All rights reserved.
  129. +// Copyright (c) .NET Foundation. All rights reserved.
  130. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
  131. using System.Collections.Generic;
  132. @@ -20,9 +20,9 @@ namespace SocketsSample.EndPoints
  133. try
  134. {
  135. - while (await connection.Transport.In.WaitToReadAsync())
  136. + while (await connection.Transport.Reader.WaitToReadAsync())
  137. {
  138. - if (connection.Transport.In.TryRead(out var buffer))
  139. + if (connection.Transport.Reader.TryRead(out var buffer))
  140. {
  141. // We can avoid the copy here but we'll deal with that later
  142. var text = Encoding.UTF8.GetString(buffer);
  143. @@ -50,7 +50,7 @@ namespace SocketsSample.EndPoints
  144. foreach (var c in Connections)
  145. {
  146. - tasks.Add(c.Transport.Out.WriteAsync(payload));
  147. + tasks.Add(c.Transport.Writer.WriteAsync(payload));
  148. }
  149. return Task.WhenAll(tasks);
  150. diff --git a/samples/SocketsSample/Hubs/Streaming.cs b/samples/SocketsSample/Hubs/Streaming.cs
  151. index 63fa8b71e19..cee2c42cdf0 100644
  152. --- a/samples/SocketsSample/Hubs/Streaming.cs
  153. +++ b/samples/SocketsSample/Hubs/Streaming.cs
  154. @@ -1,7 +1,7 @@
  155. using System;
  156. using System.Reactive.Linq;
  157. using System.Threading.Tasks;
  158. -using System.Threading.Tasks.Channels;
  159. +using System.Threading.Channels;
  160. using Microsoft.AspNetCore.SignalR;
  161. namespace SocketsSample.Hubs
  162. @@ -15,7 +15,7 @@ namespace SocketsSample.Hubs
  163. .Take(count);
  164. }
  165. - public ReadableChannel<int> ChannelCounter(int count, int delay)
  166. + public ChannelReader<int> ChannelCounter(int count, int delay)
  167. {
  168. var channel = Channel.CreateUnbounded<int>();
  169. @@ -23,14 +23,14 @@ namespace SocketsSample.Hubs
  170. {
  171. for (var i = 0; i < count; i++)
  172. {
  173. - await channel.Out.WriteAsync(i);
  174. + await channel.Writer.WriteAsync(i);
  175. await Task.Delay(delay);
  176. }
  177. - channel.Out.TryComplete();
  178. + channel.Writer.TryComplete();
  179. });
  180. - return channel.In;
  181. + return channel.Reader;
  182. }
  183. }
  184. }
  185. diff --git a/src/Microsoft.AspNetCore.SignalR.Client.Core/HubConnection.cs b/src/Microsoft.AspNetCore.SignalR.Client.Core/HubConnection.cs
  186. index 5294bcd93d7..9caa61a92b9 100644
  187. --- a/src/Microsoft.AspNetCore.SignalR.Client.Core/HubConnection.cs
  188. +++ b/src/Microsoft.AspNetCore.SignalR.Client.Core/HubConnection.cs
  189. @@ -8,7 +8,7 @@ using System.Diagnostics;
  190. using System.IO;
  191. using System.Threading;
  192. using System.Threading.Tasks;
  193. -using System.Threading.Tasks.Channels;
  194. +using System.Threading.Channels;
  195. using Microsoft.AspNetCore.SignalR.Client.Internal;
  196. using Microsoft.AspNetCore.SignalR.Internal;
  197. using Microsoft.AspNetCore.SignalR.Internal.Encoders;
  198. @@ -145,12 +145,12 @@ namespace Microsoft.AspNetCore.SignalR.Client
  199. return new Subscription(invocationHandler, invocationList);
  200. }
  201. - public async Task<ReadableChannel<object>> StreamAsync(string methodName, Type returnType, object[] args, CancellationToken cancellationToken = default)
  202. + public async Task<ChannelReader<object>> StreamAsync(string methodName, Type returnType, object[] args, CancellationToken cancellationToken = default)
  203. {
  204. return await StreamAsyncCore(methodName, returnType, args, cancellationToken).ForceAsync();
  205. }
  206. - private async Task<ReadableChannel<object>> StreamAsyncCore(string methodName, Type returnType, object[] args, CancellationToken cancellationToken)
  207. + private async Task<ChannelReader<object>> StreamAsyncCore(string methodName, Type returnType, object[] args, CancellationToken cancellationToken)
  208. {
  209. if (!_startCalled)
  210. {
  211. diff --git a/src/Microsoft.AspNetCore.SignalR.Client.Core/HubConnectionExtensions.StreamAsync.cs b/src/Microsoft.AspNetCore.SignalR.Client.Core/HubConnectionExtensions.StreamAsync.cs
  212. index b0821d97e9d..81eec099747 100644
  213. --- a/src/Microsoft.AspNetCore.SignalR.Client.Core/HubConnectionExtensions.StreamAsync.cs
  214. +++ b/src/Microsoft.AspNetCore.SignalR.Client.Core/HubConnectionExtensions.StreamAsync.cs
  215. @@ -1,71 +1,71 @@
  216. -// Copyright (c) .NET Foundation. All rights reserved.
  217. +// Copyright (c) .NET Foundation. All rights reserved.
  218. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
  219. using System;
  220. using System.Threading;
  221. using System.Threading.Tasks;
  222. -using System.Threading.Tasks.Channels;
  223. +using System.Threading.Channels;
  224. namespace Microsoft.AspNetCore.SignalR.Client
  225. {
  226. public static partial class HubConnectionExtensions
  227. {
  228. - public static Task<ReadableChannel<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, CancellationToken cancellationToken = default)
  229. + public static Task<ChannelReader<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, CancellationToken cancellationToken = default)
  230. {
  231. return hubConnection.StreamAsync<TResult>(methodName, Array.Empty<object>(), cancellationToken);
  232. }
  233. - public static Task<ReadableChannel<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, CancellationToken cancellationToken = default)
  234. + public static Task<ChannelReader<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, CancellationToken cancellationToken = default)
  235. {
  236. return hubConnection.StreamAsync<TResult>(methodName, new object[] { arg1 }, cancellationToken);
  237. }
  238. - public static Task<ReadableChannel<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, CancellationToken cancellationToken = default)
  239. + public static Task<ChannelReader<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, CancellationToken cancellationToken = default)
  240. {
  241. return hubConnection.StreamAsync<TResult>(methodName, new object[] { arg1, arg2 }, cancellationToken);
  242. }
  243. - public static Task<ReadableChannel<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, CancellationToken cancellationToken = default)
  244. + public static Task<ChannelReader<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, CancellationToken cancellationToken = default)
  245. {
  246. return hubConnection.StreamAsync<TResult>(methodName, new object[] { arg1, arg2, arg3 }, cancellationToken);
  247. }
  248. - public static Task<ReadableChannel<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, CancellationToken cancellationToken = default)
  249. + public static Task<ChannelReader<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, CancellationToken cancellationToken = default)
  250. {
  251. return hubConnection.StreamAsync<TResult>(methodName, new object[] { arg1, arg2, arg3, arg4 }, cancellationToken);
  252. }
  253. - public static Task<ReadableChannel<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, CancellationToken cancellationToken = default)
  254. + public static Task<ChannelReader<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, CancellationToken cancellationToken = default)
  255. {
  256. return hubConnection.StreamAsync<TResult>(methodName, new object[] { arg1, arg2, arg3, arg4, arg5 }, cancellationToken);
  257. }
  258. - public static Task<ReadableChannel<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, CancellationToken cancellationToken = default)
  259. + public static Task<ChannelReader<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, CancellationToken cancellationToken = default)
  260. {
  261. return hubConnection.StreamAsync<TResult>(methodName, new object[] { arg1, arg2, arg3, arg4, arg5, arg6 }, cancellationToken);
  262. }
  263. - public static Task<ReadableChannel<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, object arg7, CancellationToken cancellationToken = default)
  264. + public static Task<ChannelReader<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, object arg7, CancellationToken cancellationToken = default)
  265. {
  266. return hubConnection.StreamAsync<TResult>(methodName, new object[] { arg1, arg2, arg3, arg4, arg5, arg6, arg7 }, cancellationToken);
  267. }
  268. - public static Task<ReadableChannel<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, object arg7, object arg8, CancellationToken cancellationToken = default)
  269. + public static Task<ChannelReader<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, object arg7, object arg8, CancellationToken cancellationToken = default)
  270. {
  271. return hubConnection.StreamAsync<TResult>(methodName, new object[] { arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8 }, cancellationToken);
  272. }
  273. - public static Task<ReadableChannel<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, object arg7, object arg8, object arg9, CancellationToken cancellationToken = default)
  274. + public static Task<ChannelReader<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, object arg7, object arg8, object arg9, CancellationToken cancellationToken = default)
  275. {
  276. return hubConnection.StreamAsync<TResult>(methodName, new object[] { arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9 }, cancellationToken);
  277. }
  278. - public static Task<ReadableChannel<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, object arg7, object arg8, object arg9, object arg10, CancellationToken cancellationToken = default)
  279. + public static Task<ChannelReader<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, object arg7, object arg8, object arg9, object arg10, CancellationToken cancellationToken = default)
  280. {
  281. return hubConnection.StreamAsync<TResult>(methodName, new object[] { arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10 }, cancellationToken);
  282. }
  283. - public static async Task<ReadableChannel<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object[] args, CancellationToken cancellationToken = default)
  284. + public static async Task<ChannelReader<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object[] args, CancellationToken cancellationToken = default)
  285. {
  286. if (hubConnection == null)
  287. {
  288. @@ -85,9 +85,9 @@ namespace Microsoft.AspNetCore.SignalR.Client
  289. {
  290. while (inputChannel.TryRead(out var item))
  291. {
  292. - while (!outputChannel.Out.TryWrite((TResult)item))
  293. + while (!outputChannel.Writer.TryWrite((TResult)item))
  294. {
  295. - if (!await outputChannel.Out.WaitToWriteAsync())
  296. + if (!await outputChannel.Writer.WaitToWriteAsync())
  297. {
  298. // Failed to write to the output channel because it was closed. Nothing really we can do but abort here.
  299. return;
  300. @@ -101,18 +101,18 @@ namespace Microsoft.AspNetCore.SignalR.Client
  301. }
  302. catch (Exception ex)
  303. {
  304. - outputChannel.Out.TryComplete(ex);
  305. + outputChannel.Writer.TryComplete(ex);
  306. }
  307. finally
  308. {
  309. // This will safely no-op if the catch block above ran.
  310. - outputChannel.Out.TryComplete();
  311. + outputChannel.Writer.TryComplete();
  312. }
  313. }
  314. _ = RunChannel();
  315. - return outputChannel.In;
  316. + return outputChannel.Reader;
  317. }
  318. }
  319. }
  320. diff --git a/src/Microsoft.AspNetCore.SignalR.Client.Core/HubConnectionExtensions.cs b/src/Microsoft.AspNetCore.SignalR.Client.Core/HubConnectionExtensions.cs
  321. index edf00f1f836..5588a97b4a9 100644
  322. --- a/src/Microsoft.AspNetCore.SignalR.Client.Core/HubConnectionExtensions.cs
  323. +++ b/src/Microsoft.AspNetCore.SignalR.Client.Core/HubConnectionExtensions.cs
  324. @@ -4,7 +4,7 @@
  325. using System;
  326. using System.Threading;
  327. using System.Threading.Tasks;
  328. -using System.Threading.Tasks.Channels;
  329. +using System.Threading.Channels;
  330. namespace Microsoft.AspNetCore.SignalR.Client
  331. {
  332. diff --git a/src/Microsoft.AspNetCore.SignalR.Client.Core/InvocationRequest.cs b/src/Microsoft.AspNetCore.SignalR.Client.Core/InvocationRequest.cs
  333. index 40afc4cf02d..1018c22b980 100644
  334. --- a/src/Microsoft.AspNetCore.SignalR.Client.Core/InvocationRequest.cs
  335. +++ b/src/Microsoft.AspNetCore.SignalR.Client.Core/InvocationRequest.cs
  336. @@ -4,7 +4,7 @@
  337. using System;
  338. using System.Threading;
  339. using System.Threading.Tasks;
  340. -using System.Threading.Tasks.Channels;
  341. +using System.Threading.Channels;
  342. using Microsoft.AspNetCore.SignalR.Client.Internal;
  343. using Microsoft.AspNetCore.SignalR.Internal.Protocol;
  344. using Microsoft.Extensions.Logging;
  345. @@ -43,7 +43,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
  346. }
  347. public static InvocationRequest Stream(CancellationToken cancellationToken, Type resultType, string invocationId,
  348. - ILoggerFactory loggerFactory, HubConnection hubConnection, out ReadableChannel<object> result)
  349. + ILoggerFactory loggerFactory, HubConnection hubConnection, out ChannelReader<object> result)
  350. {
  351. var req = new Streaming(cancellationToken, resultType, invocationId, loggerFactory, hubConnection);
  352. result = req.Result;
  353. @@ -75,7 +75,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
  354. {
  355. }
  356. - public ReadableChannel<object> Result => _channel.In;
  357. + public ChannelReader<object> Result => _channel.Reader;
  358. public override void Complete(CompletionMessage completionMessage)
  359. {
  360. @@ -83,7 +83,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
  361. if (completionMessage.Result != null)
  362. {
  363. Logger.ReceivedUnexpectedComplete(InvocationId);
  364. - _channel.Out.TryComplete(new InvalidOperationException("Server provided a result in a completion response to a streamed invocation."));
  365. + _channel.Writer.TryComplete(new InvalidOperationException("Server provided a result in a completion response to a streamed invocation."));
  366. }
  367. if (!string.IsNullOrEmpty(completionMessage.Error))
  368. @@ -92,22 +92,22 @@ namespace Microsoft.AspNetCore.SignalR.Client
  369. return;
  370. }
  371. - _channel.Out.TryComplete();
  372. + _channel.Writer.TryComplete();
  373. }
  374. public override void Fail(Exception exception)
  375. {
  376. Logger.InvocationFailed(InvocationId);
  377. - _channel.Out.TryComplete(exception);
  378. + _channel.Writer.TryComplete(exception);
  379. }
  380. public override async ValueTask<bool> StreamItem(object item)
  381. {
  382. try
  383. {
  384. - while (!_channel.Out.TryWrite(item))
  385. + while (!_channel.Writer.TryWrite(item))
  386. {
  387. - if (!await _channel.Out.WaitToWriteAsync())
  388. + if (!await _channel.Writer.WaitToWriteAsync())
  389. {
  390. return false;
  391. }
  392. @@ -122,7 +122,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
  393. protected override void Cancel()
  394. {
  395. - _channel.Out.TryComplete(new OperationCanceledException("Invocation terminated"));
  396. + _channel.Writer.TryComplete(new OperationCanceledException("Invocation terminated"));
  397. }
  398. }
  399. diff --git a/src/Microsoft.AspNetCore.SignalR.Common/Internal/Encoders/Base64Encoder.cs b/src/Microsoft.AspNetCore.SignalR.Common/Internal/Encoders/Base64Encoder.cs
  400. index f5a8c044b0c..881b05112d3 100644
  401. --- a/src/Microsoft.AspNetCore.SignalR.Common/Internal/Encoders/Base64Encoder.cs
  402. +++ b/src/Microsoft.AspNetCore.SignalR.Common/Internal/Encoders/Base64Encoder.cs
  403. @@ -11,7 +11,7 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Encoders
  404. {
  405. public byte[] Decode(byte[] payload)
  406. {
  407. - var buffer = new ReadOnlyBuffer<byte>(payload);
  408. + var buffer = new ReadOnlyMemory<byte>(payload);
  409. LengthPrefixedTextMessageParser.TryParseMessage(ref buffer, out var message);
  410. return Convert.FromBase64String(Encoding.UTF8.GetString(message.ToArray()));
  411. diff --git a/src/Microsoft.AspNetCore.SignalR.Common/Internal/Encoders/LengthPrefixedTextMessageParser.cs b/src/Microsoft.AspNetCore.SignalR.Common/Internal/Encoders/LengthPrefixedTextMessageParser.cs
  412. index 25419fbd988..686add4a158 100644
  413. --- a/src/Microsoft.AspNetCore.SignalR.Common/Internal/Encoders/LengthPrefixedTextMessageParser.cs
  414. +++ b/src/Microsoft.AspNetCore.SignalR.Common/Internal/Encoders/LengthPrefixedTextMessageParser.cs
  415. @@ -14,20 +14,18 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Encoders
  416. /// Attempts to parse a message from the buffer. Returns 'false' if there is not enough data to complete a message. Throws an
  417. /// exception if there is a format error in the provided data.
  418. /// </summary>
  419. - public static bool TryParseMessage(ref ReadOnlyBuffer<byte> buffer, out ReadOnlyBuffer<byte> payload)
  420. + public static bool TryParseMessage(ref ReadOnlyMemory<byte> buffer, out ReadOnlyMemory<byte> payload)
  421. {
  422. - payload = default;
  423. - var span = buffer.Span;
  424. + payload = default(ReadOnlyMemory<byte>);
  425. - if (!TryReadLength(span, out var index, out var length))
  426. + if (!TryReadLength(buffer.Span, out var index, out var length))
  427. {
  428. return false;
  429. }
  430. var remaining = buffer.Slice(index);
  431. - span = remaining.Span;
  432. - if (!TryReadDelimiter(span, LengthPrefixedTextMessageWriter.FieldDelimiter, "length"))
  433. + if (!TryReadDelimiter(remaining.Span, LengthPrefixedTextMessageWriter.FieldDelimiter, "length"))
  434. {
  435. return false;
  436. }
  437. diff --git a/src/Microsoft.AspNetCore.SignalR.Common/Internal/Formatters/BinaryMessageFormatter.cs b/src/Microsoft.AspNetCore.SignalR.Common/Internal/Formatters/BinaryMessageFormatter.cs
  438. index 8eb5936e923..44115592f69 100644
  439. --- a/src/Microsoft.AspNetCore.SignalR.Common/Internal/Formatters/BinaryMessageFormatter.cs
  440. +++ b/src/Microsoft.AspNetCore.SignalR.Common/Internal/Formatters/BinaryMessageFormatter.cs
  441. @@ -33,7 +33,7 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Formatters
  442. var buffer = ArrayPool<byte>.Shared.Rent(lenNumBytes + payload.Length);
  443. var bufferSpan = buffer.AsSpan();
  444. - new Span<byte>(lenBuffer, lenNumBytes).CopyTo(bufferSpan);
  445. + new ReadOnlySpan<byte>(lenBuffer, lenNumBytes).CopyTo(bufferSpan);
  446. bufferSpan = bufferSpan.Slice(lenNumBytes);
  447. payload.CopyTo(bufferSpan);
  448. output.Write(buffer, 0, lenNumBytes + payload.Length);
  449. diff --git a/src/Microsoft.AspNetCore.SignalR.Common/Internal/Formatters/BinaryMessageParser.cs b/src/Microsoft.AspNetCore.SignalR.Common/Internal/Formatters/BinaryMessageParser.cs
  450. index 1835fa34aed..4889ea33f44 100644
  451. --- a/src/Microsoft.AspNetCore.SignalR.Common/Internal/Formatters/BinaryMessageParser.cs
  452. +++ b/src/Microsoft.AspNetCore.SignalR.Common/Internal/Formatters/BinaryMessageParser.cs
  453. @@ -10,7 +10,7 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Formatters
  454. private static int[] _numBitsToShift = new[] { 0, 7, 14, 21, 28 };
  455. private const int MaxLengthPrefixSize = 5;
  456. - public static bool TryParseMessage(ref ReadOnlyBuffer<byte> buffer, out ReadOnlyBuffer<byte> payload)
  457. + public static bool TryParseMessage(ref ReadOnlyMemory<byte> buffer, out ReadOnlyMemory<byte> payload)
  458. {
  459. payload = default;
  460. diff --git a/src/Microsoft.AspNetCore.SignalR.Common/Internal/Formatters/TextMessageParser.cs b/src/Microsoft.AspNetCore.SignalR.Common/Internal/Formatters/TextMessageParser.cs
  461. index 1a1c55bef48..fac697290ee 100644
  462. --- a/src/Microsoft.AspNetCore.SignalR.Common/Internal/Formatters/TextMessageParser.cs
  463. +++ b/src/Microsoft.AspNetCore.SignalR.Common/Internal/Formatters/TextMessageParser.cs
  464. @@ -7,7 +7,7 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Formatters
  465. {
  466. public static class TextMessageParser
  467. {
  468. - public static bool TryParseMessage(ref ReadOnlyBuffer<byte> buffer, out ReadOnlyBuffer<byte> payload)
  469. + public static bool TryParseMessage(ref ReadOnlyMemory<byte> buffer, out ReadOnlyMemory<byte> payload)
  470. {
  471. payload = default;
  472. diff --git a/src/Microsoft.AspNetCore.SignalR.Common/Internal/Protocol/IHubProtocol.cs b/src/Microsoft.AspNetCore.SignalR.Common/Internal/Protocol/IHubProtocol.cs
  473. index 4ccb27e0c20..c02cea455b9 100644
  474. --- a/src/Microsoft.AspNetCore.SignalR.Common/Internal/Protocol/IHubProtocol.cs
  475. +++ b/src/Microsoft.AspNetCore.SignalR.Common/Internal/Protocol/IHubProtocol.cs
  476. @@ -13,7 +13,7 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Protocol
  477. ProtocolType Type { get; }
  478. - bool TryParseMessages(ReadOnlyBuffer<byte> input, IInvocationBinder binder, out IList<HubMessage> messages);
  479. + bool TryParseMessages(ReadOnlyMemory<byte> input, IInvocationBinder binder, out IList<HubMessage> messages);
  480. void WriteMessage(HubMessage message, Stream output);
  481. }
  482. diff --git a/src/Microsoft.AspNetCore.SignalR.Common/Internal/Protocol/JsonHubProtocol.cs b/src/Microsoft.AspNetCore.SignalR.Common/Internal/Protocol/JsonHubProtocol.cs
  483. index 9624302aac2..4efa8a3e6b6 100644
  484. --- a/src/Microsoft.AspNetCore.SignalR.Common/Internal/Protocol/JsonHubProtocol.cs
  485. +++ b/src/Microsoft.AspNetCore.SignalR.Common/Internal/Protocol/JsonHubProtocol.cs
  486. @@ -61,7 +61,7 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Protocol
  487. public ProtocolType Type => ProtocolType.Text;
  488. - public bool TryParseMessages(ReadOnlyBuffer<byte> input, IInvocationBinder binder, out IList<HubMessage> messages)
  489. + public bool TryParseMessages(ReadOnlyMemory<byte> input, IInvocationBinder binder, out IList<HubMessage> messages)
  490. {
  491. messages = new List<HubMessage>();
  492. diff --git a/src/Microsoft.AspNetCore.SignalR.Common/Internal/Protocol/MessagePackHubProtocol.cs b/src/Microsoft.AspNetCore.SignalR.Common/Internal/Protocol/MessagePackHubProtocol.cs
  493. index 06401379300..30c85781c71 100644
  494. --- a/src/Microsoft.AspNetCore.SignalR.Common/Internal/Protocol/MessagePackHubProtocol.cs
  495. +++ b/src/Microsoft.AspNetCore.SignalR.Common/Internal/Protocol/MessagePackHubProtocol.cs
  496. @@ -38,7 +38,7 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Protocol
  497. _serializationContext = serializationContext;
  498. }
  499. - public bool TryParseMessages(ReadOnlyBuffer<byte> input, IInvocationBinder binder, out IList<HubMessage> messages)
  500. + public bool TryParseMessages(ReadOnlyMemory<byte> input, IInvocationBinder binder, out IList<HubMessage> messages)
  501. {
  502. messages = new List<HubMessage>();
  503. diff --git a/src/Microsoft.AspNetCore.SignalR.Common/Internal/Protocol/NegotiationProtocol.cs b/src/Microsoft.AspNetCore.SignalR.Common/Internal/Protocol/NegotiationProtocol.cs
  504. index 7f6abe5b2c0..c886a10ec2a 100644
  505. --- a/src/Microsoft.AspNetCore.SignalR.Common/Internal/Protocol/NegotiationProtocol.cs
  506. +++ b/src/Microsoft.AspNetCore.SignalR.Common/Internal/Protocol/NegotiationProtocol.cs
  507. @@ -29,7 +29,7 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Protocol
  508. }
  509. }
  510. - public static bool TryParseMessage(ReadOnlyBuffer<byte> input, out NegotiationMessage negotiationMessage)
  511. + public static bool TryParseMessage(ReadOnlyMemory<byte> input, out NegotiationMessage negotiationMessage)
  512. {
  513. if (!TextMessageParser.TryParseMessage(ref input, out var payload))
  514. {
  515. diff --git a/src/Microsoft.AspNetCore.SignalR.Common/Microsoft.AspNetCore.SignalR.Common.csproj b/src/Microsoft.AspNetCore.SignalR.Common/Microsoft.AspNetCore.SignalR.Common.csproj
  516. index 74d6d7217a9..0ab94fe4e04 100644
  517. --- a/src/Microsoft.AspNetCore.SignalR.Common/Microsoft.AspNetCore.SignalR.Common.csproj
  518. +++ b/src/Microsoft.AspNetCore.SignalR.Common/Microsoft.AspNetCore.SignalR.Common.csproj
  519. @@ -10,7 +10,6 @@
  520. <ItemGroup>
  521. <PackageReference Include="Newtonsoft.Json" Version="$(NewtonsoftJsonPackageVersion)" />
  522. <PackageReference Include="MsgPack.Cli" Version="$(MsgPackCliPackageVersion)" />
  523. - <PackageReference Include="System.Binary" Version="$(SystemBinaryPackageVersion)" />
  524. <PackageReference Include="System.Buffers.Primitives" Version="$(SystemBuffersPrimitivesPackageVersion)" />
  525. <PackageReference Include="System.Memory" Version="$(SystemMemoryPackageVersion)" />
  526. <PackageReference Include="System.Numerics.Vectors" Version="$(SystemNumericsVectorsPackageVersion)" />
  527. diff --git a/src/Microsoft.AspNetCore.SignalR.Core/HubConnectionContext.cs b/src/Microsoft.AspNetCore.SignalR.Core/HubConnectionContext.cs
  528. index 56f118dd494..61e9b3e0f22 100644
  529. --- a/src/Microsoft.AspNetCore.SignalR.Core/HubConnectionContext.cs
  530. +++ b/src/Microsoft.AspNetCore.SignalR.Core/HubConnectionContext.cs
  531. @@ -1,4 +1,4 @@
  532. -// Copyright (c) .NET Foundation. All rights reserved.
  533. +// Copyright (c) .NET Foundation. All rights reserved.
  534. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
  535. using System;
  536. @@ -8,7 +8,7 @@ using System.Runtime.ExceptionServices;
  537. using System.Security.Claims;
  538. using System.Threading;
  539. using System.Threading.Tasks;
  540. -using System.Threading.Tasks.Channels;
  541. +using System.Threading.Channels;
  542. using Microsoft.AspNetCore.Http.Features;
  543. using Microsoft.AspNetCore.SignalR.Features;
  544. using Microsoft.AspNetCore.SignalR.Internal;
  545. @@ -22,12 +22,12 @@ namespace Microsoft.AspNetCore.SignalR
  546. {
  547. private static Action<object> _abortedCallback = AbortConnection;
  548. - private readonly WritableChannel<HubMessage> _output;
  549. + private readonly ChannelWriter<HubMessage> _output;
  550. private readonly ConnectionContext _connectionContext;
  551. private readonly CancellationTokenSource _connectionAbortedTokenSource = new CancellationTokenSource();
  552. private readonly TaskCompletionSource<object> _abortCompletedTcs = new TaskCompletionSource<object>();
  553. - public HubConnectionContext(WritableChannel<HubMessage> output, ConnectionContext connectionContext)
  554. + public HubConnectionContext(ChannelWriter<HubMessage> output, ConnectionContext connectionContext)
  555. {
  556. _output = output;
  557. _connectionContext = connectionContext;
  558. @@ -37,7 +37,7 @@ namespace Microsoft.AspNetCore.SignalR
  559. private IHubFeature HubFeature => Features.Get<IHubFeature>();
  560. // Used by the HubEndPoint only
  561. - internal ReadableChannel<byte[]> Input => _connectionContext.Transport;
  562. + internal ChannelReader<byte[]> Input => _connectionContext.Transport;
  563. internal ExceptionDispatchInfo AbortException { get; private set; }
  564. @@ -53,7 +53,7 @@ namespace Microsoft.AspNetCore.SignalR
  565. public virtual HubProtocolReaderWriter ProtocolReaderWriter { get; set; }
  566. - public virtual WritableChannel<HubMessage> Output => _output;
  567. + public virtual ChannelWriter<HubMessage> Output => _output;
  568. // Currently used only for streaming methods
  569. internal ConcurrentDictionary<string, CancellationTokenSource> ActiveRequestCancellationSources { get; } = new ConcurrentDictionary<string, CancellationTokenSource>();
  570. diff --git a/src/Microsoft.AspNetCore.SignalR.Core/HubEndPoint.cs b/src/Microsoft.AspNetCore.SignalR.Core/HubEndPoint.cs
  571. index bdb32153b0a..f6c572ee37b 100644
  572. --- a/src/Microsoft.AspNetCore.SignalR.Core/HubEndPoint.cs
  573. +++ b/src/Microsoft.AspNetCore.SignalR.Core/HubEndPoint.cs
  574. @@ -9,7 +9,7 @@ using System.Reflection;
  575. using System.Security.Claims;
  576. using System.Threading;
  577. using System.Threading.Tasks;
  578. -using System.Threading.Tasks.Channels;
  579. +using System.Threading.Channels;
  580. using Microsoft.AspNetCore.Authorization;
  581. using Microsoft.AspNetCore.SignalR.Core;
  582. using Microsoft.AspNetCore.SignalR.Core.Internal;
  583. @@ -84,14 +84,14 @@ namespace Microsoft.AspNetCore.SignalR
  584. {
  585. try
  586. {
  587. - while (await output.In.WaitToReadAsync())
  588. + while (await output.Reader.WaitToReadAsync())
  589. {
  590. - while (output.In.TryRead(out var hubMessage))
  591. + while (output.Reader.TryRead(out var hubMessage))
  592. {
  593. var buffer = protocolReaderWriter.WriteMessage(hubMessage);
  594. - while (await connection.Transport.Out.WaitToWriteAsync())
  595. + while (await connection.Transport.Writer.WaitToWriteAsync())
  596. {
  597. - if (connection.Transport.Out.TryWrite(buffer))
  598. + if (connection.Transport.Writer.TryWrite(buffer))
  599. {
  600. break;
  601. }
  602. @@ -117,7 +117,7 @@ namespace Microsoft.AspNetCore.SignalR
  603. await _lifetimeManager.OnDisconnectedAsync(connectionContext);
  604. // Nothing should be writing to the HubConnectionContext
  605. - output.Out.TryComplete();
  606. + output.Writer.TryComplete();
  607. // This should unwind once we complete the output
  608. await writingOutputTask;
  609. @@ -461,7 +461,7 @@ namespace Microsoft.AspNetCore.SignalR
  610. private static bool IsChannel(Type type, out Type payloadType)
  611. {
  612. - var channelType = type.AllBaseTypes().FirstOrDefault(t => t.IsGenericType && t.GetGenericTypeDefinition() == typeof(ReadableChannel<>));
  613. + var channelType = type.AllBaseTypes().FirstOrDefault(t => t.IsGenericType && t.GetGenericTypeDefinition() == typeof(ChannelReader<>));
  614. if (channelType == null)
  615. {
  616. payloadType = null;
  617. diff --git a/src/Microsoft.AspNetCore.SignalR.Core/Internal/AsyncEnumeratorAdapters.cs b/src/Microsoft.AspNetCore.SignalR.Core/Internal/AsyncEnumeratorAdapters.cs
  618. index f066e180489..870b06eaf25 100644
  619. --- a/src/Microsoft.AspNetCore.SignalR.Core/Internal/AsyncEnumeratorAdapters.cs
  620. +++ b/src/Microsoft.AspNetCore.SignalR.Core/Internal/AsyncEnumeratorAdapters.cs
  621. @@ -6,7 +6,7 @@ using System.Linq;
  622. using System.Reflection;
  623. using System.Threading;
  624. using System.Threading.Tasks;
  625. -using System.Threading.Tasks.Channels;
  626. +using System.Threading.Channels;
  627. namespace Microsoft.AspNetCore.SignalR.Internal
  628. {
  629. @@ -21,6 +21,10 @@ namespace Microsoft.AspNetCore.SignalR.Internal
  630. .GetRuntimeMethods()
  631. .Single(m => m.Name.Equals(nameof(FromObservable)) && m.IsGenericMethod);
  632. + private static readonly MethodInfo _getAsyncEnumeratorMethod = typeof(AsyncEnumeratorAdapters)
  633. + .GetRuntimeMethods()
  634. + .Single(m => m.Name.Equals(nameof(GetAsyncEnumerator)) && m.IsGenericMethod);
  635. +
  636. public static IAsyncEnumerator<object> FromObservable(object observable, Type observableInterface, CancellationToken cancellationToken)
  637. {
  638. // TODO: Cache expressions by observable.GetType()?
  639. @@ -34,20 +38,19 @@ namespace Microsoft.AspNetCore.SignalR.Internal
  640. // TODO: Allow bounding and optimizations?
  641. var channel = Channel.CreateUnbounded<object>();
  642. - var subscription = observable.Subscribe(new ChannelObserver<T>(channel.Out, cancellationToken));
  643. + var subscription = observable.Subscribe(new ChannelObserver<T>(channel.Writer, cancellationToken));
  644. // Dispose the subscription when the token is cancelled
  645. cancellationToken.Register(state => ((IDisposable)state).Dispose(), subscription);
  646. - return channel.In.GetAsyncEnumerator(cancellationToken);
  647. + return GetAsyncEnumerator(channel.Reader, cancellationToken);
  648. }
  649. public static IAsyncEnumerator<object> FromChannel(object readableChannelOfT, Type payloadType, CancellationToken cancellationToken)
  650. {
  651. - var enumerator = readableChannelOfT
  652. - .GetType()
  653. - .GetRuntimeMethod("GetAsyncEnumerator", new[] { typeof(CancellationToken) })
  654. - .Invoke(readableChannelOfT, new object[] { cancellationToken });
  655. + var enumerator = _getAsyncEnumeratorMethod
  656. + .MakeGenericMethod(payloadType)
  657. + .Invoke(null, new object[] { readableChannelOfT, cancellationToken });
  658. if (payloadType.IsValueType)
  659. {
  660. @@ -68,10 +71,10 @@ namespace Microsoft.AspNetCore.SignalR.Internal
  661. private class ChannelObserver<T> : IObserver<T>
  662. {
  663. - private WritableChannel<object> _output;
  664. + private ChannelWriter<object> _output;
  665. private CancellationToken _cancellationToken;
  666. - public ChannelObserver(WritableChannel<object> output, CancellationToken cancellationToken)
  667. + public ChannelObserver(ChannelWriter<object> output, CancellationToken cancellationToken)
  668. {
  669. _output = output;
  670. _cancellationToken = cancellationToken;
  671. @@ -125,5 +128,66 @@ namespace Microsoft.AspNetCore.SignalR.Internal
  672. public object Current => _input.Current;
  673. public Task<bool> MoveNextAsync() => _input.MoveNextAsync();
  674. }
  675. +
  676. + public static IAsyncEnumerator<T> GetAsyncEnumerator<T>(ChannelReader<T> channel, CancellationToken cancellationToken = default(CancellationToken))
  677. + {
  678. + return new AsyncEnumerator<T>(channel, cancellationToken);
  679. + }
  680. +
  681. + /// <summary>Provides an async enumerator for the data in a channel.</summary>
  682. + internal class AsyncEnumerator<T> : IAsyncEnumerator<T>
  683. + {
  684. + /// <summary>The channel being enumerated.</summary>
  685. + private readonly ChannelReader<T> _channel;
  686. + /// <summary>Cancellation token used to cancel the enumeration.</summary>
  687. + private readonly CancellationToken _cancellationToken;
  688. + /// <summary>The current element of the enumeration.</summary>
  689. + private T _current;
  690. +
  691. + internal AsyncEnumerator(ChannelReader<T> channel, CancellationToken cancellationToken)
  692. + {
  693. + _channel = channel;
  694. + _cancellationToken = cancellationToken;
  695. + }
  696. +
  697. + public T Current => _current;
  698. +
  699. + public Task<bool> MoveNextAsync()
  700. + {
  701. + ValueTask<T> result = _channel.ReadAsync(_cancellationToken);
  702. +
  703. + if (result.IsCompletedSuccessfully)
  704. + {
  705. + _current = result.Result;
  706. + return Task.FromResult(true);
  707. + }
  708. +
  709. + return result.AsTask().ContinueWith((t, s) =>
  710. + {
  711. + var thisRef = (AsyncEnumerator<T>)s;
  712. + if (t.IsFaulted && t.Exception.InnerException is ChannelClosedException cce && cce.InnerException == null)
  713. + {
  714. + return false;
  715. + }
  716. + thisRef._current = t.GetAwaiter().GetResult();
  717. + return true;
  718. + }, this, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.NotOnCanceled, TaskScheduler.Default);
  719. + }
  720. + }
  721. + }
  722. +
  723. + /// <summary>Represents an enumerator accessed asynchronously.</summary>
  724. + /// <typeparam name="T">Specifies the type of the data enumerated.</typeparam>
  725. + internal interface IAsyncEnumerator<out T>
  726. + {
  727. + /// <summary>Asynchronously move the enumerator to the next element.</summary>
  728. + /// <returns>
  729. + /// A task that returns true if the enumerator was successfully advanced to the next item,
  730. + /// or false if no more data was available in the collection.
  731. + /// </returns>
  732. + Task<bool> MoveNextAsync();
  733. +
  734. + /// <summary>Gets the current element being enumerated.</summary>
  735. + T Current { get; }
  736. }
  737. }
  738. diff --git a/src/Microsoft.AspNetCore.Sockets.Abstractions/ChannelConnection.cs b/src/Microsoft.AspNetCore.Sockets.Abstractions/ChannelConnection.cs
  739. index 101d988e5ed..5065cb9d3d8 100644
  740. --- a/src/Microsoft.AspNetCore.Sockets.Abstractions/ChannelConnection.cs
  741. +++ b/src/Microsoft.AspNetCore.Sockets.Abstractions/ChannelConnection.cs
  742. @@ -1,8 +1,8 @@
  743. -// Copyright (c) .NET Foundation. All rights reserved.
  744. +// Copyright (c) .NET Foundation. All rights reserved.
  745. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
  746. using System;
  747. -using System.Threading.Tasks.Channels;
  748. +using System.Threading.Channels;
  749. namespace Microsoft.AspNetCore.Sockets.Internal
  750. {
  751. @@ -24,20 +24,19 @@ namespace Microsoft.AspNetCore.Sockets.Internal
  752. public Channel<T> Input { get; }
  753. public Channel<T> Output { get; }
  754. - public override ReadableChannel<T> In => Input;
  755. -
  756. - public override WritableChannel<T> Out => Output;
  757. -
  758. public ChannelConnection(Channel<T> input, Channel<T> output)
  759. {
  760. + Reader = input.Reader;
  761. Input = input;
  762. +
  763. + Writer = output.Writer;
  764. Output = output;
  765. }
  766. public void Dispose()
  767. {
  768. - Input.Out.TryComplete();
  769. - Output.Out.TryComplete();
  770. + Input.Writer.TryComplete();
  771. + Output.Writer.TryComplete();
  772. }
  773. }
  774. @@ -46,20 +45,19 @@ namespace Microsoft.AspNetCore.Sockets.Internal
  775. public Channel<TIn> Input { get; }
  776. public Channel<TOut> Output { get; }
  777. - public override ReadableChannel<TIn> In => Input;
  778. -
  779. - public override WritableChannel<TOut> Out => Output;
  780. -
  781. public ChannelConnection(Channel<TIn> input, Channel<TOut> output)
  782. {
  783. + Reader = input.Reader;
  784. Input = input;
  785. +
  786. + Writer = output.Writer;
  787. Output = output;
  788. }
  789. public void Dispose()
  790. {
  791. - Input.Out.TryComplete();
  792. - Output.Out.TryComplete();
  793. + Input.Writer.TryComplete();
  794. + Output.Writer.TryComplete();
  795. }
  796. }
  797. }
  798. diff --git a/src/Microsoft.AspNetCore.Sockets.Abstractions/ChannelReaderExtensions.cs b/src/Microsoft.AspNetCore.Sockets.Abstractions/ChannelReaderExtensions.cs
  799. new file mode 100644
  800. index 00000000000..e437a1a5945
  801. --- /dev/null
  802. +++ b/src/Microsoft.AspNetCore.Sockets.Abstractions/ChannelReaderExtensions.cs
  803. @@ -0,0 +1,47 @@
  804. +// Copyright (c) .NET Foundation. All rights reserved.
  805. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
  806. +
  807. +using System;
  808. +using System.Threading;
  809. +using System.Threading.Channels;
  810. +using System.Threading.Tasks;
  811. +
  812. +namespace Microsoft.AspNetCore.SignalR.Internal
  813. +{
  814. + public static class ChannelReaderExtensions
  815. + {
  816. + /// <summary>Asynchronously reads an item from the channel.</summary>
  817. + /// <param name="channel">The channel</param>
  818. + /// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the read operation.</param>
  819. + /// <returns>A <see cref="ValueTask{TResult}"/> that represents the asynchronous read operation.</returns>
  820. + public static ValueTask<T> ReadAsync<T>(this ChannelReader<T> channel, CancellationToken cancellationToken = default)
  821. + {
  822. + try
  823. + {
  824. + return
  825. + cancellationToken.IsCancellationRequested
  826. + ? new ValueTask<T>(Task.FromCanceled<T>(cancellationToken))
  827. + : channel.TryRead(out T item)
  828. + ? new ValueTask<T>(item)
  829. + : ReadAsyncCore(cancellationToken);
  830. + }
  831. + catch (Exception e)
  832. + {
  833. + return new ValueTask<T>(Task.FromException<T>(e));
  834. + }
  835. +
  836. + async ValueTask<T> ReadAsyncCore(CancellationToken ct)
  837. + {
  838. + while (await channel.WaitToReadAsync(ct).ConfigureAwait(false))
  839. + {
  840. + if (channel.TryRead(out T item))
  841. + {
  842. + return item;
  843. + }
  844. + }
  845. +
  846. + throw new ChannelClosedException();
  847. + }
  848. + }
  849. + }
  850. +}
  851. diff --git a/src/Microsoft.AspNetCore.Sockets.Abstractions/ConnectionContext.cs b/src/Microsoft.AspNetCore.Sockets.Abstractions/ConnectionContext.cs
  852. index 7fa72781309..8f4c799a162 100644
  853. --- a/src/Microsoft.AspNetCore.Sockets.Abstractions/ConnectionContext.cs
  854. +++ b/src/Microsoft.AspNetCore.Sockets.Abstractions/ConnectionContext.cs
  855. @@ -2,7 +2,7 @@
  856. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
  857. using System.Collections.Generic;
  858. -using System.Threading.Tasks.Channels;
  859. +using System.Threading.Channels;
  860. using Microsoft.AspNetCore.Http.Features;
  861. namespace Microsoft.AspNetCore.Sockets
  862. diff --git a/src/Microsoft.AspNetCore.Sockets.Abstractions/Features/IConnectionTransportFeature.cs b/src/Microsoft.AspNetCore.Sockets.Abstractions/Features/IConnectionTransportFeature.cs
  863. index d29718fa317..e851b49bcc0 100644
  864. --- a/src/Microsoft.AspNetCore.Sockets.Abstractions/Features/IConnectionTransportFeature.cs
  865. +++ b/src/Microsoft.AspNetCore.Sockets.Abstractions/Features/IConnectionTransportFeature.cs
  866. @@ -1,7 +1,7 @@
  867. // Copyright (c) .NET Foundation. All rights reserved.
  868. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
  869. -using System.Threading.Tasks.Channels;
  870. +using System.Threading.Channels;
  871. namespace Microsoft.AspNetCore.Sockets.Features
  872. {
  873. diff --git a/src/Microsoft.AspNetCore.Sockets.Abstractions/Microsoft.AspNetCore.Sockets.Abstractions.csproj b/src/Microsoft.AspNetCore.Sockets.Abstractions/Microsoft.AspNetCore.Sockets.Abstractions.csproj
  874. index c2dba911f3b..56b275ea06b 100644
  875. --- a/src/Microsoft.AspNetCore.Sockets.Abstractions/Microsoft.AspNetCore.Sockets.Abstractions.csproj
  876. +++ b/src/Microsoft.AspNetCore.Sockets.Abstractions/Microsoft.AspNetCore.Sockets.Abstractions.csproj
  877. @@ -7,7 +7,8 @@
  878. <ItemGroup>
  879. <PackageReference Include="Microsoft.AspNetCore.Http.Features" Version="$(MicrosoftAspNetCoreHttpFeaturesPackageVersion)" />
  880. - <PackageReference Include="System.Threading.Tasks.Channels" Version="$(SystemThreadingTasksChannelsPackageVersion)" />
  881. + <PackageReference Include="System.Threading.Channels" Version="$(SystemThreadingChannelsPackageVersion)" />
  882. + <PackageReference Include="System.Threading.Tasks.Extensions" Version="$(SystemThreadingTasksExtensionsPackageVersion)" />
  883. </ItemGroup>
  884. </Project>
  885. diff --git a/src/Microsoft.AspNetCore.Sockets.Client.Http/HttpConnection.cs b/src/Microsoft.AspNetCore.Sockets.Client.Http/HttpConnection.cs
  886. index c2c27f5fe25..1a4776ec452 100644
  887. --- a/src/Microsoft.AspNetCore.Sockets.Client.Http/HttpConnection.cs
  888. +++ b/src/Microsoft.AspNetCore.Sockets.Client.Http/HttpConnection.cs
  889. @@ -1,4 +1,4 @@
  890. -// Copyright (c) .NET Foundation. All rights reserved.
  891. +// Copyright (c) .NET Foundation. All rights reserved.
  892. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
  893. using System;
  894. @@ -8,7 +8,7 @@ using System.IO;
  895. using System.Net.Http;
  896. using System.Threading;
  897. using System.Threading.Tasks;
  898. -using System.Threading.Tasks.Channels;
  899. +using System.Threading.Channels;
  900. using Microsoft.AspNetCore.Http.Features;
  901. using Microsoft.AspNetCore.Sockets.Client.Http;
  902. using Microsoft.AspNetCore.Sockets.Client.Internal;
  903. @@ -39,8 +39,8 @@ namespace Microsoft.AspNetCore.Sockets.Client
  904. private readonly ITransportFactory _transportFactory;
  905. private string _connectionId;
  906. private readonly TimeSpan _eventQueueDrainTimeout = TimeSpan.FromSeconds(5);
  907. - private ReadableChannel<byte[]> Input => _transportChannel.In;
  908. - private WritableChannel<SendMessage> Output => _transportChannel.Out;
  909. + private ChannelReader<byte[]> Input => _transportChannel.Input;
  910. + private ChannelWriter<SendMessage> Output => _transportChannel.Output;
  911. private readonly List<ReceiveCallback> _callbacks = new List<ReceiveCallback>();
  912. private readonly TransportType _requestedTransportType = TransportType.All;
  913. diff --git a/src/Microsoft.AspNetCore.Sockets.Client.Http/ITransport.cs b/src/Microsoft.AspNetCore.Sockets.Client.Http/ITransport.cs
  914. index 7591183e339..784400db856 100644
  915. --- a/src/Microsoft.AspNetCore.Sockets.Client.Http/ITransport.cs
  916. +++ b/src/Microsoft.AspNetCore.Sockets.Client.Http/ITransport.cs
  917. @@ -1,9 +1,9 @@
  918. -// Copyright (c) .NET Foundation. All rights reserved.
  919. +// Copyright (c) .NET Foundation. All rights reserved.
  920. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
  921. using System;
  922. using System.Threading.Tasks;
  923. -using System.Threading.Tasks.Channels;
  924. +using System.Threading.Channels;
  925. namespace Microsoft.AspNetCore.Sockets.Client
  926. {
  927. diff --git a/src/Microsoft.AspNetCore.Sockets.Client.Http/LongPollingTransport.cs b/src/Microsoft.AspNetCore.Sockets.Client.Http/LongPollingTransport.cs
  928. index 9229219a521..a059a81230f 100644
  929. --- a/src/Microsoft.AspNetCore.Sockets.Client.Http/LongPollingTransport.cs
  930. +++ b/src/Microsoft.AspNetCore.Sockets.Client.Http/LongPollingTransport.cs
  931. @@ -1,4 +1,4 @@
  932. -// Copyright (c) .NET Foundation. All rights reserved.
  933. +// Copyright (c) .NET Foundation. All rights reserved.
  934. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
  935. using System;
  936. @@ -6,7 +6,7 @@ using System.Net;
  937. using System.Net.Http;
  938. using System.Threading;
  939. using System.Threading.Tasks;
  940. -using System.Threading.Tasks.Channels;
  941. +using System.Threading.Channels;
  942. using Microsoft.AspNetCore.Sockets.Client.Http;
  943. using Microsoft.AspNetCore.Sockets.Client.Internal;
  944. using Microsoft.Extensions.Logging;
  945. @@ -59,7 +59,7 @@ namespace Microsoft.AspNetCore.Sockets.Client
  946. Running = Task.WhenAll(_sender, _poller).ContinueWith(t =>
  947. {
  948. _logger.TransportStopped(_connectionId, t.Exception?.InnerException);
  949. - _application.Out.TryComplete(t.IsFaulted ? t.Exception.InnerException : null);
  950. + _application.Writer.TryComplete(t.IsFaulted ? t.Exception.InnerException : null);
  951. return t;
  952. }).Unwrap();
  953. @@ -123,9 +123,9 @@ namespace Microsoft.AspNetCore.Sockets.Client
  954. var payload = await response.Content.ReadAsByteArrayAsync();
  955. if (payload.Length > 0)
  956. {
  957. - while (!_application.Out.TryWrite(payload))
  958. + while (!_application.Writer.TryWrite(payload))
  959. {
  960. - if (cancellationToken.IsCancellationRequested || !await _application.Out.WaitToWriteAsync(cancellationToken))
  961. + if (cancellationToken.IsCancellationRequested || !await _application.Writer.WaitToWriteAsync(cancellationToken))
  962. {
  963. return;
  964. }
  965. diff --git a/src/Microsoft.AspNetCore.Sockets.Client.Http/Microsoft.AspNetCore.Sockets.Client.Http.csproj b/src/Microsoft.AspNetCore.Sockets.Client.Http/Microsoft.AspNetCore.Sockets.Client.Http.csproj
  966. index d09d5db9a5e..3a4980df1f3 100644
  967. --- a/src/Microsoft.AspNetCore.Sockets.Client.Http/Microsoft.AspNetCore.Sockets.Client.Http.csproj
  968. +++ b/src/Microsoft.AspNetCore.Sockets.Client.Http/Microsoft.AspNetCore.Sockets.Client.Http.csproj
  969. @@ -22,7 +22,7 @@
  970. <PackageReference Include="System.Memory" Version="$(SystemMemoryPackageVersion)" />
  971. <PackageReference Include="System.Numerics.Vectors" Version="$(SystemNumericsVectorsPackageVersion)" />
  972. <PackageReference Include="System.Runtime.CompilerServices.Unsafe" Version="$(SystemRuntimeCompilerServicesUnsafePackageVersion)" />
  973. - <PackageReference Include="System.Threading.Tasks.Channels" Version="$(SystemThreadingTasksChannelsPackageVersion)" />
  974. + <PackageReference Include="System.Threading.Channels" Version="$(SystemThreadingChannelsPackageVersion)" />
  975. </ItemGroup>
  976. </Project>
  977. diff --git a/src/Microsoft.AspNetCore.Sockets.Client.Http/SendUtils.cs b/src/Microsoft.AspNetCore.Sockets.Client.Http/SendUtils.cs
  978. index a95c59a3533..af05b120606 100644
  979. --- a/src/Microsoft.AspNetCore.Sockets.Client.Http/SendUtils.cs
  980. +++ b/src/Microsoft.AspNetCore.Sockets.Client.Http/SendUtils.cs
  981. @@ -1,4 +1,4 @@
  982. -// Copyright (c) .NET Foundation. All rights reserved.
  983. +// Copyright (c) .NET Foundation. All rights reserved.
  984. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
  985. using System;
  986. @@ -7,7 +7,7 @@ using System.IO;
  987. using System.Net.Http;
  988. using System.Threading;
  989. using System.Threading.Tasks;
  990. -using System.Threading.Tasks.Channels;
  991. +using System.Threading.Channels;
  992. using Microsoft.AspNetCore.Sockets.Client.Http;
  993. using Microsoft.AspNetCore.Sockets.Client.Internal;
  994. using Microsoft.Extensions.Logging;
  995. @@ -23,11 +23,11 @@ namespace Microsoft.AspNetCore.Sockets.Client
  996. IList<SendMessage> messages = null;
  997. try
  998. {
  999. - while (await application.In.WaitToReadAsync(transportCts.Token))
  1000. + while (await application.Reader.WaitToReadAsync(transportCts.Token))
  1001. {
  1002. // Grab as many messages as we can from the channel
  1003. messages = new List<SendMessage>();
  1004. - while (!transportCts.IsCancellationRequested && application.In.TryRead(out SendMessage message))
  1005. + while (!transportCts.IsCancellationRequested && application.Reader.TryRead(out SendMessage message))
  1006. {
  1007. messages.Add(message);
  1008. }
  1009. diff --git a/src/Microsoft.AspNetCore.Sockets.Client.Http/ServerSentEventsMessageParser.cs b/src/Microsoft.AspNetCore.Sockets.Client.Http/ServerSentEventsMessageParser.cs
  1010. index bc4713320df..4ad011babc9 100644
  1011. --- a/src/Microsoft.AspNetCore.Sockets.Client.Http/ServerSentEventsMessageParser.cs
  1012. +++ b/src/Microsoft.AspNetCore.Sockets.Client.Http/ServerSentEventsMessageParser.cs
  1013. @@ -146,7 +146,7 @@ namespace Microsoft.AspNetCore.Sockets.Internal.Formatters
  1014. }
  1015. [MethodImpl(MethodImplOptions.AggressiveInlining)]
  1016. - private Span<byte> ConvertBufferToSpan(ReadableBuffer buffer)
  1017. + private ReadOnlySpan<byte> ConvertBufferToSpan(ReadableBuffer buffer)
  1018. {
  1019. if (buffer.IsSingleSpan)
  1020. {
  1021. diff --git a/src/Microsoft.AspNetCore.Sockets.Client.Http/ServerSentEventsTransport.cs b/src/Microsoft.AspNetCore.Sockets.Client.Http/ServerSentEventsTransport.cs
  1022. index 9018659dddc..62e92d12c29 100644
  1023. --- a/src/Microsoft.AspNetCore.Sockets.Client.Http/ServerSentEventsTransport.cs
  1024. +++ b/src/Microsoft.AspNetCore.Sockets.Client.Http/ServerSentEventsTransport.cs
  1025. @@ -1,13 +1,14 @@
  1026. -// Copyright (c) .NET Foundation. All rights reserved.
  1027. +// Copyright (c) .NET Foundation. All rights reserved.
  1028. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
  1029. using System;
  1030. +using System.Buffers;
  1031. using System.IO.Pipelines;
  1032. using System.Net.Http;
  1033. using System.Net.Http.Headers;
  1034. using System.Threading;
  1035. using System.Threading.Tasks;
  1036. -using System.Threading.Tasks.Channels;
  1037. +using System.Threading.Channels;
  1038. using Microsoft.AspNetCore.Sockets.Client.Internal;
  1039. using Microsoft.AspNetCore.Sockets.Internal.Formatters;
  1040. using Microsoft.Extensions.Logging;
  1041. @@ -17,6 +18,7 @@ namespace Microsoft.AspNetCore.Sockets.Client
  1042. {
  1043. public class ServerSentEventsTransport : ITransport
  1044. {
  1045. + private static readonly MemoryPool _memoryPool = new MemoryPool();
  1046. private readonly HttpClient _httpClient;
  1047. private readonly ILogger _logger;
  1048. private readonly CancellationTokenSource _transportCts = new CancellationTokenSource();
  1049. @@ -64,7 +66,7 @@ namespace Microsoft.AspNetCore.Sockets.Client
  1050. {
  1051. _logger.TransportStopped(_connectionId, t.Exception?.InnerException);
  1052. - _application.Out.TryComplete(t.IsFaulted ? t.Exception.InnerException : null);
  1053. + _application.Writer.TryComplete(t.IsFaulted ? t.Exception.InnerException : null);
  1054. return t;
  1055. }).Unwrap();
  1056. @@ -80,7 +82,7 @@ namespace Microsoft.AspNetCore.Sockets.Client
  1057. var response = await _httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken);
  1058. var stream = await response.Content.ReadAsStreamAsync();
  1059. - var pipelineReader = stream.AsPipelineReader(cancellationToken);
  1060. + var pipelineReader = StreamPipeConnection.CreateReader(new PipeOptions(_memoryPool), stream);
  1061. var readCancellationRegistration = cancellationToken.Register(
  1062. reader => ((IPipeReader)reader).CancelPendingRead(), pipelineReader);
  1063. try
  1064. @@ -105,7 +107,7 @@ namespace Microsoft.AspNetCore.Sockets.Client
  1065. switch (parseResult)
  1066. {
  1067. case ServerSentEventsMessageParser.ParseResult.Completed:
  1068. - _application.Out.TryWrite(buffer);
  1069. + _application.Writer.TryWrite(buffer);
  1070. _parser.Reset();
  1071. break;
  1072. case ServerSentEventsMessageParser.ParseResult.Incomplete:
  1073. @@ -139,7 +141,7 @@ namespace Microsoft.AspNetCore.Sockets.Client
  1074. {
  1075. _logger.TransportStopping(_connectionId);
  1076. _transportCts.Cancel();
  1077. - _application.Out.TryComplete();
  1078. + _application.Writer.TryComplete();
  1079. try
  1080. {
  1081. diff --git a/src/Microsoft.AspNetCore.Sockets.Client.Http/WebSocketsTransport.cs b/src/Microsoft.AspNetCore.Sockets.Client.Http/WebSocketsTransport.cs
  1082. index 2249c44bc9a..5b8874b6c57 100644
  1083. --- a/src/Microsoft.AspNetCore.Sockets.Client.Http/WebSocketsTransport.cs
  1084. +++ b/src/Microsoft.AspNetCore.Sockets.Client.Http/WebSocketsTransport.cs
  1085. @@ -1,4 +1,4 @@
  1086. -// Copyright (c) .NET Foundation. All rights reserved.
  1087. +// Copyright (c) .NET Foundation. All rights reserved.
  1088. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
  1089. using System;
  1090. @@ -7,7 +7,7 @@ using System.Diagnostics;
  1091. using System.Net.WebSockets;
  1092. using System.Threading;
  1093. using System.Threading.Tasks;
  1094. -using System.Threading.Tasks.Channels;
  1095. +using System.Threading.Channels;
  1096. using Microsoft.AspNetCore.Sockets.Client.Internal;
  1097. using Microsoft.Extensions.Logging;
  1098. using Microsoft.Extensions.Logging.Abstractions;
  1099. @@ -70,8 +70,8 @@ namespace Microsoft.AspNetCore.Sockets.Client
  1100. {
  1101. _webSocket.Dispose();
  1102. _logger.TransportStopped(_connectionId, t.Exception?.InnerException);
  1103. - _application.Out.TryComplete(t.IsFaulted ? t.Exception.InnerException : null);
  1104. - return t;
  1105. + _application.Writer.TryComplete(t.IsFaulted ? t.Exception.InnerException : null);
  1106. + return t;
  1107. }).Unwrap();
  1108. }
  1109. @@ -97,7 +97,7 @@ namespace Microsoft.AspNetCore.Sockets.Client
  1110. {
  1111. _logger.WebSocketClosed(_connectionId, receiveResult.CloseStatus);
  1112. - _application.Out.Complete(
  1113. + _application.Writer.Complete(
  1114. receiveResult.CloseStatus == WebSocketCloseStatus.NormalClosure
  1115. ? null
  1116. : new InvalidOperationException(
  1117. @@ -135,9 +135,9 @@ namespace Microsoft.AspNetCore.Sockets.Client
  1118. if (!_transportCts.Token.IsCancellationRequested)
  1119. {
  1120. _logger.MessageToApp(_connectionId, messageBuffer.Length);
  1121. - while (await _application.Out.WaitToWriteAsync(_transportCts.Token))
  1122. + while (await _application.Writer.WaitToWriteAsync(_transportCts.Token))
  1123. {
  1124. - if (_application.Out.TryWrite(messageBuffer))
  1125. + if (_application.Writer.TryWrite(messageBuffer))
  1126. {
  1127. incomingMessage.Clear();
  1128. break;
  1129. @@ -173,9 +173,9 @@ namespace Microsoft.AspNetCore.Sockets.Client
  1130. try
  1131. {
  1132. - while (await _application.In.WaitToReadAsync(_transportCts.Token))
  1133. + while (await _application.Reader.WaitToReadAsync(_transportCts.Token))
  1134. {
  1135. - while (_application.In.TryRead(out SendMessage message))
  1136. + while (_application.Reader.TryRead(out SendMessage message))
  1137. {
  1138. try
  1139. {
  1140. diff --git a/src/Microsoft.AspNetCore.Sockets.Http/HttpConnectionDispatcher.cs b/src/Microsoft.AspNetCore.Sockets.Http/HttpConnectionDispatcher.cs
  1141. index 1bcca138ef1..996605a4567 100644
  1142. --- a/src/Microsoft.AspNetCore.Sockets.Http/HttpConnectionDispatcher.cs
  1143. +++ b/src/Microsoft.AspNetCore.Sockets.Http/HttpConnectionDispatcher.cs
  1144. @@ -1,4 +1,4 @@
  1145. -// Copyright (c) .NET Foundation. All rights reserved.
  1146. +// Copyright (c) .NET Foundation. All rights reserved.
  1147. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
  1148. using System;
  1149. @@ -93,7 +93,7 @@ namespace Microsoft.AspNetCore.Sockets
  1150. connection.TransportCapabilities = TransferMode.Text;
  1151. // We only need to provide the Input channel since writing to the application is handled through /send.
  1152. - var sse = new ServerSentEventsTransport(connection.Application.In, connection.ConnectionId, _loggerFactory);
  1153. + var sse = new ServerSentEventsTransport(connection.Application.Reader, connection.ConnectionId, _loggerFactory);
  1154. await DoPersistentConnection(socketDelegate, sse, context, connection);
  1155. }
  1156. @@ -194,7 +194,7 @@ namespace Microsoft.AspNetCore.Sockets
  1157. context.Response.RegisterForDispose(timeoutSource);
  1158. context.Response.RegisterForDispose(tokenSource);
  1159. - var longPolling = new LongPollingTransport(timeoutSource.Token, connection.Application.In, connection.ConnectionId, _loggerFactory);
  1160. + var longPolling = new LongPollingTransport(timeoutSource.Token, connection.Application.Reader, connection.ConnectionId, _loggerFactory);
  1161. // Start the transport
  1162. connection.TransportTask = longPolling.ProcessRequestAsync(context, tokenSource.Token);
  1163. @@ -215,7 +215,7 @@ namespace Microsoft.AspNetCore.Sockets
  1164. if (resultTask == connection.ApplicationTask)
  1165. {
  1166. // Complete the transport (notifying it of the application error if there is one)
  1167. - connection.Transport.Out.TryComplete(connection.ApplicationTask.Exception);
  1168. + connection.Transport.Writer.TryComplete(connection.ApplicationTask.Exception);
  1169. // Wait for the transport to run
  1170. await connection.TransportTask;
  1171. @@ -408,9 +408,9 @@ namespace Microsoft.AspNetCore.Sockets
  1172. }
  1173. _logger.ReceivedBytes(connection.ConnectionId, buffer.Length);
  1174. - while (!connection.Application.Out.TryWrite(buffer))
  1175. + while (!connection.Application.Writer.TryWrite(buffer))
  1176. {
  1177. - if (!await connection.Application.Out.WaitToWriteAsync())
  1178. + if (!await connection.Application.Writer.WaitToWriteAsync())
  1179. {
  1180. return;
  1181. }
  1182. diff --git a/src/Microsoft.AspNetCore.Sockets.Http/Internal/Transports/LongPollingTransport.cs b/src/Microsoft.AspNetCore.Sockets.Http/Internal/Transports/LongPollingTransport.cs
  1183. index 033efc6fe20..a9ae71af2ae 100644
  1184. --- a/src/Microsoft.AspNetCore.Sockets.Http/Internal/Transports/LongPollingTransport.cs
  1185. +++ b/src/Microsoft.AspNetCore.Sockets.Http/Internal/Transports/LongPollingTransport.cs
  1186. @@ -1,11 +1,11 @@
  1187. -// Copyright (c) .NET Foundation. All rights reserved.
  1188. +// Copyright (c) .NET Foundation. All rights reserved.
  1189. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
  1190. using System;
  1191. using System.Collections.Generic;
  1192. using System.Threading;
  1193. using System.Threading.Tasks;
  1194. -using System.Threading.Tasks.Channels;
  1195. +using System.Threading.Channels;
  1196. using Microsoft.AspNetCore.Http;
  1197. using Microsoft.Extensions.Logging;
  1198. @@ -13,12 +13,12 @@ namespace Microsoft.AspNetCore.Sockets.Internal.Transports
  1199. {
  1200. public class LongPollingTransport : IHttpTransport
  1201. {
  1202. - private readonly ReadableChannel<byte[]> _application;
  1203. + private readonly ChannelReader<byte[]> _application;
  1204. private readonly ILogger _logger;
  1205. private readonly CancellationToken _timeoutToken;
  1206. private readonly string _connectionId;
  1207. - public LongPollingTransport(CancellationToken timeoutToken, ReadableChannel<byte[]> application, string connectionId, ILoggerFactory loggerFactory)
  1208. + public LongPollingTransport(CancellationToken timeoutToken, ChannelReader<byte[]> application, string connectionId, ILoggerFactory loggerFactory)
  1209. {
  1210. _timeoutToken = timeoutToken;
  1211. _application = application;
  1212. diff --git a/src/Microsoft.AspNetCore.Sockets.Http/Internal/Transports/ServerSentEventsMessageFormatter.cs b/src/Microsoft.AspNetCore.Sockets.Http/Internal/Transports/ServerSentEventsMessageFormatter.cs
  1213. index 5a04cc76ead..21b079c2638 100644
  1214. --- a/src/Microsoft.AspNetCore.Sockets.Http/Internal/Transports/ServerSentEventsMessageFormatter.cs
  1215. +++ b/src/Microsoft.AspNetCore.Sockets.Http/Internal/Transports/ServerSentEventsMessageFormatter.cs
  1216. @@ -65,7 +65,7 @@ namespace Microsoft.AspNetCore.Sockets.Internal.Formatters
  1217. if (nextSliceStart >= payload.Length)
  1218. {
  1219. - payload = Span<byte>.Empty;
  1220. + payload = ReadOnlySpan<byte>.Empty;
  1221. }
  1222. else
  1223. {
  1224. diff --git a/src/Microsoft.AspNetCore.Sockets.Http/Internal/Transports/ServerSentEventsTransport.cs b/src/Microsoft.AspNetCore.Sockets.Http/Internal/Transports/ServerSentEventsTransport.cs
  1225. index 3ff1c1b756f..19bda2e3907 100644
  1226. --- a/src/Microsoft.AspNetCore.Sockets.Http/Internal/Transports/ServerSentEventsTransport.cs
  1227. +++ b/src/Microsoft.AspNetCore.Sockets.Http/Internal/Transports/ServerSentEventsTransport.cs
  1228. @@ -1,11 +1,11 @@
  1229. -// Copyright (c) .NET Foundation. All rights reserved.
  1230. +// Copyright (c) .NET Foundation. All rights reserved.
  1231. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
  1232. using System;
  1233. using System.IO;
  1234. using System.Threading;
  1235. using System.Threading.Tasks;
  1236. -using System.Threading.Tasks.Channels;
  1237. +using System.Threading.Channels;
  1238. using Microsoft.AspNetCore.Http;
  1239. using Microsoft.AspNetCore.Http.Features;
  1240. using Microsoft.AspNetCore.Sockets.Internal.Formatters;
  1241. @@ -15,11 +15,11 @@ namespace Microsoft.AspNetCore.Sockets.Internal.Transports
  1242. {
  1243. public class ServerSentEventsTransport : IHttpTransport
  1244. {
  1245. - private readonly ReadableChannel<byte[]> _application;
  1246. + private readonly ChannelReader<byte[]> _application;
  1247. private readonly string _connectionId;
  1248. private readonly ILogger _logger;
  1249. - public ServerSentEventsTransport(ReadableChannel<byte[]> application, string connectionId, ILoggerFactory loggerFactory)
  1250. + public ServerSentEventsTransport(ChannelReader<byte[]> application, string connectionId, ILoggerFactory loggerFactory)
  1251. {
  1252. _application = application;
  1253. _connectionId = connectionId;
  1254. diff --git a/src/Microsoft.AspNetCore.Sockets.Http/Internal/Transports/WebSocketsTransport.cs b/src/Microsoft.AspNetCore.Sockets.Http/Internal/Transports/WebSocketsTransport.cs
  1255. index 5adc0ab245f..97756e36130 100644
  1256. --- a/src/Microsoft.AspNetCore.Sockets.Http/Internal/Transports/WebSocketsTransport.cs
  1257. +++ b/src/Microsoft.AspNetCore.Sockets.Http/Internal/Transports/WebSocketsTransport.cs
  1258. @@ -1,4 +1,4 @@
  1259. -// Copyright (c) .NET Foundation. All rights reserved.
  1260. +// Copyright (c) .NET Foundation. All rights reserved.
  1261. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
  1262. using System;
  1263. @@ -7,7 +7,7 @@ using System.Diagnostics;
  1264. using System.Net.WebSockets;
  1265. using System.Threading;
  1266. using System.Threading.Tasks;
  1267. -using System.Threading.Tasks.Channels;
  1268. +using System.Threading.Channels;
  1269. using Microsoft.AspNetCore.Http;
  1270. using Microsoft.Extensions.Logging;
  1271. @@ -87,7 +87,7 @@ namespace Microsoft.AspNetCore.Sockets.Internal.Transports
  1272. }
  1273. // We're done writing
  1274. - _application.Out.TryComplete();
  1275. + _application.Writer.TryComplete();
  1276. await socket.CloseOutputAsync(failed ? WebSocketCloseStatus.InternalServerError : WebSocketCloseStatus.NormalClosure, "", CancellationToken.None);
  1277. @@ -160,9 +160,9 @@ namespace Microsoft.AspNetCore.Sockets.Internal.Transports
  1278. }
  1279. _logger.MessageToApplication(_connection.ConnectionId, messageBuffer.Length);
  1280. - while (await _application.Out.WaitToWriteAsync())
  1281. + while (await _application.Writer.WaitToWriteAsync())
  1282. {
  1283. - if (_application.Out.TryWrite(messageBuffer))
  1284. + if (_application.Writer.TryWrite(messageBuffer))
  1285. {
  1286. incomingMessage.Clear();
  1287. break;
  1288. @@ -173,10 +173,10 @@ namespace Microsoft.AspNetCore.Sockets.Internal.Transports
  1289. private async Task StartSending(WebSocket ws)
  1290. {
  1291. - while (await _application.In.WaitToReadAsync())
  1292. + while (await _application.Reader.WaitToReadAsync())
  1293. {
  1294. // Get a frame from the application
  1295. - while (_application.In.TryRead(out var buffer))
  1296. + while (_application.Reader.TryRead(out var buffer))
  1297. {
  1298. if (buffer.Length > 0)
  1299. {
  1300. diff --git a/src/Microsoft.AspNetCore.Sockets.Http/Microsoft.AspNetCore.Sockets.Http.csproj b/src/Microsoft.AspNetCore.Sockets.Http/Microsoft.AspNetCore.Sockets.Http.csproj
  1301. index 1f7ff58d7a6..7cbc3f60f84 100644
  1302. --- a/src/Microsoft.AspNetCore.Sockets.Http/Microsoft.AspNetCore.Sockets.Http.csproj
  1303. +++ b/src/Microsoft.AspNetCore.Sockets.Http/Microsoft.AspNetCore.Sockets.Http.csproj
  1304. @@ -16,7 +16,7 @@
  1305. <PackageReference Include="Microsoft.AspNetCore.Routing" Version="$(MicrosoftAspNetCoreRoutingPackageVersion)" />
  1306. <PackageReference Include="Microsoft.AspNetCore.WebSockets" Version="$(MicrosoftAspNetCoreWebSocketsPackageVersion)" />
  1307. <PackageReference Include="Microsoft.Extensions.SecurityHelper.Sources" PrivateAssets="All" Version="$(MicrosoftExtensionsSecurityHelperSourcesPackageVersion)" />
  1308. - <PackageReference Include="System.Threading.Tasks.Channels" Version="$(SystemThreadingTasksChannelsPackageVersion)" />
  1309. + <PackageReference Include="System.Threading.Channels" Version="$(SystemThreadingChannelsPackageVersion)" />
  1310. <PackageReference Include="System.Memory" Version="$(SystemMemoryPackageVersion)" />
  1311. <PackageReference Include="Newtonsoft.Json" Version="$(NewtonsoftJsonPackageVersion)" />
  1312. </ItemGroup>
  1313. diff --git a/src/Microsoft.AspNetCore.Sockets/ConnectionManager.cs b/src/Microsoft.AspNetCore.Sockets/ConnectionManager.cs
  1314. index 0d23bc71614..9baa5580451 100644
  1315. --- a/src/Microsoft.AspNetCore.Sockets/ConnectionManager.cs
  1316. +++ b/src/Microsoft.AspNetCore.Sockets/ConnectionManager.cs
  1317. @@ -9,7 +9,7 @@ using System.IO;
  1318. using System.Net.WebSockets;
  1319. using System.Threading;
  1320. using System.Threading.Tasks;
  1321. -using System.Threading.Tasks.Channels;
  1322. +using System.Threading.Channels;
  1323. using Microsoft.AspNetCore.Hosting;
  1324. using Microsoft.AspNetCore.Sockets.Internal;
  1325. using Microsoft.Extensions.Logging;
  1326. diff --git a/src/Microsoft.AspNetCore.Sockets/DefaultConnectionContext.cs b/src/Microsoft.AspNetCore.Sockets/DefaultConnectionContext.cs
  1327. index 3acb6bb0c18..990827767b2 100644
  1328. --- a/src/Microsoft.AspNetCore.Sockets/DefaultConnectionContext.cs
  1329. +++ b/src/Microsoft.AspNetCore.Sockets/DefaultConnectionContext.cs
  1330. @@ -6,7 +6,7 @@ using System.Collections.Generic;
  1331. using System.Security.Claims;
  1332. using System.Threading;
  1333. using System.Threading.Tasks;
  1334. -using System.Threading.Tasks.Channels;
  1335. +using System.Threading.Channels;
  1336. using Microsoft.AspNetCore.Http.Features;
  1337. using Microsoft.AspNetCore.Sockets.Features;
  1338. @@ -86,21 +86,21 @@ namespace Microsoft.AspNetCore.Sockets
  1339. // If the application task is faulted, propagate the error to the transport
  1340. if (ApplicationTask?.IsFaulted == true)
  1341. {
  1342. - Transport.Out.TryComplete(ApplicationTask.Exception.InnerException);
  1343. + Transport.Writer.TryComplete(ApplicationTask.Exception.InnerException);
  1344. }
  1345. else
  1346. {
  1347. - Transport.Out.TryComplete();
  1348. + Transport.Writer.TryComplete();
  1349. }
  1350. // If the transport task is faulted, propagate the error to the application
  1351. if (TransportTask?.IsFaulted == true)
  1352. {
  1353. - Application.Out.TryComplete(TransportTask.Exception.InnerException);
  1354. + Application.Writer.TryComplete(TransportTask.Exception.InnerException);
  1355. }
  1356. else
  1357. {
  1358. - Application.Out.TryComplete();
  1359. + Application.Writer.TryComplete();
  1360. }
  1361. var applicationTask = ApplicationTask ?? Task.CompletedTask;
  1362. diff --git a/src/Microsoft.AspNetCore.Sockets/Microsoft.AspNetCore.Sockets.csproj b/src/Microsoft.AspNetCore.Sockets/Microsoft.AspNetCore.Sockets.csproj
  1363. index 0de655a7316..27dd5e95745 100644
  1364. --- a/src/Microsoft.AspNetCore.Sockets/Microsoft.AspNetCore.Sockets.csproj
  1365. +++ b/src/Microsoft.AspNetCore.Sockets/Microsoft.AspNetCore.Sockets.csproj
  1366. @@ -13,7 +13,7 @@
  1367. <PackageReference Include="Microsoft.AspNetCore.Hosting.Abstractions" Version="$(MicrosoftAspNetCoreHostingAbstractionsPackageVersion)" />
  1368. <PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="$(MicrosoftExtensionsLoggingAbstractionsPackageVersion)" />
  1369. <PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="$(MicrosoftExtensionsDependencyInjectionAbstractionsPackageVersion)" />
  1370. - <PackageReference Include="System.Threading.Tasks.Channels" Version="$(SystemThreadingTasksChannelsPackageVersion)" />
  1371. + <PackageReference Include="System.Threading.Channels" Version="$(SystemThreadingChannelsPackageVersion)" />
  1372. </ItemGroup>
  1373. </Project>
  1374. diff --git a/test/Common/ChannelExtensions.cs b/test/Common/ChannelExtensions.cs
  1375. index 2502886317b..fd032253794 100644
  1376. --- a/test/Common/ChannelExtensions.cs
  1377. +++ b/test/Common/ChannelExtensions.cs
  1378. @@ -1,10 +1,11 @@
  1379. using System.Collections.Generic;
  1380. +using System.Threading.Tasks;
  1381. -namespace System.Threading.Tasks.Channels
  1382. +namespace System.Threading.Channels
  1383. {
  1384. internal static class ChannelExtensions
  1385. {
  1386. - public static async Task<List<T>> ReadAllAsync<T>(this ReadableChannel<T> channel)
  1387. + public static async Task<List<T>> ReadAllAsync<T>(this ChannelReader<T> channel)
  1388. {
  1389. var list = new List<T>();
  1390. while (await channel.WaitToReadAsync())
  1391. diff --git a/test/Common/TestClient.cs b/test/Common/TestClient.cs
  1392. index a4d903120f4..42a7f95ba39 100644
  1393. --- a/test/Common/TestClient.cs
  1394. +++ b/test/Common/TestClient.cs
  1395. @@ -7,7 +7,7 @@ using System.IO;
  1396. using System.Security.Claims;
  1397. using System.Threading;
  1398. using System.Threading.Tasks;
  1399. -using System.Threading.Tasks.Channels;
  1400. +using System.Threading.Channels;
  1401. using Microsoft.AspNetCore.SignalR.Internal;
  1402. using Microsoft.AspNetCore.SignalR.Internal.Encoders;
  1403. using Microsoft.AspNetCore.SignalR.Internal.Protocol;
  1404. @@ -32,7 +32,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
  1405. public TestClient(bool synchronousCallbacks = false, IHubProtocol protocol = null, IInvocationBinder invocationBinder = null, bool addClaimId = false)
  1406. {
  1407. - var options = new ChannelOptimizations { AllowSynchronousContinuations = synchronousCallbacks };
  1408. + var options = new UnboundedChannelOptions { AllowSynchronousContinuations = synchronousCallbacks };
  1409. var transportToApplication = Channel.CreateUnbounded<byte[]>(options);
  1410. var applicationToTransport = Channel.CreateUnbounded<byte[]>(options);
  1411. @@ -60,7 +60,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
  1412. using (var memoryStream = new MemoryStream())
  1413. {
  1414. NegotiationProtocol.WriteMessage(new NegotiationMessage(protocol.Name), memoryStream);
  1415. - Application.Out.TryWrite(memoryStream.ToArray());
  1416. + Application.Writer.TryWrite(memoryStream.ToArray());
  1417. }
  1418. }
  1419. @@ -149,7 +149,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
  1420. public async Task<string> SendHubMessageAsync(HubMessage message)
  1421. {
  1422. var payload = _protocolReaderWriter.WriteMessage(message);
  1423. - await Application.Out.WriteAsync(payload);
  1424. + await Application.Writer.WriteAsync(payload);
  1425. return message.InvocationId;
  1426. }
  1427. @@ -161,7 +161,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
  1428. if (message == null)
  1429. {
  1430. - if (!await Application.In.WaitToReadAsync())
  1431. + if (!await Application.Reader.WaitToReadAsync())
  1432. {
  1433. return null;
  1434. }
  1435. @@ -175,7 +175,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
  1436. public HubMessage TryRead()
  1437. {
  1438. - if (Application.In.TryRead(out var buffer) &&
  1439. + if (Application.Reader.TryRead(out var buffer) &&
  1440. _protocolReaderWriter.ReadMessages(buffer, _invocationBinder, out var messages))
  1441. {
  1442. return messages[0];
  1443. @@ -208,4 +208,4 @@ namespace Microsoft.AspNetCore.SignalR.Tests
  1444. }
  1445. }
  1446. }
  1447. -}
  1448. \ No newline at end of file
  1449. +}
  1450. diff --git a/test/Microsoft.AspNetCore.SignalR.Client.FunctionalTests/HubConnectionTests.cs b/test/Microsoft.AspNetCore.SignalR.Client.FunctionalTests/HubConnectionTests.cs
  1451. index 8e3c2bfc5ed..e110cc26a03 100644
  1452. --- a/test/Microsoft.AspNetCore.SignalR.Client.FunctionalTests/HubConnectionTests.cs
  1453. +++ b/test/Microsoft.AspNetCore.SignalR.Client.FunctionalTests/HubConnectionTests.cs
  1454. @@ -5,7 +5,7 @@ using System;
  1455. using System.Collections.Generic;
  1456. using System.Threading;
  1457. using System.Threading.Tasks;
  1458. -using System.Threading.Tasks.Channels;
  1459. +using System.Threading.Channels;
  1460. using Microsoft.AspNetCore.SignalR.Internal.Protocol;
  1461. using Microsoft.AspNetCore.SignalR.Tests.Common;
  1462. using Microsoft.AspNetCore.Sockets;
  1463. diff --git a/test/Microsoft.AspNetCore.SignalR.Client.FunctionalTests/Hubs.cs b/test/Microsoft.AspNetCore.SignalR.Client.FunctionalTests/Hubs.cs
  1464. index 02c2a875356..37b3e44dda2 100644
  1465. --- a/test/Microsoft.AspNetCore.SignalR.Client.FunctionalTests/Hubs.cs
  1466. +++ b/test/Microsoft.AspNetCore.SignalR.Client.FunctionalTests/Hubs.cs
  1467. @@ -1,11 +1,11 @@
  1468. -// Copyright (c) .NET Foundation. All rights reserved.
  1469. +// Copyright (c) .NET Foundation. All rights reserved.
  1470. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
  1471. using System;
  1472. using System.Linq;
  1473. using System.Reactive.Linq;
  1474. using System.Threading.Tasks;
  1475. -using System.Threading.Tasks.Channels;
  1476. +using System.Threading.Channels;
  1477. namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
  1478. {
  1479. @@ -17,9 +17,9 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
  1480. public IObservable<int> Stream(int count) => TestHubMethodsImpl.Stream(count);
  1481. - public ReadableChannel<int> StreamException() => TestHubMethodsImpl.StreamException();
  1482. + public ChannelReader<int> StreamException() => TestHubMethodsImpl.StreamException();
  1483. - public ReadableChannel<string> StreamBroken() => TestHubMethodsImpl.StreamBroken();
  1484. + public ChannelReader<string> StreamBroken() => TestHubMethodsImpl.StreamBroken();
  1485. public async Task CallEcho(string message)
  1486. {
  1487. @@ -40,9 +40,9 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
  1488. public IObservable<int> Stream(int count) => TestHubMethodsImpl.Stream(count);
  1489. - public ReadableChannel<int> StreamException() => TestHubMethodsImpl.StreamException();
  1490. + public ChannelReader<int> StreamException() => TestHubMethodsImpl.StreamException();
  1491. - public ReadableChannel<string> StreamBroken() => TestHubMethodsImpl.StreamBroken();
  1492. + public ChannelReader<string> StreamBroken() => TestHubMethodsImpl.StreamBroken();
  1493. public async Task CallEcho(string message)
  1494. {
  1495. @@ -63,9 +63,9 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
  1496. public IObservable<int> Stream(int count) => TestHubMethodsImpl.Stream(count);
  1497. - public ReadableChannel<int> StreamException() => TestHubMethodsImpl.StreamException();
  1498. + public ChannelReader<int> StreamException() => TestHubMethodsImpl.StreamException();
  1499. - public ReadableChannel<string> StreamBroken() => TestHubMethodsImpl.StreamBroken();
  1500. + public ChannelReader<string> StreamBroken() => TestHubMethodsImpl.StreamBroken();
  1501. public async Task CallEcho(string message)
  1502. {
  1503. @@ -97,12 +97,12 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
  1504. .Take(count);
  1505. }
  1506. - public static ReadableChannel<int> StreamException()
  1507. + public static ChannelReader<int> StreamException()
  1508. {
  1509. throw new InvalidOperationException("Error occurred while streaming.");
  1510. }
  1511. - public static ReadableChannel<string> StreamBroken() => null;
  1512. + public static ChannelReader<string> StreamBroken() => null;
  1513. }
  1514. public interface ITestHub
  1515. diff --git a/test/Microsoft.AspNetCore.SignalR.Client.Tests/HttpConnectionTests.cs b/test/Microsoft.AspNetCore.SignalR.Client.Tests/HttpConnectionTests.cs
  1516. index 58a20ef28b5..5a613c7a708 100644
  1517. --- a/test/Microsoft.AspNetCore.SignalR.Client.Tests/HttpConnectionTests.cs
  1518. +++ b/test/Microsoft.AspNetCore.SignalR.Client.Tests/HttpConnectionTests.cs
  1519. @@ -7,7 +7,7 @@ using System.Net.Http;
  1520. using System.Text;
  1521. using System.Threading;
  1522. using System.Threading.Tasks;
  1523. -using System.Threading.Tasks.Channels;
  1524. +using System.Threading.Channels;
  1525. using Microsoft.AspNetCore.Client.Tests;
  1526. using Microsoft.AspNetCore.SignalR.Tests.Common;
  1527. using Microsoft.AspNetCore.Sockets.Features;
  1528. @@ -268,8 +268,8 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
  1529. {
  1530. // The connection is now in the Disconnected state so the Received event for
  1531. // this message should not be raised
  1532. - channel.Out.TryWrite(Array.Empty<byte>());
  1533. - channel.Out.TryComplete();
  1534. + channel.Writer.TryWrite(Array.Empty<byte>());
  1535. + channel.Writer.TryComplete();
  1536. return Task.CompletedTask;
  1537. });
  1538. mockTransport.SetupGet(t => t.Mode).Returns(TransferMode.Text);
  1539. @@ -313,7 +313,7 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
  1540. mockTransport.Setup(t => t.StopAsync())
  1541. .Returns(() =>
  1542. {
  1543. - channel.Out.TryComplete();
  1544. + channel.Writer.TryComplete();
  1545. return Task.CompletedTask;
  1546. });
  1547. mockTransport.SetupGet(t => t.Mode).Returns(TransferMode.Text);
  1548. @@ -330,14 +330,14 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
  1549. });
  1550. await connection.StartAsync();
  1551. - channel.Out.TryWrite(Array.Empty<byte>());
  1552. + channel.Writer.TryWrite(Array.Empty<byte>());
  1553. // Ensure that the Received callback has been called before attempting the second write
  1554. await callbackInvokedTcs.Task.OrTimeout();
  1555. - channel.Out.TryWrite(Array.Empty<byte>());
  1556. + channel.Writer.TryWrite(Array.Empty<byte>());
  1557. // Ensure that SignalR isn't blocked by the receive callback
  1558. - Assert.False(channel.In.TryRead(out var message));
  1559. + Assert.False(channel.Reader.TryRead(out var message));
  1560. closedTcs.SetResult(null);
  1561. @@ -369,7 +369,7 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
  1562. mockTransport.Setup(t => t.StopAsync())
  1563. .Returns(() =>
  1564. {
  1565. - channel.Out.TryComplete();
  1566. + channel.Writer.TryComplete();
  1567. return Task.CompletedTask;
  1568. });
  1569. mockTransport.SetupGet(t => t.Mode).Returns(TransferMode.Text);
  1570. @@ -380,10 +380,10 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
  1571. connection.OnReceived(_ => blockReceiveCallbackTcs.Task);
  1572. await connection.StartAsync();
  1573. - channel.Out.TryWrite(Array.Empty<byte>());
  1574. + channel.Writer.TryWrite(Array.Empty<byte>());
  1575. // Ensure that SignalR isn't blocked by the receive callback
  1576. - Assert.False(channel.In.TryRead(out var message));
  1577. + Assert.False(channel.Reader.TryRead(out var message));
  1578. await connection.DisposeAsync();
  1579. }
  1580. @@ -413,7 +413,7 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
  1581. mockTransport.Setup(t => t.StopAsync())
  1582. .Returns(() =>
  1583. {
  1584. - channel.Out.TryComplete();
  1585. + channel.Writer.TryComplete();
  1586. return Task.CompletedTask;
  1587. });
  1588. mockTransport.SetupGet(t => t.Mode).Returns(TransferMode.Text);
  1589. @@ -427,10 +427,10 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
  1590. });
  1591. await connection.StartAsync();
  1592. - channel.Out.TryWrite(Array.Empty<byte>());
  1593. + channel.Writer.TryWrite(Array.Empty<byte>());
  1594. // Ensure that SignalR isn't blocked by the receive callback
  1595. - Assert.False(channel.In.TryRead(out var message));
  1596. + Assert.False(channel.Reader.TryRead(out var message));
  1597. await connection.DisposeAsync();
  1598. }
  1599. @@ -909,7 +909,7 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
  1600. mockTransport.Setup(t => t.StopAsync())
  1601. .Returns(() =>
  1602. {
  1603. - channel.Out.TryComplete();
  1604. + channel.Writer.TryComplete();
  1605. return Task.CompletedTask;
  1606. });
  1607. mockTransport.SetupGet(t => t.Mode).Returns(TransferMode.Binary);
  1608. diff --git a/test/Microsoft.AspNetCore.SignalR.Client.Tests/HubConnectionProtocolTests.cs b/test/Microsoft.AspNetCore.SignalR.Client.Tests/HubConnectionProtocolTests.cs
  1609. index 6aec848eb02..1c6e121f53b 100644
  1610. --- a/test/Microsoft.AspNetCore.SignalR.Client.Tests/HubConnectionProtocolTests.cs
  1611. +++ b/test/Microsoft.AspNetCore.SignalR.Client.Tests/HubConnectionProtocolTests.cs
  1612. @@ -6,7 +6,7 @@ using System.Globalization;
  1613. using System.IO;
  1614. using System.Text;
  1615. using System.Threading.Tasks;
  1616. -using System.Threading.Tasks.Channels;
  1617. +using System.Threading.Channels;
  1618. using Microsoft.AspNetCore.SignalR.Internal.Protocol;
  1619. using Microsoft.AspNetCore.SignalR.Tests.Common;
  1620. using Microsoft.AspNetCore.Sockets;
  1621. diff --git a/test/Microsoft.AspNetCore.SignalR.Client.Tests/HubConnectionTests.cs b/test/Microsoft.AspNetCore.SignalR.Client.Tests/HubConnectionTests.cs
  1622. index 1114955fc07..13da4d90ec1 100644
  1623. --- a/test/Microsoft.AspNetCore.SignalR.Client.Tests/HubConnectionTests.cs
  1624. +++ b/test/Microsoft.AspNetCore.SignalR.Client.Tests/HubConnectionTests.cs
  1625. @@ -215,7 +215,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
  1626. public ProtocolType Type => ProtocolType.Binary;
  1627. - public bool TryParseMessages(ReadOnlyBuffer<byte> input, IInvocationBinder binder, out IList<HubMessage> messages)
  1628. + public bool TryParseMessages(ReadOnlyMemory<byte> input, IInvocationBinder binder, out IList<HubMessage> messages)
  1629. {
  1630. messages = new List<HubMessage>();
  1631. diff --git a/test/Microsoft.AspNetCore.SignalR.Client.Tests/LongPollingTransportTests.cs b/test/Microsoft.AspNetCore.SignalR.Client.Tests/LongPollingTransportTests.cs
  1632. index 1892508f13f..5f14541613f 100644
  1633. --- a/test/Microsoft.AspNetCore.SignalR.Client.Tests/LongPollingTransportTests.cs
  1634. +++ b/test/Microsoft.AspNetCore.SignalR.Client.Tests/LongPollingTransportTests.cs
  1635. @@ -1,4 +1,4 @@
  1636. -// Copyright (c) .NET Foundation. All rights reserved.
  1637. +// Copyright (c) .NET Foundation. All rights reserved.
  1638. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
  1639. using System;
  1640. @@ -8,7 +8,7 @@ using System.Net.Http;
  1641. using System.Text;
  1642. using System.Threading;
  1643. using System.Threading.Tasks;
  1644. -using System.Threading.Tasks.Channels;
  1645. +using System.Threading.Channels;
  1646. using Microsoft.AspNetCore.SignalR.Tests.Common;
  1647. using Microsoft.AspNetCore.Sockets;
  1648. using Microsoft.AspNetCore.Sockets.Client;
  1649. @@ -83,7 +83,7 @@ namespace Microsoft.AspNetCore.Client.Tests
  1650. await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), channelConnection, TransferMode.Binary, connectionId: string.Empty);
  1651. await longPollingTransport.Running.OrTimeout();
  1652. - Assert.True(transportToConnection.In.Completion.IsCompleted);
  1653. + Assert.True(transportToConnection.Reader.Completion.IsCompleted);
  1654. }
  1655. finally
  1656. {
  1657. @@ -135,9 +135,9 @@ namespace Microsoft.AspNetCore.Client.Tests
  1658. var channelConnection = new ChannelConnection<SendMessage, byte[]>(connectionToTransport, transportToConnection);
  1659. await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), channelConnection, TransferMode.Binary, connectionId: string.Empty);
  1660. - var data = await transportToConnection.In.ReadAllAsync().OrTimeout();
  1661. + var data = await transportToConnection.Reader.ReadAllAsync().OrTimeout();
  1662. await longPollingTransport.Running.OrTimeout();
  1663. - Assert.True(transportToConnection.In.Completion.IsCompleted);
  1664. + Assert.True(transportToConnection.Reader.Completion.IsCompleted);
  1665. Assert.Equal(2, data.Count);
  1666. Assert.Equal(Encoding.UTF8.GetBytes("Hello"), data[0]);
  1667. Assert.Equal(Encoding.UTF8.GetBytes("World"), data[1]);
  1668. @@ -172,7 +172,7 @@ namespace Microsoft.AspNetCore.Client.Tests
  1669. await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), channelConnection, TransferMode.Binary, connectionId: string.Empty);
  1670. var exception =
  1671. - await Assert.ThrowsAsync<HttpRequestException>(async () => await transportToConnection.In.Completion.OrTimeout());
  1672. + await Assert.ThrowsAsync<HttpRequestException>(async () => await transportToConnection.Reader.Completion.OrTimeout());
  1673. Assert.Contains(" 500 ", exception.Message);
  1674. }
  1675. finally
  1676. @@ -207,16 +207,16 @@ namespace Microsoft.AspNetCore.Client.Tests
  1677. var channelConnection = new ChannelConnection<SendMessage, byte[]>(connectionToTransport, transportToConnection);
  1678. await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), channelConnection, TransferMode.Binary, connectionId: string.Empty);
  1679. - await connectionToTransport.Out.WriteAsync(new SendMessage());
  1680. + await connectionToTransport.Writer.WriteAsync(new SendMessage());
  1681. await Assert.ThrowsAsync<HttpRequestException>(async () => await longPollingTransport.Running.OrTimeout());
  1682. // The channel needs to be drained for the Completion task to be completed
  1683. - while (transportToConnection.In.TryRead(out var message))
  1684. + while (transportToConnection.Reader.TryRead(out var message))
  1685. {
  1686. }
  1687. - var exception = await Assert.ThrowsAsync<HttpRequestException>(async () => await transportToConnection.In.Completion);
  1688. + var exception = await Assert.ThrowsAsync<HttpRequestException>(async () => await transportToConnection.Reader.Completion);
  1689. Assert.Contains(" 500 ", exception.Message);
  1690. }
  1691. finally
  1692. @@ -248,12 +248,12 @@ namespace Microsoft.AspNetCore.Client.Tests
  1693. var channelConnection = new ChannelConnection<SendMessage, byte[]>(connectionToTransport, transportToConnection);
  1694. await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), channelConnection, TransferMode.Binary, connectionId: string.Empty);
  1695. - connectionToTransport.Out.Complete();
  1696. + connectionToTransport.Writer.Complete();
  1697. await longPollingTransport.Running.OrTimeout();
  1698. await longPollingTransport.Running.OrTimeout();
  1699. - await connectionToTransport.In.Completion.OrTimeout();
  1700. + await connectionToTransport.Reader.Completion.OrTimeout();
  1701. }
  1702. finally
  1703. {
  1704. @@ -304,9 +304,9 @@ namespace Microsoft.AspNetCore.Client.Tests
  1705. // Pull Messages out of the channel
  1706. var messages = new List<byte[]>();
  1707. - while (await transportToConnection.In.WaitToReadAsync())
  1708. + while (await transportToConnection.Reader.WaitToReadAsync())
  1709. {
  1710. - while (transportToConnection.In.TryRead(out var message))
  1711. + while (transportToConnection.Reader.TryRead(out var message))
  1712. {
  1713. messages.Add(message);
  1714. }
  1715. @@ -358,16 +358,16 @@ namespace Microsoft.AspNetCore.Client.Tests
  1716. var tcs2 = new TaskCompletionSource<object>();
  1717. // Pre-queue some messages
  1718. - await connectionToTransport.Out.WriteAsync(new SendMessage(Encoding.UTF8.GetBytes("Hello"), tcs1)).OrTimeout();
  1719. - await connectionToTransport.Out.WriteAsync(new SendMessage(Encoding.UTF8.GetBytes("World"), tcs2)).OrTimeout();
  1720. + await connectionToTransport.Writer.WriteAsync(new SendMessage(Encoding.UTF8.GetBytes("Hello"), tcs1)).OrTimeout();
  1721. + await connectionToTransport.Writer.WriteAsync(new SendMessage(Encoding.UTF8.GetBytes("World"), tcs2)).OrTimeout();
  1722. // Start the transport
  1723. await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), channelConnection, TransferMode.Binary, connectionId: string.Empty);
  1724. - connectionToTransport.Out.Complete();
  1725. + connectionToTransport.Writer.Complete();
  1726. await longPollingTransport.Running.OrTimeout();
  1727. - await connectionToTransport.In.Completion.OrTimeout();
  1728. + await connectionToTransport.Reader.Completion.OrTimeout();
  1729. Assert.Single(sentRequests);
  1730. Assert.Equal(new byte[] { (byte)'H', (byte)'e', (byte)'l', (byte)'l', (byte)'o', (byte)'W', (byte)'o', (byte)'r', (byte)'l', (byte)'d'
  1731. diff --git a/test/Microsoft.AspNetCore.SignalR.Client.Tests/ServerSentEventsParserTests.cs b/test/Microsoft.AspNetCore.SignalR.Client.Tests/ServerSentEventsParserTests.cs
  1732. index ffe4031546d..4e9d1bfae40 100644
  1733. --- a/test/Microsoft.AspNetCore.SignalR.Client.Tests/ServerSentEventsParserTests.cs
  1734. +++ b/test/Microsoft.AspNetCore.SignalR.Client.Tests/ServerSentEventsParserTests.cs
  1735. @@ -2,6 +2,7 @@
  1736. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
  1737. using System;
  1738. +using System.Buffers;
  1739. using System.Collections.Generic;
  1740. using System.IO.Pipelines;
  1741. using System.Text;
  1742. @@ -106,10 +107,10 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
  1743. [InlineData(new[] { "data: Hello, World\r\n", ":comment\r\n", "\r\n" }, "Hello, World")]
  1744. public async Task ParseMessageAcrossMultipleReadsSuccess(string[] messageParts, string expectedMessage)
  1745. {
  1746. - using (var pipeFactory = new PipeFactory())
  1747. + var parser = new ServerSentEventsMessageParser();
  1748. + using (var pool = new MemoryPool())
  1749. {
  1750. - var parser = new ServerSentEventsMessageParser();
  1751. - var pipe = pipeFactory.Create();
  1752. + var pipe = new Pipe(new PipeOptions(pool));
  1753. byte[] message = null;
  1754. ReadCursor consumed = default, examined = default;
  1755. @@ -152,9 +153,9 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
  1756. [InlineData("data: B\r\ndata: SGVs", "bG8sIFdvcmxk\r\n\n\n", "There was an error in the frame format")]
  1757. public async Task ParseMessageAcrossMultipleReadsFailure(string encodedMessagePart1, string encodedMessagePart2, string expectedMessage)
  1758. {
  1759. - using (var pipeFactory = new PipeFactory())
  1760. + using (var pool = new MemoryPool())
  1761. {
  1762. - var pipe = pipeFactory.Create();
  1763. + var pipe = new Pipe(new PipeOptions(pool));
  1764. // Read the first part of the message
  1765. await pipe.Writer.WriteAsync(Encoding.UTF8.GetBytes(encodedMessagePart1));
  1766. @@ -173,7 +174,6 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
  1767. var ex = Assert.Throws<FormatException>(() => parser.ParseMessage(result.Buffer, out consumed, out examined, out buffer));
  1768. Assert.Equal(expectedMessage, ex.Message);
  1769. -
  1770. }
  1771. }
  1772. @@ -181,9 +181,9 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
  1773. [InlineData("data: foo\r\n\r\n", "data: bar\r\n\r\n")]
  1774. public async Task ParseMultipleMessagesText(string message1, string message2)
  1775. {
  1776. - using (var pipeFactory = new PipeFactory())
  1777. + using (var pool = new MemoryPool())
  1778. {
  1779. - var pipe = pipeFactory.Create();
  1780. + var pipe = new Pipe(new PipeOptions(pool));
  1781. // Read the first part of the message
  1782. await pipe.Writer.WriteAsync(Encoding.UTF8.GetBytes(message1 + message2));
  1783. diff --git a/test/Microsoft.AspNetCore.SignalR.Client.Tests/ServerSentEventsTransportTests.cs b/test/Microsoft.AspNetCore.SignalR.Client.Tests/ServerSentEventsTransportTests.cs
  1784. index ed5a2d00547..622996bef35 100644
  1785. --- a/test/Microsoft.AspNetCore.SignalR.Client.Tests/ServerSentEventsTransportTests.cs
  1786. +++ b/test/Microsoft.AspNetCore.SignalR.Client.Tests/ServerSentEventsTransportTests.cs
  1787. @@ -1,4 +1,4 @@
  1788. -// Copyright (c) .NET Foundation. All rights reserved.
  1789. +// Copyright (c) .NET Foundation. All rights reserved.
  1790. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
  1791. using System;
  1792. @@ -8,8 +8,9 @@ using System.Net.Http.Headers;
  1793. using System.Text;
  1794. using System.Threading;
  1795. using System.Threading.Tasks;
  1796. -using System.Threading.Tasks.Channels;
  1797. +using System.Threading.Channels;
  1798. using Microsoft.AspNetCore.Client.Tests;
  1799. +using Microsoft.AspNetCore.SignalR.Internal;
  1800. using Microsoft.AspNetCore.SignalR.Tests.Common;
  1801. using Microsoft.AspNetCore.Sockets;
  1802. using Microsoft.AspNetCore.Sockets.Client;
  1803. @@ -42,6 +43,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
  1804. mockStream
  1805. .Setup(s => s.CopyToAsync(It.IsAny<Stream>(), It.IsAny<int>(), It.IsAny<CancellationToken>()))
  1806. .Returns(copyToAsyncTcs.Task);
  1807. + mockStream.Setup(s => s.CanRead).Returns(true);
  1808. return new HttpResponseMessage { Content = new StreamContent(mockStream.Object) };
  1809. });
  1810. @@ -83,12 +85,14 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
  1811. .Setup(s => s.CopyToAsync(It.IsAny<Stream>(), It.IsAny<int>(), It.IsAny<CancellationToken>()))
  1812. .Returns<Stream, int, CancellationToken>(async (stream, bufferSize, t) =>
  1813. {
  1814. + await Task.Yield();
  1815. var buffer = Encoding.ASCII.GetBytes("data: 3:abc\r\n\r\n");
  1816. while (!eventStreamCts.IsCancellationRequested)
  1817. {
  1818. await stream.WriteAsync(buffer, 0, buffer.Length);
  1819. }
  1820. });
  1821. + mockStream.Setup(s => s.CanRead).Returns(true);
  1822. return new HttpResponseMessage { Content = new StreamContent(mockStream.Object) };
  1823. });
  1824. @@ -109,7 +113,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
  1825. transportActiveTask = sseTransport.Running;
  1826. Assert.False(transportActiveTask.IsCompleted);
  1827. - var message = await transportToConnection.In.ReadAsync().AsTask().OrTimeout();
  1828. + var message = await transportToConnection.Reader.ReadAsync().AsTask().OrTimeout();
  1829. Assert.Equal("3:abc", Encoding.ASCII.GetString(message));
  1830. }
  1831. finally
  1832. @@ -140,6 +144,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
  1833. var buffer = Encoding.ASCII.GetBytes("data: 3:a");
  1834. await stream.WriteAsync(buffer, 0, buffer.Length);
  1835. });
  1836. + mockStream.Setup(s => s.CanRead).Returns(true);
  1837. return new HttpResponseMessage { Content = new StreamContent(mockStream.Object) };
  1838. });
  1839. @@ -182,6 +187,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
  1840. mockStream
  1841. .Setup(s => s.CopyToAsync(It.IsAny<Stream>(), It.IsAny<int>(), It.IsAny<CancellationToken>()))
  1842. .Returns(copyToAsyncTcs.Task);
  1843. + mockStream.Setup(s => s.CanRead).Returns(true);
  1844. return new HttpResponseMessage { Content = new StreamContent(mockStream.Object) };
  1845. }
  1846. @@ -201,7 +207,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
  1847. await eventStreamTcs.Task;
  1848. var sendTcs = new TaskCompletionSource<object>();
  1849. - Assert.True(connectionToTransport.Out.TryWrite(new SendMessage(new byte[] { 0x42 }, sendTcs)));
  1850. + Assert.True(connectionToTransport.Writer.TryWrite(new SendMessage(new byte[] { 0x42 }, sendTcs)));
  1851. var exception = await Assert.ThrowsAsync<HttpRequestException>(() => sendTcs.Task.OrTimeout());
  1852. Assert.Contains("500", exception.Message);
  1853. @@ -231,6 +237,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
  1854. mockStream
  1855. .Setup(s => s.CopyToAsync(It.IsAny<Stream>(), It.IsAny<int>(), It.IsAny<CancellationToken>()))
  1856. .Returns(copyToAsyncTcs.Task);
  1857. + mockStream.Setup(s => s.CanRead).Returns(true);
  1858. return new HttpResponseMessage { Content = new StreamContent(mockStream.Object) };
  1859. });
  1860. @@ -246,7 +253,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
  1861. new Uri("http://fakeuri.org"), channelConnection, TransferMode.Text, connectionId: string.Empty).OrTimeout();
  1862. await eventStreamTcs.Task.OrTimeout();
  1863. - connectionToTransport.Out.TryComplete(null);
  1864. + connectionToTransport.Writer.TryComplete(null);
  1865. await sseTransport.Running.OrTimeout();
  1866. }
  1867. @@ -274,7 +281,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
  1868. await sseTransport.StartAsync(
  1869. new Uri("http://fakeuri.org"), channelConnection, TransferMode.Text, connectionId: string.Empty).OrTimeout();
  1870. - var message = await transportToConnection.In.ReadAsync().AsTask().OrTimeout();
  1871. + var message = await transportToConnection.Reader.ReadAsync().AsTask().OrTimeout();
  1872. Assert.Equal("3:abc", Encoding.ASCII.GetString(message));
  1873. await sseTransport.Running.OrTimeout();
  1874. diff --git a/test/Microsoft.AspNetCore.SignalR.Client.Tests/TestConnection.cs b/test/Microsoft.AspNetCore.SignalR.Client.Tests/TestConnection.cs
  1875. index d234931fd80..6e8c8be30d3 100644
  1876. --- a/test/Microsoft.AspNetCore.SignalR.Client.Tests/TestConnection.cs
  1877. +++ b/test/Microsoft.AspNetCore.SignalR.Client.Tests/TestConnection.cs
  1878. @@ -7,8 +7,9 @@ using System.IO;
  1879. using System.Text;
  1880. using System.Threading;
  1881. using System.Threading.Tasks;
  1882. -using System.Threading.Tasks.Channels;
  1883. +using System.Threading.Channels;
  1884. using Microsoft.AspNetCore.Http.Features;
  1885. +using Microsoft.AspNetCore.SignalR.Internal;
  1886. using Microsoft.AspNetCore.SignalR.Internal.Formatters;
  1887. using Microsoft.AspNetCore.Sockets;
  1888. using Microsoft.AspNetCore.Sockets.Client;
  1889. @@ -34,8 +35,8 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
  1890. public Task Closed => _closeTcs.Task;
  1891. public Task Started => _started.Task;
  1892. public Task Disposed => _disposed.Task;
  1893. - public ReadableChannel<byte[]> SentMessages => _sentMessages.In;
  1894. - public WritableChannel<byte[]> ReceivedMessages => _receivedMessages.Out;
  1895. + public ChannelReader<byte[]> SentMessages => _sentMessages.Reader;
  1896. + public ChannelWriter<byte[]> ReceivedMessages => _receivedMessages.Writer;
  1897. private readonly List<ReceiveCallback> _callbacks = new List<ReceiveCallback>();
  1898. @@ -61,9 +62,9 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
  1899. throw new InvalidOperationException("Connection must be started before SendAsync can be called");
  1900. }
  1901. - while (await _sentMessages.Out.WaitToWriteAsync(cancellationToken))
  1902. + while (await _sentMessages.Writer.WaitToWriteAsync(cancellationToken))
  1903. {
  1904. - if (_sentMessages.Out.TryWrite(data))
  1905. + if (_sentMessages.Writer.TryWrite(data))
  1906. {
  1907. return;
  1908. }
  1909. @@ -100,7 +101,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
  1910. var json = JsonConvert.SerializeObject(jsonObject, Formatting.None);
  1911. var bytes = FormatMessageToArray(Encoding.UTF8.GetBytes(json));
  1912. - return _receivedMessages.Out.WriteAsync(bytes);
  1913. + return _receivedMessages.Writer.WriteAsync(bytes);
  1914. }
  1915. private byte[] FormatMessageToArray(byte[] message)
  1916. @@ -116,9 +117,9 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
  1917. {
  1918. while (!token.IsCancellationRequested)
  1919. {
  1920. - while (await _receivedMessages.In.WaitToReadAsync(token))
  1921. + while (await _receivedMessages.Reader.WaitToReadAsync(token))
  1922. {
  1923. - while (_receivedMessages.In.TryRead(out var message))
  1924. + while (_receivedMessages.Reader.TryRead(out var message))
  1925. {
  1926. ReceiveCallback[] callbackCopies;
  1927. lock (_callbacks)
  1928. diff --git a/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Encoders/LengthPrefixedTextMessageParserTests.cs b/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Encoders/LengthPrefixedTextMessageParserTests.cs
  1929. index 78e296ce29a..923e80dd66c 100644
  1930. --- a/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Encoders/LengthPrefixedTextMessageParserTests.cs
  1931. +++ b/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Encoders/LengthPrefixedTextMessageParserTests.cs
  1932. @@ -18,7 +18,7 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Encoders
  1933. [InlineData("12:Hello, World;", "Hello, World")]
  1934. public void ReadTextMessage(string encoded, string payload)
  1935. {
  1936. - ReadOnlyBuffer<byte> buffer = Encoding.UTF8.GetBytes(encoded);
  1937. + ReadOnlyMemory<byte> buffer = Encoding.UTF8.GetBytes(encoded);
  1938. Assert.True(LengthPrefixedTextMessageParser.TryParseMessage(ref buffer, out var message));
  1939. Assert.Equal(0, buffer.Length);
  1940. @@ -29,7 +29,7 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Encoders
  1941. public void ReadMultipleMessages()
  1942. {
  1943. const string encoded = "0:;14:Hello,\r\nWorld!;";
  1944. - ReadOnlyBuffer<byte> buffer = Encoding.UTF8.GetBytes(encoded);
  1945. + ReadOnlyMemory<byte> buffer = Encoding.UTF8.GetBytes(encoded);
  1946. var messages = new List<byte[]>();
  1947. while (LengthPrefixedTextMessageParser.TryParseMessage(ref buffer, out var message))
  1948. @@ -54,7 +54,7 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Encoders
  1949. [InlineData("5:ABCDE")]
  1950. public void ReadIncompleteMessages(string encoded)
  1951. {
  1952. - ReadOnlyBuffer<byte> buffer = Encoding.UTF8.GetBytes(encoded);
  1953. + ReadOnlyMemory<byte> buffer = Encoding.UTF8.GetBytes(encoded);
  1954. Assert.False(LengthPrefixedTextMessageParser.TryParseMessage(ref buffer, out _));
  1955. }
  1956. @@ -66,7 +66,7 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Encoders
  1957. [InlineData("5:ABCDEF", "Missing delimiter ';' after payload")]
  1958. public void ReadInvalidMessages(string encoded, string expectedMessage)
  1959. {
  1960. - ReadOnlyBuffer<byte> buffer = Encoding.UTF8.GetBytes(encoded);
  1961. + ReadOnlyMemory<byte> buffer = Encoding.UTF8.GetBytes(encoded);
  1962. var ex = Assert.Throws<FormatException>(() =>
  1963. {
  1964. LengthPrefixedTextMessageParser.TryParseMessage(ref buffer, out _);
  1965. @@ -79,7 +79,7 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Encoders
  1966. {
  1967. // Invalid because first character is a UTF-8 "continuation" character
  1968. // We need to include the ':' so that
  1969. - ReadOnlyBuffer<byte> buffer = new byte[] { 0x48, 0x65, 0x80, 0x6C, 0x6F, (byte)':' };
  1970. + ReadOnlyMemory<byte> buffer = new byte[] { 0x48, 0x65, 0x80, 0x6C, 0x6F, (byte)':' };
  1971. var ex = Assert.Throws<FormatException>(() =>
  1972. {
  1973. LengthPrefixedTextMessageParser.TryParseMessage(ref buffer, out _);
  1974. diff --git a/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Formatters/BinaryMessageFormatterTests.cs b/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Formatters/BinaryMessageFormatterTests.cs
  1975. index 7a35a6e16e7..b727e8124d0 100644
  1976. --- a/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Formatters/BinaryMessageFormatterTests.cs
  1977. +++ b/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Formatters/BinaryMessageFormatterTests.cs
  1978. @@ -109,7 +109,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests.Internal.Formatters
  1979. using (var ms = new MemoryStream())
  1980. {
  1981. BinaryMessageFormatter.WriteMessage(payload, ms);
  1982. - var buffer = new ReadOnlyBuffer<byte>(ms.ToArray());
  1983. + var buffer = new ReadOnlyMemory<byte>(ms.ToArray());
  1984. Assert.True(BinaryMessageParser.TryParseMessage(ref buffer, out var roundtripped));
  1985. Assert.Equal(payload, roundtripped.ToArray());
  1986. }
  1987. diff --git a/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Formatters/BinaryMessageParserTests.cs b/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Formatters/BinaryMessageParserTests.cs
  1988. index 58dfae9c993..7a42eca8439 100644
  1989. --- a/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Formatters/BinaryMessageParserTests.cs
  1990. +++ b/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Formatters/BinaryMessageParserTests.cs
  1991. @@ -17,7 +17,7 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Formatters
  1992. [InlineData(new byte[] { 0x0B, 0x41, 0x0A, 0x52, 0x0D, 0x43, 0x0D, 0x0A, 0x3B, 0x44, 0x45, 0x46 }, "A\nR\rC\r\n;DEF")]
  1993. public void ReadMessage(byte[] encoded, string payload)
  1994. {
  1995. - ReadOnlyBuffer<byte> span = encoded;
  1996. + ReadOnlyMemory<byte> span = encoded;
  1997. Assert.True(BinaryMessageParser.TryParseMessage(ref span, out var message));
  1998. Assert.Equal(0, span.Length);
  1999. @@ -52,7 +52,7 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Formatters
  2000. })]
  2001. public void ReadBinaryMessage(byte[] encoded, byte[] payload)
  2002. {
  2003. - ReadOnlyBuffer<byte> span = encoded;
  2004. + ReadOnlyMemory<byte> span = encoded;
  2005. Assert.True(BinaryMessageParser.TryParseMessage(ref span, out var message));
  2006. Assert.Equal(0, span.Length);
  2007. Assert.Equal(payload, message.ToArray());
  2008. @@ -64,7 +64,7 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Formatters
  2009. [InlineData(new byte[] { 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF })]
  2010. public void BinaryMessageParserThrowsForMessagesOver2GB(byte[] payload)
  2011. {
  2012. - var buffer = new ReadOnlyBuffer<byte>(payload);
  2013. + var buffer = new ReadOnlyMemory<byte>(payload);
  2014. var ex = Assert.Throws<FormatException>(() => BinaryMessageParser.TryParseMessage(ref buffer, out var message));
  2015. Assert.Equal("Messages over 2GB in size are not supported.", ex.Message);
  2016. }
  2017. @@ -76,7 +76,7 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Formatters
  2018. [InlineData(new byte[] { 0x80 })] // size is cut
  2019. public void BinaryMessageParserReturnsFalseForPartialPayloads(byte[] payload)
  2020. {
  2021. - var buffer = new ReadOnlyBuffer<byte>(payload);
  2022. + var buffer = new ReadOnlyMemory<byte>(payload);
  2023. Assert.False(BinaryMessageParser.TryParseMessage(ref buffer, out var message));
  2024. }
  2025. @@ -90,7 +90,7 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Formatters
  2026. /* length: */ 0x0E,
  2027. /* body: */ 0x48, 0x65, 0x6C, 0x6C, 0x6F, 0x2C, 0x0D, 0x0A, 0x57, 0x6F, 0x72, 0x6C, 0x64, 0x21,
  2028. };
  2029. - ReadOnlyBuffer<byte> buffer = encoded;
  2030. + ReadOnlyMemory<byte> buffer = encoded;
  2031. var messages = new List<byte[]>();
  2032. while (BinaryMessageParser.TryParseMessage(ref buffer, out var message))
  2033. @@ -110,7 +110,7 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Formatters
  2034. [InlineData(new byte[] { 0x09, 0x00, 0x00 })] // Not enough data for payload
  2035. public void ReadIncompleteMessages(byte[] encoded)
  2036. {
  2037. - ReadOnlyBuffer<byte> buffer = encoded;
  2038. + ReadOnlyMemory<byte> buffer = encoded;
  2039. Assert.False(BinaryMessageParser.TryParseMessage(ref buffer, out var message));
  2040. Assert.Equal(encoded.Length, buffer.Length);
  2041. }
  2042. diff --git a/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Formatters/TextMessageParserTests.cs b/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Formatters/TextMessageParserTests.cs
  2043. index e6d953cf87c..9dbc7b2866f 100644
  2044. --- a/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Formatters/TextMessageParserTests.cs
  2045. +++ b/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Formatters/TextMessageParserTests.cs
  2046. @@ -13,7 +13,7 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Formatters
  2047. [Fact]
  2048. public void ReadMessage()
  2049. {
  2050. - var message = new ReadOnlyBuffer<byte>(Encoding.UTF8.GetBytes("ABC\u001e"));
  2051. + var message = new ReadOnlyMemory<byte>(Encoding.UTF8.GetBytes("ABC\u001e"));
  2052. Assert.True(TextMessageParser.TryParseMessage(ref message, out var payload));
  2053. Assert.Equal("ABC", Encoding.UTF8.GetString(payload.ToArray()));
  2054. @@ -23,14 +23,14 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Formatters
  2055. [Fact]
  2056. public void TryReadingIncompleteMessage()
  2057. {
  2058. - var message = new ReadOnlyBuffer<byte>(Encoding.UTF8.GetBytes("ABC"));
  2059. + var message = new ReadOnlyMemory<byte>(Encoding.UTF8.GetBytes("ABC"));
  2060. Assert.False(TextMessageParser.TryParseMessage(ref message, out var payload));
  2061. }
  2062. [Fact]
  2063. public void TryReadingMultipleMessages()
  2064. {
  2065. - var message = new ReadOnlyBuffer<byte>(Encoding.UTF8.GetBytes("ABC\u001eXYZ\u001e"));
  2066. + var message = new ReadOnlyMemory<byte>(Encoding.UTF8.GetBytes("ABC\u001eXYZ\u001e"));
  2067. Assert.True(TextMessageParser.TryParseMessage(ref message, out var payload));
  2068. Assert.Equal("ABC", Encoding.UTF8.GetString(payload.ToArray()));
  2069. Assert.True(TextMessageParser.TryParseMessage(ref message, out payload));
  2070. @@ -40,7 +40,7 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Formatters
  2071. [Fact]
  2072. public void IncompleteTrailingMessage()
  2073. {
  2074. - var message = new ReadOnlyBuffer<byte>(Encoding.UTF8.GetBytes("ABC\u001eXYZ\u001e123"));
  2075. + var message = new ReadOnlyMemory<byte>(Encoding.UTF8.GetBytes("ABC\u001eXYZ\u001e123"));
  2076. Assert.True(TextMessageParser.TryParseMessage(ref message, out var payload));
  2077. Assert.Equal("ABC", Encoding.UTF8.GetString(payload.ToArray()));
  2078. Assert.True(TextMessageParser.TryParseMessage(ref message, out payload));
  2079. diff --git a/test/Microsoft.AspNetCore.SignalR.Redis.Tests/RedisHubLifetimeManagerTests.cs b/test/Microsoft.AspNetCore.SignalR.Redis.Tests/RedisHubLifetimeManagerTests.cs
  2080. index e7613093820..7dcc10a447e 100644
  2081. --- a/test/Microsoft.AspNetCore.SignalR.Redis.Tests/RedisHubLifetimeManagerTests.cs
  2082. +++ b/test/Microsoft.AspNetCore.SignalR.Redis.Tests/RedisHubLifetimeManagerTests.cs
  2083. @@ -4,7 +4,7 @@
  2084. using System;
  2085. using System.Threading;
  2086. using System.Threading.Tasks;
  2087. -using System.Threading.Tasks.Channels;
  2088. +using System.Threading.Channels;
  2089. using Microsoft.AspNetCore.SignalR.Internal.Protocol;
  2090. using Microsoft.AspNetCore.SignalR.Tests;
  2091. using Microsoft.AspNetCore.SignalR.Tests.Common;
  2092. @@ -70,7 +70,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis.Tests
  2093. AssertMessage(output1);
  2094. - Assert.False(output2.In.TryRead(out var item));
  2095. + Assert.False(output2.Reader.TryRead(out var item));
  2096. }
  2097. }
  2098. @@ -100,7 +100,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis.Tests
  2099. AssertMessage(output1);
  2100. - Assert.False(output2.In.TryRead(out var item));
  2101. + Assert.False(output2.Reader.TryRead(out var item));
  2102. }
  2103. }
  2104. @@ -201,7 +201,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis.Tests
  2105. AssertMessage(output1);
  2106. - Assert.False(output2.In.TryRead(out var item));
  2107. + Assert.False(output2.Reader.TryRead(out var item));
  2108. }
  2109. }
  2110. @@ -286,7 +286,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis.Tests
  2111. await manager.InvokeGroupAsync("name", "Hello", new object[] { "World" }).OrTimeout();
  2112. - Assert.False(output.In.TryRead(out var item));
  2113. + Assert.False(output.Reader.TryRead(out var item));
  2114. }
  2115. }
  2116. @@ -387,7 +387,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis.Tests
  2117. await manager.InvokeGroupAsync("name", "Hello", new object[] { "World" }).OrTimeout();
  2118. AssertMessage(output);
  2119. - Assert.False(output.In.TryRead(out var item));
  2120. + Assert.False(output.Reader.TryRead(out var item));
  2121. }
  2122. }
  2123. @@ -417,7 +417,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis.Tests
  2124. await manager2.InvokeGroupAsync("name", "Hello", new object[] { "World" }).OrTimeout();
  2125. AssertMessage(output);
  2126. - Assert.False(output.In.TryRead(out var item));
  2127. + Assert.False(output.Reader.TryRead(out var item));
  2128. }
  2129. }
  2130. @@ -451,7 +451,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis.Tests
  2131. await manager2.InvokeGroupAsync("name", "Hello", new object[] { "World" }).OrTimeout();
  2132. - Assert.False(output.In.TryRead(out var item));
  2133. + Assert.False(output.Reader.TryRead(out var item));
  2134. }
  2135. }
  2136. @@ -480,7 +480,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis.Tests
  2137. await manager1.InvokeConnectionAsync(connection.ConnectionId, "Hello", new object[] { "World" }).OrTimeout();
  2138. AssertMessage(output);
  2139. - Assert.False(output.In.TryRead(out var item));
  2140. + Assert.False(output.Reader.TryRead(out var item));
  2141. }
  2142. }
  2143. @@ -499,10 +499,10 @@ namespace Microsoft.AspNetCore.SignalR.Redis.Tests
  2144. using (var client = new TestClient())
  2145. {
  2146. // Force an exception when writing to connection
  2147. - var output = new Mock<Channel<HubMessage>>();
  2148. - output.Setup(o => o.Out.WaitToWriteAsync(It.IsAny<CancellationToken>())).Throws(new Exception());
  2149. + var writer = new Mock<ChannelWriter<HubMessage>>();
  2150. + writer.Setup(o => o.WaitToWriteAsync(It.IsAny<CancellationToken>())).Throws(new Exception());
  2151. - var connection = new HubConnectionContext(output.Object, client.Connection);
  2152. + var connection = new HubConnectionContext(new MockChannel(writer.Object), client.Connection);
  2153. await manager2.OnConnectedAsync(connection).OrTimeout();
  2154. @@ -523,10 +523,10 @@ namespace Microsoft.AspNetCore.SignalR.Redis.Tests
  2155. using (var client = new TestClient())
  2156. {
  2157. // Force an exception when writing to connection
  2158. - var output = new Mock<Channel<HubMessage>>();
  2159. - output.Setup(o => o.Out.WaitToWriteAsync(It.IsAny<CancellationToken>())).Throws(new Exception("Message"));
  2160. + var writer = new Mock<ChannelWriter<HubMessage>>();
  2161. + writer.Setup(o => o.WaitToWriteAsync(It.IsAny<CancellationToken>())).Throws(new Exception("Message"));
  2162. - var connection = new HubConnectionContext(output.Object, client.Connection);
  2163. + var connection = new HubConnectionContext(new MockChannel(writer.Object), client.Connection);
  2164. await manager.OnConnectedAsync(connection).OrTimeout();
  2165. @@ -549,10 +549,10 @@ namespace Microsoft.AspNetCore.SignalR.Redis.Tests
  2166. var output2 = Channel.CreateUnbounded<HubMessage>();
  2167. // Force an exception when writing to connection
  2168. - var output = new Mock<Channel<HubMessage>>();
  2169. - output.Setup(o => o.Out.WaitToWriteAsync(It.IsAny<CancellationToken>())).Throws(new Exception());
  2170. + var writer = new Mock<ChannelWriter<HubMessage>>();
  2171. + writer.Setup(o => o.WaitToWriteAsync(It.IsAny<CancellationToken>())).Throws(new Exception());
  2172. - var connection1 = new HubConnectionContext(output.Object, client1.Connection);
  2173. + var connection1 = new HubConnectionContext(new MockChannel(writer.Object), client1.Connection);
  2174. var connection2 = new HubConnectionContext(output2, client2.Connection);
  2175. await manager.OnConnectedAsync(connection1).OrTimeout();
  2176. @@ -573,7 +573,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis.Tests
  2177. private void AssertMessage(Channel<HubMessage> channel)
  2178. {
  2179. - Assert.True(channel.In.TryRead(out var item));
  2180. + Assert.True(channel.Reader.TryRead(out var item));
  2181. var message = Assert.IsType<InvocationMessage>(item);
  2182. Assert.Equal("Hello", message.Target);
  2183. Assert.Single(message.Arguments);
  2184. @@ -583,5 +583,13 @@ namespace Microsoft.AspNetCore.SignalR.Redis.Tests
  2185. private class MyHub : Hub
  2186. {
  2187. }
  2188. +
  2189. + private class MockChannel : Channel<HubMessage>
  2190. + {
  2191. + public MockChannel(ChannelWriter<HubMessage> writer = null)
  2192. + {
  2193. + Writer = writer;
  2194. + }
  2195. + }
  2196. }
  2197. }
  2198. diff --git a/test/Microsoft.AspNetCore.SignalR.Tests/DefaultHubLifetimeManagerTests.cs b/test/Microsoft.AspNetCore.SignalR.Tests/DefaultHubLifetimeManagerTests.cs
  2199. index 81dd798b2a6..7065eadd9d6 100644
  2200. --- a/test/Microsoft.AspNetCore.SignalR.Tests/DefaultHubLifetimeManagerTests.cs
  2201. +++ b/test/Microsoft.AspNetCore.SignalR.Tests/DefaultHubLifetimeManagerTests.cs
  2202. @@ -1,7 +1,7 @@
  2203. -using System;
  2204. +using System;
  2205. using System.Threading;
  2206. using System.Threading.Tasks;
  2207. -using System.Threading.Tasks.Channels;
  2208. +using System.Threading.Channels;
  2209. using Microsoft.AspNetCore.SignalR.Internal.Protocol;
  2210. using Microsoft.AspNetCore.SignalR.Tests.Common;
  2211. using Moq;
  2212. @@ -29,13 +29,13 @@ namespace Microsoft.AspNetCore.SignalR.Tests
  2213. await manager.InvokeAllAsync("Hello", new object[] { "World" }).OrTimeout();
  2214. - Assert.True(output1.In.TryRead(out var item));
  2215. + Assert.True(output1.Reader.TryRead(out var item));
  2216. var message = Assert.IsType<InvocationMessage>(item);
  2217. Assert.Equal("Hello", message.Target);
  2218. Assert.Single(message.Arguments);
  2219. Assert.Equal("World", (string)message.Arguments[0]);
  2220. - Assert.True(output2.In.TryRead(out item));
  2221. + Assert.True(output2.Reader.TryRead(out item));
  2222. message = Assert.IsType<InvocationMessage>(item);
  2223. Assert.Equal("Hello", message.Target);
  2224. Assert.Single(message.Arguments);
  2225. @@ -63,13 +63,13 @@ namespace Microsoft.AspNetCore.SignalR.Tests
  2226. await manager.InvokeAllAsync("Hello", new object[] { "World" }).OrTimeout();
  2227. - Assert.True(output1.In.TryRead(out var item));
  2228. + Assert.True(output1.Reader.TryRead(out var item));
  2229. var message = Assert.IsType<InvocationMessage>(item);
  2230. Assert.Equal("Hello", message.Target);
  2231. Assert.Single(message.Arguments);
  2232. Assert.Equal("World", (string)message.Arguments[0]);
  2233. - Assert.False(output2.In.TryRead(out item));
  2234. + Assert.False(output2.Reader.TryRead(out item));
  2235. }
  2236. }
  2237. @@ -93,13 +93,13 @@ namespace Microsoft.AspNetCore.SignalR.Tests
  2238. await manager.InvokeGroupAsync("gunit", "Hello", new object[] { "World" }).OrTimeout();
  2239. - Assert.True(output1.In.TryRead(out var item));
  2240. + Assert.True(output1.Reader.TryRead(out var item));
  2241. var message = Assert.IsType<InvocationMessage>(item);
  2242. Assert.Equal("Hello", message.Target);
  2243. Assert.Single(message.Arguments);
  2244. Assert.Equal("World", (string)message.Arguments[0]);
  2245. - Assert.False(output2.In.TryRead(out item));
  2246. + Assert.False(output2.Reader.TryRead(out item));
  2247. }
  2248. }
  2249. @@ -116,7 +116,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
  2250. await manager.InvokeConnectionAsync(connection.ConnectionId, "Hello", new object[] { "World" }).OrTimeout();
  2251. - Assert.True(output.In.TryRead(out var item));
  2252. + Assert.True(output.Reader.TryRead(out var item));
  2253. var message = Assert.IsType<InvocationMessage>(item);
  2254. Assert.Equal("Hello", message.Target);
  2255. Assert.Single(message.Arguments);
  2256. @@ -130,11 +130,11 @@ namespace Microsoft.AspNetCore.SignalR.Tests
  2257. using (var client = new TestClient())
  2258. {
  2259. // Force an exception when writing to connection
  2260. - var output = new Mock<Channel<HubMessage>>();
  2261. - output.Setup(o => o.Out.WaitToWriteAsync(It.IsAny<CancellationToken>())).Throws(new Exception("Message"));
  2262. + var writer = new Mock<ChannelWriter<HubMessage>>();
  2263. + writer.Setup(o => o.WaitToWriteAsync(It.IsAny<CancellationToken>())).Throws(new Exception("Message"));
  2264. var manager = new DefaultHubLifetimeManager<MyHub>();
  2265. - var connection = new HubConnectionContext(output.Object, client.Connection);
  2266. + var connection = new HubConnectionContext(new MockChannel(writer.Object), client.Connection);
  2267. await manager.OnConnectedAsync(connection).OrTimeout();
  2268. @@ -168,5 +168,14 @@ namespace Microsoft.AspNetCore.SignalR.Tests
  2269. {
  2270. }
  2271. +
  2272. + private class MockChannel: Channel<HubMessage>
  2273. + {
  2274. +
  2275. + public MockChannel(ChannelWriter<HubMessage> writer = null)
  2276. + {
  2277. + Writer = writer;
  2278. + }
  2279. + }
  2280. }
  2281. }
  2282. diff --git a/test/Microsoft.AspNetCore.SignalR.Tests/EchoEndPoint.cs b/test/Microsoft.AspNetCore.SignalR.Tests/EchoEndPoint.cs
  2283. index 08c0a5d5b3f..034230b956d 100644
  2284. --- a/test/Microsoft.AspNetCore.SignalR.Tests/EchoEndPoint.cs
  2285. +++ b/test/Microsoft.AspNetCore.SignalR.Tests/EchoEndPoint.cs
  2286. @@ -2,6 +2,7 @@
  2287. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
  2288. using System.Threading.Tasks;
  2289. +using Microsoft.AspNetCore.SignalR.Internal;
  2290. using Microsoft.AspNetCore.Sockets;
  2291. namespace Microsoft.AspNetCore.SignalR.Tests
  2292. @@ -10,7 +11,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
  2293. {
  2294. public async override Task OnConnectedAsync(ConnectionContext connection)
  2295. {
  2296. - await connection.Transport.Out.WriteAsync(await connection.Transport.In.ReadAsync());
  2297. + await connection.Transport.Writer.WriteAsync(await connection.Transport.Reader.ReadAsync());
  2298. }
  2299. }
  2300. }
  2301. diff --git a/test/Microsoft.AspNetCore.SignalR.Tests/HubEndpointTests.cs b/test/Microsoft.AspNetCore.SignalR.Tests/HubEndpointTests.cs
  2302. index bcd31c4fbff..a9c806748fa 100644
  2303. --- a/test/Microsoft.AspNetCore.SignalR.Tests/HubEndpointTests.cs
  2304. +++ b/test/Microsoft.AspNetCore.SignalR.Tests/HubEndpointTests.cs
  2305. @@ -8,7 +8,7 @@ using System.Runtime.Serialization;
  2306. using System.Security.Claims;
  2307. using System.Threading;
  2308. using System.Threading.Tasks;
  2309. -using System.Threading.Tasks.Channels;
  2310. +using System.Threading.Channels;
  2311. using Microsoft.AspNetCore.Authorization;
  2312. using Microsoft.AspNetCore.Http;
  2313. using Microsoft.AspNetCore.SignalR.Internal;
  2314. @@ -259,7 +259,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
  2315. using (var client = new TestClient())
  2316. {
  2317. // TestClient automatically writes negotiate, for this test we want to assume negotiate never gets sent
  2318. - client.Connection.Transport.In.TryRead(out var item);
  2319. + client.Connection.Transport.Reader.TryRead(out var item);
  2320. var endPointTask = endPoint.OnConnectedAsync(client.Connection);
  2321. @@ -285,7 +285,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
  2322. using (var client = new TestClient())
  2323. {
  2324. // TestClient automatically writes negotiate, for this test we want to assume negotiate never gets sent
  2325. - client.Connection.Transport.In.TryRead(out var item);
  2326. + client.Connection.Transport.Reader.TryRead(out var item);
  2327. await endPoint.OnConnectedAsync(client.Connection).OrTimeout();
  2328. }
  2329. @@ -521,7 +521,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
  2330. await client.SendInvocationAsync(methodName, nonBlocking: true).OrTimeout();
  2331. // Nothing should have been written
  2332. - Assert.False(client.Application.In.TryRead(out var buffer));
  2333. + Assert.False(client.Application.Reader.TryRead(out var buffer));
  2334. // kill the connection
  2335. client.Dispose();
  2336. @@ -1595,7 +1595,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
  2337. return new CountingObservable(count);
  2338. }
  2339. - public ReadableChannel<string> CounterChannel(int count)
  2340. + public ChannelReader<string> CounterChannel(int count)
  2341. {
  2342. var channel = Channel.CreateUnbounded<string>();
  2343. @@ -1603,17 +1603,17 @@ namespace Microsoft.AspNetCore.SignalR.Tests
  2344. {
  2345. for (int i = 0; i < count; i++)
  2346. {
  2347. - await channel.Out.WriteAsync(i.ToString());
  2348. + await channel.Writer.WriteAsync(i.ToString());
  2349. }
  2350. - channel.Out.Complete();
  2351. + channel.Writer.Complete();
  2352. });
  2353. - return channel.In;
  2354. + return channel.Reader;
  2355. }
  2356. - public ReadableChannel<string> BlockingStream()
  2357. + public ChannelReader<string> BlockingStream()
  2358. {
  2359. - return Channel.CreateUnbounded<string>().In;
  2360. + return Channel.CreateUnbounded<string>().Reader;
  2361. }
  2362. private class CountingObservable : IObservable<string>
  2363. diff --git a/test/Microsoft.AspNetCore.SignalR.Tests/Internal/DefaultHubProtocolResolverTests.cs b/test/Microsoft.AspNetCore.SignalR.Tests/Internal/DefaultHubProtocolResolverTests.cs
  2364. index b0596994cd8..dba1123dd3b 100644
  2365. --- a/test/Microsoft.AspNetCore.SignalR.Tests/Internal/DefaultHubProtocolResolverTests.cs
  2366. +++ b/test/Microsoft.AspNetCore.SignalR.Tests/Internal/DefaultHubProtocolResolverTests.cs
  2367. @@ -1,9 +1,9 @@
  2368. -// Copyright (c) .NET Foundation. All rights reserved.
  2369. +// Copyright (c) .NET Foundation. All rights reserved.
  2370. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
  2371. using System;
  2372. using System.Collections.Generic;
  2373. -using System.Threading.Tasks.Channels;
  2374. +using System.Threading.Channels;
  2375. using Microsoft.AspNetCore.SignalR.Internal;
  2376. using Microsoft.AspNetCore.SignalR.Internal.Protocol;
  2377. using Microsoft.AspNetCore.Sockets;
  2378. @@ -20,7 +20,7 @@ namespace Microsoft.AspNetCore.SignalR.Common.Protocol.Tests
  2379. [MemberData(nameof(HubProtocols))]
  2380. public void DefaultHubProtocolResolverTestsCanCreateSupportedProtocols(IHubProtocol protocol)
  2381. {
  2382. - var mockConnection = new Mock<HubConnectionContext>(Channel.CreateUnbounded<HubMessage>().Out, new Mock<ConnectionContext>().Object);
  2383. + var mockConnection = new Mock<HubConnectionContext>(Channel.CreateUnbounded<HubMessage>().Writer, new Mock<ConnectionContext>().Object);
  2384. Assert.IsType(
  2385. protocol.GetType(),
  2386. new DefaultHubProtocolResolver(Options.Create(new HubOptions())).GetProtocol(protocol.Name, mockConnection.Object));
  2387. @@ -31,7 +31,7 @@ namespace Microsoft.AspNetCore.SignalR.Common.Protocol.Tests
  2388. [InlineData("dummy")]
  2389. public void DefaultHubProtocolResolverThrowsForNotSupportedProtocol(string protocolName)
  2390. {
  2391. - var mockConnection = new Mock<HubConnectionContext>(Channel.CreateUnbounded<HubMessage>().Out, new Mock<ConnectionContext>().Object);
  2392. + var mockConnection = new Mock<HubConnectionContext>(Channel.CreateUnbounded<HubMessage>().Writer, new Mock<ConnectionContext>().Object);
  2393. var exception = Assert.Throws<NotSupportedException>(
  2394. () => new DefaultHubProtocolResolver(Options.Create(new HubOptions())).GetProtocol(protocolName, mockConnection.Object));
  2395. diff --git a/test/Microsoft.AspNetCore.SignalR.Tests/WebSocketsTransportTests.cs b/test/Microsoft.AspNetCore.SignalR.Tests/WebSocketsTransportTests.cs
  2396. index b46f1757de7..edb9099fdbd 100644
  2397. --- a/test/Microsoft.AspNetCore.SignalR.Tests/WebSocketsTransportTests.cs
  2398. +++ b/test/Microsoft.AspNetCore.SignalR.Tests/WebSocketsTransportTests.cs
  2399. @@ -1,9 +1,9 @@
  2400. -// Copyright (c) .NET Foundation. All rights reserved.
  2401. +// Copyright (c) .NET Foundation. All rights reserved.
  2402. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
  2403. using System;
  2404. using System.Threading.Tasks;
  2405. -using System.Threading.Tasks.Channels;
  2406. +using System.Threading.Channels;
  2407. using Microsoft.AspNetCore.SignalR.Tests.Common;
  2408. using Microsoft.AspNetCore.Sockets;
  2409. using Microsoft.AspNetCore.Sockets.Client;
  2410. @@ -61,7 +61,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
  2411. var webSocketsTransport = new WebSocketsTransport(loggerFactory);
  2412. await webSocketsTransport.StartAsync(new Uri(_serverFixture.WebSocketsUrl + "/echo"), channelConnection,
  2413. TransferMode.Binary, connectionId: string.Empty);
  2414. - connectionToTransport.Out.TryComplete();
  2415. + connectionToTransport.Writer.TryComplete();
  2416. await webSocketsTransport.Running.OrTimeout(TimeSpan.FromSeconds(10));
  2417. }
  2418. }
  2419. @@ -82,7 +82,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
  2420. await webSocketsTransport.StartAsync(new Uri(_serverFixture.WebSocketsUrl + "/echo"), channelConnection, transferMode, connectionId: string.Empty);
  2421. var sendTcs = new TaskCompletionSource<object>();
  2422. - connectionToTransport.Out.TryWrite(new SendMessage(new byte[] { 0x42 }, sendTcs));
  2423. + connectionToTransport.Writer.TryWrite(new SendMessage(new byte[] { 0x42 }, sendTcs));
  2424. try
  2425. {
  2426. await sendTcs.Task;
  2427. @@ -99,7 +99,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
  2428. // The echo endpoint closes the connection immediately after sending response which should stop the transport
  2429. await webSocketsTransport.Running.OrTimeout();
  2430. - Assert.True(transportToConnection.In.TryRead(out var buffer));
  2431. + Assert.True(transportToConnection.Reader.TryRead(out var buffer));
  2432. Assert.Equal(new byte[] { 0x42 }, buffer);
  2433. }
  2434. }
  2435. diff --git a/test/Microsoft.AspNetCore.Sockets.Tests/ConnectionManagerTests.cs b/test/Microsoft.AspNetCore.Sockets.Tests/ConnectionManagerTests.cs
  2436. index 9f0c56a1a10..f2cb45f02c3 100644
  2437. --- a/test/Microsoft.AspNetCore.Sockets.Tests/ConnectionManagerTests.cs
  2438. +++ b/test/Microsoft.AspNetCore.Sockets.Tests/ConnectionManagerTests.cs
  2439. @@ -82,12 +82,12 @@ namespace Microsoft.AspNetCore.Sockets.Tests
  2440. connection.ApplicationTask = Task.Run(async () =>
  2441. {
  2442. - Assert.False(await connection.Transport.In.WaitToReadAsync());
  2443. + Assert.False(await connection.Transport.Reader.WaitToReadAsync());
  2444. });
  2445. connection.TransportTask = Task.Run(async () =>
  2446. {
  2447. - Assert.False(await connection.Application.In.WaitToReadAsync());
  2448. + Assert.False(await connection.Application.Reader.WaitToReadAsync());
  2449. });
  2450. connectionManager.CloseConnections();
  2451. @@ -197,7 +197,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
  2452. appLifetime.StopApplication();
  2453. // Connection should be disposed so this should complete immediately
  2454. - Assert.False(await connection.Application.Out.WaitToWriteAsync().OrTimeout());
  2455. + Assert.False(await connection.Application.Writer.WaitToWriteAsync().OrTimeout());
  2456. }
  2457. private static ConnectionManager CreateConnectionManager(IApplicationLifetime lifetime = null)
  2458. diff --git a/test/Microsoft.AspNetCore.Sockets.Tests/HttpConnectionDispatcherTests.cs b/test/Microsoft.AspNetCore.Sockets.Tests/HttpConnectionDispatcherTests.cs
  2459. index b71067b8574..c0605de722f 100644
  2460. --- a/test/Microsoft.AspNetCore.Sockets.Tests/HttpConnectionDispatcherTests.cs
  2461. +++ b/test/Microsoft.AspNetCore.Sockets.Tests/HttpConnectionDispatcherTests.cs
  2462. @@ -1,4 +1,4 @@
  2463. -// Copyright (c) .NET Foundation. All rights reserved.
  2464. +// Copyright (c) .NET Foundation. All rights reserved.
  2465. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
  2466. using System;
  2467. @@ -511,7 +511,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
  2468. var buffer = Encoding.UTF8.GetBytes("Hello World");
  2469. // Write to the transport so the poll yields
  2470. - await connection.Transport.Out.WriteAsync(buffer);
  2471. + await connection.Transport.Writer.WriteAsync(buffer);
  2472. await task;
  2473. @@ -543,7 +543,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
  2474. var buffer = Encoding.UTF8.GetBytes("Hello World");
  2475. // Write to the application
  2476. - await connection.Application.Out.WriteAsync(buffer);
  2477. + await connection.Application.Writer.WriteAsync(buffer);
  2478. await task;
  2479. @@ -573,7 +573,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
  2480. var buffer = Encoding.UTF8.GetBytes("Hello World");
  2481. // Write to the application
  2482. - await connection.Application.Out.WriteAsync(buffer);
  2483. + await connection.Application.Writer.WriteAsync(buffer);
  2484. await task;
  2485. @@ -606,7 +606,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
  2486. await task1.OrTimeout();
  2487. // Send a message from the app to complete Task 2
  2488. - await connection.Transport.Out.WriteAsync(Encoding.UTF8.GetBytes("Hello, World"));
  2489. + await connection.Transport.Writer.WriteAsync(Encoding.UTF8.GetBytes("Hello, World"));
  2490. await task2.OrTimeout();
  2491. @@ -775,7 +775,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
  2492. context.User = new ClaimsPrincipal(new ClaimsIdentity(new[] { new Claim(ClaimTypes.NameIdentifier, "name") }));
  2493. var endPointTask = dispatcher.ExecuteAsync(context, options, app);
  2494. - await connection.Transport.Out.WriteAsync(Encoding.UTF8.GetBytes("Hello, World")).OrTimeout();
  2495. + await connection.Transport.Writer.WriteAsync(Encoding.UTF8.GetBytes("Hello, World")).OrTimeout();
  2496. await endPointTask.OrTimeout();
  2497. @@ -853,7 +853,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
  2498. }));
  2499. var endPointTask = dispatcher.ExecuteAsync(context, options, app);
  2500. - await connection.Transport.Out.WriteAsync(Encoding.UTF8.GetBytes("Hello, World")).OrTimeout();
  2501. + await connection.Transport.Writer.WriteAsync(Encoding.UTF8.GetBytes("Hello, World")).OrTimeout();
  2502. await endPointTask.OrTimeout();
  2503. @@ -907,7 +907,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
  2504. context.User = new ClaimsPrincipal(new ClaimsIdentity(new[] { new Claim(ClaimTypes.NameIdentifier, "name") }));
  2505. var endPointTask = dispatcher.ExecuteAsync(context, options, app);
  2506. - await connection.Transport.Out.WriteAsync(Encoding.UTF8.GetBytes("Hello, World")).OrTimeout();
  2507. + await connection.Transport.Writer.WriteAsync(Encoding.UTF8.GetBytes("Hello, World")).OrTimeout();
  2508. await endPointTask.OrTimeout();
  2509. @@ -1110,7 +1110,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
  2510. {
  2511. public override Task OnConnectedAsync(ConnectionContext connection)
  2512. {
  2513. - connection.Transport.In.WaitToReadAsync().Wait();
  2514. + connection.Transport.Reader.WaitToReadAsync().Wait();
  2515. return Task.CompletedTask;
  2516. }
  2517. }
  2518. @@ -1135,7 +1135,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
  2519. {
  2520. public override async Task OnConnectedAsync(ConnectionContext connection)
  2521. {
  2522. - while (await connection.Transport.In.WaitToReadAsync())
  2523. + while (await connection.Transport.Reader.WaitToReadAsync())
  2524. {
  2525. }
  2526. }
  2527. diff --git a/test/Microsoft.AspNetCore.Sockets.Tests/LongPollingTests.cs b/test/Microsoft.AspNetCore.Sockets.Tests/LongPollingTests.cs
  2528. index c851dfe7138..112314f6acd 100644
  2529. --- a/test/Microsoft.AspNetCore.Sockets.Tests/LongPollingTests.cs
  2530. +++ b/test/Microsoft.AspNetCore.Sockets.Tests/LongPollingTests.cs
  2531. @@ -1,11 +1,11 @@
  2532. -// Copyright (c) .NET Foundation. All rights reserved.
  2533. +// Copyright (c) .NET Foundation. All rights reserved.
  2534. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
  2535. using System.IO;
  2536. using System.Text;
  2537. using System.Threading;
  2538. using System.Threading.Tasks;
  2539. -using System.Threading.Tasks.Channels;
  2540. +using System.Threading.Channels;
  2541. using Microsoft.AspNetCore.Http;
  2542. using Microsoft.AspNetCore.SignalR.Tests.Common;
  2543. using Microsoft.AspNetCore.Sockets.Internal.Transports;
  2544. @@ -23,7 +23,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
  2545. var context = new DefaultHttpContext();
  2546. var poll = new LongPollingTransport(CancellationToken.None, channel, connectionId: string.Empty, loggerFactory: new LoggerFactory());
  2547. - Assert.True(channel.Out.TryComplete());
  2548. + Assert.True(channel.Writer.TryComplete());
  2549. await poll.ProcessRequestAsync(context, context.RequestAborted);
  2550. @@ -56,9 +56,9 @@ namespace Microsoft.AspNetCore.Sockets.Tests
  2551. var ms = new MemoryStream();
  2552. context.Response.Body = ms;
  2553. - await channel.Out.WriteAsync(Encoding.UTF8.GetBytes("Hello World"));
  2554. + await channel.Writer.WriteAsync(Encoding.UTF8.GetBytes("Hello World"));
  2555. - Assert.True(channel.Out.TryComplete());
  2556. + Assert.True(channel.Writer.TryComplete());
  2557. await poll.ProcessRequestAsync(context, context.RequestAborted);
  2558. @@ -76,11 +76,11 @@ namespace Microsoft.AspNetCore.Sockets.Tests
  2559. var ms = new MemoryStream();
  2560. context.Response.Body = ms;
  2561. - await channel.Out.WriteAsync(Encoding.UTF8.GetBytes("Hello"));
  2562. - await channel.Out.WriteAsync(Encoding.UTF8.GetBytes(" "));
  2563. - await channel.Out.WriteAsync(Encoding.UTF8.GetBytes("World"));
  2564. + await channel.Writer.WriteAsync(Encoding.UTF8.GetBytes("Hello"));
  2565. + await channel.Writer.WriteAsync(Encoding.UTF8.GetBytes(" "));
  2566. + await channel.Writer.WriteAsync(Encoding.UTF8.GetBytes("World"));
  2567. - Assert.True(channel.Out.TryComplete());
  2568. + Assert.True(channel.Writer.TryComplete());
  2569. await poll.ProcessRequestAsync(context, context.RequestAborted);
  2570. diff --git a/test/Microsoft.AspNetCore.Sockets.Tests/MapEndPointTests.cs b/test/Microsoft.AspNetCore.Sockets.Tests/MapEndPointTests.cs
  2571. index 79a0bd9d345..da1ae0c0fa8 100644
  2572. --- a/test/Microsoft.AspNetCore.Sockets.Tests/MapEndPointTests.cs
  2573. +++ b/test/Microsoft.AspNetCore.Sockets.Tests/MapEndPointTests.cs
  2574. @@ -1,4 +1,4 @@
  2575. -// Copyright (c) .NET Foundation. All rights reserved.
  2576. +// Copyright (c) .NET Foundation. All rights reserved.
  2577. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
  2578. using System;
  2579. @@ -13,12 +13,21 @@ using Microsoft.AspNetCore.Hosting.Server.Features;
  2580. using Microsoft.AspNetCore.SignalR.Tests.Common;
  2581. using Microsoft.AspNetCore.Testing.xunit;
  2582. using Microsoft.Extensions.DependencyInjection;
  2583. +using Microsoft.Extensions.Logging;
  2584. using Xunit;
  2585. +using Xunit.Abstractions;
  2586. namespace Microsoft.AspNetCore.Sockets.Tests
  2587. {
  2588. public class MapEndPointTests
  2589. {
  2590. + private ITestOutputHelper _output;
  2591. +
  2592. + public MapEndPointTests(ITestOutputHelper output)
  2593. + {
  2594. + _output = output;
  2595. + }
  2596. +
  2597. [Fact]
  2598. public void MapEndPointFindsAuthAttributeOnEndPoint()
  2599. {
  2600. @@ -40,6 +49,10 @@ namespace Microsoft.AspNetCore.Sockets.Tests
  2601. });
  2602. });
  2603. })
  2604. + .ConfigureLogging(factory =>
  2605. + {
  2606. + factory.AddXunit(_output, LogLevel.Trace);
  2607. + })
  2608. .Build();
  2609. Assert.Equal(1, authCount);
  2610. @@ -66,6 +79,10 @@ namespace Microsoft.AspNetCore.Sockets.Tests
  2611. });
  2612. });
  2613. })
  2614. + .ConfigureLogging(factory =>
  2615. + {
  2616. + factory.AddXunit(_output, LogLevel.Trace);
  2617. + })
  2618. .Build();
  2619. Assert.Equal(1, authCount);
  2620. @@ -92,6 +109,10 @@ namespace Microsoft.AspNetCore.Sockets.Tests
  2621. });
  2622. });
  2623. })
  2624. + .ConfigureLogging(factory =>
  2625. + {
  2626. + factory.AddXunit(_output, LogLevel.Trace);
  2627. + })
  2628. .Build();
  2629. Assert.Equal(2, authCount);
  2630. @@ -102,24 +123,28 @@ namespace Microsoft.AspNetCore.Sockets.Tests
  2631. public async Task MapEndPointWithWebSocketSubProtocolSetsProtocol()
  2632. {
  2633. var host = new WebHostBuilder()
  2634. - .UseUrls("http://127.0.0.1:0")
  2635. - .UseKestrel()
  2636. - .ConfigureServices(services =>
  2637. - {
  2638. - services.AddSockets();
  2639. - services.AddEndPoint<MyEndPoint>();
  2640. - })
  2641. - .Configure(app =>
  2642. + .UseUrls("http://127.0.0.1:0")
  2643. + .UseKestrel()
  2644. + .ConfigureServices(services =>
  2645. + {
  2646. + services.AddSockets();
  2647. + services.AddEndPoint<MyEndPoint>();
  2648. + })
  2649. + .Configure(app =>
  2650. + {
  2651. + app.UseSockets(routes =>
  2652. {
  2653. - app.UseSockets(routes =>
  2654. + routes.MapEndPoint<MyEndPoint>("socket", httpSocketOptions =>
  2655. {
  2656. - routes.MapEndPoint<MyEndPoint>("socket", httpSocketOptions =>
  2657. - {
  2658. - httpSocketOptions.WebSockets.SubProtocol = "protocol1";
  2659. - });
  2660. + httpSocketOptions.WebSockets.SubProtocol = "protocol1";
  2661. });
  2662. - })
  2663. - .Build();
  2664. + });
  2665. + })
  2666. + .ConfigureLogging(factory =>
  2667. + {
  2668. + factory.AddXunit(_output, LogLevel.Trace);
  2669. + })
  2670. + .Build();
  2671. await host.StartAsync();
  2672. @@ -140,7 +165,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
  2673. {
  2674. public override async Task OnConnectedAsync(ConnectionContext connection)
  2675. {
  2676. - while (!await connection.Transport.In.WaitToReadAsync())
  2677. + while (!await connection.Transport.Reader.WaitToReadAsync())
  2678. {
  2679. }
  2680. diff --git a/test/Microsoft.AspNetCore.Sockets.Tests/Microsoft.AspNetCore.Sockets.Tests.csproj b/test/Microsoft.AspNetCore.Sockets.Tests/Microsoft.AspNetCore.Sockets.Tests.csproj
  2681. index c73a09bcc4b..432dfec514a 100644
  2682. --- a/test/Microsoft.AspNetCore.Sockets.Tests/Microsoft.AspNetCore.Sockets.Tests.csproj
  2683. +++ b/test/Microsoft.AspNetCore.Sockets.Tests/Microsoft.AspNetCore.Sockets.Tests.csproj
  2684. @@ -3,7 +3,7 @@
  2685. <PropertyGroup>
  2686. <TargetFrameworks>netcoreapp2.0;net461</TargetFrameworks>
  2687. <TargetFrameworks Condition="'$(OS)' != 'Windows_NT'">netcoreapp2.0</TargetFrameworks>
  2688. -
  2689. +
  2690. <RuntimeIdentifier Condition="'$(TargetFramework)' != 'netcoreapp2.0'">win7-x64</RuntimeIdentifier>
  2691. </PropertyGroup>
  2692. @@ -21,6 +21,7 @@
  2693. <PackageReference Include="Microsoft.AspNetCore.Http" Version="$(MicrosoftAspNetCoreHttpPackageVersion)" />
  2694. <PackageReference Include="Microsoft.AspNetCore.Server.Kestrel" Version="$(MicrosoftAspNetCoreServerKestrelPackageVersion)" />
  2695. <PackageReference Include="Newtonsoft.Json" Version="$(NewtonsoftJsonPackageVersion)" />
  2696. + <PackageReference Include="Microsoft.Extensions.Logging.Testing" Version="$(MicrosoftExtensionsLoggingTestingPackageVersion)" />
  2697. </ItemGroup>
  2698. </Project>
  2699. diff --git a/test/Microsoft.AspNetCore.Sockets.Tests/ServerSentEventsTests.cs b/test/Microsoft.AspNetCore.Sockets.Tests/ServerSentEventsTests.cs
  2700. index fbdc4137b87..b7283440b43 100644
  2701. --- a/test/Microsoft.AspNetCore.Sockets.Tests/ServerSentEventsTests.cs
  2702. +++ b/test/Microsoft.AspNetCore.Sockets.Tests/ServerSentEventsTests.cs
  2703. @@ -1,10 +1,10 @@
  2704. -// Copyright (c) .NET Foundation. All rights reserved.
  2705. +// Copyright (c) .NET Foundation. All rights reserved.
  2706. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
  2707. using System.IO;
  2708. using System.Text;
  2709. using System.Threading.Tasks;
  2710. -using System.Threading.Tasks.Channels;
  2711. +using System.Threading.Channels;
  2712. using Microsoft.AspNetCore.Http;
  2713. using Microsoft.AspNetCore.Http.Features;
  2714. using Microsoft.AspNetCore.SignalR.Tests.Common;
  2715. @@ -23,7 +23,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
  2716. var context = new DefaultHttpContext();
  2717. var sse = new ServerSentEventsTransport(channel, connectionId: string.Empty, loggerFactory: new LoggerFactory());
  2718. - Assert.True(channel.Out.TryComplete());
  2719. + Assert.True(channel.Writer.TryComplete());
  2720. await sse.ProcessRequestAsync(context, context.RequestAborted);
  2721. @@ -40,7 +40,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
  2722. context.Features.Set<IHttpBufferingFeature>(feature);
  2723. var sse = new ServerSentEventsTransport(channel, connectionId: string.Empty, loggerFactory: new LoggerFactory());
  2724. - Assert.True(channel.Out.TryComplete());
  2725. + Assert.True(channel.Writer.TryComplete());
  2726. await sse.ProcessRequestAsync(context, context.RequestAborted);
  2727. @@ -50,7 +50,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
  2728. [Fact]
  2729. public async Task SSEWritesMessages()
  2730. {
  2731. - var channel = Channel.CreateUnbounded<byte[]>(new ChannelOptimizations
  2732. + var channel = Channel.CreateUnbounded<byte[]>(new UnboundedChannelOptions
  2733. {
  2734. AllowSynchronousContinuations = true
  2735. });
  2736. @@ -62,11 +62,11 @@ namespace Microsoft.AspNetCore.Sockets.Tests
  2737. var task = sse.ProcessRequestAsync(context, context.RequestAborted);
  2738. - await channel.Out.WriteAsync(Encoding.ASCII.GetBytes("Hello"));
  2739. + await channel.Writer.WriteAsync(Encoding.ASCII.GetBytes("Hello"));
  2740. Assert.Equal(":\r\ndata: Hello\r\n\r\n", Encoding.ASCII.GetString(ms.ToArray()));
  2741. - channel.Out.TryComplete();
  2742. + channel.Writer.TryComplete();
  2743. await task.OrTimeout();
  2744. }
  2745. @@ -83,9 +83,9 @@ namespace Microsoft.AspNetCore.Sockets.Tests
  2746. var ms = new MemoryStream();
  2747. context.Response.Body = ms;
  2748. - await channel.Out.WriteAsync(Encoding.UTF8.GetBytes(message));
  2749. + await channel.Writer.WriteAsync(Encoding.UTF8.GetBytes(message));
  2750. - Assert.True(channel.Out.TryComplete());
  2751. + Assert.True(channel.Writer.TryComplete());
  2752. await sse.ProcessRequestAsync(context, context.RequestAborted);
  2753. diff --git a/test/Microsoft.AspNetCore.Sockets.Tests/TestWebSocketConnectionFeature.cs b/test/Microsoft.AspNetCore.Sockets.Tests/TestWebSocketConnectionFeature.cs
  2754. index 29b5cac70f8..ea085b939f3 100644
  2755. --- a/test/Microsoft.AspNetCore.Sockets.Tests/TestWebSocketConnectionFeature.cs
  2756. +++ b/test/Microsoft.AspNetCore.Sockets.Tests/TestWebSocketConnectionFeature.cs
  2757. @@ -1,9 +1,9 @@
  2758. -using System;
  2759. +using System;
  2760. using System.Collections.Generic;
  2761. using System.Net.WebSockets;
  2762. using System.Threading;
  2763. using System.Threading.Tasks;
  2764. -using System.Threading.Tasks.Channels;
  2765. +using System.Threading.Channels;
  2766. using Microsoft.AspNetCore.Http;
  2767. using Microsoft.AspNetCore.Http.Features;
  2768. @@ -22,8 +22,8 @@ namespace Microsoft.AspNetCore.Sockets.Tests
  2769. var clientToServer = Channel.CreateUnbounded<WebSocketMessage>();
  2770. var serverToClient = Channel.CreateUnbounded<WebSocketMessage>();
  2771. - var clientSocket = new WebSocketChannel(serverToClient.In, clientToServer.Out);
  2772. - var serverSocket = new WebSocketChannel(clientToServer.In, serverToClient.Out);
  2773. + var clientSocket = new WebSocketChannel(serverToClient.Reader, clientToServer.Writer);
  2774. + var serverSocket = new WebSocketChannel(clientToServer.Reader, serverToClient.Writer);
  2775. Client = clientSocket;
  2776. return Task.FromResult<WebSocket>(serverSocket);
  2777. @@ -35,14 +35,14 @@ namespace Microsoft.AspNetCore.Sockets.Tests
  2778. public class WebSocketChannel : WebSocket
  2779. {
  2780. - private readonly ReadableChannel<WebSocketMessage> _input;
  2781. - private readonly WritableChannel<WebSocketMessage> _output;
  2782. + private readonly ChannelReader<WebSocketMessage> _input;
  2783. + private readonly ChannelWriter<WebSocketMessage> _output;
  2784. private WebSocketCloseStatus? _closeStatus;
  2785. private string _closeStatusDescription;
  2786. private WebSocketState _state;
  2787. - public WebSocketChannel(ReadableChannel<WebSocketMessage> input, WritableChannel<WebSocketMessage> output)
  2788. + public WebSocketChannel(ChannelReader<WebSocketMessage> input, ChannelWriter<WebSocketMessage> output)
  2789. {
  2790. _input = input;
  2791. _output = output;
  2792. @@ -209,4 +209,4 @@ namespace Microsoft.AspNetCore.Sockets.Tests
  2793. public string CloseStatusDescription { get; set; }
  2794. }
  2795. }
  2796. -}
  2797. \ No newline at end of file
  2798. +}
  2799. diff --git a/test/Microsoft.AspNetCore.Sockets.Tests/WebSocketsTests.cs b/test/Microsoft.AspNetCore.Sockets.Tests/WebSocketsTests.cs
  2800. index ba696ff46e4..d93d653e9b1 100644
  2801. --- a/test/Microsoft.AspNetCore.Sockets.Tests/WebSocketsTests.cs
  2802. +++ b/test/Microsoft.AspNetCore.Sockets.Tests/WebSocketsTests.cs
  2803. @@ -1,4 +1,4 @@
  2804. -// Copyright (c) .NET Foundation. All rights reserved.
  2805. +// Copyright (c) .NET Foundation. All rights reserved.
  2806. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
  2807. using System;
  2808. @@ -6,58 +6,68 @@ using System.Net.WebSockets;
  2809. using System.Text;
  2810. using System.Threading;
  2811. using System.Threading.Tasks;
  2812. -using System.Threading.Tasks.Channels;
  2813. +using System.Threading.Channels;
  2814. +using Microsoft.AspNetCore.SignalR.Internal;
  2815. using Microsoft.AspNetCore.SignalR.Tests.Common;
  2816. using Microsoft.AspNetCore.Sockets.Internal;
  2817. using Microsoft.AspNetCore.Sockets.Internal.Transports;
  2818. -using Microsoft.Extensions.Logging;
  2819. +using Microsoft.Extensions.Logging.Testing;
  2820. using Xunit;
  2821. +using Xunit.Abstractions;
  2822. namespace Microsoft.AspNetCore.Sockets.Tests
  2823. {
  2824. - public class WebSocketsTests
  2825. + public class WebSocketsTests : LoggedTest
  2826. {
  2827. + public WebSocketsTests(ITestOutputHelper output)
  2828. + : base(output)
  2829. + {
  2830. + }
  2831. +
  2832. [Theory]
  2833. [InlineData(WebSocketMessageType.Text)]
  2834. [InlineData(WebSocketMessageType.Binary)]
  2835. public async Task ReceivedFramesAreWrittenToChannel(WebSocketMessageType webSocketMessageType)
  2836. {
  2837. - var transportToApplication = Channel.CreateUnbounded<byte[]>();
  2838. - var applicationToTransport = Channel.CreateUnbounded<byte[]>();
  2839. -
  2840. - using (var transportSide = ChannelConnection.Create<byte[]>(applicationToTransport, transportToApplication))
  2841. - using (var applicationSide = ChannelConnection.Create<byte[]>(transportToApplication, applicationToTransport))
  2842. - using (var feature = new TestWebSocketConnectionFeature())
  2843. + using (StartLog(out var loggerFactory))
  2844. {
  2845. - var connectionContext = new DefaultConnectionContext(string.Empty, null, null);
  2846. - var ws = new WebSocketsTransport(new WebSocketOptions(), transportSide, connectionContext, loggerFactory: new LoggerFactory());
  2847. + var transportToApplication = Channel.CreateUnbounded<byte[]>();
  2848. + var applicationToTransport = Channel.CreateUnbounded<byte[]>();
  2849. - // Give the server socket to the transport and run it
  2850. - var transport = ws.ProcessSocketAsync(await feature.AcceptAsync());
  2851. + using (var transportSide = ChannelConnection.Create<byte[]>(applicationToTransport, transportToApplication))
  2852. + using (var applicationSide = ChannelConnection.Create<byte[]>(transportToApplication, applicationToTransport))
  2853. + using (var feature = new TestWebSocketConnectionFeature())
  2854. + {
  2855. + var connectionContext = new DefaultConnectionContext(string.Empty, null, null);
  2856. + var ws = new WebSocketsTransport(new WebSocketOptions(), transportSide, connectionContext, loggerFactory);
  2857. +
  2858. + // Give the server socket to the transport and run it
  2859. + var transport = ws.ProcessSocketAsync(await feature.AcceptAsync());
  2860. - // Run the client socket
  2861. - var client = feature.Client.ExecuteAndCaptureFramesAsync();
  2862. + // Run the client socket
  2863. + var client = feature.Client.ExecuteAndCaptureFramesAsync();
  2864. - // Send a frame, then close
  2865. - await feature.Client.SendAsync(
  2866. - buffer: new ArraySegment<byte>(Encoding.UTF8.GetBytes("Hello")),
  2867. - messageType: webSocketMessageType,
  2868. - endOfMessage: true,
  2869. - cancellationToken: CancellationToken.None);
  2870. - await feature.Client.CloseAsync(WebSocketCloseStatus.NormalClosure, "", CancellationToken.None);
  2871. + // Send a frame, then close
  2872. + await feature.Client.SendAsync(
  2873. + buffer: new ArraySegment<byte>(Encoding.UTF8.GetBytes("Hello")),
  2874. + messageType: webSocketMessageType,
  2875. + endOfMessage: true,
  2876. + cancellationToken: CancellationToken.None);
  2877. + await feature.Client.CloseAsync(WebSocketCloseStatus.NormalClosure, "", CancellationToken.None);
  2878. - var buffer = await applicationSide.In.ReadAsync();
  2879. - Assert.Equal("Hello", Encoding.UTF8.GetString(buffer));
  2880. + var buffer = await applicationSide.Reader.ReadAsync();
  2881. + Assert.Equal("Hello", Encoding.UTF8.GetString(buffer));
  2882. - Assert.True(applicationSide.Out.TryComplete());
  2883. + Assert.True(applicationSide.Writer.TryComplete());
  2884. - // The transport should finish now
  2885. - await transport;
  2886. + // The transport should finish now
  2887. + await transport;
  2888. - // The connection should close after this, which means the client will get a close frame.
  2889. - var clientSummary = await client;
  2890. + // The connection should close after this, which means the client will get a close frame.
  2891. + var clientSummary = await client;
  2892. - Assert.Equal(WebSocketCloseStatus.NormalClosure, clientSummary.CloseResult.CloseStatus);
  2893. + Assert.Equal(WebSocketCloseStatus.NormalClosure, clientSummary.CloseResult.CloseStatus);
  2894. + }
  2895. }
  2896. }
  2897. @@ -66,256 +76,276 @@ namespace Microsoft.AspNetCore.Sockets.Tests
  2898. [InlineData(TransferMode.Binary, WebSocketMessageType.Binary)]
  2899. public async Task WebSocketTransportSetsMessageTypeBasedOnTransferModeFeature(TransferMode transferMode, WebSocketMessageType expectedMessageType)
  2900. {
  2901. - var transportToApplication = Channel.CreateUnbounded<byte[]>();
  2902. - var applicationToTransport = Channel.CreateUnbounded<byte[]>();
  2903. -
  2904. - using (var transportSide = ChannelConnection.Create<byte[]>(applicationToTransport, transportToApplication))
  2905. - using (var applicationSide = ChannelConnection.Create<byte[]>(transportToApplication, applicationToTransport))
  2906. - using (var feature = new TestWebSocketConnectionFeature())
  2907. + using (StartLog(out var loggerFactory))
  2908. {
  2909. - var connectionContext = new DefaultConnectionContext(string.Empty, null, null) { TransferMode = transferMode };
  2910. - var ws = new WebSocketsTransport(new WebSocketOptions(),
  2911. - transportSide, connectionContext, loggerFactory: new LoggerFactory());
  2912. + var transportToApplication = Channel.CreateUnbounded<byte[]>();
  2913. + var applicationToTransport = Channel.CreateUnbounded<byte[]>();
  2914. - // Give the server socket to the transport and run it
  2915. - var transport = ws.ProcessSocketAsync(await feature.AcceptAsync());
  2916. + using (var transportSide = ChannelConnection.Create<byte[]>(applicationToTransport, transportToApplication))
  2917. + using (var applicationSide = ChannelConnection.Create<byte[]>(transportToApplication, applicationToTransport))
  2918. + using (var feature = new TestWebSocketConnectionFeature())
  2919. + {
  2920. + var connectionContext = new DefaultConnectionContext(string.Empty, null, null) { TransferMode = transferMode };
  2921. + var ws = new WebSocketsTransport(new WebSocketOptions(), transportSide, connectionContext, loggerFactory);
  2922. +
  2923. + // Give the server socket to the transport and run it
  2924. + var transport = ws.ProcessSocketAsync(await feature.AcceptAsync());
  2925. - // Run the client socket
  2926. - var client = feature.Client.ExecuteAndCaptureFramesAsync();
  2927. + // Run the client socket
  2928. + var client = feature.Client.ExecuteAndCaptureFramesAsync();
  2929. - // Write to the output channel, and then complete it
  2930. - await applicationSide.Out.WriteAsync(Encoding.UTF8.GetBytes("Hello"));
  2931. - Assert.True(applicationSide.Out.TryComplete());
  2932. + // Write to the output channel, and then complete it
  2933. + await applicationSide.Writer.WriteAsync(Encoding.UTF8.GetBytes("Hello"));
  2934. + Assert.True(applicationSide.Writer.TryComplete());
  2935. - // The client should finish now, as should the server
  2936. - var clientSummary = await client;
  2937. - await feature.Client.CloseAsync(WebSocketCloseStatus.NormalClosure, "", CancellationToken.None);
  2938. - await transport;
  2939. + // The client should finish now, as should the server
  2940. + var clientSummary = await client;
  2941. + await feature.Client.CloseAsync(WebSocketCloseStatus.NormalClosure, "", CancellationToken.None);
  2942. + await transport;
  2943. - Assert.Equal(1, clientSummary.Received.Count);
  2944. - Assert.True(clientSummary.Received[0].EndOfMessage);
  2945. - Assert.Equal(expectedMessageType, clientSummary.Received[0].MessageType);
  2946. - Assert.Equal("Hello", Encoding.UTF8.GetString(clientSummary.Received[0].Buffer));
  2947. + Assert.Equal(1, clientSummary.Received.Count);
  2948. + Assert.True(clientSummary.Received[0].EndOfMessage);
  2949. + Assert.Equal(expectedMessageType, clientSummary.Received[0].MessageType);
  2950. + Assert.Equal("Hello", Encoding.UTF8.GetString(clientSummary.Received[0].Buffer));
  2951. + }
  2952. }
  2953. }
  2954. [Fact]
  2955. public async Task TransportFailsWhenClientDisconnectsAbnormally()
  2956. {
  2957. - var transportToApplication = Channel.CreateUnbounded<byte[]>();
  2958. - var applicationToTransport = Channel.CreateUnbounded<byte[]>();
  2959. -
  2960. - using (var transportSide = ChannelConnection.Create<byte[]>(applicationToTransport, transportToApplication))
  2961. - using (var applicationSide = ChannelConnection.Create<byte[]>(transportToApplication, applicationToTransport))
  2962. - using (var feature = new TestWebSocketConnectionFeature())
  2963. + using (StartLog(out var loggerFactory))
  2964. {
  2965. - async Task CompleteApplicationAfterTransportCompletes()
  2966. + var transportToApplication = Channel.CreateUnbounded<byte[]>();
  2967. + var applicationToTransport = Channel.CreateUnbounded<byte[]>();
  2968. +
  2969. + using (var transportSide = ChannelConnection.Create<byte[]>(applicationToTransport, transportToApplication))
  2970. + using (var applicationSide = ChannelConnection.Create<byte[]>(transportToApplication, applicationToTransport))
  2971. + using (var feature = new TestWebSocketConnectionFeature())
  2972. {
  2973. - // Wait until the transport completes so that we can end the application
  2974. - await applicationSide.In.WaitToReadAsync();
  2975. + async Task CompleteApplicationAfterTransportCompletes()
  2976. + {
  2977. + // Wait until the transport completes so that we can end the application
  2978. + await applicationSide.Reader.WaitToReadAsync();
  2979. - // Complete the application so that the connection unwinds without aborting
  2980. - applicationSide.Out.TryComplete();
  2981. - }
  2982. + // Complete the application so that the connection unwinds without aborting
  2983. + applicationSide.Writer.TryComplete();
  2984. + }
  2985. - var connectionContext = new DefaultConnectionContext(string.Empty, null, null);
  2986. - var ws = new WebSocketsTransport(new WebSocketOptions(), transportSide, connectionContext, loggerFactory: new LoggerFactory());
  2987. + var connectionContext = new DefaultConnectionContext(string.Empty, null, null);
  2988. + var ws = new WebSocketsTransport(new WebSocketOptions(), transportSide, connectionContext, loggerFactory);
  2989. - // Give the server socket to the transport and run it
  2990. - var transport = ws.ProcessSocketAsync(await feature.AcceptAsync());
  2991. + // Give the server socket to the transport and run it
  2992. + var transport = ws.ProcessSocketAsync(await feature.AcceptAsync());
  2993. - // Run the client socket
  2994. - var client = feature.Client.ExecuteAndCaptureFramesAsync();
  2995. + // Run the client socket
  2996. + var client = feature.Client.ExecuteAndCaptureFramesAsync();
  2997. - // When the close frame is received, we complete the application so the send
  2998. - // loop unwinds
  2999. - _ = CompleteApplicationAfterTransportCompletes();
  3000. + // When the close frame is received, we complete the application so the send
  3001. + // loop unwinds
  3002. + _ = CompleteApplicationAfterTransportCompletes();
  3003. - // Terminate the client to server channel with an exception
  3004. - feature.Client.SendAbort();
  3005. + // Terminate the client to server channel with an exception
  3006. + feature.Client.SendAbort();
  3007. - // Wait for the transport
  3008. - await Assert.ThrowsAsync<WebSocketException>(() => transport).OrTimeout();
  3009. + // Wait for the transport
  3010. + await Assert.ThrowsAsync<WebSocketException>(() => transport).OrTimeout();
  3011. - var summary = await client.OrTimeout();
  3012. - Assert.Equal(WebSocketCloseStatus.InternalServerError, summary.CloseResult.CloseStatus);
  3013. + var summary = await client.OrTimeout();
  3014. + Assert.Equal(WebSocketCloseStatus.InternalServerError, summary.CloseResult.CloseStatus);
  3015. + }
  3016. }
  3017. }
  3018. [Fact]
  3019. public async Task ClientReceivesInternalServerErrorWhenTheApplicationFails()
  3020. {
  3021. - var transportToApplication = Channel.CreateUnbounded<byte[]>();
  3022. - var applicationToTransport = Channel.CreateUnbounded<byte[]>();
  3023. -
  3024. - using (var transportSide = ChannelConnection.Create<byte[]>(applicationToTransport, transportToApplication))
  3025. - using (var applicationSide = ChannelConnection.Create<byte[]>(transportToApplication, applicationToTransport))
  3026. - using (var feature = new TestWebSocketConnectionFeature())
  3027. + using (StartLog(out var loggerFactory))
  3028. {
  3029. - var connectionContext = new DefaultConnectionContext(string.Empty, null, null);
  3030. - var ws = new WebSocketsTransport(new WebSocketOptions(), transportSide, connectionContext, loggerFactory: new LoggerFactory());
  3031. + var transportToApplication = Channel.CreateUnbounded<byte[]>();
  3032. + var applicationToTransport = Channel.CreateUnbounded<byte[]>();
  3033. - // Give the server socket to the transport and run it
  3034. - var transport = ws.ProcessSocketAsync(await feature.AcceptAsync());
  3035. + using (var transportSide = ChannelConnection.Create<byte[]>(applicationToTransport, transportToApplication))
  3036. + using (var applicationSide = ChannelConnection.Create<byte[]>(transportToApplication, applicationToTransport))
  3037. + using (var feature = new TestWebSocketConnectionFeature())
  3038. + {
  3039. + var connectionContext = new DefaultConnectionContext(string.Empty, null, null);
  3040. + var ws = new WebSocketsTransport(new WebSocketOptions(), transportSide, connectionContext, loggerFactory);
  3041. +
  3042. + // Give the server socket to the transport and run it
  3043. + var transport = ws.ProcessSocketAsync(await feature.AcceptAsync());
  3044. - // Run the client socket
  3045. - var client = feature.Client.ExecuteAndCaptureFramesAsync();
  3046. + // Run the client socket
  3047. + var client = feature.Client.ExecuteAndCaptureFramesAsync();
  3048. - // Fail in the app
  3049. - Assert.True(applicationSide.Out.TryComplete(new InvalidOperationException("Catastrophic failure.")));
  3050. - var clientSummary = await client.OrTimeout();
  3051. - Assert.Equal(WebSocketCloseStatus.InternalServerError, clientSummary.CloseResult.CloseStatus);
  3052. + // Fail in the app
  3053. + Assert.True(applicationSide.Writer.TryComplete(new InvalidOperationException("Catastrophic failure.")));
  3054. + var clientSummary = await client.OrTimeout();
  3055. + Assert.Equal(WebSocketCloseStatus.InternalServerError, clientSummary.CloseResult.CloseStatus);
  3056. - // Close from the client
  3057. - await feature.Client.CloseAsync(WebSocketCloseStatus.NormalClosure, "", CancellationToken.None);
  3058. + // Close from the client
  3059. + await feature.Client.CloseAsync(WebSocketCloseStatus.NormalClosure, "", CancellationToken.None);
  3060. - var ex = await Assert.ThrowsAsync<InvalidOperationException>(() => transport.OrTimeout());
  3061. - Assert.Equal("Catastrophic failure.", ex.Message);
  3062. + var ex = await Assert.ThrowsAsync<InvalidOperationException>(() => transport.OrTimeout());
  3063. + Assert.Equal("Catastrophic failure.", ex.Message);
  3064. + }
  3065. }
  3066. }
  3067. [Fact]
  3068. public async Task TransportClosesOnCloseTimeoutIfClientDoesNotSendCloseFrame()
  3069. {
  3070. - var transportToApplication = Channel.CreateUnbounded<byte[]>();
  3071. - var applicationToTransport = Channel.CreateUnbounded<byte[]>();
  3072. -
  3073. - using (var transportSide = ChannelConnection.Create<byte[]>(applicationToTransport, transportToApplication))
  3074. - using (var applicationSide = ChannelConnection.Create<byte[]>(transportToApplication, applicationToTransport))
  3075. - using (var feature = new TestWebSocketConnectionFeature())
  3076. + using (StartLog(out var loggerFactory))
  3077. {
  3078. - var options = new WebSocketOptions()
  3079. + var transportToApplication = Channel.CreateUnbounded<byte[]>();
  3080. + var applicationToTransport = Channel.CreateUnbounded<byte[]>();
  3081. +
  3082. + using (var transportSide = ChannelConnection.Create<byte[]>(applicationToTransport, transportToApplication))
  3083. + using (var applicationSide = ChannelConnection.Create<byte[]>(transportToApplication, applicationToTransport))
  3084. + using (var feature = new TestWebSocketConnectionFeature())
  3085. {
  3086. - CloseTimeout = TimeSpan.FromSeconds(1)
  3087. - };
  3088. + var options = new WebSocketOptions()
  3089. + {
  3090. + CloseTimeout = TimeSpan.FromSeconds(1)
  3091. + };
  3092. - var connectionContext = new DefaultConnectionContext(string.Empty, null, null);
  3093. - var ws = new WebSocketsTransport(options, transportSide, connectionContext, loggerFactory: new LoggerFactory());
  3094. + var connectionContext = new DefaultConnectionContext(string.Empty, null, null);
  3095. + var ws = new WebSocketsTransport(options, transportSide, connectionContext, loggerFactory);
  3096. - var serverSocket = await feature.AcceptAsync();
  3097. - // Give the server socket to the transport and run it
  3098. - var transport = ws.ProcessSocketAsync(serverSocket);
  3099. + var serverSocket = await feature.AcceptAsync();
  3100. + // Give the server socket to the transport and run it
  3101. + var transport = ws.ProcessSocketAsync(serverSocket);
  3102. - // End the app
  3103. - applicationSide.Dispose();
  3104. + // End the app
  3105. + applicationSide.Dispose();
  3106. - await transport.OrTimeout(TimeSpan.FromSeconds(10));
  3107. + await transport.OrTimeout(TimeSpan.FromSeconds(10));
  3108. - // Now we're closed
  3109. - Assert.Equal(WebSocketState.Aborted, serverSocket.State);
  3110. + // Now we're closed
  3111. + Assert.Equal(WebSocketState.Aborted, serverSocket.State);
  3112. - serverSocket.Dispose();
  3113. + serverSocket.Dispose();
  3114. + }
  3115. }
  3116. }
  3117. [Fact]
  3118. public async Task TransportFailsOnTimeoutWithErrorWhenApplicationFailsAndClientDoesNotSendCloseFrame()
  3119. {
  3120. - var transportToApplication = Channel.CreateUnbounded<byte[]>();
  3121. - var applicationToTransport = Channel.CreateUnbounded<byte[]>();
  3122. -
  3123. - using (var transportSide = ChannelConnection.Create<byte[]>(applicationToTransport, transportToApplication))
  3124. - using (var applicationSide = ChannelConnection.Create<byte[]>(transportToApplication, applicationToTransport))
  3125. - using (var feature = new TestWebSocketConnectionFeature())
  3126. + using (StartLog(out var loggerFactory))
  3127. {
  3128. - var options = new WebSocketOptions
  3129. + var transportToApplication = Channel.CreateUnbounded<byte[]>();
  3130. + var applicationToTransport = Channel.CreateUnbounded<byte[]>();
  3131. +
  3132. + using (var transportSide = ChannelConnection.Create<byte[]>(applicationToTransport, transportToApplication))
  3133. + using (var applicationSide = ChannelConnection.Create<byte[]>(transportToApplication, applicationToTransport))
  3134. + using (var feature = new TestWebSocketConnectionFeature())
  3135. {
  3136. - CloseTimeout = TimeSpan.FromSeconds(1)
  3137. - };
  3138. + var options = new WebSocketOptions
  3139. + {
  3140. + CloseTimeout = TimeSpan.FromSeconds(1)
  3141. + };
  3142. - var connectionContext = new DefaultConnectionContext(string.Empty, null, null);
  3143. - var ws = new WebSocketsTransport(options, transportSide, connectionContext, loggerFactory: new LoggerFactory());
  3144. + var connectionContext = new DefaultConnectionContext(string.Empty, null, null);
  3145. + var ws = new WebSocketsTransport(options, transportSide, connectionContext, loggerFactory);
  3146. - var serverSocket = await feature.AcceptAsync();
  3147. - // Give the server socket to the transport and run it
  3148. - var transport = ws.ProcessSocketAsync(serverSocket);
  3149. + var serverSocket = await feature.AcceptAsync();
  3150. + // Give the server socket to the transport and run it
  3151. + var transport = ws.ProcessSocketAsync(serverSocket);
  3152. - // Run the client socket
  3153. - var client = feature.Client.ExecuteAndCaptureFramesAsync();
  3154. + // Run the client socket
  3155. + var client = feature.Client.ExecuteAndCaptureFramesAsync();
  3156. - // fail the client to server channel
  3157. - applicationToTransport.Out.TryComplete(new Exception());
  3158. + // fail the client to server channel
  3159. + applicationToTransport.Writer.TryComplete(new Exception());
  3160. - await Assert.ThrowsAsync<Exception>(() => transport).OrTimeout();
  3161. + await Assert.ThrowsAsync<Exception>(() => transport).OrTimeout();
  3162. - Assert.Equal(WebSocketState.Aborted, serverSocket.State);
  3163. + Assert.Equal(WebSocketState.Aborted, serverSocket.State);
  3164. + }
  3165. }
  3166. }
  3167. [Fact]
  3168. public async Task ServerGracefullyClosesWhenApplicationEndsThenClientSendsCloseFrame()
  3169. {
  3170. - var transportToApplication = Channel.CreateUnbounded<byte[]>();
  3171. - var applicationToTransport = Channel.CreateUnbounded<byte[]>();
  3172. -
  3173. - using (var transportSide = ChannelConnection.Create<byte[]>(applicationToTransport, transportToApplication))
  3174. - using (var applicationSide = ChannelConnection.Create<byte[]>(transportToApplication, applicationToTransport))
  3175. - using (var feature = new TestWebSocketConnectionFeature())
  3176. + using (StartLog(out var loggerFactory))
  3177. {
  3178. - var options = new WebSocketOptions
  3179. + var transportToApplication = Channel.CreateUnbounded<byte[]>();
  3180. + var applicationToTransport = Channel.CreateUnbounded<byte[]>();
  3181. +
  3182. + using (var transportSide = ChannelConnection.Create<byte[]>(applicationToTransport, transportToApplication))
  3183. + using (var applicationSide = ChannelConnection.Create<byte[]>(transportToApplication, applicationToTransport))
  3184. + using (var feature = new TestWebSocketConnectionFeature())
  3185. {
  3186. - // We want to verify behavior without timeout affecting it
  3187. - CloseTimeout = TimeSpan.FromSeconds(20)
  3188. - };
  3189. + var options = new WebSocketOptions
  3190. + {
  3191. + // We want to verify behavior without timeout affecting it
  3192. + CloseTimeout = TimeSpan.FromSeconds(20)
  3193. + };
  3194. - var connectionContext = new DefaultConnectionContext(string.Empty, null, null);
  3195. - var ws = new WebSocketsTransport(options, transportSide, connectionContext, loggerFactory: new LoggerFactory());
  3196. + var connectionContext = new DefaultConnectionContext(string.Empty, null, null);
  3197. + var ws = new WebSocketsTransport(options, transportSide, connectionContext, loggerFactory);
  3198. - var serverSocket = await feature.AcceptAsync();
  3199. - // Give the server socket to the transport and run it
  3200. - var transport = ws.ProcessSocketAsync(serverSocket);
  3201. + var serverSocket = await feature.AcceptAsync();
  3202. + // Give the server socket to the transport and run it
  3203. + var transport = ws.ProcessSocketAsync(serverSocket);
  3204. - // Run the client socket
  3205. - var client = feature.Client.ExecuteAndCaptureFramesAsync();
  3206. + // Run the client socket
  3207. + var client = feature.Client.ExecuteAndCaptureFramesAsync();
  3208. - // close the client to server channel
  3209. - applicationToTransport.Out.TryComplete();
  3210. + // close the client to server channel
  3211. + applicationToTransport.Writer.TryComplete();
  3212. - _ = await client.OrTimeout();
  3213. + _ = await client.OrTimeout();
  3214. - await feature.Client.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, null, CancellationToken.None).OrTimeout();
  3215. + await feature.Client.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, null, CancellationToken.None).OrTimeout();
  3216. - await transport.OrTimeout();
  3217. + await transport.OrTimeout();
  3218. - Assert.Equal(WebSocketCloseStatus.NormalClosure, serverSocket.CloseStatus);
  3219. + Assert.Equal(WebSocketCloseStatus.NormalClosure, serverSocket.CloseStatus);
  3220. + }
  3221. }
  3222. }
  3223. [Fact]
  3224. public async Task ServerGracefullyClosesWhenClientSendsCloseFrameThenApplicationEnds()
  3225. {
  3226. - var transportToApplication = Channel.CreateUnbounded<byte[]>();
  3227. - var applicationToTransport = Channel.CreateUnbounded<byte[]>();
  3228. -
  3229. - using (var transportSide = ChannelConnection.Create<byte[]>(applicationToTransport, transportToApplication))
  3230. - using (var applicationSide = ChannelConnection.Create<byte[]>(transportToApplication, applicationToTransport))
  3231. - using (var feature = new TestWebSocketConnectionFeature())
  3232. + using (StartLog(out var loggerFactory))
  3233. {
  3234. - var options = new WebSocketOptions
  3235. + var transportToApplication = Channel.CreateUnbounded<byte[]>();
  3236. + var applicationToTransport = Channel.CreateUnbounded<byte[]>();
  3237. +
  3238. + using (var transportSide = ChannelConnection.Create<byte[]>(applicationToTransport, transportToApplication))
  3239. + using (var applicationSide = ChannelConnection.Create<byte[]>(transportToApplication, applicationToTransport))
  3240. + using (var feature = new TestWebSocketConnectionFeature())
  3241. {
  3242. - // We want to verify behavior without timeout affecting it
  3243. - CloseTimeout = TimeSpan.FromSeconds(20)
  3244. - };
  3245. - var connectionContext = new DefaultConnectionContext(string.Empty, null, null);
  3246. - var ws = new WebSocketsTransport(options, transportSide, connectionContext, loggerFactory: new LoggerFactory());
  3247. + var options = new WebSocketOptions
  3248. + {
  3249. + // We want to verify behavior without timeout affecting it
  3250. + CloseTimeout = TimeSpan.FromSeconds(20)
  3251. + };
  3252. + var connectionContext = new DefaultConnectionContext(string.Empty, null, null);
  3253. + var ws = new WebSocketsTransport(options, transportSide, connectionContext, loggerFactory);
  3254. - var serverSocket = await feature.AcceptAsync();
  3255. - // Give the server socket to the transport and run it
  3256. - var transport = ws.ProcessSocketAsync(serverSocket);
  3257. + var serverSocket = await feature.AcceptAsync();
  3258. + // Give the server socket to the transport and run it
  3259. + var transport = ws.ProcessSocketAsync(serverSocket);
  3260. - // Run the client socket
  3261. - var client = feature.Client.ExecuteAndCaptureFramesAsync();
  3262. + // Run the client socket
  3263. + var client = feature.Client.ExecuteAndCaptureFramesAsync();
  3264. - await feature.Client.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, null, CancellationToken.None).OrTimeout();
  3265. + await feature.Client.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, null, CancellationToken.None).OrTimeout();
  3266. - // close the client to server channel
  3267. - applicationToTransport.Out.TryComplete();
  3268. + // close the client to server channel
  3269. + applicationToTransport.Writer.TryComplete();
  3270. - _ = await client.OrTimeout();
  3271. + _ = await client.OrTimeout();
  3272. - await transport.OrTimeout();
  3273. + await transport.OrTimeout();
  3274. - Assert.Equal(WebSocketCloseStatus.NormalClosure, serverSocket.CloseStatus);
  3275. + Assert.Equal(WebSocketCloseStatus.NormalClosure, serverSocket.CloseStatus);
  3276. + }
  3277. }
  3278. }
  3279. }