| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094309530963097309830993100310131023103310431053106310731083109311031113112311331143115311631173118311931203121312231233124312531263127312831293130313131323133313431353136313731383139314031413142314331443145314631473148314931503151315231533154315531563157315831593160316131623163316431653166316731683169317031713172317331743175317631773178317931803181318231833184318531863187318831893190319131923193319431953196319731983199320032013202320332043205320632073208320932103211321232133214321532163217321832193220322132223223322432253226322732283229323032313232323332343235323632373238323932403241324232433244324532463247324832493250325132523253325432553256325732583259326032613262326332643265326632673268326932703271327232733274327532763277327832793280328132823283328432853286328732883289329032913292329332943295329632973298329933003301330233033304330533063307330833093310331133123313331433153316331733183319332033213322332333243325332633273328332933303331333233333334333533363337333833393340334133423343334433453346334733483349335033513352335333543355335633573358335933603361336233633364336533663367336833693370337133723373337433753376337733783379338033813382338333843385338633873388338933903391339233933394339533963397339833993400340134023403340434053406340734083409341034113412341334143415341634173418341934203421342234233424342534263427342834293430343134323433343434353436343734383439344034413442344334443445344634473448344934503451345234533454345534563457345834593460346134623463346434653466346734683469347034713472347334743475347634773478347934803481348234833484348534863487348834893490349134923493349434953496349734983499350035013502350335043505350635073508350935103511351235133514351535163517351835193520352135223523352435253526352735283529353035313532353335343535353635373538353935403541354235433544354535463547354835493550355135523553355435553556355735583559356035613562356335643565356635673568356935703571357235733574357535763577357835793580358135823583358435853586358735883589359035913592359335943595359635973598359936003601360236033604360536063607360836093610361136123613361436153616361736183619362036213622362336243625362636273628362936303631363236333634363536363637363836393640364136423643364436453646364736483649365036513652365336543655365636573658365936603661366236633664366536663667366836693670367136723673367436753676367736783679368036813682368336843685368636873688368936903691369236933694369536963697369836993700370137023703 |
- commit 792745ad98f9c6314406ab9ce87e255ee0ba3e4a
- Author: BrennanConroy <[email protected]>
- Date: Mon Nov 13 15:05:35 2017 -0800
- React to CoreFxLab packages (#998)
- diff --git a/Directory.Build.props b/Directory.Build.props
- index b51ed601331..a391978b7de 100644
- --- a/Directory.Build.props
- +++ b/Directory.Build.props
- @@ -11,6 +11,11 @@
- <SignAssembly>true</SignAssembly>
- <PublicSign Condition="'$(OS)' != 'Windows_NT'">true</PublicSign>
- <TreatWarningsAsErrors>true</TreatWarningsAsErrors>
- + <LangVersion>latest</LangVersion>
- </PropertyGroup>
-
- + <ItemGroup>
- + <!-- This is an experimental version of the compiler. See https://github.com/dotnet/csharplang/issues/666 for more details. -->
- + <PackageReference Include="Microsoft.NETCore.Compilers" Version="$(MicrosoftNETCoreCompilersPackageVersion)" PrivateAssets="All" />
- + </ItemGroup>
- </Project>
- diff --git a/benchmarks/Microsoft.AspNetCore.SignalR.Microbenchmarks/MessageParserBenchmark.cs b/benchmarks/Microsoft.AspNetCore.SignalR.Microbenchmarks/MessageParserBenchmark.cs
- index a9b99c63e1b..0661272907c 100644
- --- a/benchmarks/Microsoft.AspNetCore.SignalR.Microbenchmarks/MessageParserBenchmark.cs
- +++ b/benchmarks/Microsoft.AspNetCore.SignalR.Microbenchmarks/MessageParserBenchmark.cs
- @@ -9,8 +9,8 @@ namespace Microsoft.AspNetCore.SignalR.Microbenchmarks
- public class MessageParserBenchmark
- {
- private static readonly Random Random = new Random();
- - private ReadOnlyBuffer<byte> _binaryInput;
- - private ReadOnlyBuffer<byte> _textInput;
- + private ReadOnlyMemory<byte> _binaryInput;
- + private ReadOnlyMemory<byte> _textInput;
-
- [Params(32, 64)]
- public int ChunkSize { get; set; }
- diff --git a/build/dependencies.props b/build/dependencies.props
- index 8608bcaf2e3..ddbe310bd48 100644
- --- a/build/dependencies.props
- +++ b/build/dependencies.props
- @@ -50,21 +50,23 @@
- <MicrosoftExtensionsOptionsPackageVersion>2.1.0-preview1-27475</MicrosoftExtensionsOptionsPackageVersion>
- <MicrosoftExtensionsSecurityHelperSourcesPackageVersion>2.1.0-preview1-27475</MicrosoftExtensionsSecurityHelperSourcesPackageVersion>
- <MicrosoftNETCoreApp20PackageVersion>2.0.0</MicrosoftNETCoreApp20PackageVersion>
- + <MicrosoftNETCoreCompilersPackageVersion>2.6.0-beta2-62211-02</MicrosoftNETCoreCompilersPackageVersion>
- <MicrosoftNETTestSdkPackageVersion>15.3.0</MicrosoftNETTestSdkPackageVersion>
- <MoqPackageVersion>4.7.49</MoqPackageVersion>
- <MsgPackCliPackageVersion>0.9.0-beta2</MsgPackCliPackageVersion>
- <NewtonsoftJsonPackageVersion>10.0.1</NewtonsoftJsonPackageVersion>
- <StackExchangeRedisStrongNamePackageVersion>1.2.4</StackExchangeRedisStrongNamePackageVersion>
- - <SystemBinaryPackageVersion>0.1.0-e170811-6</SystemBinaryPackageVersion>
- - <SystemBuffersPrimitivesPackageVersion>0.1.0-e170811-6</SystemBuffersPrimitivesPackageVersion>
- - <SystemIOPipelinesExtensionsPackageVersion>0.1.0-e170811-6</SystemIOPipelinesExtensionsPackageVersion>
- - <SystemIOPipelinesPackageVersion>0.1.0-e170811-6</SystemIOPipelinesPackageVersion>
- - <SystemMemoryPackageVersion>4.4.0-preview3-25519-03</SystemMemoryPackageVersion>
- - <SystemNumericsVectorsPackageVersion>4.4.0</SystemNumericsVectorsPackageVersion>
- + <SystemBinaryPackageVersion>0.1.0-alpha-002</SystemBinaryPackageVersion>
- + <SystemBuffersPrimitivesPackageVersion>0.1.0-alpha-002</SystemBuffersPrimitivesPackageVersion>
- + <SystemIOPipelinesExtensionsPackageVersion>0.1.0-alpha-002</SystemIOPipelinesExtensionsPackageVersion>
- + <SystemIOPipelinesPackageVersion>0.1.0-alpha-002</SystemIOPipelinesPackageVersion>
- + <SystemMemoryPackageVersion>4.5.0-preview1-25902-08</SystemMemoryPackageVersion>
- + <SystemNumericsVectorsPackageVersion>4.5.0-preview1-25902-08</SystemNumericsVectorsPackageVersion>
- <SystemReactiveLinqPackageVersion>3.1.1</SystemReactiveLinqPackageVersion>
- <SystemReflectionEmitPackageVersion>4.3.0</SystemReflectionEmitPackageVersion>
- - <SystemRuntimeCompilerServicesUnsafePackageVersion>4.4.0</SystemRuntimeCompilerServicesUnsafePackageVersion>
- - <SystemThreadingTasksChannelsPackageVersion>0.1.0-e170811-6</SystemThreadingTasksChannelsPackageVersion>
- + <SystemRuntimeCompilerServicesUnsafePackageVersion>4.5.0-preview1-25902-08</SystemRuntimeCompilerServicesUnsafePackageVersion>
- + <SystemThreadingChannelsPackageVersion>4.5.0-preview1-25902-08</SystemThreadingChannelsPackageVersion>
- + <SystemThreadingTasksExtensionsPackageVersion>4.4.0</SystemThreadingTasksExtensionsPackageVersion>
- <XunitPackageVersion>2.3.0</XunitPackageVersion>
- <XunitRunnerVisualStudioPackageVersion>2.3.0</XunitRunnerVisualStudioPackageVersion>
- </PropertyGroup>
- diff --git a/client-ts/Microsoft.AspNetCore.SignalR.Test.Server/EchoEndPoint.cs b/client-ts/Microsoft.AspNetCore.SignalR.Test.Server/EchoEndPoint.cs
- index e08b972bd11..433532b432e 100644
- --- a/client-ts/Microsoft.AspNetCore.SignalR.Test.Server/EchoEndPoint.cs
- +++ b/client-ts/Microsoft.AspNetCore.SignalR.Test.Server/EchoEndPoint.cs
- @@ -1,8 +1,9 @@
- -// Copyright (c) .NET Foundation. All rights reserved.
- +// Copyright (c) .NET Foundation. All rights reserved.
- // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
-
- using System;
- using System.Threading.Tasks;
- +using Microsoft.AspNetCore.SignalR.Internal;
- using Microsoft.AspNetCore.Sockets;
-
- namespace Microsoft.AspNetCore.SignalR.Test.Server
- @@ -11,7 +12,7 @@ namespace Microsoft.AspNetCore.SignalR.Test.Server
- {
- public async override Task OnConnectedAsync(ConnectionContext connection)
- {
- - await connection.Transport.Out.WriteAsync(await connection.Transport.In.ReadAsync());
- + await connection.Transport.Writer.WriteAsync(await connection.Transport.Reader.ReadAsync());
- }
- }
- }
- diff --git a/samples/SocialWeather/PersistentConnectionLifeTimeManager.cs b/samples/SocialWeather/PersistentConnectionLifeTimeManager.cs
- index 87a6fbb9c3f..f0af31544a5 100644
- --- a/samples/SocialWeather/PersistentConnectionLifeTimeManager.cs
- +++ b/samples/SocialWeather/PersistentConnectionLifeTimeManager.cs
- @@ -40,7 +40,7 @@ namespace SocialWeather
- var ms = new MemoryStream();
- await formatter.WriteAsync(data, ms);
-
- - connection.Transport.Out.TryWrite(ms.ToArray());
- + connection.Transport.Writer.TryWrite(ms.ToArray());
- }
- }
-
- diff --git a/samples/SocialWeather/SocialWeatherEndPoint.cs b/samples/SocialWeather/SocialWeatherEndPoint.cs
- index e412cfafebf..17889ec1aa6 100644
- --- a/samples/SocialWeather/SocialWeatherEndPoint.cs
- +++ b/samples/SocialWeather/SocialWeatherEndPoint.cs
- @@ -1,4 +1,4 @@
- -// Copyright (c) .NET Foundation. All rights reserved.
- +// Copyright (c) .NET Foundation. All rights reserved.
- // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
-
- using System.IO;
- @@ -34,9 +34,9 @@ namespace SocialWeather
- var formatter = _formatterResolver.GetFormatter<WeatherReport>(
- (string)connection.Metadata["formatType"]);
-
- - while (await connection.Transport.In.WaitToReadAsync())
- + while (await connection.Transport.Reader.WaitToReadAsync())
- {
- - if (connection.Transport.In.TryRead(out var buffer))
- + if (connection.Transport.Reader.TryRead(out var buffer))
- {
- var stream = new MemoryStream();
- await stream.WriteAsync(buffer, 0, buffer.Length);
- diff --git a/samples/SocketsSample/EndPoints/MessagesEndPoint.cs b/samples/SocketsSample/EndPoints/MessagesEndPoint.cs
- index 5559e565184..a17cb4624a9 100644
- --- a/samples/SocketsSample/EndPoints/MessagesEndPoint.cs
- +++ b/samples/SocketsSample/EndPoints/MessagesEndPoint.cs
- @@ -1,4 +1,4 @@
- -// Copyright (c) .NET Foundation. All rights reserved.
- +// Copyright (c) .NET Foundation. All rights reserved.
- // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
-
- using System.Collections.Generic;
- @@ -20,9 +20,9 @@ namespace SocketsSample.EndPoints
-
- try
- {
- - while (await connection.Transport.In.WaitToReadAsync())
- + while (await connection.Transport.Reader.WaitToReadAsync())
- {
- - if (connection.Transport.In.TryRead(out var buffer))
- + if (connection.Transport.Reader.TryRead(out var buffer))
- {
- // We can avoid the copy here but we'll deal with that later
- var text = Encoding.UTF8.GetString(buffer);
- @@ -50,7 +50,7 @@ namespace SocketsSample.EndPoints
-
- foreach (var c in Connections)
- {
- - tasks.Add(c.Transport.Out.WriteAsync(payload));
- + tasks.Add(c.Transport.Writer.WriteAsync(payload));
- }
-
- return Task.WhenAll(tasks);
- diff --git a/samples/SocketsSample/Hubs/Streaming.cs b/samples/SocketsSample/Hubs/Streaming.cs
- index 63fa8b71e19..cee2c42cdf0 100644
- --- a/samples/SocketsSample/Hubs/Streaming.cs
- +++ b/samples/SocketsSample/Hubs/Streaming.cs
- @@ -1,7 +1,7 @@
- using System;
- using System.Reactive.Linq;
- using System.Threading.Tasks;
- -using System.Threading.Tasks.Channels;
- +using System.Threading.Channels;
- using Microsoft.AspNetCore.SignalR;
-
- namespace SocketsSample.Hubs
- @@ -15,7 +15,7 @@ namespace SocketsSample.Hubs
- .Take(count);
- }
-
- - public ReadableChannel<int> ChannelCounter(int count, int delay)
- + public ChannelReader<int> ChannelCounter(int count, int delay)
- {
- var channel = Channel.CreateUnbounded<int>();
-
- @@ -23,14 +23,14 @@ namespace SocketsSample.Hubs
- {
- for (var i = 0; i < count; i++)
- {
- - await channel.Out.WriteAsync(i);
- + await channel.Writer.WriteAsync(i);
- await Task.Delay(delay);
- }
-
- - channel.Out.TryComplete();
- + channel.Writer.TryComplete();
- });
-
- - return channel.In;
- + return channel.Reader;
- }
- }
- }
- diff --git a/src/Microsoft.AspNetCore.SignalR.Client.Core/HubConnection.cs b/src/Microsoft.AspNetCore.SignalR.Client.Core/HubConnection.cs
- index 5294bcd93d7..9caa61a92b9 100644
- --- a/src/Microsoft.AspNetCore.SignalR.Client.Core/HubConnection.cs
- +++ b/src/Microsoft.AspNetCore.SignalR.Client.Core/HubConnection.cs
- @@ -8,7 +8,7 @@ using System.Diagnostics;
- using System.IO;
- using System.Threading;
- using System.Threading.Tasks;
- -using System.Threading.Tasks.Channels;
- +using System.Threading.Channels;
- using Microsoft.AspNetCore.SignalR.Client.Internal;
- using Microsoft.AspNetCore.SignalR.Internal;
- using Microsoft.AspNetCore.SignalR.Internal.Encoders;
- @@ -145,12 +145,12 @@ namespace Microsoft.AspNetCore.SignalR.Client
- return new Subscription(invocationHandler, invocationList);
- }
-
- - public async Task<ReadableChannel<object>> StreamAsync(string methodName, Type returnType, object[] args, CancellationToken cancellationToken = default)
- + public async Task<ChannelReader<object>> StreamAsync(string methodName, Type returnType, object[] args, CancellationToken cancellationToken = default)
- {
- return await StreamAsyncCore(methodName, returnType, args, cancellationToken).ForceAsync();
- }
-
- - private async Task<ReadableChannel<object>> StreamAsyncCore(string methodName, Type returnType, object[] args, CancellationToken cancellationToken)
- + private async Task<ChannelReader<object>> StreamAsyncCore(string methodName, Type returnType, object[] args, CancellationToken cancellationToken)
- {
- if (!_startCalled)
- {
- diff --git a/src/Microsoft.AspNetCore.SignalR.Client.Core/HubConnectionExtensions.StreamAsync.cs b/src/Microsoft.AspNetCore.SignalR.Client.Core/HubConnectionExtensions.StreamAsync.cs
- index b0821d97e9d..81eec099747 100644
- --- a/src/Microsoft.AspNetCore.SignalR.Client.Core/HubConnectionExtensions.StreamAsync.cs
- +++ b/src/Microsoft.AspNetCore.SignalR.Client.Core/HubConnectionExtensions.StreamAsync.cs
- @@ -1,71 +1,71 @@
- -// Copyright (c) .NET Foundation. All rights reserved.
- +// Copyright (c) .NET Foundation. All rights reserved.
- // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
-
- using System;
- using System.Threading;
- using System.Threading.Tasks;
- -using System.Threading.Tasks.Channels;
- +using System.Threading.Channels;
-
- namespace Microsoft.AspNetCore.SignalR.Client
- {
- public static partial class HubConnectionExtensions
- {
- - public static Task<ReadableChannel<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, CancellationToken cancellationToken = default)
- + public static Task<ChannelReader<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, CancellationToken cancellationToken = default)
- {
- return hubConnection.StreamAsync<TResult>(methodName, Array.Empty<object>(), cancellationToken);
- }
-
- - public static Task<ReadableChannel<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, CancellationToken cancellationToken = default)
- + public static Task<ChannelReader<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, CancellationToken cancellationToken = default)
- {
- return hubConnection.StreamAsync<TResult>(methodName, new object[] { arg1 }, cancellationToken);
- }
-
- - public static Task<ReadableChannel<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, CancellationToken cancellationToken = default)
- + public static Task<ChannelReader<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, CancellationToken cancellationToken = default)
- {
- return hubConnection.StreamAsync<TResult>(methodName, new object[] { arg1, arg2 }, cancellationToken);
- }
-
- - public static Task<ReadableChannel<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, CancellationToken cancellationToken = default)
- + public static Task<ChannelReader<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, CancellationToken cancellationToken = default)
- {
- return hubConnection.StreamAsync<TResult>(methodName, new object[] { arg1, arg2, arg3 }, cancellationToken);
- }
-
- - public static Task<ReadableChannel<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, CancellationToken cancellationToken = default)
- + public static Task<ChannelReader<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, CancellationToken cancellationToken = default)
- {
- return hubConnection.StreamAsync<TResult>(methodName, new object[] { arg1, arg2, arg3, arg4 }, cancellationToken);
- }
-
- - public static Task<ReadableChannel<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, CancellationToken cancellationToken = default)
- + public static Task<ChannelReader<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, CancellationToken cancellationToken = default)
- {
- return hubConnection.StreamAsync<TResult>(methodName, new object[] { arg1, arg2, arg3, arg4, arg5 }, cancellationToken);
- }
-
- - public static Task<ReadableChannel<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, CancellationToken cancellationToken = default)
- + public static Task<ChannelReader<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, CancellationToken cancellationToken = default)
- {
- return hubConnection.StreamAsync<TResult>(methodName, new object[] { arg1, arg2, arg3, arg4, arg5, arg6 }, cancellationToken);
- }
-
- - public static Task<ReadableChannel<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, object arg7, CancellationToken cancellationToken = default)
- + public static Task<ChannelReader<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, object arg7, CancellationToken cancellationToken = default)
- {
- return hubConnection.StreamAsync<TResult>(methodName, new object[] { arg1, arg2, arg3, arg4, arg5, arg6, arg7 }, cancellationToken);
- }
-
- - public static Task<ReadableChannel<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, object arg7, object arg8, CancellationToken cancellationToken = default)
- + public static Task<ChannelReader<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, object arg7, object arg8, CancellationToken cancellationToken = default)
- {
- return hubConnection.StreamAsync<TResult>(methodName, new object[] { arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8 }, cancellationToken);
- }
-
- - public static Task<ReadableChannel<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, object arg7, object arg8, object arg9, CancellationToken cancellationToken = default)
- + public static Task<ChannelReader<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, object arg7, object arg8, object arg9, CancellationToken cancellationToken = default)
- {
- return hubConnection.StreamAsync<TResult>(methodName, new object[] { arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9 }, cancellationToken);
- }
-
- - public static Task<ReadableChannel<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, object arg7, object arg8, object arg9, object arg10, CancellationToken cancellationToken = default)
- + public static Task<ChannelReader<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object arg1, object arg2, object arg3, object arg4, object arg5, object arg6, object arg7, object arg8, object arg9, object arg10, CancellationToken cancellationToken = default)
- {
- return hubConnection.StreamAsync<TResult>(methodName, new object[] { arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10 }, cancellationToken);
- }
-
- - public static async Task<ReadableChannel<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object[] args, CancellationToken cancellationToken = default)
- + public static async Task<ChannelReader<TResult>> StreamAsync<TResult>(this HubConnection hubConnection, string methodName, object[] args, CancellationToken cancellationToken = default)
- {
- if (hubConnection == null)
- {
- @@ -85,9 +85,9 @@ namespace Microsoft.AspNetCore.SignalR.Client
- {
- while (inputChannel.TryRead(out var item))
- {
- - while (!outputChannel.Out.TryWrite((TResult)item))
- + while (!outputChannel.Writer.TryWrite((TResult)item))
- {
- - if (!await outputChannel.Out.WaitToWriteAsync())
- + if (!await outputChannel.Writer.WaitToWriteAsync())
- {
- // Failed to write to the output channel because it was closed. Nothing really we can do but abort here.
- return;
- @@ -101,18 +101,18 @@ namespace Microsoft.AspNetCore.SignalR.Client
- }
- catch (Exception ex)
- {
- - outputChannel.Out.TryComplete(ex);
- + outputChannel.Writer.TryComplete(ex);
- }
- finally
- {
- // This will safely no-op if the catch block above ran.
- - outputChannel.Out.TryComplete();
- + outputChannel.Writer.TryComplete();
- }
- }
-
- _ = RunChannel();
-
- - return outputChannel.In;
- + return outputChannel.Reader;
- }
- }
- }
- diff --git a/src/Microsoft.AspNetCore.SignalR.Client.Core/HubConnectionExtensions.cs b/src/Microsoft.AspNetCore.SignalR.Client.Core/HubConnectionExtensions.cs
- index edf00f1f836..5588a97b4a9 100644
- --- a/src/Microsoft.AspNetCore.SignalR.Client.Core/HubConnectionExtensions.cs
- +++ b/src/Microsoft.AspNetCore.SignalR.Client.Core/HubConnectionExtensions.cs
- @@ -4,7 +4,7 @@
- using System;
- using System.Threading;
- using System.Threading.Tasks;
- -using System.Threading.Tasks.Channels;
- +using System.Threading.Channels;
-
- namespace Microsoft.AspNetCore.SignalR.Client
- {
- diff --git a/src/Microsoft.AspNetCore.SignalR.Client.Core/InvocationRequest.cs b/src/Microsoft.AspNetCore.SignalR.Client.Core/InvocationRequest.cs
- index 40afc4cf02d..1018c22b980 100644
- --- a/src/Microsoft.AspNetCore.SignalR.Client.Core/InvocationRequest.cs
- +++ b/src/Microsoft.AspNetCore.SignalR.Client.Core/InvocationRequest.cs
- @@ -4,7 +4,7 @@
- using System;
- using System.Threading;
- using System.Threading.Tasks;
- -using System.Threading.Tasks.Channels;
- +using System.Threading.Channels;
- using Microsoft.AspNetCore.SignalR.Client.Internal;
- using Microsoft.AspNetCore.SignalR.Internal.Protocol;
- using Microsoft.Extensions.Logging;
- @@ -43,7 +43,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
- }
-
- public static InvocationRequest Stream(CancellationToken cancellationToken, Type resultType, string invocationId,
- - ILoggerFactory loggerFactory, HubConnection hubConnection, out ReadableChannel<object> result)
- + ILoggerFactory loggerFactory, HubConnection hubConnection, out ChannelReader<object> result)
- {
- var req = new Streaming(cancellationToken, resultType, invocationId, loggerFactory, hubConnection);
- result = req.Result;
- @@ -75,7 +75,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
- {
- }
-
- - public ReadableChannel<object> Result => _channel.In;
- + public ChannelReader<object> Result => _channel.Reader;
-
- public override void Complete(CompletionMessage completionMessage)
- {
- @@ -83,7 +83,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
- if (completionMessage.Result != null)
- {
- Logger.ReceivedUnexpectedComplete(InvocationId);
- - _channel.Out.TryComplete(new InvalidOperationException("Server provided a result in a completion response to a streamed invocation."));
- + _channel.Writer.TryComplete(new InvalidOperationException("Server provided a result in a completion response to a streamed invocation."));
- }
-
- if (!string.IsNullOrEmpty(completionMessage.Error))
- @@ -92,22 +92,22 @@ namespace Microsoft.AspNetCore.SignalR.Client
- return;
- }
-
- - _channel.Out.TryComplete();
- + _channel.Writer.TryComplete();
- }
-
- public override void Fail(Exception exception)
- {
- Logger.InvocationFailed(InvocationId);
- - _channel.Out.TryComplete(exception);
- + _channel.Writer.TryComplete(exception);
- }
-
- public override async ValueTask<bool> StreamItem(object item)
- {
- try
- {
- - while (!_channel.Out.TryWrite(item))
- + while (!_channel.Writer.TryWrite(item))
- {
- - if (!await _channel.Out.WaitToWriteAsync())
- + if (!await _channel.Writer.WaitToWriteAsync())
- {
- return false;
- }
- @@ -122,7 +122,7 @@ namespace Microsoft.AspNetCore.SignalR.Client
-
- protected override void Cancel()
- {
- - _channel.Out.TryComplete(new OperationCanceledException("Invocation terminated"));
- + _channel.Writer.TryComplete(new OperationCanceledException("Invocation terminated"));
- }
- }
-
- diff --git a/src/Microsoft.AspNetCore.SignalR.Common/Internal/Encoders/Base64Encoder.cs b/src/Microsoft.AspNetCore.SignalR.Common/Internal/Encoders/Base64Encoder.cs
- index f5a8c044b0c..881b05112d3 100644
- --- a/src/Microsoft.AspNetCore.SignalR.Common/Internal/Encoders/Base64Encoder.cs
- +++ b/src/Microsoft.AspNetCore.SignalR.Common/Internal/Encoders/Base64Encoder.cs
- @@ -11,7 +11,7 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Encoders
- {
- public byte[] Decode(byte[] payload)
- {
- - var buffer = new ReadOnlyBuffer<byte>(payload);
- + var buffer = new ReadOnlyMemory<byte>(payload);
- LengthPrefixedTextMessageParser.TryParseMessage(ref buffer, out var message);
-
- return Convert.FromBase64String(Encoding.UTF8.GetString(message.ToArray()));
- diff --git a/src/Microsoft.AspNetCore.SignalR.Common/Internal/Encoders/LengthPrefixedTextMessageParser.cs b/src/Microsoft.AspNetCore.SignalR.Common/Internal/Encoders/LengthPrefixedTextMessageParser.cs
- index 25419fbd988..686add4a158 100644
- --- a/src/Microsoft.AspNetCore.SignalR.Common/Internal/Encoders/LengthPrefixedTextMessageParser.cs
- +++ b/src/Microsoft.AspNetCore.SignalR.Common/Internal/Encoders/LengthPrefixedTextMessageParser.cs
- @@ -14,20 +14,18 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Encoders
- /// Attempts to parse a message from the buffer. Returns 'false' if there is not enough data to complete a message. Throws an
- /// exception if there is a format error in the provided data.
- /// </summary>
- - public static bool TryParseMessage(ref ReadOnlyBuffer<byte> buffer, out ReadOnlyBuffer<byte> payload)
- + public static bool TryParseMessage(ref ReadOnlyMemory<byte> buffer, out ReadOnlyMemory<byte> payload)
- {
- - payload = default;
- - var span = buffer.Span;
- + payload = default(ReadOnlyMemory<byte>);
-
- - if (!TryReadLength(span, out var index, out var length))
- + if (!TryReadLength(buffer.Span, out var index, out var length))
- {
- return false;
- }
-
- var remaining = buffer.Slice(index);
- - span = remaining.Span;
-
- - if (!TryReadDelimiter(span, LengthPrefixedTextMessageWriter.FieldDelimiter, "length"))
- + if (!TryReadDelimiter(remaining.Span, LengthPrefixedTextMessageWriter.FieldDelimiter, "length"))
- {
- return false;
- }
- diff --git a/src/Microsoft.AspNetCore.SignalR.Common/Internal/Formatters/BinaryMessageFormatter.cs b/src/Microsoft.AspNetCore.SignalR.Common/Internal/Formatters/BinaryMessageFormatter.cs
- index 8eb5936e923..44115592f69 100644
- --- a/src/Microsoft.AspNetCore.SignalR.Common/Internal/Formatters/BinaryMessageFormatter.cs
- +++ b/src/Microsoft.AspNetCore.SignalR.Common/Internal/Formatters/BinaryMessageFormatter.cs
- @@ -33,7 +33,7 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Formatters
- var buffer = ArrayPool<byte>.Shared.Rent(lenNumBytes + payload.Length);
- var bufferSpan = buffer.AsSpan();
-
- - new Span<byte>(lenBuffer, lenNumBytes).CopyTo(bufferSpan);
- + new ReadOnlySpan<byte>(lenBuffer, lenNumBytes).CopyTo(bufferSpan);
- bufferSpan = bufferSpan.Slice(lenNumBytes);
- payload.CopyTo(bufferSpan);
- output.Write(buffer, 0, lenNumBytes + payload.Length);
- diff --git a/src/Microsoft.AspNetCore.SignalR.Common/Internal/Formatters/BinaryMessageParser.cs b/src/Microsoft.AspNetCore.SignalR.Common/Internal/Formatters/BinaryMessageParser.cs
- index 1835fa34aed..4889ea33f44 100644
- --- a/src/Microsoft.AspNetCore.SignalR.Common/Internal/Formatters/BinaryMessageParser.cs
- +++ b/src/Microsoft.AspNetCore.SignalR.Common/Internal/Formatters/BinaryMessageParser.cs
- @@ -10,7 +10,7 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Formatters
- private static int[] _numBitsToShift = new[] { 0, 7, 14, 21, 28 };
- private const int MaxLengthPrefixSize = 5;
-
- - public static bool TryParseMessage(ref ReadOnlyBuffer<byte> buffer, out ReadOnlyBuffer<byte> payload)
- + public static bool TryParseMessage(ref ReadOnlyMemory<byte> buffer, out ReadOnlyMemory<byte> payload)
- {
- payload = default;
-
- diff --git a/src/Microsoft.AspNetCore.SignalR.Common/Internal/Formatters/TextMessageParser.cs b/src/Microsoft.AspNetCore.SignalR.Common/Internal/Formatters/TextMessageParser.cs
- index 1a1c55bef48..fac697290ee 100644
- --- a/src/Microsoft.AspNetCore.SignalR.Common/Internal/Formatters/TextMessageParser.cs
- +++ b/src/Microsoft.AspNetCore.SignalR.Common/Internal/Formatters/TextMessageParser.cs
- @@ -7,7 +7,7 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Formatters
- {
- public static class TextMessageParser
- {
- - public static bool TryParseMessage(ref ReadOnlyBuffer<byte> buffer, out ReadOnlyBuffer<byte> payload)
- + public static bool TryParseMessage(ref ReadOnlyMemory<byte> buffer, out ReadOnlyMemory<byte> payload)
- {
- payload = default;
-
- diff --git a/src/Microsoft.AspNetCore.SignalR.Common/Internal/Protocol/IHubProtocol.cs b/src/Microsoft.AspNetCore.SignalR.Common/Internal/Protocol/IHubProtocol.cs
- index 4ccb27e0c20..c02cea455b9 100644
- --- a/src/Microsoft.AspNetCore.SignalR.Common/Internal/Protocol/IHubProtocol.cs
- +++ b/src/Microsoft.AspNetCore.SignalR.Common/Internal/Protocol/IHubProtocol.cs
- @@ -13,7 +13,7 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Protocol
-
- ProtocolType Type { get; }
-
- - bool TryParseMessages(ReadOnlyBuffer<byte> input, IInvocationBinder binder, out IList<HubMessage> messages);
- + bool TryParseMessages(ReadOnlyMemory<byte> input, IInvocationBinder binder, out IList<HubMessage> messages);
-
- void WriteMessage(HubMessage message, Stream output);
- }
- diff --git a/src/Microsoft.AspNetCore.SignalR.Common/Internal/Protocol/JsonHubProtocol.cs b/src/Microsoft.AspNetCore.SignalR.Common/Internal/Protocol/JsonHubProtocol.cs
- index 9624302aac2..4efa8a3e6b6 100644
- --- a/src/Microsoft.AspNetCore.SignalR.Common/Internal/Protocol/JsonHubProtocol.cs
- +++ b/src/Microsoft.AspNetCore.SignalR.Common/Internal/Protocol/JsonHubProtocol.cs
- @@ -61,7 +61,7 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Protocol
-
- public ProtocolType Type => ProtocolType.Text;
-
- - public bool TryParseMessages(ReadOnlyBuffer<byte> input, IInvocationBinder binder, out IList<HubMessage> messages)
- + public bool TryParseMessages(ReadOnlyMemory<byte> input, IInvocationBinder binder, out IList<HubMessage> messages)
- {
- messages = new List<HubMessage>();
-
- diff --git a/src/Microsoft.AspNetCore.SignalR.Common/Internal/Protocol/MessagePackHubProtocol.cs b/src/Microsoft.AspNetCore.SignalR.Common/Internal/Protocol/MessagePackHubProtocol.cs
- index 06401379300..30c85781c71 100644
- --- a/src/Microsoft.AspNetCore.SignalR.Common/Internal/Protocol/MessagePackHubProtocol.cs
- +++ b/src/Microsoft.AspNetCore.SignalR.Common/Internal/Protocol/MessagePackHubProtocol.cs
- @@ -38,7 +38,7 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Protocol
- _serializationContext = serializationContext;
- }
-
- - public bool TryParseMessages(ReadOnlyBuffer<byte> input, IInvocationBinder binder, out IList<HubMessage> messages)
- + public bool TryParseMessages(ReadOnlyMemory<byte> input, IInvocationBinder binder, out IList<HubMessage> messages)
- {
- messages = new List<HubMessage>();
-
- diff --git a/src/Microsoft.AspNetCore.SignalR.Common/Internal/Protocol/NegotiationProtocol.cs b/src/Microsoft.AspNetCore.SignalR.Common/Internal/Protocol/NegotiationProtocol.cs
- index 7f6abe5b2c0..c886a10ec2a 100644
- --- a/src/Microsoft.AspNetCore.SignalR.Common/Internal/Protocol/NegotiationProtocol.cs
- +++ b/src/Microsoft.AspNetCore.SignalR.Common/Internal/Protocol/NegotiationProtocol.cs
- @@ -29,7 +29,7 @@ namespace Microsoft.AspNetCore.SignalR.Internal.Protocol
- }
- }
-
- - public static bool TryParseMessage(ReadOnlyBuffer<byte> input, out NegotiationMessage negotiationMessage)
- + public static bool TryParseMessage(ReadOnlyMemory<byte> input, out NegotiationMessage negotiationMessage)
- {
- if (!TextMessageParser.TryParseMessage(ref input, out var payload))
- {
- diff --git a/src/Microsoft.AspNetCore.SignalR.Common/Microsoft.AspNetCore.SignalR.Common.csproj b/src/Microsoft.AspNetCore.SignalR.Common/Microsoft.AspNetCore.SignalR.Common.csproj
- index 74d6d7217a9..0ab94fe4e04 100644
- --- a/src/Microsoft.AspNetCore.SignalR.Common/Microsoft.AspNetCore.SignalR.Common.csproj
- +++ b/src/Microsoft.AspNetCore.SignalR.Common/Microsoft.AspNetCore.SignalR.Common.csproj
- @@ -10,7 +10,6 @@
- <ItemGroup>
- <PackageReference Include="Newtonsoft.Json" Version="$(NewtonsoftJsonPackageVersion)" />
- <PackageReference Include="MsgPack.Cli" Version="$(MsgPackCliPackageVersion)" />
- - <PackageReference Include="System.Binary" Version="$(SystemBinaryPackageVersion)" />
- <PackageReference Include="System.Buffers.Primitives" Version="$(SystemBuffersPrimitivesPackageVersion)" />
- <PackageReference Include="System.Memory" Version="$(SystemMemoryPackageVersion)" />
- <PackageReference Include="System.Numerics.Vectors" Version="$(SystemNumericsVectorsPackageVersion)" />
- diff --git a/src/Microsoft.AspNetCore.SignalR.Core/HubConnectionContext.cs b/src/Microsoft.AspNetCore.SignalR.Core/HubConnectionContext.cs
- index 56f118dd494..61e9b3e0f22 100644
- --- a/src/Microsoft.AspNetCore.SignalR.Core/HubConnectionContext.cs
- +++ b/src/Microsoft.AspNetCore.SignalR.Core/HubConnectionContext.cs
- @@ -1,4 +1,4 @@
- -// Copyright (c) .NET Foundation. All rights reserved.
- +// Copyright (c) .NET Foundation. All rights reserved.
- // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
-
- using System;
- @@ -8,7 +8,7 @@ using System.Runtime.ExceptionServices;
- using System.Security.Claims;
- using System.Threading;
- using System.Threading.Tasks;
- -using System.Threading.Tasks.Channels;
- +using System.Threading.Channels;
- using Microsoft.AspNetCore.Http.Features;
- using Microsoft.AspNetCore.SignalR.Features;
- using Microsoft.AspNetCore.SignalR.Internal;
- @@ -22,12 +22,12 @@ namespace Microsoft.AspNetCore.SignalR
- {
- private static Action<object> _abortedCallback = AbortConnection;
-
- - private readonly WritableChannel<HubMessage> _output;
- + private readonly ChannelWriter<HubMessage> _output;
- private readonly ConnectionContext _connectionContext;
- private readonly CancellationTokenSource _connectionAbortedTokenSource = new CancellationTokenSource();
- private readonly TaskCompletionSource<object> _abortCompletedTcs = new TaskCompletionSource<object>();
-
- - public HubConnectionContext(WritableChannel<HubMessage> output, ConnectionContext connectionContext)
- + public HubConnectionContext(ChannelWriter<HubMessage> output, ConnectionContext connectionContext)
- {
- _output = output;
- _connectionContext = connectionContext;
- @@ -37,7 +37,7 @@ namespace Microsoft.AspNetCore.SignalR
- private IHubFeature HubFeature => Features.Get<IHubFeature>();
-
- // Used by the HubEndPoint only
- - internal ReadableChannel<byte[]> Input => _connectionContext.Transport;
- + internal ChannelReader<byte[]> Input => _connectionContext.Transport;
-
- internal ExceptionDispatchInfo AbortException { get; private set; }
-
- @@ -53,7 +53,7 @@ namespace Microsoft.AspNetCore.SignalR
-
- public virtual HubProtocolReaderWriter ProtocolReaderWriter { get; set; }
-
- - public virtual WritableChannel<HubMessage> Output => _output;
- + public virtual ChannelWriter<HubMessage> Output => _output;
-
- // Currently used only for streaming methods
- internal ConcurrentDictionary<string, CancellationTokenSource> ActiveRequestCancellationSources { get; } = new ConcurrentDictionary<string, CancellationTokenSource>();
- diff --git a/src/Microsoft.AspNetCore.SignalR.Core/HubEndPoint.cs b/src/Microsoft.AspNetCore.SignalR.Core/HubEndPoint.cs
- index bdb32153b0a..f6c572ee37b 100644
- --- a/src/Microsoft.AspNetCore.SignalR.Core/HubEndPoint.cs
- +++ b/src/Microsoft.AspNetCore.SignalR.Core/HubEndPoint.cs
- @@ -9,7 +9,7 @@ using System.Reflection;
- using System.Security.Claims;
- using System.Threading;
- using System.Threading.Tasks;
- -using System.Threading.Tasks.Channels;
- +using System.Threading.Channels;
- using Microsoft.AspNetCore.Authorization;
- using Microsoft.AspNetCore.SignalR.Core;
- using Microsoft.AspNetCore.SignalR.Core.Internal;
- @@ -84,14 +84,14 @@ namespace Microsoft.AspNetCore.SignalR
- {
- try
- {
- - while (await output.In.WaitToReadAsync())
- + while (await output.Reader.WaitToReadAsync())
- {
- - while (output.In.TryRead(out var hubMessage))
- + while (output.Reader.TryRead(out var hubMessage))
- {
- var buffer = protocolReaderWriter.WriteMessage(hubMessage);
- - while (await connection.Transport.Out.WaitToWriteAsync())
- + while (await connection.Transport.Writer.WaitToWriteAsync())
- {
- - if (connection.Transport.Out.TryWrite(buffer))
- + if (connection.Transport.Writer.TryWrite(buffer))
- {
- break;
- }
- @@ -117,7 +117,7 @@ namespace Microsoft.AspNetCore.SignalR
- await _lifetimeManager.OnDisconnectedAsync(connectionContext);
-
- // Nothing should be writing to the HubConnectionContext
- - output.Out.TryComplete();
- + output.Writer.TryComplete();
-
- // This should unwind once we complete the output
- await writingOutputTask;
- @@ -461,7 +461,7 @@ namespace Microsoft.AspNetCore.SignalR
-
- private static bool IsChannel(Type type, out Type payloadType)
- {
- - var channelType = type.AllBaseTypes().FirstOrDefault(t => t.IsGenericType && t.GetGenericTypeDefinition() == typeof(ReadableChannel<>));
- + var channelType = type.AllBaseTypes().FirstOrDefault(t => t.IsGenericType && t.GetGenericTypeDefinition() == typeof(ChannelReader<>));
- if (channelType == null)
- {
- payloadType = null;
- diff --git a/src/Microsoft.AspNetCore.SignalR.Core/Internal/AsyncEnumeratorAdapters.cs b/src/Microsoft.AspNetCore.SignalR.Core/Internal/AsyncEnumeratorAdapters.cs
- index f066e180489..870b06eaf25 100644
- --- a/src/Microsoft.AspNetCore.SignalR.Core/Internal/AsyncEnumeratorAdapters.cs
- +++ b/src/Microsoft.AspNetCore.SignalR.Core/Internal/AsyncEnumeratorAdapters.cs
- @@ -6,7 +6,7 @@ using System.Linq;
- using System.Reflection;
- using System.Threading;
- using System.Threading.Tasks;
- -using System.Threading.Tasks.Channels;
- +using System.Threading.Channels;
-
- namespace Microsoft.AspNetCore.SignalR.Internal
- {
- @@ -21,6 +21,10 @@ namespace Microsoft.AspNetCore.SignalR.Internal
- .GetRuntimeMethods()
- .Single(m => m.Name.Equals(nameof(FromObservable)) && m.IsGenericMethod);
-
- + private static readonly MethodInfo _getAsyncEnumeratorMethod = typeof(AsyncEnumeratorAdapters)
- + .GetRuntimeMethods()
- + .Single(m => m.Name.Equals(nameof(GetAsyncEnumerator)) && m.IsGenericMethod);
- +
- public static IAsyncEnumerator<object> FromObservable(object observable, Type observableInterface, CancellationToken cancellationToken)
- {
- // TODO: Cache expressions by observable.GetType()?
- @@ -34,20 +38,19 @@ namespace Microsoft.AspNetCore.SignalR.Internal
- // TODO: Allow bounding and optimizations?
- var channel = Channel.CreateUnbounded<object>();
-
- - var subscription = observable.Subscribe(new ChannelObserver<T>(channel.Out, cancellationToken));
- + var subscription = observable.Subscribe(new ChannelObserver<T>(channel.Writer, cancellationToken));
-
- // Dispose the subscription when the token is cancelled
- cancellationToken.Register(state => ((IDisposable)state).Dispose(), subscription);
-
- - return channel.In.GetAsyncEnumerator(cancellationToken);
- + return GetAsyncEnumerator(channel.Reader, cancellationToken);
- }
-
- public static IAsyncEnumerator<object> FromChannel(object readableChannelOfT, Type payloadType, CancellationToken cancellationToken)
- {
- - var enumerator = readableChannelOfT
- - .GetType()
- - .GetRuntimeMethod("GetAsyncEnumerator", new[] { typeof(CancellationToken) })
- - .Invoke(readableChannelOfT, new object[] { cancellationToken });
- + var enumerator = _getAsyncEnumeratorMethod
- + .MakeGenericMethod(payloadType)
- + .Invoke(null, new object[] { readableChannelOfT, cancellationToken });
-
- if (payloadType.IsValueType)
- {
- @@ -68,10 +71,10 @@ namespace Microsoft.AspNetCore.SignalR.Internal
-
- private class ChannelObserver<T> : IObserver<T>
- {
- - private WritableChannel<object> _output;
- + private ChannelWriter<object> _output;
- private CancellationToken _cancellationToken;
-
- - public ChannelObserver(WritableChannel<object> output, CancellationToken cancellationToken)
- + public ChannelObserver(ChannelWriter<object> output, CancellationToken cancellationToken)
- {
- _output = output;
- _cancellationToken = cancellationToken;
- @@ -125,5 +128,66 @@ namespace Microsoft.AspNetCore.SignalR.Internal
- public object Current => _input.Current;
- public Task<bool> MoveNextAsync() => _input.MoveNextAsync();
- }
- +
- + public static IAsyncEnumerator<T> GetAsyncEnumerator<T>(ChannelReader<T> channel, CancellationToken cancellationToken = default(CancellationToken))
- + {
- + return new AsyncEnumerator<T>(channel, cancellationToken);
- + }
- +
- + /// <summary>Provides an async enumerator for the data in a channel.</summary>
- + internal class AsyncEnumerator<T> : IAsyncEnumerator<T>
- + {
- + /// <summary>The channel being enumerated.</summary>
- + private readonly ChannelReader<T> _channel;
- + /// <summary>Cancellation token used to cancel the enumeration.</summary>
- + private readonly CancellationToken _cancellationToken;
- + /// <summary>The current element of the enumeration.</summary>
- + private T _current;
- +
- + internal AsyncEnumerator(ChannelReader<T> channel, CancellationToken cancellationToken)
- + {
- + _channel = channel;
- + _cancellationToken = cancellationToken;
- + }
- +
- + public T Current => _current;
- +
- + public Task<bool> MoveNextAsync()
- + {
- + ValueTask<T> result = _channel.ReadAsync(_cancellationToken);
- +
- + if (result.IsCompletedSuccessfully)
- + {
- + _current = result.Result;
- + return Task.FromResult(true);
- + }
- +
- + return result.AsTask().ContinueWith((t, s) =>
- + {
- + var thisRef = (AsyncEnumerator<T>)s;
- + if (t.IsFaulted && t.Exception.InnerException is ChannelClosedException cce && cce.InnerException == null)
- + {
- + return false;
- + }
- + thisRef._current = t.GetAwaiter().GetResult();
- + return true;
- + }, this, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.NotOnCanceled, TaskScheduler.Default);
- + }
- + }
- + }
- +
- + /// <summary>Represents an enumerator accessed asynchronously.</summary>
- + /// <typeparam name="T">Specifies the type of the data enumerated.</typeparam>
- + internal interface IAsyncEnumerator<out T>
- + {
- + /// <summary>Asynchronously move the enumerator to the next element.</summary>
- + /// <returns>
- + /// A task that returns true if the enumerator was successfully advanced to the next item,
- + /// or false if no more data was available in the collection.
- + /// </returns>
- + Task<bool> MoveNextAsync();
- +
- + /// <summary>Gets the current element being enumerated.</summary>
- + T Current { get; }
- }
- }
- diff --git a/src/Microsoft.AspNetCore.Sockets.Abstractions/ChannelConnection.cs b/src/Microsoft.AspNetCore.Sockets.Abstractions/ChannelConnection.cs
- index 101d988e5ed..5065cb9d3d8 100644
- --- a/src/Microsoft.AspNetCore.Sockets.Abstractions/ChannelConnection.cs
- +++ b/src/Microsoft.AspNetCore.Sockets.Abstractions/ChannelConnection.cs
- @@ -1,8 +1,8 @@
- -// Copyright (c) .NET Foundation. All rights reserved.
- +// Copyright (c) .NET Foundation. All rights reserved.
- // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
-
- using System;
- -using System.Threading.Tasks.Channels;
- +using System.Threading.Channels;
-
- namespace Microsoft.AspNetCore.Sockets.Internal
- {
- @@ -24,20 +24,19 @@ namespace Microsoft.AspNetCore.Sockets.Internal
- public Channel<T> Input { get; }
- public Channel<T> Output { get; }
-
- - public override ReadableChannel<T> In => Input;
- -
- - public override WritableChannel<T> Out => Output;
- -
- public ChannelConnection(Channel<T> input, Channel<T> output)
- {
- + Reader = input.Reader;
- Input = input;
- +
- + Writer = output.Writer;
- Output = output;
- }
-
- public void Dispose()
- {
- - Input.Out.TryComplete();
- - Output.Out.TryComplete();
- + Input.Writer.TryComplete();
- + Output.Writer.TryComplete();
- }
- }
-
- @@ -46,20 +45,19 @@ namespace Microsoft.AspNetCore.Sockets.Internal
- public Channel<TIn> Input { get; }
- public Channel<TOut> Output { get; }
-
- - public override ReadableChannel<TIn> In => Input;
- -
- - public override WritableChannel<TOut> Out => Output;
- -
- public ChannelConnection(Channel<TIn> input, Channel<TOut> output)
- {
- + Reader = input.Reader;
- Input = input;
- +
- + Writer = output.Writer;
- Output = output;
- }
-
- public void Dispose()
- {
- - Input.Out.TryComplete();
- - Output.Out.TryComplete();
- + Input.Writer.TryComplete();
- + Output.Writer.TryComplete();
- }
- }
- }
- diff --git a/src/Microsoft.AspNetCore.Sockets.Abstractions/ChannelReaderExtensions.cs b/src/Microsoft.AspNetCore.Sockets.Abstractions/ChannelReaderExtensions.cs
- new file mode 100644
- index 00000000000..e437a1a5945
- --- /dev/null
- +++ b/src/Microsoft.AspNetCore.Sockets.Abstractions/ChannelReaderExtensions.cs
- @@ -0,0 +1,47 @@
- +// Copyright (c) .NET Foundation. All rights reserved.
- +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
- +
- +using System;
- +using System.Threading;
- +using System.Threading.Channels;
- +using System.Threading.Tasks;
- +
- +namespace Microsoft.AspNetCore.SignalR.Internal
- +{
- + public static class ChannelReaderExtensions
- + {
- + /// <summary>Asynchronously reads an item from the channel.</summary>
- + /// <param name="channel">The channel</param>
- + /// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the read operation.</param>
- + /// <returns>A <see cref="ValueTask{TResult}"/> that represents the asynchronous read operation.</returns>
- + public static ValueTask<T> ReadAsync<T>(this ChannelReader<T> channel, CancellationToken cancellationToken = default)
- + {
- + try
- + {
- + return
- + cancellationToken.IsCancellationRequested
- + ? new ValueTask<T>(Task.FromCanceled<T>(cancellationToken))
- + : channel.TryRead(out T item)
- + ? new ValueTask<T>(item)
- + : ReadAsyncCore(cancellationToken);
- + }
- + catch (Exception e)
- + {
- + return new ValueTask<T>(Task.FromException<T>(e));
- + }
- +
- + async ValueTask<T> ReadAsyncCore(CancellationToken ct)
- + {
- + while (await channel.WaitToReadAsync(ct).ConfigureAwait(false))
- + {
- + if (channel.TryRead(out T item))
- + {
- + return item;
- + }
- + }
- +
- + throw new ChannelClosedException();
- + }
- + }
- + }
- +}
- diff --git a/src/Microsoft.AspNetCore.Sockets.Abstractions/ConnectionContext.cs b/src/Microsoft.AspNetCore.Sockets.Abstractions/ConnectionContext.cs
- index 7fa72781309..8f4c799a162 100644
- --- a/src/Microsoft.AspNetCore.Sockets.Abstractions/ConnectionContext.cs
- +++ b/src/Microsoft.AspNetCore.Sockets.Abstractions/ConnectionContext.cs
- @@ -2,7 +2,7 @@
- // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
-
- using System.Collections.Generic;
- -using System.Threading.Tasks.Channels;
- +using System.Threading.Channels;
- using Microsoft.AspNetCore.Http.Features;
-
- namespace Microsoft.AspNetCore.Sockets
- diff --git a/src/Microsoft.AspNetCore.Sockets.Abstractions/Features/IConnectionTransportFeature.cs b/src/Microsoft.AspNetCore.Sockets.Abstractions/Features/IConnectionTransportFeature.cs
- index d29718fa317..e851b49bcc0 100644
- --- a/src/Microsoft.AspNetCore.Sockets.Abstractions/Features/IConnectionTransportFeature.cs
- +++ b/src/Microsoft.AspNetCore.Sockets.Abstractions/Features/IConnectionTransportFeature.cs
- @@ -1,7 +1,7 @@
- // Copyright (c) .NET Foundation. All rights reserved.
- // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
-
- -using System.Threading.Tasks.Channels;
- +using System.Threading.Channels;
-
- namespace Microsoft.AspNetCore.Sockets.Features
- {
- diff --git a/src/Microsoft.AspNetCore.Sockets.Abstractions/Microsoft.AspNetCore.Sockets.Abstractions.csproj b/src/Microsoft.AspNetCore.Sockets.Abstractions/Microsoft.AspNetCore.Sockets.Abstractions.csproj
- index c2dba911f3b..56b275ea06b 100644
- --- a/src/Microsoft.AspNetCore.Sockets.Abstractions/Microsoft.AspNetCore.Sockets.Abstractions.csproj
- +++ b/src/Microsoft.AspNetCore.Sockets.Abstractions/Microsoft.AspNetCore.Sockets.Abstractions.csproj
- @@ -7,7 +7,8 @@
-
- <ItemGroup>
- <PackageReference Include="Microsoft.AspNetCore.Http.Features" Version="$(MicrosoftAspNetCoreHttpFeaturesPackageVersion)" />
- - <PackageReference Include="System.Threading.Tasks.Channels" Version="$(SystemThreadingTasksChannelsPackageVersion)" />
- + <PackageReference Include="System.Threading.Channels" Version="$(SystemThreadingChannelsPackageVersion)" />
- + <PackageReference Include="System.Threading.Tasks.Extensions" Version="$(SystemThreadingTasksExtensionsPackageVersion)" />
- </ItemGroup>
-
- </Project>
- diff --git a/src/Microsoft.AspNetCore.Sockets.Client.Http/HttpConnection.cs b/src/Microsoft.AspNetCore.Sockets.Client.Http/HttpConnection.cs
- index c2c27f5fe25..1a4776ec452 100644
- --- a/src/Microsoft.AspNetCore.Sockets.Client.Http/HttpConnection.cs
- +++ b/src/Microsoft.AspNetCore.Sockets.Client.Http/HttpConnection.cs
- @@ -1,4 +1,4 @@
- -// Copyright (c) .NET Foundation. All rights reserved.
- +// Copyright (c) .NET Foundation. All rights reserved.
- // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
-
- using System;
- @@ -8,7 +8,7 @@ using System.IO;
- using System.Net.Http;
- using System.Threading;
- using System.Threading.Tasks;
- -using System.Threading.Tasks.Channels;
- +using System.Threading.Channels;
- using Microsoft.AspNetCore.Http.Features;
- using Microsoft.AspNetCore.Sockets.Client.Http;
- using Microsoft.AspNetCore.Sockets.Client.Internal;
- @@ -39,8 +39,8 @@ namespace Microsoft.AspNetCore.Sockets.Client
- private readonly ITransportFactory _transportFactory;
- private string _connectionId;
- private readonly TimeSpan _eventQueueDrainTimeout = TimeSpan.FromSeconds(5);
- - private ReadableChannel<byte[]> Input => _transportChannel.In;
- - private WritableChannel<SendMessage> Output => _transportChannel.Out;
- + private ChannelReader<byte[]> Input => _transportChannel.Input;
- + private ChannelWriter<SendMessage> Output => _transportChannel.Output;
- private readonly List<ReceiveCallback> _callbacks = new List<ReceiveCallback>();
- private readonly TransportType _requestedTransportType = TransportType.All;
-
- diff --git a/src/Microsoft.AspNetCore.Sockets.Client.Http/ITransport.cs b/src/Microsoft.AspNetCore.Sockets.Client.Http/ITransport.cs
- index 7591183e339..784400db856 100644
- --- a/src/Microsoft.AspNetCore.Sockets.Client.Http/ITransport.cs
- +++ b/src/Microsoft.AspNetCore.Sockets.Client.Http/ITransport.cs
- @@ -1,9 +1,9 @@
- -// Copyright (c) .NET Foundation. All rights reserved.
- +// Copyright (c) .NET Foundation. All rights reserved.
- // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
-
- using System;
- using System.Threading.Tasks;
- -using System.Threading.Tasks.Channels;
- +using System.Threading.Channels;
-
- namespace Microsoft.AspNetCore.Sockets.Client
- {
- diff --git a/src/Microsoft.AspNetCore.Sockets.Client.Http/LongPollingTransport.cs b/src/Microsoft.AspNetCore.Sockets.Client.Http/LongPollingTransport.cs
- index 9229219a521..a059a81230f 100644
- --- a/src/Microsoft.AspNetCore.Sockets.Client.Http/LongPollingTransport.cs
- +++ b/src/Microsoft.AspNetCore.Sockets.Client.Http/LongPollingTransport.cs
- @@ -1,4 +1,4 @@
- -// Copyright (c) .NET Foundation. All rights reserved.
- +// Copyright (c) .NET Foundation. All rights reserved.
- // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
-
- using System;
- @@ -6,7 +6,7 @@ using System.Net;
- using System.Net.Http;
- using System.Threading;
- using System.Threading.Tasks;
- -using System.Threading.Tasks.Channels;
- +using System.Threading.Channels;
- using Microsoft.AspNetCore.Sockets.Client.Http;
- using Microsoft.AspNetCore.Sockets.Client.Internal;
- using Microsoft.Extensions.Logging;
- @@ -59,7 +59,7 @@ namespace Microsoft.AspNetCore.Sockets.Client
- Running = Task.WhenAll(_sender, _poller).ContinueWith(t =>
- {
- _logger.TransportStopped(_connectionId, t.Exception?.InnerException);
- - _application.Out.TryComplete(t.IsFaulted ? t.Exception.InnerException : null);
- + _application.Writer.TryComplete(t.IsFaulted ? t.Exception.InnerException : null);
- return t;
- }).Unwrap();
-
- @@ -123,9 +123,9 @@ namespace Microsoft.AspNetCore.Sockets.Client
- var payload = await response.Content.ReadAsByteArrayAsync();
- if (payload.Length > 0)
- {
- - while (!_application.Out.TryWrite(payload))
- + while (!_application.Writer.TryWrite(payload))
- {
- - if (cancellationToken.IsCancellationRequested || !await _application.Out.WaitToWriteAsync(cancellationToken))
- + if (cancellationToken.IsCancellationRequested || !await _application.Writer.WaitToWriteAsync(cancellationToken))
- {
- return;
- }
- diff --git a/src/Microsoft.AspNetCore.Sockets.Client.Http/Microsoft.AspNetCore.Sockets.Client.Http.csproj b/src/Microsoft.AspNetCore.Sockets.Client.Http/Microsoft.AspNetCore.Sockets.Client.Http.csproj
- index d09d5db9a5e..3a4980df1f3 100644
- --- a/src/Microsoft.AspNetCore.Sockets.Client.Http/Microsoft.AspNetCore.Sockets.Client.Http.csproj
- +++ b/src/Microsoft.AspNetCore.Sockets.Client.Http/Microsoft.AspNetCore.Sockets.Client.Http.csproj
- @@ -22,7 +22,7 @@
- <PackageReference Include="System.Memory" Version="$(SystemMemoryPackageVersion)" />
- <PackageReference Include="System.Numerics.Vectors" Version="$(SystemNumericsVectorsPackageVersion)" />
- <PackageReference Include="System.Runtime.CompilerServices.Unsafe" Version="$(SystemRuntimeCompilerServicesUnsafePackageVersion)" />
- - <PackageReference Include="System.Threading.Tasks.Channels" Version="$(SystemThreadingTasksChannelsPackageVersion)" />
- + <PackageReference Include="System.Threading.Channels" Version="$(SystemThreadingChannelsPackageVersion)" />
- </ItemGroup>
-
- </Project>
- diff --git a/src/Microsoft.AspNetCore.Sockets.Client.Http/SendUtils.cs b/src/Microsoft.AspNetCore.Sockets.Client.Http/SendUtils.cs
- index a95c59a3533..af05b120606 100644
- --- a/src/Microsoft.AspNetCore.Sockets.Client.Http/SendUtils.cs
- +++ b/src/Microsoft.AspNetCore.Sockets.Client.Http/SendUtils.cs
- @@ -1,4 +1,4 @@
- -// Copyright (c) .NET Foundation. All rights reserved.
- +// Copyright (c) .NET Foundation. All rights reserved.
- // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
-
- using System;
- @@ -7,7 +7,7 @@ using System.IO;
- using System.Net.Http;
- using System.Threading;
- using System.Threading.Tasks;
- -using System.Threading.Tasks.Channels;
- +using System.Threading.Channels;
- using Microsoft.AspNetCore.Sockets.Client.Http;
- using Microsoft.AspNetCore.Sockets.Client.Internal;
- using Microsoft.Extensions.Logging;
- @@ -23,11 +23,11 @@ namespace Microsoft.AspNetCore.Sockets.Client
- IList<SendMessage> messages = null;
- try
- {
- - while (await application.In.WaitToReadAsync(transportCts.Token))
- + while (await application.Reader.WaitToReadAsync(transportCts.Token))
- {
- // Grab as many messages as we can from the channel
- messages = new List<SendMessage>();
- - while (!transportCts.IsCancellationRequested && application.In.TryRead(out SendMessage message))
- + while (!transportCts.IsCancellationRequested && application.Reader.TryRead(out SendMessage message))
- {
- messages.Add(message);
- }
- diff --git a/src/Microsoft.AspNetCore.Sockets.Client.Http/ServerSentEventsMessageParser.cs b/src/Microsoft.AspNetCore.Sockets.Client.Http/ServerSentEventsMessageParser.cs
- index bc4713320df..4ad011babc9 100644
- --- a/src/Microsoft.AspNetCore.Sockets.Client.Http/ServerSentEventsMessageParser.cs
- +++ b/src/Microsoft.AspNetCore.Sockets.Client.Http/ServerSentEventsMessageParser.cs
- @@ -146,7 +146,7 @@ namespace Microsoft.AspNetCore.Sockets.Internal.Formatters
- }
-
- [MethodImpl(MethodImplOptions.AggressiveInlining)]
- - private Span<byte> ConvertBufferToSpan(ReadableBuffer buffer)
- + private ReadOnlySpan<byte> ConvertBufferToSpan(ReadableBuffer buffer)
- {
- if (buffer.IsSingleSpan)
- {
- diff --git a/src/Microsoft.AspNetCore.Sockets.Client.Http/ServerSentEventsTransport.cs b/src/Microsoft.AspNetCore.Sockets.Client.Http/ServerSentEventsTransport.cs
- index 9018659dddc..62e92d12c29 100644
- --- a/src/Microsoft.AspNetCore.Sockets.Client.Http/ServerSentEventsTransport.cs
- +++ b/src/Microsoft.AspNetCore.Sockets.Client.Http/ServerSentEventsTransport.cs
- @@ -1,13 +1,14 @@
- -// Copyright (c) .NET Foundation. All rights reserved.
- +// Copyright (c) .NET Foundation. All rights reserved.
- // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
-
- using System;
- +using System.Buffers;
- using System.IO.Pipelines;
- using System.Net.Http;
- using System.Net.Http.Headers;
- using System.Threading;
- using System.Threading.Tasks;
- -using System.Threading.Tasks.Channels;
- +using System.Threading.Channels;
- using Microsoft.AspNetCore.Sockets.Client.Internal;
- using Microsoft.AspNetCore.Sockets.Internal.Formatters;
- using Microsoft.Extensions.Logging;
- @@ -17,6 +18,7 @@ namespace Microsoft.AspNetCore.Sockets.Client
- {
- public class ServerSentEventsTransport : ITransport
- {
- + private static readonly MemoryPool _memoryPool = new MemoryPool();
- private readonly HttpClient _httpClient;
- private readonly ILogger _logger;
- private readonly CancellationTokenSource _transportCts = new CancellationTokenSource();
- @@ -64,7 +66,7 @@ namespace Microsoft.AspNetCore.Sockets.Client
- {
- _logger.TransportStopped(_connectionId, t.Exception?.InnerException);
-
- - _application.Out.TryComplete(t.IsFaulted ? t.Exception.InnerException : null);
- + _application.Writer.TryComplete(t.IsFaulted ? t.Exception.InnerException : null);
- return t;
- }).Unwrap();
-
- @@ -80,7 +82,7 @@ namespace Microsoft.AspNetCore.Sockets.Client
- var response = await _httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken);
-
- var stream = await response.Content.ReadAsStreamAsync();
- - var pipelineReader = stream.AsPipelineReader(cancellationToken);
- + var pipelineReader = StreamPipeConnection.CreateReader(new PipeOptions(_memoryPool), stream);
- var readCancellationRegistration = cancellationToken.Register(
- reader => ((IPipeReader)reader).CancelPendingRead(), pipelineReader);
- try
- @@ -105,7 +107,7 @@ namespace Microsoft.AspNetCore.Sockets.Client
- switch (parseResult)
- {
- case ServerSentEventsMessageParser.ParseResult.Completed:
- - _application.Out.TryWrite(buffer);
- + _application.Writer.TryWrite(buffer);
- _parser.Reset();
- break;
- case ServerSentEventsMessageParser.ParseResult.Incomplete:
- @@ -139,7 +141,7 @@ namespace Microsoft.AspNetCore.Sockets.Client
- {
- _logger.TransportStopping(_connectionId);
- _transportCts.Cancel();
- - _application.Out.TryComplete();
- + _application.Writer.TryComplete();
-
- try
- {
- diff --git a/src/Microsoft.AspNetCore.Sockets.Client.Http/WebSocketsTransport.cs b/src/Microsoft.AspNetCore.Sockets.Client.Http/WebSocketsTransport.cs
- index 2249c44bc9a..5b8874b6c57 100644
- --- a/src/Microsoft.AspNetCore.Sockets.Client.Http/WebSocketsTransport.cs
- +++ b/src/Microsoft.AspNetCore.Sockets.Client.Http/WebSocketsTransport.cs
- @@ -1,4 +1,4 @@
- -// Copyright (c) .NET Foundation. All rights reserved.
- +// Copyright (c) .NET Foundation. All rights reserved.
- // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
-
- using System;
- @@ -7,7 +7,7 @@ using System.Diagnostics;
- using System.Net.WebSockets;
- using System.Threading;
- using System.Threading.Tasks;
- -using System.Threading.Tasks.Channels;
- +using System.Threading.Channels;
- using Microsoft.AspNetCore.Sockets.Client.Internal;
- using Microsoft.Extensions.Logging;
- using Microsoft.Extensions.Logging.Abstractions;
- @@ -70,8 +70,8 @@ namespace Microsoft.AspNetCore.Sockets.Client
- {
- _webSocket.Dispose();
- _logger.TransportStopped(_connectionId, t.Exception?.InnerException);
- - _application.Out.TryComplete(t.IsFaulted ? t.Exception.InnerException : null);
- - return t;
- + _application.Writer.TryComplete(t.IsFaulted ? t.Exception.InnerException : null);
- + return t;
- }).Unwrap();
- }
-
- @@ -97,7 +97,7 @@ namespace Microsoft.AspNetCore.Sockets.Client
- {
- _logger.WebSocketClosed(_connectionId, receiveResult.CloseStatus);
-
- - _application.Out.Complete(
- + _application.Writer.Complete(
- receiveResult.CloseStatus == WebSocketCloseStatus.NormalClosure
- ? null
- : new InvalidOperationException(
- @@ -135,9 +135,9 @@ namespace Microsoft.AspNetCore.Sockets.Client
- if (!_transportCts.Token.IsCancellationRequested)
- {
- _logger.MessageToApp(_connectionId, messageBuffer.Length);
- - while (await _application.Out.WaitToWriteAsync(_transportCts.Token))
- + while (await _application.Writer.WaitToWriteAsync(_transportCts.Token))
- {
- - if (_application.Out.TryWrite(messageBuffer))
- + if (_application.Writer.TryWrite(messageBuffer))
- {
- incomingMessage.Clear();
- break;
- @@ -173,9 +173,9 @@ namespace Microsoft.AspNetCore.Sockets.Client
-
- try
- {
- - while (await _application.In.WaitToReadAsync(_transportCts.Token))
- + while (await _application.Reader.WaitToReadAsync(_transportCts.Token))
- {
- - while (_application.In.TryRead(out SendMessage message))
- + while (_application.Reader.TryRead(out SendMessage message))
- {
- try
- {
- diff --git a/src/Microsoft.AspNetCore.Sockets.Http/HttpConnectionDispatcher.cs b/src/Microsoft.AspNetCore.Sockets.Http/HttpConnectionDispatcher.cs
- index 1bcca138ef1..996605a4567 100644
- --- a/src/Microsoft.AspNetCore.Sockets.Http/HttpConnectionDispatcher.cs
- +++ b/src/Microsoft.AspNetCore.Sockets.Http/HttpConnectionDispatcher.cs
- @@ -1,4 +1,4 @@
- -// Copyright (c) .NET Foundation. All rights reserved.
- +// Copyright (c) .NET Foundation. All rights reserved.
- // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
-
- using System;
- @@ -93,7 +93,7 @@ namespace Microsoft.AspNetCore.Sockets
- connection.TransportCapabilities = TransferMode.Text;
-
- // We only need to provide the Input channel since writing to the application is handled through /send.
- - var sse = new ServerSentEventsTransport(connection.Application.In, connection.ConnectionId, _loggerFactory);
- + var sse = new ServerSentEventsTransport(connection.Application.Reader, connection.ConnectionId, _loggerFactory);
-
- await DoPersistentConnection(socketDelegate, sse, context, connection);
- }
- @@ -194,7 +194,7 @@ namespace Microsoft.AspNetCore.Sockets
- context.Response.RegisterForDispose(timeoutSource);
- context.Response.RegisterForDispose(tokenSource);
-
- - var longPolling = new LongPollingTransport(timeoutSource.Token, connection.Application.In, connection.ConnectionId, _loggerFactory);
- + var longPolling = new LongPollingTransport(timeoutSource.Token, connection.Application.Reader, connection.ConnectionId, _loggerFactory);
-
- // Start the transport
- connection.TransportTask = longPolling.ProcessRequestAsync(context, tokenSource.Token);
- @@ -215,7 +215,7 @@ namespace Microsoft.AspNetCore.Sockets
- if (resultTask == connection.ApplicationTask)
- {
- // Complete the transport (notifying it of the application error if there is one)
- - connection.Transport.Out.TryComplete(connection.ApplicationTask.Exception);
- + connection.Transport.Writer.TryComplete(connection.ApplicationTask.Exception);
-
- // Wait for the transport to run
- await connection.TransportTask;
- @@ -408,9 +408,9 @@ namespace Microsoft.AspNetCore.Sockets
- }
-
- _logger.ReceivedBytes(connection.ConnectionId, buffer.Length);
- - while (!connection.Application.Out.TryWrite(buffer))
- + while (!connection.Application.Writer.TryWrite(buffer))
- {
- - if (!await connection.Application.Out.WaitToWriteAsync())
- + if (!await connection.Application.Writer.WaitToWriteAsync())
- {
- return;
- }
- diff --git a/src/Microsoft.AspNetCore.Sockets.Http/Internal/Transports/LongPollingTransport.cs b/src/Microsoft.AspNetCore.Sockets.Http/Internal/Transports/LongPollingTransport.cs
- index 033efc6fe20..a9ae71af2ae 100644
- --- a/src/Microsoft.AspNetCore.Sockets.Http/Internal/Transports/LongPollingTransport.cs
- +++ b/src/Microsoft.AspNetCore.Sockets.Http/Internal/Transports/LongPollingTransport.cs
- @@ -1,11 +1,11 @@
- -// Copyright (c) .NET Foundation. All rights reserved.
- +// Copyright (c) .NET Foundation. All rights reserved.
- // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
-
- using System;
- using System.Collections.Generic;
- using System.Threading;
- using System.Threading.Tasks;
- -using System.Threading.Tasks.Channels;
- +using System.Threading.Channels;
- using Microsoft.AspNetCore.Http;
- using Microsoft.Extensions.Logging;
-
- @@ -13,12 +13,12 @@ namespace Microsoft.AspNetCore.Sockets.Internal.Transports
- {
- public class LongPollingTransport : IHttpTransport
- {
- - private readonly ReadableChannel<byte[]> _application;
- + private readonly ChannelReader<byte[]> _application;
- private readonly ILogger _logger;
- private readonly CancellationToken _timeoutToken;
- private readonly string _connectionId;
-
- - public LongPollingTransport(CancellationToken timeoutToken, ReadableChannel<byte[]> application, string connectionId, ILoggerFactory loggerFactory)
- + public LongPollingTransport(CancellationToken timeoutToken, ChannelReader<byte[]> application, string connectionId, ILoggerFactory loggerFactory)
- {
- _timeoutToken = timeoutToken;
- _application = application;
- diff --git a/src/Microsoft.AspNetCore.Sockets.Http/Internal/Transports/ServerSentEventsMessageFormatter.cs b/src/Microsoft.AspNetCore.Sockets.Http/Internal/Transports/ServerSentEventsMessageFormatter.cs
- index 5a04cc76ead..21b079c2638 100644
- --- a/src/Microsoft.AspNetCore.Sockets.Http/Internal/Transports/ServerSentEventsMessageFormatter.cs
- +++ b/src/Microsoft.AspNetCore.Sockets.Http/Internal/Transports/ServerSentEventsMessageFormatter.cs
- @@ -65,7 +65,7 @@ namespace Microsoft.AspNetCore.Sockets.Internal.Formatters
-
- if (nextSliceStart >= payload.Length)
- {
- - payload = Span<byte>.Empty;
- + payload = ReadOnlySpan<byte>.Empty;
- }
- else
- {
- diff --git a/src/Microsoft.AspNetCore.Sockets.Http/Internal/Transports/ServerSentEventsTransport.cs b/src/Microsoft.AspNetCore.Sockets.Http/Internal/Transports/ServerSentEventsTransport.cs
- index 3ff1c1b756f..19bda2e3907 100644
- --- a/src/Microsoft.AspNetCore.Sockets.Http/Internal/Transports/ServerSentEventsTransport.cs
- +++ b/src/Microsoft.AspNetCore.Sockets.Http/Internal/Transports/ServerSentEventsTransport.cs
- @@ -1,11 +1,11 @@
- -// Copyright (c) .NET Foundation. All rights reserved.
- +// Copyright (c) .NET Foundation. All rights reserved.
- // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
-
- using System;
- using System.IO;
- using System.Threading;
- using System.Threading.Tasks;
- -using System.Threading.Tasks.Channels;
- +using System.Threading.Channels;
- using Microsoft.AspNetCore.Http;
- using Microsoft.AspNetCore.Http.Features;
- using Microsoft.AspNetCore.Sockets.Internal.Formatters;
- @@ -15,11 +15,11 @@ namespace Microsoft.AspNetCore.Sockets.Internal.Transports
- {
- public class ServerSentEventsTransport : IHttpTransport
- {
- - private readonly ReadableChannel<byte[]> _application;
- + private readonly ChannelReader<byte[]> _application;
- private readonly string _connectionId;
- private readonly ILogger _logger;
-
- - public ServerSentEventsTransport(ReadableChannel<byte[]> application, string connectionId, ILoggerFactory loggerFactory)
- + public ServerSentEventsTransport(ChannelReader<byte[]> application, string connectionId, ILoggerFactory loggerFactory)
- {
- _application = application;
- _connectionId = connectionId;
- diff --git a/src/Microsoft.AspNetCore.Sockets.Http/Internal/Transports/WebSocketsTransport.cs b/src/Microsoft.AspNetCore.Sockets.Http/Internal/Transports/WebSocketsTransport.cs
- index 5adc0ab245f..97756e36130 100644
- --- a/src/Microsoft.AspNetCore.Sockets.Http/Internal/Transports/WebSocketsTransport.cs
- +++ b/src/Microsoft.AspNetCore.Sockets.Http/Internal/Transports/WebSocketsTransport.cs
- @@ -1,4 +1,4 @@
- -// Copyright (c) .NET Foundation. All rights reserved.
- +// Copyright (c) .NET Foundation. All rights reserved.
- // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
-
- using System;
- @@ -7,7 +7,7 @@ using System.Diagnostics;
- using System.Net.WebSockets;
- using System.Threading;
- using System.Threading.Tasks;
- -using System.Threading.Tasks.Channels;
- +using System.Threading.Channels;
- using Microsoft.AspNetCore.Http;
- using Microsoft.Extensions.Logging;
-
- @@ -87,7 +87,7 @@ namespace Microsoft.AspNetCore.Sockets.Internal.Transports
- }
-
- // We're done writing
- - _application.Out.TryComplete();
- + _application.Writer.TryComplete();
-
- await socket.CloseOutputAsync(failed ? WebSocketCloseStatus.InternalServerError : WebSocketCloseStatus.NormalClosure, "", CancellationToken.None);
-
- @@ -160,9 +160,9 @@ namespace Microsoft.AspNetCore.Sockets.Internal.Transports
- }
-
- _logger.MessageToApplication(_connection.ConnectionId, messageBuffer.Length);
- - while (await _application.Out.WaitToWriteAsync())
- + while (await _application.Writer.WaitToWriteAsync())
- {
- - if (_application.Out.TryWrite(messageBuffer))
- + if (_application.Writer.TryWrite(messageBuffer))
- {
- incomingMessage.Clear();
- break;
- @@ -173,10 +173,10 @@ namespace Microsoft.AspNetCore.Sockets.Internal.Transports
-
- private async Task StartSending(WebSocket ws)
- {
- - while (await _application.In.WaitToReadAsync())
- + while (await _application.Reader.WaitToReadAsync())
- {
- // Get a frame from the application
- - while (_application.In.TryRead(out var buffer))
- + while (_application.Reader.TryRead(out var buffer))
- {
- if (buffer.Length > 0)
- {
- diff --git a/src/Microsoft.AspNetCore.Sockets.Http/Microsoft.AspNetCore.Sockets.Http.csproj b/src/Microsoft.AspNetCore.Sockets.Http/Microsoft.AspNetCore.Sockets.Http.csproj
- index 1f7ff58d7a6..7cbc3f60f84 100644
- --- a/src/Microsoft.AspNetCore.Sockets.Http/Microsoft.AspNetCore.Sockets.Http.csproj
- +++ b/src/Microsoft.AspNetCore.Sockets.Http/Microsoft.AspNetCore.Sockets.Http.csproj
- @@ -16,7 +16,7 @@
- <PackageReference Include="Microsoft.AspNetCore.Routing" Version="$(MicrosoftAspNetCoreRoutingPackageVersion)" />
- <PackageReference Include="Microsoft.AspNetCore.WebSockets" Version="$(MicrosoftAspNetCoreWebSocketsPackageVersion)" />
- <PackageReference Include="Microsoft.Extensions.SecurityHelper.Sources" PrivateAssets="All" Version="$(MicrosoftExtensionsSecurityHelperSourcesPackageVersion)" />
- - <PackageReference Include="System.Threading.Tasks.Channels" Version="$(SystemThreadingTasksChannelsPackageVersion)" />
- + <PackageReference Include="System.Threading.Channels" Version="$(SystemThreadingChannelsPackageVersion)" />
- <PackageReference Include="System.Memory" Version="$(SystemMemoryPackageVersion)" />
- <PackageReference Include="Newtonsoft.Json" Version="$(NewtonsoftJsonPackageVersion)" />
- </ItemGroup>
- diff --git a/src/Microsoft.AspNetCore.Sockets/ConnectionManager.cs b/src/Microsoft.AspNetCore.Sockets/ConnectionManager.cs
- index 0d23bc71614..9baa5580451 100644
- --- a/src/Microsoft.AspNetCore.Sockets/ConnectionManager.cs
- +++ b/src/Microsoft.AspNetCore.Sockets/ConnectionManager.cs
- @@ -9,7 +9,7 @@ using System.IO;
- using System.Net.WebSockets;
- using System.Threading;
- using System.Threading.Tasks;
- -using System.Threading.Tasks.Channels;
- +using System.Threading.Channels;
- using Microsoft.AspNetCore.Hosting;
- using Microsoft.AspNetCore.Sockets.Internal;
- using Microsoft.Extensions.Logging;
- diff --git a/src/Microsoft.AspNetCore.Sockets/DefaultConnectionContext.cs b/src/Microsoft.AspNetCore.Sockets/DefaultConnectionContext.cs
- index 3acb6bb0c18..990827767b2 100644
- --- a/src/Microsoft.AspNetCore.Sockets/DefaultConnectionContext.cs
- +++ b/src/Microsoft.AspNetCore.Sockets/DefaultConnectionContext.cs
- @@ -6,7 +6,7 @@ using System.Collections.Generic;
- using System.Security.Claims;
- using System.Threading;
- using System.Threading.Tasks;
- -using System.Threading.Tasks.Channels;
- +using System.Threading.Channels;
- using Microsoft.AspNetCore.Http.Features;
- using Microsoft.AspNetCore.Sockets.Features;
-
- @@ -86,21 +86,21 @@ namespace Microsoft.AspNetCore.Sockets
- // If the application task is faulted, propagate the error to the transport
- if (ApplicationTask?.IsFaulted == true)
- {
- - Transport.Out.TryComplete(ApplicationTask.Exception.InnerException);
- + Transport.Writer.TryComplete(ApplicationTask.Exception.InnerException);
- }
- else
- {
- - Transport.Out.TryComplete();
- + Transport.Writer.TryComplete();
- }
-
- // If the transport task is faulted, propagate the error to the application
- if (TransportTask?.IsFaulted == true)
- {
- - Application.Out.TryComplete(TransportTask.Exception.InnerException);
- + Application.Writer.TryComplete(TransportTask.Exception.InnerException);
- }
- else
- {
- - Application.Out.TryComplete();
- + Application.Writer.TryComplete();
- }
-
- var applicationTask = ApplicationTask ?? Task.CompletedTask;
- diff --git a/src/Microsoft.AspNetCore.Sockets/Microsoft.AspNetCore.Sockets.csproj b/src/Microsoft.AspNetCore.Sockets/Microsoft.AspNetCore.Sockets.csproj
- index 0de655a7316..27dd5e95745 100644
- --- a/src/Microsoft.AspNetCore.Sockets/Microsoft.AspNetCore.Sockets.csproj
- +++ b/src/Microsoft.AspNetCore.Sockets/Microsoft.AspNetCore.Sockets.csproj
- @@ -13,7 +13,7 @@
- <PackageReference Include="Microsoft.AspNetCore.Hosting.Abstractions" Version="$(MicrosoftAspNetCoreHostingAbstractionsPackageVersion)" />
- <PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="$(MicrosoftExtensionsLoggingAbstractionsPackageVersion)" />
- <PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="$(MicrosoftExtensionsDependencyInjectionAbstractionsPackageVersion)" />
- - <PackageReference Include="System.Threading.Tasks.Channels" Version="$(SystemThreadingTasksChannelsPackageVersion)" />
- + <PackageReference Include="System.Threading.Channels" Version="$(SystemThreadingChannelsPackageVersion)" />
- </ItemGroup>
-
- </Project>
- diff --git a/test/Common/ChannelExtensions.cs b/test/Common/ChannelExtensions.cs
- index 2502886317b..fd032253794 100644
- --- a/test/Common/ChannelExtensions.cs
- +++ b/test/Common/ChannelExtensions.cs
- @@ -1,10 +1,11 @@
- using System.Collections.Generic;
- +using System.Threading.Tasks;
-
- -namespace System.Threading.Tasks.Channels
- +namespace System.Threading.Channels
- {
- internal static class ChannelExtensions
- {
- - public static async Task<List<T>> ReadAllAsync<T>(this ReadableChannel<T> channel)
- + public static async Task<List<T>> ReadAllAsync<T>(this ChannelReader<T> channel)
- {
- var list = new List<T>();
- while (await channel.WaitToReadAsync())
- diff --git a/test/Common/TestClient.cs b/test/Common/TestClient.cs
- index a4d903120f4..42a7f95ba39 100644
- --- a/test/Common/TestClient.cs
- +++ b/test/Common/TestClient.cs
- @@ -7,7 +7,7 @@ using System.IO;
- using System.Security.Claims;
- using System.Threading;
- using System.Threading.Tasks;
- -using System.Threading.Tasks.Channels;
- +using System.Threading.Channels;
- using Microsoft.AspNetCore.SignalR.Internal;
- using Microsoft.AspNetCore.SignalR.Internal.Encoders;
- using Microsoft.AspNetCore.SignalR.Internal.Protocol;
- @@ -32,7 +32,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
-
- public TestClient(bool synchronousCallbacks = false, IHubProtocol protocol = null, IInvocationBinder invocationBinder = null, bool addClaimId = false)
- {
- - var options = new ChannelOptimizations { AllowSynchronousContinuations = synchronousCallbacks };
- + var options = new UnboundedChannelOptions { AllowSynchronousContinuations = synchronousCallbacks };
- var transportToApplication = Channel.CreateUnbounded<byte[]>(options);
- var applicationToTransport = Channel.CreateUnbounded<byte[]>(options);
-
- @@ -60,7 +60,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
- using (var memoryStream = new MemoryStream())
- {
- NegotiationProtocol.WriteMessage(new NegotiationMessage(protocol.Name), memoryStream);
- - Application.Out.TryWrite(memoryStream.ToArray());
- + Application.Writer.TryWrite(memoryStream.ToArray());
- }
- }
-
- @@ -149,7 +149,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
- public async Task<string> SendHubMessageAsync(HubMessage message)
- {
- var payload = _protocolReaderWriter.WriteMessage(message);
- - await Application.Out.WriteAsync(payload);
- + await Application.Writer.WriteAsync(payload);
- return message.InvocationId;
- }
-
- @@ -161,7 +161,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
-
- if (message == null)
- {
- - if (!await Application.In.WaitToReadAsync())
- + if (!await Application.Reader.WaitToReadAsync())
- {
- return null;
- }
- @@ -175,7 +175,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
-
- public HubMessage TryRead()
- {
- - if (Application.In.TryRead(out var buffer) &&
- + if (Application.Reader.TryRead(out var buffer) &&
- _protocolReaderWriter.ReadMessages(buffer, _invocationBinder, out var messages))
- {
- return messages[0];
- @@ -208,4 +208,4 @@ namespace Microsoft.AspNetCore.SignalR.Tests
- }
- }
- }
- -}
- \ No newline at end of file
- +}
- diff --git a/test/Microsoft.AspNetCore.SignalR.Client.FunctionalTests/HubConnectionTests.cs b/test/Microsoft.AspNetCore.SignalR.Client.FunctionalTests/HubConnectionTests.cs
- index 8e3c2bfc5ed..e110cc26a03 100644
- --- a/test/Microsoft.AspNetCore.SignalR.Client.FunctionalTests/HubConnectionTests.cs
- +++ b/test/Microsoft.AspNetCore.SignalR.Client.FunctionalTests/HubConnectionTests.cs
- @@ -5,7 +5,7 @@ using System;
- using System.Collections.Generic;
- using System.Threading;
- using System.Threading.Tasks;
- -using System.Threading.Tasks.Channels;
- +using System.Threading.Channels;
- using Microsoft.AspNetCore.SignalR.Internal.Protocol;
- using Microsoft.AspNetCore.SignalR.Tests.Common;
- using Microsoft.AspNetCore.Sockets;
- diff --git a/test/Microsoft.AspNetCore.SignalR.Client.FunctionalTests/Hubs.cs b/test/Microsoft.AspNetCore.SignalR.Client.FunctionalTests/Hubs.cs
- index 02c2a875356..37b3e44dda2 100644
- --- a/test/Microsoft.AspNetCore.SignalR.Client.FunctionalTests/Hubs.cs
- +++ b/test/Microsoft.AspNetCore.SignalR.Client.FunctionalTests/Hubs.cs
- @@ -1,11 +1,11 @@
- -// Copyright (c) .NET Foundation. All rights reserved.
- +// Copyright (c) .NET Foundation. All rights reserved.
- // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
-
- using System;
- using System.Linq;
- using System.Reactive.Linq;
- using System.Threading.Tasks;
- -using System.Threading.Tasks.Channels;
- +using System.Threading.Channels;
-
- namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
- {
- @@ -17,9 +17,9 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
-
- public IObservable<int> Stream(int count) => TestHubMethodsImpl.Stream(count);
-
- - public ReadableChannel<int> StreamException() => TestHubMethodsImpl.StreamException();
- + public ChannelReader<int> StreamException() => TestHubMethodsImpl.StreamException();
-
- - public ReadableChannel<string> StreamBroken() => TestHubMethodsImpl.StreamBroken();
- + public ChannelReader<string> StreamBroken() => TestHubMethodsImpl.StreamBroken();
-
- public async Task CallEcho(string message)
- {
- @@ -40,9 +40,9 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
-
- public IObservable<int> Stream(int count) => TestHubMethodsImpl.Stream(count);
-
- - public ReadableChannel<int> StreamException() => TestHubMethodsImpl.StreamException();
- + public ChannelReader<int> StreamException() => TestHubMethodsImpl.StreamException();
-
- - public ReadableChannel<string> StreamBroken() => TestHubMethodsImpl.StreamBroken();
- + public ChannelReader<string> StreamBroken() => TestHubMethodsImpl.StreamBroken();
-
- public async Task CallEcho(string message)
- {
- @@ -63,9 +63,9 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
-
- public IObservable<int> Stream(int count) => TestHubMethodsImpl.Stream(count);
-
- - public ReadableChannel<int> StreamException() => TestHubMethodsImpl.StreamException();
- + public ChannelReader<int> StreamException() => TestHubMethodsImpl.StreamException();
-
- - public ReadableChannel<string> StreamBroken() => TestHubMethodsImpl.StreamBroken();
- + public ChannelReader<string> StreamBroken() => TestHubMethodsImpl.StreamBroken();
-
- public async Task CallEcho(string message)
- {
- @@ -97,12 +97,12 @@ namespace Microsoft.AspNetCore.SignalR.Client.FunctionalTests
- .Take(count);
- }
-
- - public static ReadableChannel<int> StreamException()
- + public static ChannelReader<int> StreamException()
- {
- throw new InvalidOperationException("Error occurred while streaming.");
- }
-
- - public static ReadableChannel<string> StreamBroken() => null;
- + public static ChannelReader<string> StreamBroken() => null;
- }
-
- public interface ITestHub
- diff --git a/test/Microsoft.AspNetCore.SignalR.Client.Tests/HttpConnectionTests.cs b/test/Microsoft.AspNetCore.SignalR.Client.Tests/HttpConnectionTests.cs
- index 58a20ef28b5..5a613c7a708 100644
- --- a/test/Microsoft.AspNetCore.SignalR.Client.Tests/HttpConnectionTests.cs
- +++ b/test/Microsoft.AspNetCore.SignalR.Client.Tests/HttpConnectionTests.cs
- @@ -7,7 +7,7 @@ using System.Net.Http;
- using System.Text;
- using System.Threading;
- using System.Threading.Tasks;
- -using System.Threading.Tasks.Channels;
- +using System.Threading.Channels;
- using Microsoft.AspNetCore.Client.Tests;
- using Microsoft.AspNetCore.SignalR.Tests.Common;
- using Microsoft.AspNetCore.Sockets.Features;
- @@ -268,8 +268,8 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
- {
- // The connection is now in the Disconnected state so the Received event for
- // this message should not be raised
- - channel.Out.TryWrite(Array.Empty<byte>());
- - channel.Out.TryComplete();
- + channel.Writer.TryWrite(Array.Empty<byte>());
- + channel.Writer.TryComplete();
- return Task.CompletedTask;
- });
- mockTransport.SetupGet(t => t.Mode).Returns(TransferMode.Text);
- @@ -313,7 +313,7 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
- mockTransport.Setup(t => t.StopAsync())
- .Returns(() =>
- {
- - channel.Out.TryComplete();
- + channel.Writer.TryComplete();
- return Task.CompletedTask;
- });
- mockTransport.SetupGet(t => t.Mode).Returns(TransferMode.Text);
- @@ -330,14 +330,14 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
- });
-
- await connection.StartAsync();
- - channel.Out.TryWrite(Array.Empty<byte>());
- + channel.Writer.TryWrite(Array.Empty<byte>());
-
- // Ensure that the Received callback has been called before attempting the second write
- await callbackInvokedTcs.Task.OrTimeout();
- - channel.Out.TryWrite(Array.Empty<byte>());
- + channel.Writer.TryWrite(Array.Empty<byte>());
-
- // Ensure that SignalR isn't blocked by the receive callback
- - Assert.False(channel.In.TryRead(out var message));
- + Assert.False(channel.Reader.TryRead(out var message));
-
- closedTcs.SetResult(null);
-
- @@ -369,7 +369,7 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
- mockTransport.Setup(t => t.StopAsync())
- .Returns(() =>
- {
- - channel.Out.TryComplete();
- + channel.Writer.TryComplete();
- return Task.CompletedTask;
- });
- mockTransport.SetupGet(t => t.Mode).Returns(TransferMode.Text);
- @@ -380,10 +380,10 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
- connection.OnReceived(_ => blockReceiveCallbackTcs.Task);
-
- await connection.StartAsync();
- - channel.Out.TryWrite(Array.Empty<byte>());
- + channel.Writer.TryWrite(Array.Empty<byte>());
-
- // Ensure that SignalR isn't blocked by the receive callback
- - Assert.False(channel.In.TryRead(out var message));
- + Assert.False(channel.Reader.TryRead(out var message));
-
- await connection.DisposeAsync();
- }
- @@ -413,7 +413,7 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
- mockTransport.Setup(t => t.StopAsync())
- .Returns(() =>
- {
- - channel.Out.TryComplete();
- + channel.Writer.TryComplete();
- return Task.CompletedTask;
- });
- mockTransport.SetupGet(t => t.Mode).Returns(TransferMode.Text);
- @@ -427,10 +427,10 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
- });
-
- await connection.StartAsync();
- - channel.Out.TryWrite(Array.Empty<byte>());
- + channel.Writer.TryWrite(Array.Empty<byte>());
-
- // Ensure that SignalR isn't blocked by the receive callback
- - Assert.False(channel.In.TryRead(out var message));
- + Assert.False(channel.Reader.TryRead(out var message));
-
- await connection.DisposeAsync();
- }
- @@ -909,7 +909,7 @@ namespace Microsoft.AspNetCore.Sockets.Client.Tests
- mockTransport.Setup(t => t.StopAsync())
- .Returns(() =>
- {
- - channel.Out.TryComplete();
- + channel.Writer.TryComplete();
- return Task.CompletedTask;
- });
- mockTransport.SetupGet(t => t.Mode).Returns(TransferMode.Binary);
- diff --git a/test/Microsoft.AspNetCore.SignalR.Client.Tests/HubConnectionProtocolTests.cs b/test/Microsoft.AspNetCore.SignalR.Client.Tests/HubConnectionProtocolTests.cs
- index 6aec848eb02..1c6e121f53b 100644
- --- a/test/Microsoft.AspNetCore.SignalR.Client.Tests/HubConnectionProtocolTests.cs
- +++ b/test/Microsoft.AspNetCore.SignalR.Client.Tests/HubConnectionProtocolTests.cs
- @@ -6,7 +6,7 @@ using System.Globalization;
- using System.IO;
- using System.Text;
- using System.Threading.Tasks;
- -using System.Threading.Tasks.Channels;
- +using System.Threading.Channels;
- using Microsoft.AspNetCore.SignalR.Internal.Protocol;
- using Microsoft.AspNetCore.SignalR.Tests.Common;
- using Microsoft.AspNetCore.Sockets;
- diff --git a/test/Microsoft.AspNetCore.SignalR.Client.Tests/HubConnectionTests.cs b/test/Microsoft.AspNetCore.SignalR.Client.Tests/HubConnectionTests.cs
- index 1114955fc07..13da4d90ec1 100644
- --- a/test/Microsoft.AspNetCore.SignalR.Client.Tests/HubConnectionTests.cs
- +++ b/test/Microsoft.AspNetCore.SignalR.Client.Tests/HubConnectionTests.cs
- @@ -215,7 +215,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
-
- public ProtocolType Type => ProtocolType.Binary;
-
- - public bool TryParseMessages(ReadOnlyBuffer<byte> input, IInvocationBinder binder, out IList<HubMessage> messages)
- + public bool TryParseMessages(ReadOnlyMemory<byte> input, IInvocationBinder binder, out IList<HubMessage> messages)
- {
- messages = new List<HubMessage>();
-
- diff --git a/test/Microsoft.AspNetCore.SignalR.Client.Tests/LongPollingTransportTests.cs b/test/Microsoft.AspNetCore.SignalR.Client.Tests/LongPollingTransportTests.cs
- index 1892508f13f..5f14541613f 100644
- --- a/test/Microsoft.AspNetCore.SignalR.Client.Tests/LongPollingTransportTests.cs
- +++ b/test/Microsoft.AspNetCore.SignalR.Client.Tests/LongPollingTransportTests.cs
- @@ -1,4 +1,4 @@
- -// Copyright (c) .NET Foundation. All rights reserved.
- +// Copyright (c) .NET Foundation. All rights reserved.
- // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
-
- using System;
- @@ -8,7 +8,7 @@ using System.Net.Http;
- using System.Text;
- using System.Threading;
- using System.Threading.Tasks;
- -using System.Threading.Tasks.Channels;
- +using System.Threading.Channels;
- using Microsoft.AspNetCore.SignalR.Tests.Common;
- using Microsoft.AspNetCore.Sockets;
- using Microsoft.AspNetCore.Sockets.Client;
- @@ -83,7 +83,7 @@ namespace Microsoft.AspNetCore.Client.Tests
- await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), channelConnection, TransferMode.Binary, connectionId: string.Empty);
-
- await longPollingTransport.Running.OrTimeout();
- - Assert.True(transportToConnection.In.Completion.IsCompleted);
- + Assert.True(transportToConnection.Reader.Completion.IsCompleted);
- }
- finally
- {
- @@ -135,9 +135,9 @@ namespace Microsoft.AspNetCore.Client.Tests
- var channelConnection = new ChannelConnection<SendMessage, byte[]>(connectionToTransport, transportToConnection);
- await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), channelConnection, TransferMode.Binary, connectionId: string.Empty);
-
- - var data = await transportToConnection.In.ReadAllAsync().OrTimeout();
- + var data = await transportToConnection.Reader.ReadAllAsync().OrTimeout();
- await longPollingTransport.Running.OrTimeout();
- - Assert.True(transportToConnection.In.Completion.IsCompleted);
- + Assert.True(transportToConnection.Reader.Completion.IsCompleted);
- Assert.Equal(2, data.Count);
- Assert.Equal(Encoding.UTF8.GetBytes("Hello"), data[0]);
- Assert.Equal(Encoding.UTF8.GetBytes("World"), data[1]);
- @@ -172,7 +172,7 @@ namespace Microsoft.AspNetCore.Client.Tests
- await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), channelConnection, TransferMode.Binary, connectionId: string.Empty);
-
- var exception =
- - await Assert.ThrowsAsync<HttpRequestException>(async () => await transportToConnection.In.Completion.OrTimeout());
- + await Assert.ThrowsAsync<HttpRequestException>(async () => await transportToConnection.Reader.Completion.OrTimeout());
- Assert.Contains(" 500 ", exception.Message);
- }
- finally
- @@ -207,16 +207,16 @@ namespace Microsoft.AspNetCore.Client.Tests
- var channelConnection = new ChannelConnection<SendMessage, byte[]>(connectionToTransport, transportToConnection);
- await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), channelConnection, TransferMode.Binary, connectionId: string.Empty);
-
- - await connectionToTransport.Out.WriteAsync(new SendMessage());
- + await connectionToTransport.Writer.WriteAsync(new SendMessage());
-
- await Assert.ThrowsAsync<HttpRequestException>(async () => await longPollingTransport.Running.OrTimeout());
-
- // The channel needs to be drained for the Completion task to be completed
- - while (transportToConnection.In.TryRead(out var message))
- + while (transportToConnection.Reader.TryRead(out var message))
- {
- }
-
- - var exception = await Assert.ThrowsAsync<HttpRequestException>(async () => await transportToConnection.In.Completion);
- + var exception = await Assert.ThrowsAsync<HttpRequestException>(async () => await transportToConnection.Reader.Completion);
- Assert.Contains(" 500 ", exception.Message);
- }
- finally
- @@ -248,12 +248,12 @@ namespace Microsoft.AspNetCore.Client.Tests
- var channelConnection = new ChannelConnection<SendMessage, byte[]>(connectionToTransport, transportToConnection);
- await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), channelConnection, TransferMode.Binary, connectionId: string.Empty);
-
- - connectionToTransport.Out.Complete();
- + connectionToTransport.Writer.Complete();
-
- await longPollingTransport.Running.OrTimeout();
-
- await longPollingTransport.Running.OrTimeout();
- - await connectionToTransport.In.Completion.OrTimeout();
- + await connectionToTransport.Reader.Completion.OrTimeout();
- }
- finally
- {
- @@ -304,9 +304,9 @@ namespace Microsoft.AspNetCore.Client.Tests
-
- // Pull Messages out of the channel
- var messages = new List<byte[]>();
- - while (await transportToConnection.In.WaitToReadAsync())
- + while (await transportToConnection.Reader.WaitToReadAsync())
- {
- - while (transportToConnection.In.TryRead(out var message))
- + while (transportToConnection.Reader.TryRead(out var message))
- {
- messages.Add(message);
- }
- @@ -358,16 +358,16 @@ namespace Microsoft.AspNetCore.Client.Tests
- var tcs2 = new TaskCompletionSource<object>();
-
- // Pre-queue some messages
- - await connectionToTransport.Out.WriteAsync(new SendMessage(Encoding.UTF8.GetBytes("Hello"), tcs1)).OrTimeout();
- - await connectionToTransport.Out.WriteAsync(new SendMessage(Encoding.UTF8.GetBytes("World"), tcs2)).OrTimeout();
- + await connectionToTransport.Writer.WriteAsync(new SendMessage(Encoding.UTF8.GetBytes("Hello"), tcs1)).OrTimeout();
- + await connectionToTransport.Writer.WriteAsync(new SendMessage(Encoding.UTF8.GetBytes("World"), tcs2)).OrTimeout();
-
- // Start the transport
- await longPollingTransport.StartAsync(new Uri("http://fakeuri.org"), channelConnection, TransferMode.Binary, connectionId: string.Empty);
-
- - connectionToTransport.Out.Complete();
- + connectionToTransport.Writer.Complete();
-
- await longPollingTransport.Running.OrTimeout();
- - await connectionToTransport.In.Completion.OrTimeout();
- + await connectionToTransport.Reader.Completion.OrTimeout();
-
- Assert.Single(sentRequests);
- Assert.Equal(new byte[] { (byte)'H', (byte)'e', (byte)'l', (byte)'l', (byte)'o', (byte)'W', (byte)'o', (byte)'r', (byte)'l', (byte)'d'
- diff --git a/test/Microsoft.AspNetCore.SignalR.Client.Tests/ServerSentEventsParserTests.cs b/test/Microsoft.AspNetCore.SignalR.Client.Tests/ServerSentEventsParserTests.cs
- index ffe4031546d..4e9d1bfae40 100644
- --- a/test/Microsoft.AspNetCore.SignalR.Client.Tests/ServerSentEventsParserTests.cs
- +++ b/test/Microsoft.AspNetCore.SignalR.Client.Tests/ServerSentEventsParserTests.cs
- @@ -2,6 +2,7 @@
- // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
-
- using System;
- +using System.Buffers;
- using System.Collections.Generic;
- using System.IO.Pipelines;
- using System.Text;
- @@ -106,10 +107,10 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
- [InlineData(new[] { "data: Hello, World\r\n", ":comment\r\n", "\r\n" }, "Hello, World")]
- public async Task ParseMessageAcrossMultipleReadsSuccess(string[] messageParts, string expectedMessage)
- {
- - using (var pipeFactory = new PipeFactory())
- + var parser = new ServerSentEventsMessageParser();
- + using (var pool = new MemoryPool())
- {
- - var parser = new ServerSentEventsMessageParser();
- - var pipe = pipeFactory.Create();
- + var pipe = new Pipe(new PipeOptions(pool));
-
- byte[] message = null;
- ReadCursor consumed = default, examined = default;
- @@ -152,9 +153,9 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
- [InlineData("data: B\r\ndata: SGVs", "bG8sIFdvcmxk\r\n\n\n", "There was an error in the frame format")]
- public async Task ParseMessageAcrossMultipleReadsFailure(string encodedMessagePart1, string encodedMessagePart2, string expectedMessage)
- {
- - using (var pipeFactory = new PipeFactory())
- + using (var pool = new MemoryPool())
- {
- - var pipe = pipeFactory.Create();
- + var pipe = new Pipe(new PipeOptions(pool));
-
- // Read the first part of the message
- await pipe.Writer.WriteAsync(Encoding.UTF8.GetBytes(encodedMessagePart1));
- @@ -173,7 +174,6 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
-
- var ex = Assert.Throws<FormatException>(() => parser.ParseMessage(result.Buffer, out consumed, out examined, out buffer));
- Assert.Equal(expectedMessage, ex.Message);
- -
- }
- }
-
- @@ -181,9 +181,9 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
- [InlineData("data: foo\r\n\r\n", "data: bar\r\n\r\n")]
- public async Task ParseMultipleMessagesText(string message1, string message2)
- {
- - using (var pipeFactory = new PipeFactory())
- + using (var pool = new MemoryPool())
- {
- - var pipe = pipeFactory.Create();
- + var pipe = new Pipe(new PipeOptions(pool));
-
- // Read the first part of the message
- await pipe.Writer.WriteAsync(Encoding.UTF8.GetBytes(message1 + message2));
- diff --git a/test/Microsoft.AspNetCore.SignalR.Client.Tests/ServerSentEventsTransportTests.cs b/test/Microsoft.AspNetCore.SignalR.Client.Tests/ServerSentEventsTransportTests.cs
- index ed5a2d00547..622996bef35 100644
- --- a/test/Microsoft.AspNetCore.SignalR.Client.Tests/ServerSentEventsTransportTests.cs
- +++ b/test/Microsoft.AspNetCore.SignalR.Client.Tests/ServerSentEventsTransportTests.cs
- @@ -1,4 +1,4 @@
- -// Copyright (c) .NET Foundation. All rights reserved.
- +// Copyright (c) .NET Foundation. All rights reserved.
- // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
-
- using System;
- @@ -8,8 +8,9 @@ using System.Net.Http.Headers;
- using System.Text;
- using System.Threading;
- using System.Threading.Tasks;
- -using System.Threading.Tasks.Channels;
- +using System.Threading.Channels;
- using Microsoft.AspNetCore.Client.Tests;
- +using Microsoft.AspNetCore.SignalR.Internal;
- using Microsoft.AspNetCore.SignalR.Tests.Common;
- using Microsoft.AspNetCore.Sockets;
- using Microsoft.AspNetCore.Sockets.Client;
- @@ -42,6 +43,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
- mockStream
- .Setup(s => s.CopyToAsync(It.IsAny<Stream>(), It.IsAny<int>(), It.IsAny<CancellationToken>()))
- .Returns(copyToAsyncTcs.Task);
- + mockStream.Setup(s => s.CanRead).Returns(true);
- return new HttpResponseMessage { Content = new StreamContent(mockStream.Object) };
- });
-
- @@ -83,12 +85,14 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
- .Setup(s => s.CopyToAsync(It.IsAny<Stream>(), It.IsAny<int>(), It.IsAny<CancellationToken>()))
- .Returns<Stream, int, CancellationToken>(async (stream, bufferSize, t) =>
- {
- + await Task.Yield();
- var buffer = Encoding.ASCII.GetBytes("data: 3:abc\r\n\r\n");
- while (!eventStreamCts.IsCancellationRequested)
- {
- await stream.WriteAsync(buffer, 0, buffer.Length);
- }
- });
- + mockStream.Setup(s => s.CanRead).Returns(true);
-
- return new HttpResponseMessage { Content = new StreamContent(mockStream.Object) };
- });
- @@ -109,7 +113,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
-
- transportActiveTask = sseTransport.Running;
- Assert.False(transportActiveTask.IsCompleted);
- - var message = await transportToConnection.In.ReadAsync().AsTask().OrTimeout();
- + var message = await transportToConnection.Reader.ReadAsync().AsTask().OrTimeout();
- Assert.Equal("3:abc", Encoding.ASCII.GetString(message));
- }
- finally
- @@ -140,6 +144,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
- var buffer = Encoding.ASCII.GetBytes("data: 3:a");
- await stream.WriteAsync(buffer, 0, buffer.Length);
- });
- + mockStream.Setup(s => s.CanRead).Returns(true);
-
- return new HttpResponseMessage { Content = new StreamContent(mockStream.Object) };
- });
- @@ -182,6 +187,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
- mockStream
- .Setup(s => s.CopyToAsync(It.IsAny<Stream>(), It.IsAny<int>(), It.IsAny<CancellationToken>()))
- .Returns(copyToAsyncTcs.Task);
- + mockStream.Setup(s => s.CanRead).Returns(true);
- return new HttpResponseMessage { Content = new StreamContent(mockStream.Object) };
- }
-
- @@ -201,7 +207,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
- await eventStreamTcs.Task;
-
- var sendTcs = new TaskCompletionSource<object>();
- - Assert.True(connectionToTransport.Out.TryWrite(new SendMessage(new byte[] { 0x42 }, sendTcs)));
- + Assert.True(connectionToTransport.Writer.TryWrite(new SendMessage(new byte[] { 0x42 }, sendTcs)));
-
- var exception = await Assert.ThrowsAsync<HttpRequestException>(() => sendTcs.Task.OrTimeout());
- Assert.Contains("500", exception.Message);
- @@ -231,6 +237,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
- mockStream
- .Setup(s => s.CopyToAsync(It.IsAny<Stream>(), It.IsAny<int>(), It.IsAny<CancellationToken>()))
- .Returns(copyToAsyncTcs.Task);
- + mockStream.Setup(s => s.CanRead).Returns(true);
- return new HttpResponseMessage { Content = new StreamContent(mockStream.Object) };
- });
-
- @@ -246,7 +253,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
- new Uri("http://fakeuri.org"), channelConnection, TransferMode.Text, connectionId: string.Empty).OrTimeout();
- await eventStreamTcs.Task.OrTimeout();
-
- - connectionToTransport.Out.TryComplete(null);
- + connectionToTransport.Writer.TryComplete(null);
-
- await sseTransport.Running.OrTimeout();
- }
- @@ -274,7 +281,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
- await sseTransport.StartAsync(
- new Uri("http://fakeuri.org"), channelConnection, TransferMode.Text, connectionId: string.Empty).OrTimeout();
-
- - var message = await transportToConnection.In.ReadAsync().AsTask().OrTimeout();
- + var message = await transportToConnection.Reader.ReadAsync().AsTask().OrTimeout();
- Assert.Equal("3:abc", Encoding.ASCII.GetString(message));
-
- await sseTransport.Running.OrTimeout();
- diff --git a/test/Microsoft.AspNetCore.SignalR.Client.Tests/TestConnection.cs b/test/Microsoft.AspNetCore.SignalR.Client.Tests/TestConnection.cs
- index d234931fd80..6e8c8be30d3 100644
- --- a/test/Microsoft.AspNetCore.SignalR.Client.Tests/TestConnection.cs
- +++ b/test/Microsoft.AspNetCore.SignalR.Client.Tests/TestConnection.cs
- @@ -7,8 +7,9 @@ using System.IO;
- using System.Text;
- using System.Threading;
- using System.Threading.Tasks;
- -using System.Threading.Tasks.Channels;
- +using System.Threading.Channels;
- using Microsoft.AspNetCore.Http.Features;
- +using Microsoft.AspNetCore.SignalR.Internal;
- using Microsoft.AspNetCore.SignalR.Internal.Formatters;
- using Microsoft.AspNetCore.Sockets;
- using Microsoft.AspNetCore.Sockets.Client;
- @@ -34,8 +35,8 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
- public Task Closed => _closeTcs.Task;
- public Task Started => _started.Task;
- public Task Disposed => _disposed.Task;
- - public ReadableChannel<byte[]> SentMessages => _sentMessages.In;
- - public WritableChannel<byte[]> ReceivedMessages => _receivedMessages.Out;
- + public ChannelReader<byte[]> SentMessages => _sentMessages.Reader;
- + public ChannelWriter<byte[]> ReceivedMessages => _receivedMessages.Writer;
-
- private readonly List<ReceiveCallback> _callbacks = new List<ReceiveCallback>();
-
- @@ -61,9 +62,9 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
- throw new InvalidOperationException("Connection must be started before SendAsync can be called");
- }
-
- - while (await _sentMessages.Out.WaitToWriteAsync(cancellationToken))
- + while (await _sentMessages.Writer.WaitToWriteAsync(cancellationToken))
- {
- - if (_sentMessages.Out.TryWrite(data))
- + if (_sentMessages.Writer.TryWrite(data))
- {
- return;
- }
- @@ -100,7 +101,7 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
- var json = JsonConvert.SerializeObject(jsonObject, Formatting.None);
- var bytes = FormatMessageToArray(Encoding.UTF8.GetBytes(json));
-
- - return _receivedMessages.Out.WriteAsync(bytes);
- + return _receivedMessages.Writer.WriteAsync(bytes);
- }
-
- private byte[] FormatMessageToArray(byte[] message)
- @@ -116,9 +117,9 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
- {
- while (!token.IsCancellationRequested)
- {
- - while (await _receivedMessages.In.WaitToReadAsync(token))
- + while (await _receivedMessages.Reader.WaitToReadAsync(token))
- {
- - while (_receivedMessages.In.TryRead(out var message))
- + while (_receivedMessages.Reader.TryRead(out var message))
- {
- ReceiveCallback[] callbackCopies;
- lock (_callbacks)
- diff --git a/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Encoders/LengthPrefixedTextMessageParserTests.cs b/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Encoders/LengthPrefixedTextMessageParserTests.cs
- index 78e296ce29a..923e80dd66c 100644
- --- a/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Encoders/LengthPrefixedTextMessageParserTests.cs
- +++ b/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Encoders/LengthPrefixedTextMessageParserTests.cs
- @@ -18,7 +18,7 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Encoders
- [InlineData("12:Hello, World;", "Hello, World")]
- public void ReadTextMessage(string encoded, string payload)
- {
- - ReadOnlyBuffer<byte> buffer = Encoding.UTF8.GetBytes(encoded);
- + ReadOnlyMemory<byte> buffer = Encoding.UTF8.GetBytes(encoded);
-
- Assert.True(LengthPrefixedTextMessageParser.TryParseMessage(ref buffer, out var message));
- Assert.Equal(0, buffer.Length);
- @@ -29,7 +29,7 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Encoders
- public void ReadMultipleMessages()
- {
- const string encoded = "0:;14:Hello,\r\nWorld!;";
- - ReadOnlyBuffer<byte> buffer = Encoding.UTF8.GetBytes(encoded);
- + ReadOnlyMemory<byte> buffer = Encoding.UTF8.GetBytes(encoded);
-
- var messages = new List<byte[]>();
- while (LengthPrefixedTextMessageParser.TryParseMessage(ref buffer, out var message))
- @@ -54,7 +54,7 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Encoders
- [InlineData("5:ABCDE")]
- public void ReadIncompleteMessages(string encoded)
- {
- - ReadOnlyBuffer<byte> buffer = Encoding.UTF8.GetBytes(encoded);
- + ReadOnlyMemory<byte> buffer = Encoding.UTF8.GetBytes(encoded);
- Assert.False(LengthPrefixedTextMessageParser.TryParseMessage(ref buffer, out _));
- }
-
- @@ -66,7 +66,7 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Encoders
- [InlineData("5:ABCDEF", "Missing delimiter ';' after payload")]
- public void ReadInvalidMessages(string encoded, string expectedMessage)
- {
- - ReadOnlyBuffer<byte> buffer = Encoding.UTF8.GetBytes(encoded);
- + ReadOnlyMemory<byte> buffer = Encoding.UTF8.GetBytes(encoded);
- var ex = Assert.Throws<FormatException>(() =>
- {
- LengthPrefixedTextMessageParser.TryParseMessage(ref buffer, out _);
- @@ -79,7 +79,7 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Encoders
- {
- // Invalid because first character is a UTF-8 "continuation" character
- // We need to include the ':' so that
- - ReadOnlyBuffer<byte> buffer = new byte[] { 0x48, 0x65, 0x80, 0x6C, 0x6F, (byte)':' };
- + ReadOnlyMemory<byte> buffer = new byte[] { 0x48, 0x65, 0x80, 0x6C, 0x6F, (byte)':' };
- var ex = Assert.Throws<FormatException>(() =>
- {
- LengthPrefixedTextMessageParser.TryParseMessage(ref buffer, out _);
- diff --git a/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Formatters/BinaryMessageFormatterTests.cs b/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Formatters/BinaryMessageFormatterTests.cs
- index 7a35a6e16e7..b727e8124d0 100644
- --- a/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Formatters/BinaryMessageFormatterTests.cs
- +++ b/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Formatters/BinaryMessageFormatterTests.cs
- @@ -109,7 +109,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests.Internal.Formatters
- using (var ms = new MemoryStream())
- {
- BinaryMessageFormatter.WriteMessage(payload, ms);
- - var buffer = new ReadOnlyBuffer<byte>(ms.ToArray());
- + var buffer = new ReadOnlyMemory<byte>(ms.ToArray());
- Assert.True(BinaryMessageParser.TryParseMessage(ref buffer, out var roundtripped));
- Assert.Equal(payload, roundtripped.ToArray());
- }
- diff --git a/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Formatters/BinaryMessageParserTests.cs b/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Formatters/BinaryMessageParserTests.cs
- index 58dfae9c993..7a42eca8439 100644
- --- a/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Formatters/BinaryMessageParserTests.cs
- +++ b/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Formatters/BinaryMessageParserTests.cs
- @@ -17,7 +17,7 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Formatters
- [InlineData(new byte[] { 0x0B, 0x41, 0x0A, 0x52, 0x0D, 0x43, 0x0D, 0x0A, 0x3B, 0x44, 0x45, 0x46 }, "A\nR\rC\r\n;DEF")]
- public void ReadMessage(byte[] encoded, string payload)
- {
- - ReadOnlyBuffer<byte> span = encoded;
- + ReadOnlyMemory<byte> span = encoded;
- Assert.True(BinaryMessageParser.TryParseMessage(ref span, out var message));
- Assert.Equal(0, span.Length);
-
- @@ -52,7 +52,7 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Formatters
- })]
- public void ReadBinaryMessage(byte[] encoded, byte[] payload)
- {
- - ReadOnlyBuffer<byte> span = encoded;
- + ReadOnlyMemory<byte> span = encoded;
- Assert.True(BinaryMessageParser.TryParseMessage(ref span, out var message));
- Assert.Equal(0, span.Length);
- Assert.Equal(payload, message.ToArray());
- @@ -64,7 +64,7 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Formatters
- [InlineData(new byte[] { 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF })]
- public void BinaryMessageParserThrowsForMessagesOver2GB(byte[] payload)
- {
- - var buffer = new ReadOnlyBuffer<byte>(payload);
- + var buffer = new ReadOnlyMemory<byte>(payload);
- var ex = Assert.Throws<FormatException>(() => BinaryMessageParser.TryParseMessage(ref buffer, out var message));
- Assert.Equal("Messages over 2GB in size are not supported.", ex.Message);
- }
- @@ -76,7 +76,7 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Formatters
- [InlineData(new byte[] { 0x80 })] // size is cut
- public void BinaryMessageParserReturnsFalseForPartialPayloads(byte[] payload)
- {
- - var buffer = new ReadOnlyBuffer<byte>(payload);
- + var buffer = new ReadOnlyMemory<byte>(payload);
- Assert.False(BinaryMessageParser.TryParseMessage(ref buffer, out var message));
- }
-
- @@ -90,7 +90,7 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Formatters
- /* length: */ 0x0E,
- /* body: */ 0x48, 0x65, 0x6C, 0x6C, 0x6F, 0x2C, 0x0D, 0x0A, 0x57, 0x6F, 0x72, 0x6C, 0x64, 0x21,
- };
- - ReadOnlyBuffer<byte> buffer = encoded;
- + ReadOnlyMemory<byte> buffer = encoded;
-
- var messages = new List<byte[]>();
- while (BinaryMessageParser.TryParseMessage(ref buffer, out var message))
- @@ -110,7 +110,7 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Formatters
- [InlineData(new byte[] { 0x09, 0x00, 0x00 })] // Not enough data for payload
- public void ReadIncompleteMessages(byte[] encoded)
- {
- - ReadOnlyBuffer<byte> buffer = encoded;
- + ReadOnlyMemory<byte> buffer = encoded;
- Assert.False(BinaryMessageParser.TryParseMessage(ref buffer, out var message));
- Assert.Equal(encoded.Length, buffer.Length);
- }
- diff --git a/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Formatters/TextMessageParserTests.cs b/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Formatters/TextMessageParserTests.cs
- index e6d953cf87c..9dbc7b2866f 100644
- --- a/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Formatters/TextMessageParserTests.cs
- +++ b/test/Microsoft.AspNetCore.SignalR.Common.Tests/Internal/Formatters/TextMessageParserTests.cs
- @@ -13,7 +13,7 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Formatters
- [Fact]
- public void ReadMessage()
- {
- - var message = new ReadOnlyBuffer<byte>(Encoding.UTF8.GetBytes("ABC\u001e"));
- + var message = new ReadOnlyMemory<byte>(Encoding.UTF8.GetBytes("ABC\u001e"));
-
- Assert.True(TextMessageParser.TryParseMessage(ref message, out var payload));
- Assert.Equal("ABC", Encoding.UTF8.GetString(payload.ToArray()));
- @@ -23,14 +23,14 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Formatters
- [Fact]
- public void TryReadingIncompleteMessage()
- {
- - var message = new ReadOnlyBuffer<byte>(Encoding.UTF8.GetBytes("ABC"));
- + var message = new ReadOnlyMemory<byte>(Encoding.UTF8.GetBytes("ABC"));
- Assert.False(TextMessageParser.TryParseMessage(ref message, out var payload));
- }
-
- [Fact]
- public void TryReadingMultipleMessages()
- {
- - var message = new ReadOnlyBuffer<byte>(Encoding.UTF8.GetBytes("ABC\u001eXYZ\u001e"));
- + var message = new ReadOnlyMemory<byte>(Encoding.UTF8.GetBytes("ABC\u001eXYZ\u001e"));
- Assert.True(TextMessageParser.TryParseMessage(ref message, out var payload));
- Assert.Equal("ABC", Encoding.UTF8.GetString(payload.ToArray()));
- Assert.True(TextMessageParser.TryParseMessage(ref message, out payload));
- @@ -40,7 +40,7 @@ namespace Microsoft.AspNetCore.SignalR.Common.Tests.Internal.Formatters
- [Fact]
- public void IncompleteTrailingMessage()
- {
- - var message = new ReadOnlyBuffer<byte>(Encoding.UTF8.GetBytes("ABC\u001eXYZ\u001e123"));
- + var message = new ReadOnlyMemory<byte>(Encoding.UTF8.GetBytes("ABC\u001eXYZ\u001e123"));
- Assert.True(TextMessageParser.TryParseMessage(ref message, out var payload));
- Assert.Equal("ABC", Encoding.UTF8.GetString(payload.ToArray()));
- Assert.True(TextMessageParser.TryParseMessage(ref message, out payload));
- diff --git a/test/Microsoft.AspNetCore.SignalR.Redis.Tests/RedisHubLifetimeManagerTests.cs b/test/Microsoft.AspNetCore.SignalR.Redis.Tests/RedisHubLifetimeManagerTests.cs
- index e7613093820..7dcc10a447e 100644
- --- a/test/Microsoft.AspNetCore.SignalR.Redis.Tests/RedisHubLifetimeManagerTests.cs
- +++ b/test/Microsoft.AspNetCore.SignalR.Redis.Tests/RedisHubLifetimeManagerTests.cs
- @@ -4,7 +4,7 @@
- using System;
- using System.Threading;
- using System.Threading.Tasks;
- -using System.Threading.Tasks.Channels;
- +using System.Threading.Channels;
- using Microsoft.AspNetCore.SignalR.Internal.Protocol;
- using Microsoft.AspNetCore.SignalR.Tests;
- using Microsoft.AspNetCore.SignalR.Tests.Common;
- @@ -70,7 +70,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis.Tests
-
- AssertMessage(output1);
-
- - Assert.False(output2.In.TryRead(out var item));
- + Assert.False(output2.Reader.TryRead(out var item));
- }
- }
-
- @@ -100,7 +100,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis.Tests
-
- AssertMessage(output1);
-
- - Assert.False(output2.In.TryRead(out var item));
- + Assert.False(output2.Reader.TryRead(out var item));
- }
- }
-
- @@ -201,7 +201,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis.Tests
-
- AssertMessage(output1);
-
- - Assert.False(output2.In.TryRead(out var item));
- + Assert.False(output2.Reader.TryRead(out var item));
- }
- }
-
- @@ -286,7 +286,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis.Tests
-
- await manager.InvokeGroupAsync("name", "Hello", new object[] { "World" }).OrTimeout();
-
- - Assert.False(output.In.TryRead(out var item));
- + Assert.False(output.Reader.TryRead(out var item));
- }
- }
-
- @@ -387,7 +387,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis.Tests
- await manager.InvokeGroupAsync("name", "Hello", new object[] { "World" }).OrTimeout();
-
- AssertMessage(output);
- - Assert.False(output.In.TryRead(out var item));
- + Assert.False(output.Reader.TryRead(out var item));
- }
- }
-
- @@ -417,7 +417,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis.Tests
- await manager2.InvokeGroupAsync("name", "Hello", new object[] { "World" }).OrTimeout();
-
- AssertMessage(output);
- - Assert.False(output.In.TryRead(out var item));
- + Assert.False(output.Reader.TryRead(out var item));
- }
- }
-
- @@ -451,7 +451,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis.Tests
-
- await manager2.InvokeGroupAsync("name", "Hello", new object[] { "World" }).OrTimeout();
-
- - Assert.False(output.In.TryRead(out var item));
- + Assert.False(output.Reader.TryRead(out var item));
- }
- }
-
- @@ -480,7 +480,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis.Tests
- await manager1.InvokeConnectionAsync(connection.ConnectionId, "Hello", new object[] { "World" }).OrTimeout();
-
- AssertMessage(output);
- - Assert.False(output.In.TryRead(out var item));
- + Assert.False(output.Reader.TryRead(out var item));
- }
- }
-
- @@ -499,10 +499,10 @@ namespace Microsoft.AspNetCore.SignalR.Redis.Tests
- using (var client = new TestClient())
- {
- // Force an exception when writing to connection
- - var output = new Mock<Channel<HubMessage>>();
- - output.Setup(o => o.Out.WaitToWriteAsync(It.IsAny<CancellationToken>())).Throws(new Exception());
- + var writer = new Mock<ChannelWriter<HubMessage>>();
- + writer.Setup(o => o.WaitToWriteAsync(It.IsAny<CancellationToken>())).Throws(new Exception());
-
- - var connection = new HubConnectionContext(output.Object, client.Connection);
- + var connection = new HubConnectionContext(new MockChannel(writer.Object), client.Connection);
-
- await manager2.OnConnectedAsync(connection).OrTimeout();
-
- @@ -523,10 +523,10 @@ namespace Microsoft.AspNetCore.SignalR.Redis.Tests
- using (var client = new TestClient())
- {
- // Force an exception when writing to connection
- - var output = new Mock<Channel<HubMessage>>();
- - output.Setup(o => o.Out.WaitToWriteAsync(It.IsAny<CancellationToken>())).Throws(new Exception("Message"));
- + var writer = new Mock<ChannelWriter<HubMessage>>();
- + writer.Setup(o => o.WaitToWriteAsync(It.IsAny<CancellationToken>())).Throws(new Exception("Message"));
-
- - var connection = new HubConnectionContext(output.Object, client.Connection);
- + var connection = new HubConnectionContext(new MockChannel(writer.Object), client.Connection);
-
- await manager.OnConnectedAsync(connection).OrTimeout();
-
- @@ -549,10 +549,10 @@ namespace Microsoft.AspNetCore.SignalR.Redis.Tests
- var output2 = Channel.CreateUnbounded<HubMessage>();
-
- // Force an exception when writing to connection
- - var output = new Mock<Channel<HubMessage>>();
- - output.Setup(o => o.Out.WaitToWriteAsync(It.IsAny<CancellationToken>())).Throws(new Exception());
- + var writer = new Mock<ChannelWriter<HubMessage>>();
- + writer.Setup(o => o.WaitToWriteAsync(It.IsAny<CancellationToken>())).Throws(new Exception());
-
- - var connection1 = new HubConnectionContext(output.Object, client1.Connection);
- + var connection1 = new HubConnectionContext(new MockChannel(writer.Object), client1.Connection);
- var connection2 = new HubConnectionContext(output2, client2.Connection);
-
- await manager.OnConnectedAsync(connection1).OrTimeout();
- @@ -573,7 +573,7 @@ namespace Microsoft.AspNetCore.SignalR.Redis.Tests
-
- private void AssertMessage(Channel<HubMessage> channel)
- {
- - Assert.True(channel.In.TryRead(out var item));
- + Assert.True(channel.Reader.TryRead(out var item));
- var message = Assert.IsType<InvocationMessage>(item);
- Assert.Equal("Hello", message.Target);
- Assert.Single(message.Arguments);
- @@ -583,5 +583,13 @@ namespace Microsoft.AspNetCore.SignalR.Redis.Tests
- private class MyHub : Hub
- {
- }
- +
- + private class MockChannel : Channel<HubMessage>
- + {
- + public MockChannel(ChannelWriter<HubMessage> writer = null)
- + {
- + Writer = writer;
- + }
- + }
- }
- }
- diff --git a/test/Microsoft.AspNetCore.SignalR.Tests/DefaultHubLifetimeManagerTests.cs b/test/Microsoft.AspNetCore.SignalR.Tests/DefaultHubLifetimeManagerTests.cs
- index 81dd798b2a6..7065eadd9d6 100644
- --- a/test/Microsoft.AspNetCore.SignalR.Tests/DefaultHubLifetimeManagerTests.cs
- +++ b/test/Microsoft.AspNetCore.SignalR.Tests/DefaultHubLifetimeManagerTests.cs
- @@ -1,7 +1,7 @@
- -using System;
- +using System;
- using System.Threading;
- using System.Threading.Tasks;
- -using System.Threading.Tasks.Channels;
- +using System.Threading.Channels;
- using Microsoft.AspNetCore.SignalR.Internal.Protocol;
- using Microsoft.AspNetCore.SignalR.Tests.Common;
- using Moq;
- @@ -29,13 +29,13 @@ namespace Microsoft.AspNetCore.SignalR.Tests
-
- await manager.InvokeAllAsync("Hello", new object[] { "World" }).OrTimeout();
-
- - Assert.True(output1.In.TryRead(out var item));
- + Assert.True(output1.Reader.TryRead(out var item));
- var message = Assert.IsType<InvocationMessage>(item);
- Assert.Equal("Hello", message.Target);
- Assert.Single(message.Arguments);
- Assert.Equal("World", (string)message.Arguments[0]);
-
- - Assert.True(output2.In.TryRead(out item));
- + Assert.True(output2.Reader.TryRead(out item));
- message = Assert.IsType<InvocationMessage>(item);
- Assert.Equal("Hello", message.Target);
- Assert.Single(message.Arguments);
- @@ -63,13 +63,13 @@ namespace Microsoft.AspNetCore.SignalR.Tests
-
- await manager.InvokeAllAsync("Hello", new object[] { "World" }).OrTimeout();
-
- - Assert.True(output1.In.TryRead(out var item));
- + Assert.True(output1.Reader.TryRead(out var item));
- var message = Assert.IsType<InvocationMessage>(item);
- Assert.Equal("Hello", message.Target);
- Assert.Single(message.Arguments);
- Assert.Equal("World", (string)message.Arguments[0]);
-
- - Assert.False(output2.In.TryRead(out item));
- + Assert.False(output2.Reader.TryRead(out item));
- }
- }
-
- @@ -93,13 +93,13 @@ namespace Microsoft.AspNetCore.SignalR.Tests
-
- await manager.InvokeGroupAsync("gunit", "Hello", new object[] { "World" }).OrTimeout();
-
- - Assert.True(output1.In.TryRead(out var item));
- + Assert.True(output1.Reader.TryRead(out var item));
- var message = Assert.IsType<InvocationMessage>(item);
- Assert.Equal("Hello", message.Target);
- Assert.Single(message.Arguments);
- Assert.Equal("World", (string)message.Arguments[0]);
-
- - Assert.False(output2.In.TryRead(out item));
- + Assert.False(output2.Reader.TryRead(out item));
- }
- }
-
- @@ -116,7 +116,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
-
- await manager.InvokeConnectionAsync(connection.ConnectionId, "Hello", new object[] { "World" }).OrTimeout();
-
- - Assert.True(output.In.TryRead(out var item));
- + Assert.True(output.Reader.TryRead(out var item));
- var message = Assert.IsType<InvocationMessage>(item);
- Assert.Equal("Hello", message.Target);
- Assert.Single(message.Arguments);
- @@ -130,11 +130,11 @@ namespace Microsoft.AspNetCore.SignalR.Tests
- using (var client = new TestClient())
- {
- // Force an exception when writing to connection
- - var output = new Mock<Channel<HubMessage>>();
- - output.Setup(o => o.Out.WaitToWriteAsync(It.IsAny<CancellationToken>())).Throws(new Exception("Message"));
- + var writer = new Mock<ChannelWriter<HubMessage>>();
- + writer.Setup(o => o.WaitToWriteAsync(It.IsAny<CancellationToken>())).Throws(new Exception("Message"));
-
- var manager = new DefaultHubLifetimeManager<MyHub>();
- - var connection = new HubConnectionContext(output.Object, client.Connection);
- + var connection = new HubConnectionContext(new MockChannel(writer.Object), client.Connection);
-
- await manager.OnConnectedAsync(connection).OrTimeout();
-
- @@ -168,5 +168,14 @@ namespace Microsoft.AspNetCore.SignalR.Tests
- {
-
- }
- +
- + private class MockChannel: Channel<HubMessage>
- + {
- +
- + public MockChannel(ChannelWriter<HubMessage> writer = null)
- + {
- + Writer = writer;
- + }
- + }
- }
- }
- diff --git a/test/Microsoft.AspNetCore.SignalR.Tests/EchoEndPoint.cs b/test/Microsoft.AspNetCore.SignalR.Tests/EchoEndPoint.cs
- index 08c0a5d5b3f..034230b956d 100644
- --- a/test/Microsoft.AspNetCore.SignalR.Tests/EchoEndPoint.cs
- +++ b/test/Microsoft.AspNetCore.SignalR.Tests/EchoEndPoint.cs
- @@ -2,6 +2,7 @@
- // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
-
- using System.Threading.Tasks;
- +using Microsoft.AspNetCore.SignalR.Internal;
- using Microsoft.AspNetCore.Sockets;
-
- namespace Microsoft.AspNetCore.SignalR.Tests
- @@ -10,7 +11,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
- {
- public async override Task OnConnectedAsync(ConnectionContext connection)
- {
- - await connection.Transport.Out.WriteAsync(await connection.Transport.In.ReadAsync());
- + await connection.Transport.Writer.WriteAsync(await connection.Transport.Reader.ReadAsync());
- }
- }
- }
- diff --git a/test/Microsoft.AspNetCore.SignalR.Tests/HubEndpointTests.cs b/test/Microsoft.AspNetCore.SignalR.Tests/HubEndpointTests.cs
- index bcd31c4fbff..a9c806748fa 100644
- --- a/test/Microsoft.AspNetCore.SignalR.Tests/HubEndpointTests.cs
- +++ b/test/Microsoft.AspNetCore.SignalR.Tests/HubEndpointTests.cs
- @@ -8,7 +8,7 @@ using System.Runtime.Serialization;
- using System.Security.Claims;
- using System.Threading;
- using System.Threading.Tasks;
- -using System.Threading.Tasks.Channels;
- +using System.Threading.Channels;
- using Microsoft.AspNetCore.Authorization;
- using Microsoft.AspNetCore.Http;
- using Microsoft.AspNetCore.SignalR.Internal;
- @@ -259,7 +259,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
- using (var client = new TestClient())
- {
- // TestClient automatically writes negotiate, for this test we want to assume negotiate never gets sent
- - client.Connection.Transport.In.TryRead(out var item);
- + client.Connection.Transport.Reader.TryRead(out var item);
-
- var endPointTask = endPoint.OnConnectedAsync(client.Connection);
-
- @@ -285,7 +285,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
- using (var client = new TestClient())
- {
- // TestClient automatically writes negotiate, for this test we want to assume negotiate never gets sent
- - client.Connection.Transport.In.TryRead(out var item);
- + client.Connection.Transport.Reader.TryRead(out var item);
-
- await endPoint.OnConnectedAsync(client.Connection).OrTimeout();
- }
- @@ -521,7 +521,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
- await client.SendInvocationAsync(methodName, nonBlocking: true).OrTimeout();
-
- // Nothing should have been written
- - Assert.False(client.Application.In.TryRead(out var buffer));
- + Assert.False(client.Application.Reader.TryRead(out var buffer));
-
- // kill the connection
- client.Dispose();
- @@ -1595,7 +1595,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
- return new CountingObservable(count);
- }
-
- - public ReadableChannel<string> CounterChannel(int count)
- + public ChannelReader<string> CounterChannel(int count)
- {
- var channel = Channel.CreateUnbounded<string>();
-
- @@ -1603,17 +1603,17 @@ namespace Microsoft.AspNetCore.SignalR.Tests
- {
- for (int i = 0; i < count; i++)
- {
- - await channel.Out.WriteAsync(i.ToString());
- + await channel.Writer.WriteAsync(i.ToString());
- }
- - channel.Out.Complete();
- + channel.Writer.Complete();
- });
-
- - return channel.In;
- + return channel.Reader;
- }
-
- - public ReadableChannel<string> BlockingStream()
- + public ChannelReader<string> BlockingStream()
- {
- - return Channel.CreateUnbounded<string>().In;
- + return Channel.CreateUnbounded<string>().Reader;
- }
-
- private class CountingObservable : IObservable<string>
- diff --git a/test/Microsoft.AspNetCore.SignalR.Tests/Internal/DefaultHubProtocolResolverTests.cs b/test/Microsoft.AspNetCore.SignalR.Tests/Internal/DefaultHubProtocolResolverTests.cs
- index b0596994cd8..dba1123dd3b 100644
- --- a/test/Microsoft.AspNetCore.SignalR.Tests/Internal/DefaultHubProtocolResolverTests.cs
- +++ b/test/Microsoft.AspNetCore.SignalR.Tests/Internal/DefaultHubProtocolResolverTests.cs
- @@ -1,9 +1,9 @@
- -// Copyright (c) .NET Foundation. All rights reserved.
- +// Copyright (c) .NET Foundation. All rights reserved.
- // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
-
- using System;
- using System.Collections.Generic;
- -using System.Threading.Tasks.Channels;
- +using System.Threading.Channels;
- using Microsoft.AspNetCore.SignalR.Internal;
- using Microsoft.AspNetCore.SignalR.Internal.Protocol;
- using Microsoft.AspNetCore.Sockets;
- @@ -20,7 +20,7 @@ namespace Microsoft.AspNetCore.SignalR.Common.Protocol.Tests
- [MemberData(nameof(HubProtocols))]
- public void DefaultHubProtocolResolverTestsCanCreateSupportedProtocols(IHubProtocol protocol)
- {
- - var mockConnection = new Mock<HubConnectionContext>(Channel.CreateUnbounded<HubMessage>().Out, new Mock<ConnectionContext>().Object);
- + var mockConnection = new Mock<HubConnectionContext>(Channel.CreateUnbounded<HubMessage>().Writer, new Mock<ConnectionContext>().Object);
- Assert.IsType(
- protocol.GetType(),
- new DefaultHubProtocolResolver(Options.Create(new HubOptions())).GetProtocol(protocol.Name, mockConnection.Object));
- @@ -31,7 +31,7 @@ namespace Microsoft.AspNetCore.SignalR.Common.Protocol.Tests
- [InlineData("dummy")]
- public void DefaultHubProtocolResolverThrowsForNotSupportedProtocol(string protocolName)
- {
- - var mockConnection = new Mock<HubConnectionContext>(Channel.CreateUnbounded<HubMessage>().Out, new Mock<ConnectionContext>().Object);
- + var mockConnection = new Mock<HubConnectionContext>(Channel.CreateUnbounded<HubMessage>().Writer, new Mock<ConnectionContext>().Object);
- var exception = Assert.Throws<NotSupportedException>(
- () => new DefaultHubProtocolResolver(Options.Create(new HubOptions())).GetProtocol(protocolName, mockConnection.Object));
-
- diff --git a/test/Microsoft.AspNetCore.SignalR.Tests/WebSocketsTransportTests.cs b/test/Microsoft.AspNetCore.SignalR.Tests/WebSocketsTransportTests.cs
- index b46f1757de7..edb9099fdbd 100644
- --- a/test/Microsoft.AspNetCore.SignalR.Tests/WebSocketsTransportTests.cs
- +++ b/test/Microsoft.AspNetCore.SignalR.Tests/WebSocketsTransportTests.cs
- @@ -1,9 +1,9 @@
- -// Copyright (c) .NET Foundation. All rights reserved.
- +// Copyright (c) .NET Foundation. All rights reserved.
- // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
-
- using System;
- using System.Threading.Tasks;
- -using System.Threading.Tasks.Channels;
- +using System.Threading.Channels;
- using Microsoft.AspNetCore.SignalR.Tests.Common;
- using Microsoft.AspNetCore.Sockets;
- using Microsoft.AspNetCore.Sockets.Client;
- @@ -61,7 +61,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
- var webSocketsTransport = new WebSocketsTransport(loggerFactory);
- await webSocketsTransport.StartAsync(new Uri(_serverFixture.WebSocketsUrl + "/echo"), channelConnection,
- TransferMode.Binary, connectionId: string.Empty);
- - connectionToTransport.Out.TryComplete();
- + connectionToTransport.Writer.TryComplete();
- await webSocketsTransport.Running.OrTimeout(TimeSpan.FromSeconds(10));
- }
- }
- @@ -82,7 +82,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
- await webSocketsTransport.StartAsync(new Uri(_serverFixture.WebSocketsUrl + "/echo"), channelConnection, transferMode, connectionId: string.Empty);
-
- var sendTcs = new TaskCompletionSource<object>();
- - connectionToTransport.Out.TryWrite(new SendMessage(new byte[] { 0x42 }, sendTcs));
- + connectionToTransport.Writer.TryWrite(new SendMessage(new byte[] { 0x42 }, sendTcs));
- try
- {
- await sendTcs.Task;
- @@ -99,7 +99,7 @@ namespace Microsoft.AspNetCore.SignalR.Tests
- // The echo endpoint closes the connection immediately after sending response which should stop the transport
- await webSocketsTransport.Running.OrTimeout();
-
- - Assert.True(transportToConnection.In.TryRead(out var buffer));
- + Assert.True(transportToConnection.Reader.TryRead(out var buffer));
- Assert.Equal(new byte[] { 0x42 }, buffer);
- }
- }
- diff --git a/test/Microsoft.AspNetCore.Sockets.Tests/ConnectionManagerTests.cs b/test/Microsoft.AspNetCore.Sockets.Tests/ConnectionManagerTests.cs
- index 9f0c56a1a10..f2cb45f02c3 100644
- --- a/test/Microsoft.AspNetCore.Sockets.Tests/ConnectionManagerTests.cs
- +++ b/test/Microsoft.AspNetCore.Sockets.Tests/ConnectionManagerTests.cs
- @@ -82,12 +82,12 @@ namespace Microsoft.AspNetCore.Sockets.Tests
-
- connection.ApplicationTask = Task.Run(async () =>
- {
- - Assert.False(await connection.Transport.In.WaitToReadAsync());
- + Assert.False(await connection.Transport.Reader.WaitToReadAsync());
- });
-
- connection.TransportTask = Task.Run(async () =>
- {
- - Assert.False(await connection.Application.In.WaitToReadAsync());
- + Assert.False(await connection.Application.Reader.WaitToReadAsync());
- });
-
- connectionManager.CloseConnections();
- @@ -197,7 +197,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
- appLifetime.StopApplication();
-
- // Connection should be disposed so this should complete immediately
- - Assert.False(await connection.Application.Out.WaitToWriteAsync().OrTimeout());
- + Assert.False(await connection.Application.Writer.WaitToWriteAsync().OrTimeout());
- }
-
- private static ConnectionManager CreateConnectionManager(IApplicationLifetime lifetime = null)
- diff --git a/test/Microsoft.AspNetCore.Sockets.Tests/HttpConnectionDispatcherTests.cs b/test/Microsoft.AspNetCore.Sockets.Tests/HttpConnectionDispatcherTests.cs
- index b71067b8574..c0605de722f 100644
- --- a/test/Microsoft.AspNetCore.Sockets.Tests/HttpConnectionDispatcherTests.cs
- +++ b/test/Microsoft.AspNetCore.Sockets.Tests/HttpConnectionDispatcherTests.cs
- @@ -1,4 +1,4 @@
- -// Copyright (c) .NET Foundation. All rights reserved.
- +// Copyright (c) .NET Foundation. All rights reserved.
- // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
-
- using System;
- @@ -511,7 +511,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
- var buffer = Encoding.UTF8.GetBytes("Hello World");
-
- // Write to the transport so the poll yields
- - await connection.Transport.Out.WriteAsync(buffer);
- + await connection.Transport.Writer.WriteAsync(buffer);
-
- await task;
-
- @@ -543,7 +543,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
- var buffer = Encoding.UTF8.GetBytes("Hello World");
-
- // Write to the application
- - await connection.Application.Out.WriteAsync(buffer);
- + await connection.Application.Writer.WriteAsync(buffer);
-
- await task;
-
- @@ -573,7 +573,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
- var buffer = Encoding.UTF8.GetBytes("Hello World");
-
- // Write to the application
- - await connection.Application.Out.WriteAsync(buffer);
- + await connection.Application.Writer.WriteAsync(buffer);
-
- await task;
-
- @@ -606,7 +606,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
- await task1.OrTimeout();
-
- // Send a message from the app to complete Task 2
- - await connection.Transport.Out.WriteAsync(Encoding.UTF8.GetBytes("Hello, World"));
- + await connection.Transport.Writer.WriteAsync(Encoding.UTF8.GetBytes("Hello, World"));
-
- await task2.OrTimeout();
-
- @@ -775,7 +775,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
- context.User = new ClaimsPrincipal(new ClaimsIdentity(new[] { new Claim(ClaimTypes.NameIdentifier, "name") }));
-
- var endPointTask = dispatcher.ExecuteAsync(context, options, app);
- - await connection.Transport.Out.WriteAsync(Encoding.UTF8.GetBytes("Hello, World")).OrTimeout();
- + await connection.Transport.Writer.WriteAsync(Encoding.UTF8.GetBytes("Hello, World")).OrTimeout();
-
- await endPointTask.OrTimeout();
-
- @@ -853,7 +853,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
- }));
-
- var endPointTask = dispatcher.ExecuteAsync(context, options, app);
- - await connection.Transport.Out.WriteAsync(Encoding.UTF8.GetBytes("Hello, World")).OrTimeout();
- + await connection.Transport.Writer.WriteAsync(Encoding.UTF8.GetBytes("Hello, World")).OrTimeout();
-
- await endPointTask.OrTimeout();
-
- @@ -907,7 +907,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
- context.User = new ClaimsPrincipal(new ClaimsIdentity(new[] { new Claim(ClaimTypes.NameIdentifier, "name") }));
-
- var endPointTask = dispatcher.ExecuteAsync(context, options, app);
- - await connection.Transport.Out.WriteAsync(Encoding.UTF8.GetBytes("Hello, World")).OrTimeout();
- + await connection.Transport.Writer.WriteAsync(Encoding.UTF8.GetBytes("Hello, World")).OrTimeout();
-
- await endPointTask.OrTimeout();
-
- @@ -1110,7 +1110,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
- {
- public override Task OnConnectedAsync(ConnectionContext connection)
- {
- - connection.Transport.In.WaitToReadAsync().Wait();
- + connection.Transport.Reader.WaitToReadAsync().Wait();
- return Task.CompletedTask;
- }
- }
- @@ -1135,7 +1135,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
- {
- public override async Task OnConnectedAsync(ConnectionContext connection)
- {
- - while (await connection.Transport.In.WaitToReadAsync())
- + while (await connection.Transport.Reader.WaitToReadAsync())
- {
- }
- }
- diff --git a/test/Microsoft.AspNetCore.Sockets.Tests/LongPollingTests.cs b/test/Microsoft.AspNetCore.Sockets.Tests/LongPollingTests.cs
- index c851dfe7138..112314f6acd 100644
- --- a/test/Microsoft.AspNetCore.Sockets.Tests/LongPollingTests.cs
- +++ b/test/Microsoft.AspNetCore.Sockets.Tests/LongPollingTests.cs
- @@ -1,11 +1,11 @@
- -// Copyright (c) .NET Foundation. All rights reserved.
- +// Copyright (c) .NET Foundation. All rights reserved.
- // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
-
- using System.IO;
- using System.Text;
- using System.Threading;
- using System.Threading.Tasks;
- -using System.Threading.Tasks.Channels;
- +using System.Threading.Channels;
- using Microsoft.AspNetCore.Http;
- using Microsoft.AspNetCore.SignalR.Tests.Common;
- using Microsoft.AspNetCore.Sockets.Internal.Transports;
- @@ -23,7 +23,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
- var context = new DefaultHttpContext();
- var poll = new LongPollingTransport(CancellationToken.None, channel, connectionId: string.Empty, loggerFactory: new LoggerFactory());
-
- - Assert.True(channel.Out.TryComplete());
- + Assert.True(channel.Writer.TryComplete());
-
- await poll.ProcessRequestAsync(context, context.RequestAborted);
-
- @@ -56,9 +56,9 @@ namespace Microsoft.AspNetCore.Sockets.Tests
- var ms = new MemoryStream();
- context.Response.Body = ms;
-
- - await channel.Out.WriteAsync(Encoding.UTF8.GetBytes("Hello World"));
- + await channel.Writer.WriteAsync(Encoding.UTF8.GetBytes("Hello World"));
-
- - Assert.True(channel.Out.TryComplete());
- + Assert.True(channel.Writer.TryComplete());
-
- await poll.ProcessRequestAsync(context, context.RequestAborted);
-
- @@ -76,11 +76,11 @@ namespace Microsoft.AspNetCore.Sockets.Tests
- var ms = new MemoryStream();
- context.Response.Body = ms;
-
- - await channel.Out.WriteAsync(Encoding.UTF8.GetBytes("Hello"));
- - await channel.Out.WriteAsync(Encoding.UTF8.GetBytes(" "));
- - await channel.Out.WriteAsync(Encoding.UTF8.GetBytes("World"));
- + await channel.Writer.WriteAsync(Encoding.UTF8.GetBytes("Hello"));
- + await channel.Writer.WriteAsync(Encoding.UTF8.GetBytes(" "));
- + await channel.Writer.WriteAsync(Encoding.UTF8.GetBytes("World"));
-
- - Assert.True(channel.Out.TryComplete());
- + Assert.True(channel.Writer.TryComplete());
-
- await poll.ProcessRequestAsync(context, context.RequestAborted);
-
- diff --git a/test/Microsoft.AspNetCore.Sockets.Tests/MapEndPointTests.cs b/test/Microsoft.AspNetCore.Sockets.Tests/MapEndPointTests.cs
- index 79a0bd9d345..da1ae0c0fa8 100644
- --- a/test/Microsoft.AspNetCore.Sockets.Tests/MapEndPointTests.cs
- +++ b/test/Microsoft.AspNetCore.Sockets.Tests/MapEndPointTests.cs
- @@ -1,4 +1,4 @@
- -// Copyright (c) .NET Foundation. All rights reserved.
- +// Copyright (c) .NET Foundation. All rights reserved.
- // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
-
- using System;
- @@ -13,12 +13,21 @@ using Microsoft.AspNetCore.Hosting.Server.Features;
- using Microsoft.AspNetCore.SignalR.Tests.Common;
- using Microsoft.AspNetCore.Testing.xunit;
- using Microsoft.Extensions.DependencyInjection;
- +using Microsoft.Extensions.Logging;
- using Xunit;
- +using Xunit.Abstractions;
-
- namespace Microsoft.AspNetCore.Sockets.Tests
- {
- public class MapEndPointTests
- {
- + private ITestOutputHelper _output;
- +
- + public MapEndPointTests(ITestOutputHelper output)
- + {
- + _output = output;
- + }
- +
- [Fact]
- public void MapEndPointFindsAuthAttributeOnEndPoint()
- {
- @@ -40,6 +49,10 @@ namespace Microsoft.AspNetCore.Sockets.Tests
- });
- });
- })
- + .ConfigureLogging(factory =>
- + {
- + factory.AddXunit(_output, LogLevel.Trace);
- + })
- .Build();
-
- Assert.Equal(1, authCount);
- @@ -66,6 +79,10 @@ namespace Microsoft.AspNetCore.Sockets.Tests
- });
- });
- })
- + .ConfigureLogging(factory =>
- + {
- + factory.AddXunit(_output, LogLevel.Trace);
- + })
- .Build();
-
- Assert.Equal(1, authCount);
- @@ -92,6 +109,10 @@ namespace Microsoft.AspNetCore.Sockets.Tests
- });
- });
- })
- + .ConfigureLogging(factory =>
- + {
- + factory.AddXunit(_output, LogLevel.Trace);
- + })
- .Build();
-
- Assert.Equal(2, authCount);
- @@ -102,24 +123,28 @@ namespace Microsoft.AspNetCore.Sockets.Tests
- public async Task MapEndPointWithWebSocketSubProtocolSetsProtocol()
- {
- var host = new WebHostBuilder()
- - .UseUrls("http://127.0.0.1:0")
- - .UseKestrel()
- - .ConfigureServices(services =>
- - {
- - services.AddSockets();
- - services.AddEndPoint<MyEndPoint>();
- - })
- - .Configure(app =>
- + .UseUrls("http://127.0.0.1:0")
- + .UseKestrel()
- + .ConfigureServices(services =>
- + {
- + services.AddSockets();
- + services.AddEndPoint<MyEndPoint>();
- + })
- + .Configure(app =>
- + {
- + app.UseSockets(routes =>
- {
- - app.UseSockets(routes =>
- + routes.MapEndPoint<MyEndPoint>("socket", httpSocketOptions =>
- {
- - routes.MapEndPoint<MyEndPoint>("socket", httpSocketOptions =>
- - {
- - httpSocketOptions.WebSockets.SubProtocol = "protocol1";
- - });
- + httpSocketOptions.WebSockets.SubProtocol = "protocol1";
- });
- - })
- - .Build();
- + });
- + })
- + .ConfigureLogging(factory =>
- + {
- + factory.AddXunit(_output, LogLevel.Trace);
- + })
- + .Build();
-
- await host.StartAsync();
-
- @@ -140,7 +165,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
- {
- public override async Task OnConnectedAsync(ConnectionContext connection)
- {
- - while (!await connection.Transport.In.WaitToReadAsync())
- + while (!await connection.Transport.Reader.WaitToReadAsync())
- {
-
- }
- diff --git a/test/Microsoft.AspNetCore.Sockets.Tests/Microsoft.AspNetCore.Sockets.Tests.csproj b/test/Microsoft.AspNetCore.Sockets.Tests/Microsoft.AspNetCore.Sockets.Tests.csproj
- index c73a09bcc4b..432dfec514a 100644
- --- a/test/Microsoft.AspNetCore.Sockets.Tests/Microsoft.AspNetCore.Sockets.Tests.csproj
- +++ b/test/Microsoft.AspNetCore.Sockets.Tests/Microsoft.AspNetCore.Sockets.Tests.csproj
- @@ -3,7 +3,7 @@
- <PropertyGroup>
- <TargetFrameworks>netcoreapp2.0;net461</TargetFrameworks>
- <TargetFrameworks Condition="'$(OS)' != 'Windows_NT'">netcoreapp2.0</TargetFrameworks>
- -
- +
- <RuntimeIdentifier Condition="'$(TargetFramework)' != 'netcoreapp2.0'">win7-x64</RuntimeIdentifier>
- </PropertyGroup>
-
- @@ -21,6 +21,7 @@
- <PackageReference Include="Microsoft.AspNetCore.Http" Version="$(MicrosoftAspNetCoreHttpPackageVersion)" />
- <PackageReference Include="Microsoft.AspNetCore.Server.Kestrel" Version="$(MicrosoftAspNetCoreServerKestrelPackageVersion)" />
- <PackageReference Include="Newtonsoft.Json" Version="$(NewtonsoftJsonPackageVersion)" />
- + <PackageReference Include="Microsoft.Extensions.Logging.Testing" Version="$(MicrosoftExtensionsLoggingTestingPackageVersion)" />
- </ItemGroup>
-
- </Project>
- diff --git a/test/Microsoft.AspNetCore.Sockets.Tests/ServerSentEventsTests.cs b/test/Microsoft.AspNetCore.Sockets.Tests/ServerSentEventsTests.cs
- index fbdc4137b87..b7283440b43 100644
- --- a/test/Microsoft.AspNetCore.Sockets.Tests/ServerSentEventsTests.cs
- +++ b/test/Microsoft.AspNetCore.Sockets.Tests/ServerSentEventsTests.cs
- @@ -1,10 +1,10 @@
- -// Copyright (c) .NET Foundation. All rights reserved.
- +// Copyright (c) .NET Foundation. All rights reserved.
- // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
-
- using System.IO;
- using System.Text;
- using System.Threading.Tasks;
- -using System.Threading.Tasks.Channels;
- +using System.Threading.Channels;
- using Microsoft.AspNetCore.Http;
- using Microsoft.AspNetCore.Http.Features;
- using Microsoft.AspNetCore.SignalR.Tests.Common;
- @@ -23,7 +23,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
- var context = new DefaultHttpContext();
- var sse = new ServerSentEventsTransport(channel, connectionId: string.Empty, loggerFactory: new LoggerFactory());
-
- - Assert.True(channel.Out.TryComplete());
- + Assert.True(channel.Writer.TryComplete());
-
- await sse.ProcessRequestAsync(context, context.RequestAborted);
-
- @@ -40,7 +40,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
- context.Features.Set<IHttpBufferingFeature>(feature);
- var sse = new ServerSentEventsTransport(channel, connectionId: string.Empty, loggerFactory: new LoggerFactory());
-
- - Assert.True(channel.Out.TryComplete());
- + Assert.True(channel.Writer.TryComplete());
-
- await sse.ProcessRequestAsync(context, context.RequestAborted);
-
- @@ -50,7 +50,7 @@ namespace Microsoft.AspNetCore.Sockets.Tests
- [Fact]
- public async Task SSEWritesMessages()
- {
- - var channel = Channel.CreateUnbounded<byte[]>(new ChannelOptimizations
- + var channel = Channel.CreateUnbounded<byte[]>(new UnboundedChannelOptions
- {
- AllowSynchronousContinuations = true
- });
- @@ -62,11 +62,11 @@ namespace Microsoft.AspNetCore.Sockets.Tests
-
- var task = sse.ProcessRequestAsync(context, context.RequestAborted);
-
- - await channel.Out.WriteAsync(Encoding.ASCII.GetBytes("Hello"));
- + await channel.Writer.WriteAsync(Encoding.ASCII.GetBytes("Hello"));
-
- Assert.Equal(":\r\ndata: Hello\r\n\r\n", Encoding.ASCII.GetString(ms.ToArray()));
-
- - channel.Out.TryComplete();
- + channel.Writer.TryComplete();
-
- await task.OrTimeout();
- }
- @@ -83,9 +83,9 @@ namespace Microsoft.AspNetCore.Sockets.Tests
- var ms = new MemoryStream();
- context.Response.Body = ms;
-
- - await channel.Out.WriteAsync(Encoding.UTF8.GetBytes(message));
- + await channel.Writer.WriteAsync(Encoding.UTF8.GetBytes(message));
-
- - Assert.True(channel.Out.TryComplete());
- + Assert.True(channel.Writer.TryComplete());
-
- await sse.ProcessRequestAsync(context, context.RequestAborted);
-
- diff --git a/test/Microsoft.AspNetCore.Sockets.Tests/TestWebSocketConnectionFeature.cs b/test/Microsoft.AspNetCore.Sockets.Tests/TestWebSocketConnectionFeature.cs
- index 29b5cac70f8..ea085b939f3 100644
- --- a/test/Microsoft.AspNetCore.Sockets.Tests/TestWebSocketConnectionFeature.cs
- +++ b/test/Microsoft.AspNetCore.Sockets.Tests/TestWebSocketConnectionFeature.cs
- @@ -1,9 +1,9 @@
- -using System;
- +using System;
- using System.Collections.Generic;
- using System.Net.WebSockets;
- using System.Threading;
- using System.Threading.Tasks;
- -using System.Threading.Tasks.Channels;
- +using System.Threading.Channels;
- using Microsoft.AspNetCore.Http;
- using Microsoft.AspNetCore.Http.Features;
-
- @@ -22,8 +22,8 @@ namespace Microsoft.AspNetCore.Sockets.Tests
- var clientToServer = Channel.CreateUnbounded<WebSocketMessage>();
- var serverToClient = Channel.CreateUnbounded<WebSocketMessage>();
-
- - var clientSocket = new WebSocketChannel(serverToClient.In, clientToServer.Out);
- - var serverSocket = new WebSocketChannel(clientToServer.In, serverToClient.Out);
- + var clientSocket = new WebSocketChannel(serverToClient.Reader, clientToServer.Writer);
- + var serverSocket = new WebSocketChannel(clientToServer.Reader, serverToClient.Writer);
-
- Client = clientSocket;
- return Task.FromResult<WebSocket>(serverSocket);
- @@ -35,14 +35,14 @@ namespace Microsoft.AspNetCore.Sockets.Tests
-
- public class WebSocketChannel : WebSocket
- {
- - private readonly ReadableChannel<WebSocketMessage> _input;
- - private readonly WritableChannel<WebSocketMessage> _output;
- + private readonly ChannelReader<WebSocketMessage> _input;
- + private readonly ChannelWriter<WebSocketMessage> _output;
-
- private WebSocketCloseStatus? _closeStatus;
- private string _closeStatusDescription;
- private WebSocketState _state;
-
- - public WebSocketChannel(ReadableChannel<WebSocketMessage> input, WritableChannel<WebSocketMessage> output)
- + public WebSocketChannel(ChannelReader<WebSocketMessage> input, ChannelWriter<WebSocketMessage> output)
- {
- _input = input;
- _output = output;
- @@ -209,4 +209,4 @@ namespace Microsoft.AspNetCore.Sockets.Tests
- public string CloseStatusDescription { get; set; }
- }
- }
- -}
- \ No newline at end of file
- +}
- diff --git a/test/Microsoft.AspNetCore.Sockets.Tests/WebSocketsTests.cs b/test/Microsoft.AspNetCore.Sockets.Tests/WebSocketsTests.cs
- index ba696ff46e4..d93d653e9b1 100644
- --- a/test/Microsoft.AspNetCore.Sockets.Tests/WebSocketsTests.cs
- +++ b/test/Microsoft.AspNetCore.Sockets.Tests/WebSocketsTests.cs
- @@ -1,4 +1,4 @@
- -// Copyright (c) .NET Foundation. All rights reserved.
- +// Copyright (c) .NET Foundation. All rights reserved.
- // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
-
- using System;
- @@ -6,58 +6,68 @@ using System.Net.WebSockets;
- using System.Text;
- using System.Threading;
- using System.Threading.Tasks;
- -using System.Threading.Tasks.Channels;
- +using System.Threading.Channels;
- +using Microsoft.AspNetCore.SignalR.Internal;
- using Microsoft.AspNetCore.SignalR.Tests.Common;
- using Microsoft.AspNetCore.Sockets.Internal;
- using Microsoft.AspNetCore.Sockets.Internal.Transports;
- -using Microsoft.Extensions.Logging;
- +using Microsoft.Extensions.Logging.Testing;
- using Xunit;
- +using Xunit.Abstractions;
-
- namespace Microsoft.AspNetCore.Sockets.Tests
- {
- - public class WebSocketsTests
- + public class WebSocketsTests : LoggedTest
- {
- + public WebSocketsTests(ITestOutputHelper output)
- + : base(output)
- + {
- + }
- +
- [Theory]
- [InlineData(WebSocketMessageType.Text)]
- [InlineData(WebSocketMessageType.Binary)]
- public async Task ReceivedFramesAreWrittenToChannel(WebSocketMessageType webSocketMessageType)
- {
- - var transportToApplication = Channel.CreateUnbounded<byte[]>();
- - var applicationToTransport = Channel.CreateUnbounded<byte[]>();
- -
- - using (var transportSide = ChannelConnection.Create<byte[]>(applicationToTransport, transportToApplication))
- - using (var applicationSide = ChannelConnection.Create<byte[]>(transportToApplication, applicationToTransport))
- - using (var feature = new TestWebSocketConnectionFeature())
- + using (StartLog(out var loggerFactory))
- {
- - var connectionContext = new DefaultConnectionContext(string.Empty, null, null);
- - var ws = new WebSocketsTransport(new WebSocketOptions(), transportSide, connectionContext, loggerFactory: new LoggerFactory());
- + var transportToApplication = Channel.CreateUnbounded<byte[]>();
- + var applicationToTransport = Channel.CreateUnbounded<byte[]>();
-
- - // Give the server socket to the transport and run it
- - var transport = ws.ProcessSocketAsync(await feature.AcceptAsync());
- + using (var transportSide = ChannelConnection.Create<byte[]>(applicationToTransport, transportToApplication))
- + using (var applicationSide = ChannelConnection.Create<byte[]>(transportToApplication, applicationToTransport))
- + using (var feature = new TestWebSocketConnectionFeature())
- + {
- + var connectionContext = new DefaultConnectionContext(string.Empty, null, null);
- + var ws = new WebSocketsTransport(new WebSocketOptions(), transportSide, connectionContext, loggerFactory);
- +
- + // Give the server socket to the transport and run it
- + var transport = ws.ProcessSocketAsync(await feature.AcceptAsync());
-
- - // Run the client socket
- - var client = feature.Client.ExecuteAndCaptureFramesAsync();
- + // Run the client socket
- + var client = feature.Client.ExecuteAndCaptureFramesAsync();
-
- - // Send a frame, then close
- - await feature.Client.SendAsync(
- - buffer: new ArraySegment<byte>(Encoding.UTF8.GetBytes("Hello")),
- - messageType: webSocketMessageType,
- - endOfMessage: true,
- - cancellationToken: CancellationToken.None);
- - await feature.Client.CloseAsync(WebSocketCloseStatus.NormalClosure, "", CancellationToken.None);
- + // Send a frame, then close
- + await feature.Client.SendAsync(
- + buffer: new ArraySegment<byte>(Encoding.UTF8.GetBytes("Hello")),
- + messageType: webSocketMessageType,
- + endOfMessage: true,
- + cancellationToken: CancellationToken.None);
- + await feature.Client.CloseAsync(WebSocketCloseStatus.NormalClosure, "", CancellationToken.None);
-
- - var buffer = await applicationSide.In.ReadAsync();
- - Assert.Equal("Hello", Encoding.UTF8.GetString(buffer));
- + var buffer = await applicationSide.Reader.ReadAsync();
- + Assert.Equal("Hello", Encoding.UTF8.GetString(buffer));
-
- - Assert.True(applicationSide.Out.TryComplete());
- + Assert.True(applicationSide.Writer.TryComplete());
-
- - // The transport should finish now
- - await transport;
- + // The transport should finish now
- + await transport;
-
- - // The connection should close after this, which means the client will get a close frame.
- - var clientSummary = await client;
- + // The connection should close after this, which means the client will get a close frame.
- + var clientSummary = await client;
-
- - Assert.Equal(WebSocketCloseStatus.NormalClosure, clientSummary.CloseResult.CloseStatus);
- + Assert.Equal(WebSocketCloseStatus.NormalClosure, clientSummary.CloseResult.CloseStatus);
- + }
- }
- }
-
- @@ -66,256 +76,276 @@ namespace Microsoft.AspNetCore.Sockets.Tests
- [InlineData(TransferMode.Binary, WebSocketMessageType.Binary)]
- public async Task WebSocketTransportSetsMessageTypeBasedOnTransferModeFeature(TransferMode transferMode, WebSocketMessageType expectedMessageType)
- {
- - var transportToApplication = Channel.CreateUnbounded<byte[]>();
- - var applicationToTransport = Channel.CreateUnbounded<byte[]>();
- -
- - using (var transportSide = ChannelConnection.Create<byte[]>(applicationToTransport, transportToApplication))
- - using (var applicationSide = ChannelConnection.Create<byte[]>(transportToApplication, applicationToTransport))
- - using (var feature = new TestWebSocketConnectionFeature())
- + using (StartLog(out var loggerFactory))
- {
- - var connectionContext = new DefaultConnectionContext(string.Empty, null, null) { TransferMode = transferMode };
- - var ws = new WebSocketsTransport(new WebSocketOptions(),
- - transportSide, connectionContext, loggerFactory: new LoggerFactory());
- + var transportToApplication = Channel.CreateUnbounded<byte[]>();
- + var applicationToTransport = Channel.CreateUnbounded<byte[]>();
-
- - // Give the server socket to the transport and run it
- - var transport = ws.ProcessSocketAsync(await feature.AcceptAsync());
- + using (var transportSide = ChannelConnection.Create<byte[]>(applicationToTransport, transportToApplication))
- + using (var applicationSide = ChannelConnection.Create<byte[]>(transportToApplication, applicationToTransport))
- + using (var feature = new TestWebSocketConnectionFeature())
- + {
- + var connectionContext = new DefaultConnectionContext(string.Empty, null, null) { TransferMode = transferMode };
- + var ws = new WebSocketsTransport(new WebSocketOptions(), transportSide, connectionContext, loggerFactory);
- +
- + // Give the server socket to the transport and run it
- + var transport = ws.ProcessSocketAsync(await feature.AcceptAsync());
-
- - // Run the client socket
- - var client = feature.Client.ExecuteAndCaptureFramesAsync();
- + // Run the client socket
- + var client = feature.Client.ExecuteAndCaptureFramesAsync();
-
- - // Write to the output channel, and then complete it
- - await applicationSide.Out.WriteAsync(Encoding.UTF8.GetBytes("Hello"));
- - Assert.True(applicationSide.Out.TryComplete());
- + // Write to the output channel, and then complete it
- + await applicationSide.Writer.WriteAsync(Encoding.UTF8.GetBytes("Hello"));
- + Assert.True(applicationSide.Writer.TryComplete());
-
- - // The client should finish now, as should the server
- - var clientSummary = await client;
- - await feature.Client.CloseAsync(WebSocketCloseStatus.NormalClosure, "", CancellationToken.None);
- - await transport;
- + // The client should finish now, as should the server
- + var clientSummary = await client;
- + await feature.Client.CloseAsync(WebSocketCloseStatus.NormalClosure, "", CancellationToken.None);
- + await transport;
-
- - Assert.Equal(1, clientSummary.Received.Count);
- - Assert.True(clientSummary.Received[0].EndOfMessage);
- - Assert.Equal(expectedMessageType, clientSummary.Received[0].MessageType);
- - Assert.Equal("Hello", Encoding.UTF8.GetString(clientSummary.Received[0].Buffer));
- + Assert.Equal(1, clientSummary.Received.Count);
- + Assert.True(clientSummary.Received[0].EndOfMessage);
- + Assert.Equal(expectedMessageType, clientSummary.Received[0].MessageType);
- + Assert.Equal("Hello", Encoding.UTF8.GetString(clientSummary.Received[0].Buffer));
- + }
- }
- }
-
- [Fact]
- public async Task TransportFailsWhenClientDisconnectsAbnormally()
- {
- - var transportToApplication = Channel.CreateUnbounded<byte[]>();
- - var applicationToTransport = Channel.CreateUnbounded<byte[]>();
- -
- - using (var transportSide = ChannelConnection.Create<byte[]>(applicationToTransport, transportToApplication))
- - using (var applicationSide = ChannelConnection.Create<byte[]>(transportToApplication, applicationToTransport))
- - using (var feature = new TestWebSocketConnectionFeature())
- + using (StartLog(out var loggerFactory))
- {
- - async Task CompleteApplicationAfterTransportCompletes()
- + var transportToApplication = Channel.CreateUnbounded<byte[]>();
- + var applicationToTransport = Channel.CreateUnbounded<byte[]>();
- +
- + using (var transportSide = ChannelConnection.Create<byte[]>(applicationToTransport, transportToApplication))
- + using (var applicationSide = ChannelConnection.Create<byte[]>(transportToApplication, applicationToTransport))
- + using (var feature = new TestWebSocketConnectionFeature())
- {
- - // Wait until the transport completes so that we can end the application
- - await applicationSide.In.WaitToReadAsync();
- + async Task CompleteApplicationAfterTransportCompletes()
- + {
- + // Wait until the transport completes so that we can end the application
- + await applicationSide.Reader.WaitToReadAsync();
-
- - // Complete the application so that the connection unwinds without aborting
- - applicationSide.Out.TryComplete();
- - }
- + // Complete the application so that the connection unwinds without aborting
- + applicationSide.Writer.TryComplete();
- + }
-
- - var connectionContext = new DefaultConnectionContext(string.Empty, null, null);
- - var ws = new WebSocketsTransport(new WebSocketOptions(), transportSide, connectionContext, loggerFactory: new LoggerFactory());
- + var connectionContext = new DefaultConnectionContext(string.Empty, null, null);
- + var ws = new WebSocketsTransport(new WebSocketOptions(), transportSide, connectionContext, loggerFactory);
-
- - // Give the server socket to the transport and run it
- - var transport = ws.ProcessSocketAsync(await feature.AcceptAsync());
- + // Give the server socket to the transport and run it
- + var transport = ws.ProcessSocketAsync(await feature.AcceptAsync());
-
- - // Run the client socket
- - var client = feature.Client.ExecuteAndCaptureFramesAsync();
- + // Run the client socket
- + var client = feature.Client.ExecuteAndCaptureFramesAsync();
-
- - // When the close frame is received, we complete the application so the send
- - // loop unwinds
- - _ = CompleteApplicationAfterTransportCompletes();
- + // When the close frame is received, we complete the application so the send
- + // loop unwinds
- + _ = CompleteApplicationAfterTransportCompletes();
-
- - // Terminate the client to server channel with an exception
- - feature.Client.SendAbort();
- + // Terminate the client to server channel with an exception
- + feature.Client.SendAbort();
-
- - // Wait for the transport
- - await Assert.ThrowsAsync<WebSocketException>(() => transport).OrTimeout();
- + // Wait for the transport
- + await Assert.ThrowsAsync<WebSocketException>(() => transport).OrTimeout();
-
- - var summary = await client.OrTimeout();
- - Assert.Equal(WebSocketCloseStatus.InternalServerError, summary.CloseResult.CloseStatus);
- + var summary = await client.OrTimeout();
- + Assert.Equal(WebSocketCloseStatus.InternalServerError, summary.CloseResult.CloseStatus);
- + }
- }
- }
-
- [Fact]
- public async Task ClientReceivesInternalServerErrorWhenTheApplicationFails()
- {
- - var transportToApplication = Channel.CreateUnbounded<byte[]>();
- - var applicationToTransport = Channel.CreateUnbounded<byte[]>();
- -
- - using (var transportSide = ChannelConnection.Create<byte[]>(applicationToTransport, transportToApplication))
- - using (var applicationSide = ChannelConnection.Create<byte[]>(transportToApplication, applicationToTransport))
- - using (var feature = new TestWebSocketConnectionFeature())
- + using (StartLog(out var loggerFactory))
- {
- - var connectionContext = new DefaultConnectionContext(string.Empty, null, null);
- - var ws = new WebSocketsTransport(new WebSocketOptions(), transportSide, connectionContext, loggerFactory: new LoggerFactory());
- + var transportToApplication = Channel.CreateUnbounded<byte[]>();
- + var applicationToTransport = Channel.CreateUnbounded<byte[]>();
-
- - // Give the server socket to the transport and run it
- - var transport = ws.ProcessSocketAsync(await feature.AcceptAsync());
- + using (var transportSide = ChannelConnection.Create<byte[]>(applicationToTransport, transportToApplication))
- + using (var applicationSide = ChannelConnection.Create<byte[]>(transportToApplication, applicationToTransport))
- + using (var feature = new TestWebSocketConnectionFeature())
- + {
- + var connectionContext = new DefaultConnectionContext(string.Empty, null, null);
- + var ws = new WebSocketsTransport(new WebSocketOptions(), transportSide, connectionContext, loggerFactory);
- +
- + // Give the server socket to the transport and run it
- + var transport = ws.ProcessSocketAsync(await feature.AcceptAsync());
-
- - // Run the client socket
- - var client = feature.Client.ExecuteAndCaptureFramesAsync();
- + // Run the client socket
- + var client = feature.Client.ExecuteAndCaptureFramesAsync();
-
- - // Fail in the app
- - Assert.True(applicationSide.Out.TryComplete(new InvalidOperationException("Catastrophic failure.")));
- - var clientSummary = await client.OrTimeout();
- - Assert.Equal(WebSocketCloseStatus.InternalServerError, clientSummary.CloseResult.CloseStatus);
- + // Fail in the app
- + Assert.True(applicationSide.Writer.TryComplete(new InvalidOperationException("Catastrophic failure.")));
- + var clientSummary = await client.OrTimeout();
- + Assert.Equal(WebSocketCloseStatus.InternalServerError, clientSummary.CloseResult.CloseStatus);
-
- - // Close from the client
- - await feature.Client.CloseAsync(WebSocketCloseStatus.NormalClosure, "", CancellationToken.None);
- + // Close from the client
- + await feature.Client.CloseAsync(WebSocketCloseStatus.NormalClosure, "", CancellationToken.None);
-
- - var ex = await Assert.ThrowsAsync<InvalidOperationException>(() => transport.OrTimeout());
- - Assert.Equal("Catastrophic failure.", ex.Message);
- + var ex = await Assert.ThrowsAsync<InvalidOperationException>(() => transport.OrTimeout());
- + Assert.Equal("Catastrophic failure.", ex.Message);
- + }
- }
- }
-
- [Fact]
- public async Task TransportClosesOnCloseTimeoutIfClientDoesNotSendCloseFrame()
- {
- - var transportToApplication = Channel.CreateUnbounded<byte[]>();
- - var applicationToTransport = Channel.CreateUnbounded<byte[]>();
- -
- - using (var transportSide = ChannelConnection.Create<byte[]>(applicationToTransport, transportToApplication))
- - using (var applicationSide = ChannelConnection.Create<byte[]>(transportToApplication, applicationToTransport))
- - using (var feature = new TestWebSocketConnectionFeature())
- + using (StartLog(out var loggerFactory))
- {
- - var options = new WebSocketOptions()
- + var transportToApplication = Channel.CreateUnbounded<byte[]>();
- + var applicationToTransport = Channel.CreateUnbounded<byte[]>();
- +
- + using (var transportSide = ChannelConnection.Create<byte[]>(applicationToTransport, transportToApplication))
- + using (var applicationSide = ChannelConnection.Create<byte[]>(transportToApplication, applicationToTransport))
- + using (var feature = new TestWebSocketConnectionFeature())
- {
- - CloseTimeout = TimeSpan.FromSeconds(1)
- - };
- + var options = new WebSocketOptions()
- + {
- + CloseTimeout = TimeSpan.FromSeconds(1)
- + };
-
- - var connectionContext = new DefaultConnectionContext(string.Empty, null, null);
- - var ws = new WebSocketsTransport(options, transportSide, connectionContext, loggerFactory: new LoggerFactory());
- + var connectionContext = new DefaultConnectionContext(string.Empty, null, null);
- + var ws = new WebSocketsTransport(options, transportSide, connectionContext, loggerFactory);
-
- - var serverSocket = await feature.AcceptAsync();
- - // Give the server socket to the transport and run it
- - var transport = ws.ProcessSocketAsync(serverSocket);
- + var serverSocket = await feature.AcceptAsync();
- + // Give the server socket to the transport and run it
- + var transport = ws.ProcessSocketAsync(serverSocket);
-
- - // End the app
- - applicationSide.Dispose();
- + // End the app
- + applicationSide.Dispose();
-
- - await transport.OrTimeout(TimeSpan.FromSeconds(10));
- + await transport.OrTimeout(TimeSpan.FromSeconds(10));
-
- - // Now we're closed
- - Assert.Equal(WebSocketState.Aborted, serverSocket.State);
- + // Now we're closed
- + Assert.Equal(WebSocketState.Aborted, serverSocket.State);
-
- - serverSocket.Dispose();
- + serverSocket.Dispose();
- + }
- }
- }
-
- [Fact]
- public async Task TransportFailsOnTimeoutWithErrorWhenApplicationFailsAndClientDoesNotSendCloseFrame()
- {
- - var transportToApplication = Channel.CreateUnbounded<byte[]>();
- - var applicationToTransport = Channel.CreateUnbounded<byte[]>();
- -
- - using (var transportSide = ChannelConnection.Create<byte[]>(applicationToTransport, transportToApplication))
- - using (var applicationSide = ChannelConnection.Create<byte[]>(transportToApplication, applicationToTransport))
- - using (var feature = new TestWebSocketConnectionFeature())
- + using (StartLog(out var loggerFactory))
- {
- - var options = new WebSocketOptions
- + var transportToApplication = Channel.CreateUnbounded<byte[]>();
- + var applicationToTransport = Channel.CreateUnbounded<byte[]>();
- +
- + using (var transportSide = ChannelConnection.Create<byte[]>(applicationToTransport, transportToApplication))
- + using (var applicationSide = ChannelConnection.Create<byte[]>(transportToApplication, applicationToTransport))
- + using (var feature = new TestWebSocketConnectionFeature())
- {
- - CloseTimeout = TimeSpan.FromSeconds(1)
- - };
- + var options = new WebSocketOptions
- + {
- + CloseTimeout = TimeSpan.FromSeconds(1)
- + };
-
- - var connectionContext = new DefaultConnectionContext(string.Empty, null, null);
- - var ws = new WebSocketsTransport(options, transportSide, connectionContext, loggerFactory: new LoggerFactory());
- + var connectionContext = new DefaultConnectionContext(string.Empty, null, null);
- + var ws = new WebSocketsTransport(options, transportSide, connectionContext, loggerFactory);
-
- - var serverSocket = await feature.AcceptAsync();
- - // Give the server socket to the transport and run it
- - var transport = ws.ProcessSocketAsync(serverSocket);
- + var serverSocket = await feature.AcceptAsync();
- + // Give the server socket to the transport and run it
- + var transport = ws.ProcessSocketAsync(serverSocket);
-
- - // Run the client socket
- - var client = feature.Client.ExecuteAndCaptureFramesAsync();
- + // Run the client socket
- + var client = feature.Client.ExecuteAndCaptureFramesAsync();
-
- - // fail the client to server channel
- - applicationToTransport.Out.TryComplete(new Exception());
- + // fail the client to server channel
- + applicationToTransport.Writer.TryComplete(new Exception());
-
- - await Assert.ThrowsAsync<Exception>(() => transport).OrTimeout();
- + await Assert.ThrowsAsync<Exception>(() => transport).OrTimeout();
-
- - Assert.Equal(WebSocketState.Aborted, serverSocket.State);
- + Assert.Equal(WebSocketState.Aborted, serverSocket.State);
- + }
- }
- }
-
- [Fact]
- public async Task ServerGracefullyClosesWhenApplicationEndsThenClientSendsCloseFrame()
- {
- - var transportToApplication = Channel.CreateUnbounded<byte[]>();
- - var applicationToTransport = Channel.CreateUnbounded<byte[]>();
- -
- - using (var transportSide = ChannelConnection.Create<byte[]>(applicationToTransport, transportToApplication))
- - using (var applicationSide = ChannelConnection.Create<byte[]>(transportToApplication, applicationToTransport))
- - using (var feature = new TestWebSocketConnectionFeature())
- + using (StartLog(out var loggerFactory))
- {
- - var options = new WebSocketOptions
- + var transportToApplication = Channel.CreateUnbounded<byte[]>();
- + var applicationToTransport = Channel.CreateUnbounded<byte[]>();
- +
- + using (var transportSide = ChannelConnection.Create<byte[]>(applicationToTransport, transportToApplication))
- + using (var applicationSide = ChannelConnection.Create<byte[]>(transportToApplication, applicationToTransport))
- + using (var feature = new TestWebSocketConnectionFeature())
- {
- - // We want to verify behavior without timeout affecting it
- - CloseTimeout = TimeSpan.FromSeconds(20)
- - };
- + var options = new WebSocketOptions
- + {
- + // We want to verify behavior without timeout affecting it
- + CloseTimeout = TimeSpan.FromSeconds(20)
- + };
-
- - var connectionContext = new DefaultConnectionContext(string.Empty, null, null);
- - var ws = new WebSocketsTransport(options, transportSide, connectionContext, loggerFactory: new LoggerFactory());
- + var connectionContext = new DefaultConnectionContext(string.Empty, null, null);
- + var ws = new WebSocketsTransport(options, transportSide, connectionContext, loggerFactory);
-
- - var serverSocket = await feature.AcceptAsync();
- - // Give the server socket to the transport and run it
- - var transport = ws.ProcessSocketAsync(serverSocket);
- + var serverSocket = await feature.AcceptAsync();
- + // Give the server socket to the transport and run it
- + var transport = ws.ProcessSocketAsync(serverSocket);
-
- - // Run the client socket
- - var client = feature.Client.ExecuteAndCaptureFramesAsync();
- + // Run the client socket
- + var client = feature.Client.ExecuteAndCaptureFramesAsync();
-
- - // close the client to server channel
- - applicationToTransport.Out.TryComplete();
- + // close the client to server channel
- + applicationToTransport.Writer.TryComplete();
-
- - _ = await client.OrTimeout();
- + _ = await client.OrTimeout();
-
- - await feature.Client.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, null, CancellationToken.None).OrTimeout();
- + await feature.Client.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, null, CancellationToken.None).OrTimeout();
-
- - await transport.OrTimeout();
- + await transport.OrTimeout();
-
- - Assert.Equal(WebSocketCloseStatus.NormalClosure, serverSocket.CloseStatus);
- + Assert.Equal(WebSocketCloseStatus.NormalClosure, serverSocket.CloseStatus);
- + }
- }
- }
-
- [Fact]
- public async Task ServerGracefullyClosesWhenClientSendsCloseFrameThenApplicationEnds()
- {
- - var transportToApplication = Channel.CreateUnbounded<byte[]>();
- - var applicationToTransport = Channel.CreateUnbounded<byte[]>();
- -
- - using (var transportSide = ChannelConnection.Create<byte[]>(applicationToTransport, transportToApplication))
- - using (var applicationSide = ChannelConnection.Create<byte[]>(transportToApplication, applicationToTransport))
- - using (var feature = new TestWebSocketConnectionFeature())
- + using (StartLog(out var loggerFactory))
- {
- - var options = new WebSocketOptions
- + var transportToApplication = Channel.CreateUnbounded<byte[]>();
- + var applicationToTransport = Channel.CreateUnbounded<byte[]>();
- +
- + using (var transportSide = ChannelConnection.Create<byte[]>(applicationToTransport, transportToApplication))
- + using (var applicationSide = ChannelConnection.Create<byte[]>(transportToApplication, applicationToTransport))
- + using (var feature = new TestWebSocketConnectionFeature())
- {
- - // We want to verify behavior without timeout affecting it
- - CloseTimeout = TimeSpan.FromSeconds(20)
- - };
- - var connectionContext = new DefaultConnectionContext(string.Empty, null, null);
- - var ws = new WebSocketsTransport(options, transportSide, connectionContext, loggerFactory: new LoggerFactory());
- + var options = new WebSocketOptions
- + {
- + // We want to verify behavior without timeout affecting it
- + CloseTimeout = TimeSpan.FromSeconds(20)
- + };
- + var connectionContext = new DefaultConnectionContext(string.Empty, null, null);
- + var ws = new WebSocketsTransport(options, transportSide, connectionContext, loggerFactory);
-
- - var serverSocket = await feature.AcceptAsync();
- - // Give the server socket to the transport and run it
- - var transport = ws.ProcessSocketAsync(serverSocket);
- + var serverSocket = await feature.AcceptAsync();
- + // Give the server socket to the transport and run it
- + var transport = ws.ProcessSocketAsync(serverSocket);
-
- - // Run the client socket
- - var client = feature.Client.ExecuteAndCaptureFramesAsync();
- + // Run the client socket
- + var client = feature.Client.ExecuteAndCaptureFramesAsync();
-
- - await feature.Client.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, null, CancellationToken.None).OrTimeout();
- + await feature.Client.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, null, CancellationToken.None).OrTimeout();
-
- - // close the client to server channel
- - applicationToTransport.Out.TryComplete();
- + // close the client to server channel
- + applicationToTransport.Writer.TryComplete();
-
- - _ = await client.OrTimeout();
- + _ = await client.OrTimeout();
-
- - await transport.OrTimeout();
- + await transport.OrTimeout();
-
- - Assert.Equal(WebSocketCloseStatus.NormalClosure, serverSocket.CloseStatus);
- + Assert.Equal(WebSocketCloseStatus.NormalClosure, serverSocket.CloseStatus);
- + }
- }
- }
- }
|