HttpAbstractions 719 B

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366
  1. commit 962ec07bdbb090ac2e9cd2a0e68e24c6c276a972
  2. Author: Justin Kotalik <[email protected]>
  3. Date: Fri Nov 16 19:18:47 2018 -0800
  4. Adds PipeWriterAdapter (#1065)
  5. diff --git a/.gitignore b/.gitignore
  6. index d5717b3f3fd..3d7e16e84a0 100644
  7. --- a/.gitignore
  8. +++ b/.gitignore
  9. @@ -30,3 +30,4 @@ project.lock.json
  10. /.vs/
  11. .vscode/
  12. global.json
  13. +BenchmarkDotNet.Artifacts/
  14. diff --git a/benchmarks/Microsoft.AspNetCore.Http.Performance/StreamPipeWriterBenchmark.cs b/benchmarks/Microsoft.AspNetCore.Http.Performance/StreamPipeWriterBenchmark.cs
  15. new file mode 100644
  16. index 00000000000..705cb0d8af7
  17. --- /dev/null
  18. +++ b/benchmarks/Microsoft.AspNetCore.Http.Performance/StreamPipeWriterBenchmark.cs
  19. @@ -0,0 +1,89 @@
  20. +// Copyright (c) .NET Foundation. All rights reserved.
  21. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
  22. +
  23. +using System;
  24. +using System.IO;
  25. +using System.Text;
  26. +using System.Threading;
  27. +using System.Threading.Tasks;
  28. +using BenchmarkDotNet.Attributes;
  29. +
  30. +namespace Microsoft.AspNetCore.Http
  31. +{
  32. + public class StreamPipeWriterBenchmark
  33. + {
  34. + private Stream _memoryStream;
  35. + private StreamPipeWriter _pipeWriter;
  36. + private static byte[] _helloWorldBytes = Encoding.ASCII.GetBytes("Hello World");
  37. + private static byte[] _largeWrite = Encoding.ASCII.GetBytes(new string('a', 50000));
  38. +
  39. + [IterationSetup]
  40. + public void Setup()
  41. + {
  42. + _memoryStream = new NoopStream();
  43. + _pipeWriter = new StreamPipeWriter(_memoryStream);
  44. + }
  45. +
  46. + [Benchmark]
  47. + public async Task WriteHelloWorld()
  48. + {
  49. + await _pipeWriter.WriteAsync(_helloWorldBytes);
  50. + }
  51. +
  52. + [Benchmark]
  53. + public async Task WriteHelloWorldLargeWrite()
  54. + {
  55. + await _pipeWriter.WriteAsync(_largeWrite);
  56. + }
  57. +
  58. + public class NoopStream : Stream
  59. + {
  60. + public override bool CanRead => false;
  61. +
  62. + public override bool CanSeek => throw new System.NotImplementedException();
  63. +
  64. + public override bool CanWrite => true;
  65. +
  66. + public override long Length => throw new System.NotImplementedException();
  67. +
  68. + public override long Position { get => throw new System.NotImplementedException(); set => throw new System.NotImplementedException(); }
  69. +
  70. + public override void Flush()
  71. + {
  72. + }
  73. +
  74. + public override int Read(byte[] buffer, int offset, int count)
  75. + {
  76. + throw new System.NotImplementedException();
  77. + }
  78. +
  79. + public override long Seek(long offset, SeekOrigin origin)
  80. + {
  81. + throw new System.NotImplementedException();
  82. + }
  83. +
  84. + public override void SetLength(long value)
  85. + {
  86. + }
  87. +
  88. + public override void Write(byte[] buffer, int offset, int count)
  89. + {
  90. + }
  91. +
  92. + public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
  93. + {
  94. + return Task.CompletedTask;
  95. + }
  96. +
  97. + public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default(CancellationToken))
  98. + {
  99. + return default(ValueTask);
  100. + }
  101. +
  102. + public override Task FlushAsync(CancellationToken cancellationToken)
  103. + {
  104. + return Task.CompletedTask;
  105. + }
  106. + }
  107. + }
  108. +}
  109. diff --git a/build/dependencies.props b/build/dependencies.props
  110. index 2aa50e3e131..c3991cb407a 100644
  111. --- a/build/dependencies.props
  112. +++ b/build/dependencies.props
  113. @@ -24,6 +24,7 @@
  114. <MoqPackageVersion>4.9.0</MoqPackageVersion>
  115. <NETStandardLibrary20PackageVersion>2.0.3</NETStandardLibrary20PackageVersion>
  116. <SystemBuffersPackageVersion>4.6.0-preview1-26907-04</SystemBuffersPackageVersion>
  117. + <SystemIOPipelinesPackageVersion>4.6.0-preview1-26907-04</SystemIOPipelinesPackageVersion>
  118. <SystemTextEncodingsWebPackageVersion>4.6.0-preview1-26907-04</SystemTextEncodingsWebPackageVersion>
  119. <XunitAnalyzersPackageVersion>0.10.0</XunitAnalyzersPackageVersion>
  120. <XunitPackageVersion>2.3.1</XunitPackageVersion>
  121. diff --git a/src/Microsoft.AspNetCore.Http/Microsoft.AspNetCore.Http.csproj b/src/Microsoft.AspNetCore.Http/Microsoft.AspNetCore.Http.csproj
  122. index 162315a7a60..94080281b39 100644
  123. --- a/src/Microsoft.AspNetCore.Http/Microsoft.AspNetCore.Http.csproj
  124. +++ b/src/Microsoft.AspNetCore.Http/Microsoft.AspNetCore.Http.csproj
  125. @@ -2,7 +2,7 @@
  126. <PropertyGroup>
  127. <Description>ASP.NET Core default HTTP feature implementations.</Description>
  128. - <TargetFramework>netstandard2.0</TargetFramework>
  129. + <TargetFrameworks>netstandard2.0;netcoreapp2.2</TargetFrameworks>
  130. <NoWarn>$(NoWarn);CS1591</NoWarn>
  131. <AllowUnsafeBlocks>true</AllowUnsafeBlocks>
  132. <GenerateDocumentationFile>true</GenerateDocumentationFile>
  133. @@ -19,6 +19,7 @@
  134. <PackageReference Include="Microsoft.Extensions.CopyOnWriteDictionary.Sources" PrivateAssets="All" Version="$(MicrosoftExtensionsCopyOnWriteDictionarySourcesPackageVersion)" />
  135. <PackageReference Include="Microsoft.Extensions.ObjectPool" Version="$(MicrosoftExtensionsObjectPoolPackageVersion)" />
  136. <PackageReference Include="Microsoft.Extensions.Options" Version="$(MicrosoftExtensionsOptionsPackageVersion)" />
  137. + <PackageReference Include="System.IO.Pipelines" Version="$(SystemIOPipelinesPackageVersion)" />
  138. </ItemGroup>
  139. </Project>
  140. diff --git a/src/Microsoft.AspNetCore.Http/StreamPipeWriter.cs b/src/Microsoft.AspNetCore.Http/StreamPipeWriter.cs
  141. new file mode 100644
  142. index 00000000000..f232aa97cfa
  143. --- /dev/null
  144. +++ b/src/Microsoft.AspNetCore.Http/StreamPipeWriter.cs
  145. @@ -0,0 +1,320 @@
  146. +// Copyright (c) .NET Foundation. All rights reserved.
  147. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
  148. +
  149. +using System;
  150. +using System.Buffers;
  151. +using System.Collections.Generic;
  152. +using System.IO;
  153. +using System.IO.Pipelines;
  154. +using System.Runtime.CompilerServices;
  155. +using System.Runtime.ExceptionServices;
  156. +using System.Runtime.InteropServices;
  157. +using System.Threading;
  158. +using System.Threading.Tasks;
  159. +
  160. +namespace Microsoft.AspNetCore.Http
  161. +{
  162. + /// <summary>
  163. + /// Implements PipeWriter using a underlying stream.
  164. + /// </summary>
  165. + public class StreamPipeWriter : PipeWriter, IDisposable
  166. + {
  167. + private readonly int _minimumSegmentSize;
  168. + private readonly Stream _writingStream;
  169. + private int _bytesWritten;
  170. +
  171. + private List<CompletedBuffer> _completedSegments;
  172. + private Memory<byte> _currentSegment;
  173. + private IMemoryOwner<byte> _currentSegmentOwner;
  174. + private MemoryPool<byte> _pool;
  175. + private int _position;
  176. +
  177. + private CancellationTokenSource _internalTokenSource;
  178. + private bool _isCompleted;
  179. + private ExceptionDispatchInfo _exceptionInfo;
  180. + private object _lockObject = new object();
  181. +
  182. + private CancellationTokenSource InternalTokenSource
  183. + {
  184. + get
  185. + {
  186. + lock (_lockObject)
  187. + {
  188. + if (_internalTokenSource == null)
  189. + {
  190. + _internalTokenSource = new CancellationTokenSource();
  191. + }
  192. + return _internalTokenSource;
  193. + }
  194. + }
  195. + }
  196. +
  197. + /// <summary>
  198. + /// Creates a new StreamPipeWrapper
  199. + /// </summary>
  200. + /// <param name="writingStream">The stream to write to</param>
  201. + public StreamPipeWriter(Stream writingStream) : this(writingStream, 4096)
  202. + {
  203. + }
  204. +
  205. + public StreamPipeWriter(Stream writingStream, int minimumSegmentSize, MemoryPool<byte> pool = null)
  206. + {
  207. + _minimumSegmentSize = minimumSegmentSize;
  208. + _writingStream = writingStream;
  209. + _pool = pool ?? MemoryPool<byte>.Shared;
  210. + }
  211. +
  212. + /// <inheritdoc />
  213. + public override void Advance(int count)
  214. + {
  215. + if (_currentSegment.IsEmpty) // TODO confirm this
  216. + {
  217. + throw new InvalidOperationException("No writing operation. Make sure GetMemory() was called.");
  218. + }
  219. +
  220. + if (count >= 0)
  221. + {
  222. + if (_currentSegment.Length < _position + count)
  223. + {
  224. + throw new InvalidOperationException("Can't advance past buffer size.");
  225. + }
  226. + _bytesWritten += count;
  227. + _position += count;
  228. + }
  229. + }
  230. +
  231. + /// <inheritdoc />
  232. + public override Memory<byte> GetMemory(int sizeHint = 0)
  233. + {
  234. + EnsureCapacity(sizeHint);
  235. +
  236. + return _currentSegment;
  237. + }
  238. +
  239. + /// <inheritdoc />
  240. + public override Span<byte> GetSpan(int sizeHint = 0)
  241. + {
  242. + EnsureCapacity(sizeHint);
  243. +
  244. + return _currentSegment.Span.Slice(_position);
  245. + }
  246. +
  247. + /// <inheritdoc />
  248. + public override void CancelPendingFlush()
  249. + {
  250. + Cancel();
  251. + }
  252. +
  253. + /// <inheritdoc />
  254. + public override void Complete(Exception exception = null)
  255. + {
  256. + if (_isCompleted)
  257. + {
  258. + return;
  259. + }
  260. +
  261. + _isCompleted = true;
  262. + if (exception != null)
  263. + {
  264. + _exceptionInfo = ExceptionDispatchInfo.Capture(exception);
  265. + }
  266. +
  267. + _internalTokenSource?.Dispose();
  268. +
  269. + if (_completedSegments != null)
  270. + {
  271. + foreach (var segment in _completedSegments)
  272. + {
  273. + segment.Return();
  274. + }
  275. + }
  276. +
  277. + _currentSegmentOwner?.Dispose();
  278. + }
  279. +
  280. + /// <inheritdoc />
  281. + public override void OnReaderCompleted(Action<Exception, object> callback, object state)
  282. + {
  283. + throw new NotSupportedException("OnReaderCompleted isn't supported in StreamPipeWrapper.");
  284. + }
  285. +
  286. + /// <inheritdoc />
  287. + public override ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken = default)
  288. + {
  289. + if (_bytesWritten == 0)
  290. + {
  291. + return new ValueTask<FlushResult>(new FlushResult(isCanceled: false, IsCompletedOrThrow()));
  292. + }
  293. +
  294. + return FlushAsyncInternal(cancellationToken);
  295. + }
  296. +
  297. + private void Cancel()
  298. + {
  299. + InternalTokenSource.Cancel();
  300. + }
  301. +
  302. + private async ValueTask<FlushResult> FlushAsyncInternal(CancellationToken cancellationToken = default)
  303. + {
  304. + // Write all completed segments and whatever remains in the current segment
  305. + // and flush the result.
  306. + CancellationTokenRegistration reg = new CancellationTokenRegistration();
  307. + if (cancellationToken.CanBeCanceled)
  308. + {
  309. + reg = cancellationToken.Register(state => ((StreamPipeWriter)state).Cancel(), this);
  310. + }
  311. + using (reg)
  312. + {
  313. + var localToken = InternalTokenSource.Token;
  314. + try
  315. + {
  316. + if (_completedSegments != null && _completedSegments.Count > 0)
  317. + {
  318. + var count = _completedSegments.Count;
  319. + for (var i = 0; i < count; i++)
  320. + {
  321. + var segment = _completedSegments[0];
  322. +#if NETCOREAPP2_2
  323. + await _writingStream.WriteAsync(segment.Buffer.Slice(0, segment.Length), localToken);
  324. +#elif NETSTANDARD2_0
  325. + MemoryMarshal.TryGetArray<byte>(segment.Buffer, out var arraySegment);
  326. + await _writingStream.WriteAsync(arraySegment.Array, 0, segment.Length, localToken);
  327. +#else
  328. +#error Target frameworks need to be updated.
  329. +#endif
  330. + _bytesWritten -= segment.Length;
  331. + segment.Return();
  332. + _completedSegments.RemoveAt(0);
  333. + }
  334. + }
  335. +
  336. + if (!_currentSegment.IsEmpty)
  337. + {
  338. +#if NETCOREAPP2_2
  339. + await _writingStream.WriteAsync(_currentSegment.Slice(0, _position), localToken);
  340. +#elif NETSTANDARD2_0
  341. + MemoryMarshal.TryGetArray<byte>(_currentSegment, out var arraySegment);
  342. + await _writingStream.WriteAsync(arraySegment.Array, 0, _position, localToken);
  343. +#else
  344. +#error Target frameworks need to be updated.
  345. +#endif
  346. + _bytesWritten -= _position;
  347. + _position = 0;
  348. + }
  349. +
  350. + await _writingStream.FlushAsync(localToken);
  351. +
  352. + return new FlushResult(isCanceled: false, IsCompletedOrThrow());
  353. + }
  354. + catch (OperationCanceledException)
  355. + {
  356. + // Remove the cancellation token such that the next time Flush is called
  357. + // A new CTS is created.
  358. + lock (_lockObject)
  359. + {
  360. + _internalTokenSource = null;
  361. + }
  362. +
  363. + if (cancellationToken.IsCancellationRequested)
  364. + {
  365. + throw;
  366. + }
  367. +
  368. + // Catch any cancellation and translate it into setting isCanceled = true
  369. + return new FlushResult(isCanceled: true, IsCompletedOrThrow());
  370. + }
  371. + }
  372. + }
  373. +
  374. + private void EnsureCapacity(int sizeHint)
  375. + {
  376. + // This does the Right Thing. It only subtracts _position from the current segment length if it's non-null.
  377. + // If _currentSegment is null, it returns 0.
  378. + var remainingSize = _currentSegment.Length - _position;
  379. +
  380. + // If the sizeHint is 0, any capacity will do
  381. + // Otherwise, the buffer must have enough space for the entire size hint, or we need to add a segment.
  382. + if ((sizeHint == 0 && remainingSize > 0) || (sizeHint > 0 && remainingSize >= sizeHint))
  383. + {
  384. + // We have capacity in the current segment
  385. + return;
  386. + }
  387. +
  388. + AddSegment(sizeHint);
  389. + }
  390. +
  391. + private void AddSegment(int sizeHint = 0)
  392. + {
  393. + if (_currentSegment.Length != 0)
  394. + {
  395. + // We're adding a segment to the list
  396. + if (_completedSegments == null)
  397. + {
  398. + _completedSegments = new List<CompletedBuffer>();
  399. + }
  400. +
  401. + // Position might be less than the segment length if there wasn't enough space to satisfy the sizeHint when
  402. + // GetMemory was called. In that case we'll take the current segment and call it "completed", but need to
  403. + // ignore any empty space in it.
  404. + _completedSegments.Add(new CompletedBuffer(_currentSegmentOwner, _position));
  405. + }
  406. +
  407. + // Get a new buffer using the minimum segment size, unless the size hint is larger than a single segment.
  408. + _currentSegmentOwner = _pool.Rent(Math.Max(_minimumSegmentSize, sizeHint));
  409. + _currentSegment = _currentSegmentOwner.Memory;
  410. + _position = 0;
  411. + }
  412. +
  413. + [MethodImpl(MethodImplOptions.AggressiveInlining)]
  414. + private bool IsCompletedOrThrow()
  415. + {
  416. + if (!_isCompleted)
  417. + {
  418. + return false;
  419. + }
  420. +
  421. + if (_exceptionInfo != null)
  422. + {
  423. + ThrowLatchedException();
  424. + }
  425. +
  426. + return true;
  427. + }
  428. +
  429. + [MethodImpl(MethodImplOptions.NoInlining)]
  430. + private void ThrowLatchedException()
  431. + {
  432. + _exceptionInfo.Throw();
  433. + }
  434. +
  435. + public void Dispose()
  436. + {
  437. + Complete();
  438. + }
  439. +
  440. + /// <summary>
  441. + /// Holds a byte[] from the pool and a size value. Basically a Memory but guaranteed to be backed by an ArrayPool byte[], so that we know we can return it.
  442. + /// </summary>
  443. + private readonly struct CompletedBuffer
  444. + {
  445. + public Memory<byte> Buffer { get; }
  446. + public int Length { get; }
  447. +
  448. + public ReadOnlySpan<byte> Span => Buffer.Span;
  449. +
  450. + private readonly IMemoryOwner<byte> _memoryOwner;
  451. +
  452. + public CompletedBuffer(IMemoryOwner<byte> buffer, int length)
  453. + {
  454. + Buffer = buffer.Memory;
  455. + Length = length;
  456. + _memoryOwner = buffer;
  457. + }
  458. +
  459. + public void Return()
  460. + {
  461. + _memoryOwner.Dispose();
  462. + }
  463. + }
  464. + }
  465. +}
  466. diff --git a/test/Microsoft.AspNetCore.Http.Tests/FlushResultCancellationTests.cs b/test/Microsoft.AspNetCore.Http.Tests/FlushResultCancellationTests.cs
  467. new file mode 100644
  468. index 00000000000..f4ab7cb96fe
  469. --- /dev/null
  470. +++ b/test/Microsoft.AspNetCore.Http.Tests/FlushResultCancellationTests.cs
  471. @@ -0,0 +1,68 @@
  472. +// Copyright (c) .NET Foundation. All rights reserved.
  473. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
  474. +
  475. +using System;
  476. +using System.IO.Pipelines;
  477. +using System.Runtime.CompilerServices;
  478. +using System.Threading;
  479. +using System.Threading.Tasks;
  480. +using Xunit;
  481. +
  482. +namespace Microsoft.AspNetCore.Http.Tests
  483. +{
  484. + public class FlushResultCancellationTests : PipeTest
  485. + {
  486. + [Fact]
  487. + public void FlushAsyncCancellationDeadlock()
  488. + {
  489. + var cts = new CancellationTokenSource();
  490. + var cts2 = new CancellationTokenSource();
  491. +
  492. + PipeWriter buffer = Writer.WriteEmpty(MaximumSizeHigh);
  493. +
  494. + var e = new ManualResetEventSlim();
  495. +
  496. + ValueTaskAwaiter<FlushResult> awaiter = buffer.FlushAsync(cts.Token).GetAwaiter();
  497. + awaiter.OnCompleted(
  498. + () => {
  499. + // We are on cancellation thread and need to wait until another FlushAsync call
  500. + // takes pipe state lock
  501. + e.Wait();
  502. +
  503. + // Make sure we had enough time to reach _cancellationTokenRegistration.Dispose
  504. + Thread.Sleep(100);
  505. +
  506. + // Try to take pipe state lock
  507. + buffer.FlushAsync();
  508. + });
  509. +
  510. + // Start a thread that would run cancellation callbacks
  511. + Task cancellationTask = Task.Run(() => cts.Cancel());
  512. + // Start a thread that would call FlushAsync with different token
  513. + // and block on _cancellationTokenRegistration.Dispose
  514. + Task blockingTask = Task.Run(
  515. + () => {
  516. + e.Set();
  517. + buffer.FlushAsync(cts2.Token);
  518. + });
  519. +
  520. + bool completed = Task.WhenAll(cancellationTask, blockingTask).Wait(TimeSpan.FromSeconds(10));
  521. + Assert.True(completed);
  522. + }
  523. +
  524. + [Fact]
  525. + public async Task FlushAsyncWithNewCancellationTokenNotAffectedByPrevious()
  526. + {
  527. + var cancellationTokenSource1 = new CancellationTokenSource();
  528. + PipeWriter buffer = Writer.WriteEmpty(10);
  529. + await buffer.FlushAsync(cancellationTokenSource1.Token);
  530. +
  531. + cancellationTokenSource1.Cancel();
  532. +
  533. + var cancellationTokenSource2 = new CancellationTokenSource();
  534. + buffer = Writer.WriteEmpty(10);
  535. +
  536. + await buffer.FlushAsync(cancellationTokenSource2.Token);
  537. + }
  538. + }
  539. +}
  540. diff --git a/test/Microsoft.AspNetCore.Http.Tests/Microsoft.AspNetCore.Http.Tests.csproj b/test/Microsoft.AspNetCore.Http.Tests/Microsoft.AspNetCore.Http.Tests.csproj
  541. index aa428320cde..a8ee8f19fc1 100644
  542. --- a/test/Microsoft.AspNetCore.Http.Tests/Microsoft.AspNetCore.Http.Tests.csproj
  543. +++ b/test/Microsoft.AspNetCore.Http.Tests/Microsoft.AspNetCore.Http.Tests.csproj
  544. @@ -2,8 +2,9 @@
  545. <PropertyGroup>
  546. <TargetFrameworks>$(StandardTestTfms)</TargetFrameworks>
  547. + <AllowUnsafeBlocks>true</AllowUnsafeBlocks>
  548. </PropertyGroup>
  549. -
  550. +
  551. <ItemGroup>
  552. <ProjectReference Include="..\..\src\Microsoft.AspNetCore.Http\Microsoft.AspNetCore.Http.csproj" />
  553. </ItemGroup>
  554. diff --git a/test/Microsoft.AspNetCore.Http.Tests/PipeTest.cs b/test/Microsoft.AspNetCore.Http.Tests/PipeTest.cs
  555. new file mode 100644
  556. index 00000000000..2e94e3a2673
  557. --- /dev/null
  558. +++ b/test/Microsoft.AspNetCore.Http.Tests/PipeTest.cs
  559. @@ -0,0 +1,43 @@
  560. +// Copyright (c) .NET Foundation. All rights reserved.
  561. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
  562. +
  563. +using System;
  564. +using System.IO;
  565. +using System.IO.Pipelines;
  566. +
  567. +namespace Microsoft.AspNetCore.Http.Tests
  568. +{
  569. + public abstract class PipeTest : IDisposable
  570. + {
  571. + protected const int MaximumSizeHigh = 65;
  572. +
  573. + public MemoryStream MemoryStream { get; set; }
  574. +
  575. + public PipeWriter Writer { get; set; }
  576. +
  577. + protected PipeTest()
  578. + {
  579. + MemoryStream = new MemoryStream();
  580. + Writer = new StreamPipeWriter(MemoryStream, 4096, new TestMemoryPool());
  581. + }
  582. +
  583. + public void Dispose()
  584. + {
  585. + Writer.Complete();
  586. + }
  587. +
  588. + public byte[] Read()
  589. + {
  590. + Writer.FlushAsync().GetAwaiter().GetResult();
  591. + return ReadWithoutFlush();
  592. + }
  593. +
  594. + public byte[] ReadWithoutFlush()
  595. + {
  596. + MemoryStream.Position = 0;
  597. + var buffer = new byte[MemoryStream.Length];
  598. + var result = MemoryStream.Read(buffer, 0, (int)MemoryStream.Length);
  599. + return buffer;
  600. + }
  601. + }
  602. +}
  603. diff --git a/test/Microsoft.AspNetCore.Http.Tests/PipeWriterTests.cs b/test/Microsoft.AspNetCore.Http.Tests/PipeWriterTests.cs
  604. new file mode 100644
  605. index 00000000000..0cc6dc012f4
  606. --- /dev/null
  607. +++ b/test/Microsoft.AspNetCore.Http.Tests/PipeWriterTests.cs
  608. @@ -0,0 +1,221 @@
  609. +// Copyright (c) .NET Foundation. All rights reserved.
  610. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
  611. +
  612. +using System;
  613. +using System.Buffers;
  614. +using System.Collections.Generic;
  615. +using System.IO;
  616. +using System.IO.Pipelines;
  617. +using System.Linq;
  618. +using System.Threading.Tasks;
  619. +using Xunit;
  620. +
  621. +namespace Microsoft.AspNetCore.Http.Tests
  622. +{
  623. + public class PipeWriterTests : PipeTest
  624. + {
  625. +
  626. + [Theory]
  627. + [InlineData(3, -1, 0)]
  628. + [InlineData(3, 0, -1)]
  629. + [InlineData(3, 0, 4)]
  630. + [InlineData(3, 4, 0)]
  631. + [InlineData(3, -1, -1)]
  632. + [InlineData(3, 4, 4)]
  633. + public void ThrowsForInvalidParameters(int arrayLength, int offset, int length)
  634. + {
  635. + var array = new byte[arrayLength];
  636. + for (var i = 0; i < array.Length; i++)
  637. + {
  638. + array[i] = (byte)(i + 1);
  639. + }
  640. +
  641. + Writer.Write(new Span<byte>(array, 0, 0));
  642. + Writer.Write(new Span<byte>(array, array.Length, 0));
  643. +
  644. + try
  645. + {
  646. + Writer.Write(new Span<byte>(array, offset, length));
  647. + Assert.True(false);
  648. + }
  649. + catch (Exception ex)
  650. + {
  651. + Assert.True(ex is ArgumentOutOfRangeException);
  652. + }
  653. +
  654. + Writer.Write(new Span<byte>(array, 0, array.Length));
  655. + Assert.Equal(array, Read());
  656. + }
  657. +
  658. + [Theory]
  659. + [InlineData(0, 3)]
  660. + [InlineData(1, 2)]
  661. + [InlineData(2, 1)]
  662. + [InlineData(1, 1)]
  663. + public void CanWriteWithOffsetAndLength(int offset, int length)
  664. + {
  665. + var array = new byte[] { 1, 2, 3 };
  666. +
  667. + Writer.Write(new Span<byte>(array, offset, length));
  668. +
  669. + Assert.Equal(array.Skip(offset).Take(length).ToArray(), Read());
  670. + }
  671. +
  672. + [Fact]
  673. + public void CanWriteIntoHeadlessBuffer()
  674. + {
  675. +
  676. + Writer.Write(new byte[] { 1, 2, 3 });
  677. + Assert.Equal(new byte[] { 1, 2, 3 }, Read());
  678. + }
  679. +
  680. + [Fact]
  681. + public void CanGetNewMemoryWhenSizeTooLarge()
  682. + {
  683. + var memory = Writer.GetMemory(0);
  684. +
  685. + var memoryLarge = Writer.GetMemory(10000);
  686. +
  687. + Assert.NotEqual(memory, memoryLarge);
  688. + }
  689. +
  690. + [Fact]
  691. + public void CanGetSameMemoryWhenNoAdvance()
  692. + {
  693. + var memory = Writer.GetMemory(0);
  694. +
  695. + var secondMemory = Writer.GetMemory(0);
  696. +
  697. + Assert.Equal(memory, secondMemory);
  698. + }
  699. +
  700. + [Fact]
  701. + public void CanGetNewSpanWhenNoAdvanceWhenSizeTooLarge()
  702. + {
  703. + var span = Writer.GetSpan(0);
  704. +
  705. + var secondSpan = Writer.GetSpan(8000);
  706. +
  707. + Assert.False(span.SequenceEqual(secondSpan));
  708. + }
  709. +
  710. + [Fact]
  711. + public void CanGetSameSpanWhenNoAdvance()
  712. + {
  713. + var span = Writer.GetSpan(0);
  714. +
  715. + var secondSpan = Writer.GetSpan(0);
  716. +
  717. + Assert.True(span.SequenceEqual(secondSpan));
  718. + }
  719. +
  720. + [Theory]
  721. + [InlineData(16, 32, 32)]
  722. + [InlineData(16, 16, 16)]
  723. + [InlineData(64, 32, 64)]
  724. + [InlineData(40, 32, 64)] // memory sizes are powers of 2.
  725. + public void CheckMinimumSegmentSizeWithGetMemory(int minimumSegmentSize, int getMemorySize, int expectedSize)
  726. + {
  727. + var writer = new StreamPipeWriter(new MemoryStream(), minimumSegmentSize);
  728. + var memory = writer.GetMemory(getMemorySize);
  729. +
  730. + Assert.Equal(expectedSize, memory.Length);
  731. + }
  732. +
  733. + [Fact]
  734. + public void CanWriteMultipleTimes()
  735. + {
  736. +
  737. + Writer.Write(new byte[] { 1 });
  738. + Writer.Write(new byte[] { 2 });
  739. + Writer.Write(new byte[] { 3 });
  740. +
  741. + Assert.Equal(new byte[] { 1, 2, 3 }, Read());
  742. + }
  743. +
  744. + [Fact]
  745. + public void CanWriteOverTheBlockLength()
  746. + {
  747. + Memory<byte> memory = Writer.GetMemory();
  748. +
  749. + IEnumerable<byte> source = Enumerable.Range(0, memory.Length).Select(i => (byte)i);
  750. + byte[] expectedBytes = source.Concat(source).Concat(source).ToArray();
  751. +
  752. + Writer.Write(expectedBytes);
  753. +
  754. + Assert.Equal(expectedBytes, Read());
  755. + }
  756. +
  757. + [Fact]
  758. + public void EnsureAllocatesSpan()
  759. + {
  760. + var span = Writer.GetSpan(10);
  761. +
  762. + Assert.True(span.Length >= 10);
  763. + // 0 byte Flush would not complete the reader so we complete.
  764. + Writer.Complete();
  765. + Assert.Equal(new byte[] { }, Read());
  766. + }
  767. +
  768. + [Fact]
  769. + public void SlicesSpanAndAdvancesAfterWrite()
  770. + {
  771. + int initialLength = Writer.GetSpan(3).Length;
  772. +
  773. +
  774. + Writer.Write(new byte[] { 1, 2, 3 });
  775. + Span<byte> span = Writer.GetSpan();
  776. +
  777. + Assert.Equal(initialLength - 3, span.Length);
  778. + Assert.Equal(new byte[] { 1, 2, 3 }, Read());
  779. + }
  780. +
  781. + [Theory]
  782. + [InlineData(5)]
  783. + [InlineData(50)]
  784. + [InlineData(500)]
  785. + [InlineData(5000)]
  786. + [InlineData(50000)]
  787. + public async Task WriteLargeDataBinary(int length)
  788. + {
  789. + var data = new byte[length];
  790. + new Random(length).NextBytes(data);
  791. + PipeWriter output = Writer;
  792. + output.Write(data);
  793. + await output.FlushAsync();
  794. +
  795. + var input = Read();
  796. + Assert.Equal(data, input.ToArray());
  797. + }
  798. +
  799. + [Fact]
  800. + public async Task CanWriteNothingToBuffer()
  801. + {
  802. + Writer.GetMemory(0);
  803. + Writer.Advance(0); // doing nothing, the hard way
  804. + await Writer.FlushAsync();
  805. + }
  806. +
  807. + [Fact]
  808. + public void EmptyWriteDoesNotThrow()
  809. + {
  810. + Writer.Write(new byte[0]);
  811. + }
  812. +
  813. + [Fact]
  814. + public void ThrowsOnAdvanceOverMemorySize()
  815. + {
  816. + Memory<byte> buffer = Writer.GetMemory(1);
  817. + var exception = Assert.Throws<InvalidOperationException>(() => Writer.Advance(buffer.Length + 1));
  818. + Assert.Equal("Can't advance past buffer size.", exception.Message);
  819. + }
  820. +
  821. + [Fact]
  822. + public void ThrowsOnAdvanceWithNoMemory()
  823. + {
  824. + PipeWriter buffer = Writer;
  825. + var exception = Assert.Throws<InvalidOperationException>(() => buffer.Advance(1));
  826. + Assert.Equal("No writing operation. Make sure GetMemory() was called.", exception.Message);
  827. + }
  828. + }
  829. +}
  830. diff --git a/test/Microsoft.AspNetCore.Http.Tests/StreamPipeWriterTests.cs b/test/Microsoft.AspNetCore.Http.Tests/StreamPipeWriterTests.cs
  831. new file mode 100644
  832. index 00000000000..76d3b34faea
  833. --- /dev/null
  834. +++ b/test/Microsoft.AspNetCore.Http.Tests/StreamPipeWriterTests.cs
  835. @@ -0,0 +1,380 @@
  836. +// Copyright (c) .NET Foundation. All rights reserved.
  837. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
  838. +
  839. +using System;
  840. +using System.Buffers;
  841. +using System.IO;
  842. +using System.IO.Pipelines;
  843. +using System.Text;
  844. +using System.Threading;
  845. +using System.Threading.Tasks;
  846. +using Xunit;
  847. +
  848. +namespace Microsoft.AspNetCore.Http.Tests
  849. +{
  850. + public class StreamPipeWriterTests : PipeTest
  851. + {
  852. + [Fact]
  853. + public async Task CanWriteAsyncMultipleTimesIntoSameBlock()
  854. + {
  855. +
  856. + await Writer.WriteAsync(new byte[] { 1 });
  857. + await Writer.WriteAsync(new byte[] { 2 });
  858. + await Writer.WriteAsync(new byte[] { 3 });
  859. +
  860. + Assert.Equal(new byte[] { 1, 2, 3 }, Read());
  861. + }
  862. +
  863. + [Theory]
  864. + [InlineData(100, 1000)]
  865. + [InlineData(100, 8000)]
  866. + [InlineData(100, 10000)]
  867. + [InlineData(8000, 100)]
  868. + [InlineData(8000, 8000)]
  869. + public async Task CanAdvanceWithPartialConsumptionOfFirstSegment(int firstWriteLength, int secondWriteLength)
  870. + {
  871. + await Writer.WriteAsync(Encoding.ASCII.GetBytes("a"));
  872. +
  873. + var expectedLength = firstWriteLength + secondWriteLength + 1;
  874. +
  875. + var memory = Writer.GetMemory(firstWriteLength);
  876. + Writer.Advance(firstWriteLength);
  877. +
  878. + memory = Writer.GetMemory(secondWriteLength);
  879. + Writer.Advance(secondWriteLength);
  880. +
  881. + await Writer.FlushAsync();
  882. +
  883. + Assert.Equal(expectedLength, Read().Length);
  884. + }
  885. +
  886. + [Fact]
  887. + public async Task ThrowsOnCompleteAndWrite()
  888. + {
  889. + Writer.Complete(new InvalidOperationException("Whoops"));
  890. + var exception = await Assert.ThrowsAsync<InvalidOperationException>(async () => await Writer.FlushAsync());
  891. +
  892. + Assert.Equal("Whoops", exception.Message);
  893. + }
  894. +
  895. + [Fact]
  896. + public async Task WriteCanBeCancelledViaProvidedCancellationToken()
  897. + {
  898. + var pipeWriter = new StreamPipeWriter(new HangingStream());
  899. + var cts = new CancellationTokenSource(1);
  900. + await Assert.ThrowsAsync<TaskCanceledException>(async () => await pipeWriter.WriteAsync(Encoding.ASCII.GetBytes("data"), cts.Token));
  901. + }
  902. +
  903. + [Fact]
  904. + public async Task WriteCanBeCanceledViaCancelPendingFlushWhenFlushIsAsync()
  905. + {
  906. + var pipeWriter = new StreamPipeWriter(new HangingStream());
  907. + FlushResult flushResult = new FlushResult();
  908. +
  909. + var tcs = new TaskCompletionSource<int>(TaskCreationOptions.RunContinuationsAsynchronously);
  910. +
  911. + var task = Task.Run(async () =>
  912. + {
  913. + try
  914. + {
  915. + var writingTask = pipeWriter.WriteAsync(Encoding.ASCII.GetBytes("data"));
  916. + tcs.SetResult(0);
  917. + flushResult = await writingTask;
  918. + }
  919. + catch (Exception ex)
  920. + {
  921. + Console.WriteLine(ex.Message);
  922. + throw ex;
  923. + }
  924. + });
  925. +
  926. + await tcs.Task;
  927. +
  928. + pipeWriter.CancelPendingFlush();
  929. +
  930. + await task;
  931. +
  932. + Assert.True(flushResult.IsCanceled);
  933. + }
  934. +
  935. + [Fact]
  936. + public void FlushAsyncCompletedAfterPreCancellation()
  937. + {
  938. + PipeWriter writableBuffer = Writer.WriteEmpty(1);
  939. +
  940. + Writer.CancelPendingFlush();
  941. +
  942. + ValueTask<FlushResult> flushAsync = writableBuffer.FlushAsync();
  943. +
  944. + Assert.True(flushAsync.IsCompleted);
  945. +
  946. + FlushResult flushResult = flushAsync.GetAwaiter().GetResult();
  947. +
  948. + Assert.True(flushResult.IsCanceled);
  949. +
  950. + flushAsync = writableBuffer.FlushAsync();
  951. +
  952. + Assert.True(flushAsync.IsCompleted);
  953. + }
  954. +
  955. + [Fact]
  956. + public void FlushAsyncReturnsCanceledIfCanceledBeforeFlush()
  957. + {
  958. + CheckCanceledFlush();
  959. + }
  960. +
  961. + [Fact]
  962. + public void FlushAsyncReturnsCanceledIfCanceledBeforeFlushMultipleTimes()
  963. + {
  964. + for (var i = 0; i < 10; i++)
  965. + {
  966. + CheckCanceledFlush();
  967. + }
  968. + }
  969. +
  970. + [Fact]
  971. + public async Task FlushAsyncReturnsCanceledInterleaved()
  972. + {
  973. + for (var i = 0; i < 5; i++)
  974. + {
  975. + CheckCanceledFlush();
  976. + await CheckWriteIsNotCanceled();
  977. + }
  978. + }
  979. +
  980. + [Fact]
  981. + public async Task CancelPendingFlushBetweenWritesAllDataIsPreserved()
  982. + {
  983. + MemoryStream = new SingleWriteStream();
  984. + Writer = new StreamPipeWriter(MemoryStream);
  985. + FlushResult flushResult = new FlushResult();
  986. +
  987. + var tcs = new TaskCompletionSource<int>(TaskCreationOptions.RunContinuationsAsynchronously);
  988. +
  989. + var task = Task.Run(async () =>
  990. + {
  991. + try
  992. + {
  993. + await Writer.WriteAsync(Encoding.ASCII.GetBytes("data"));
  994. +
  995. + var writingTask = Writer.WriteAsync(Encoding.ASCII.GetBytes(" data"));
  996. + tcs.SetResult(0);
  997. + flushResult = await writingTask;
  998. + }
  999. + catch (Exception ex)
  1000. + {
  1001. + Console.WriteLine(ex.Message);
  1002. + throw ex;
  1003. + }
  1004. + });
  1005. +
  1006. + await tcs.Task;
  1007. +
  1008. + Writer.CancelPendingFlush();
  1009. +
  1010. + await task;
  1011. +
  1012. + Assert.True(flushResult.IsCanceled);
  1013. +
  1014. + await Writer.WriteAsync(Encoding.ASCII.GetBytes(" more data"));
  1015. + Assert.Equal(Encoding.ASCII.GetBytes("data data more data"), Read());
  1016. + }
  1017. +
  1018. + [Fact]
  1019. + public async Task CancelPendingFlushAfterAllWritesAllDataIsPreserved()
  1020. + {
  1021. + MemoryStream = new CannotFlushStream();
  1022. + Writer = new StreamPipeWriter(MemoryStream);
  1023. + FlushResult flushResult = new FlushResult();
  1024. +
  1025. + var tcs = new TaskCompletionSource<int>(TaskCreationOptions.RunContinuationsAsynchronously);
  1026. +
  1027. + var task = Task.Run(async () =>
  1028. + {
  1029. + try
  1030. + {
  1031. + // Create two Segments
  1032. + // First one will succeed to write, other one will hang.
  1033. + var writingTask = Writer.WriteAsync(Encoding.ASCII.GetBytes("data"));
  1034. + tcs.SetResult(0);
  1035. + flushResult = await writingTask;
  1036. + }
  1037. + catch (Exception ex)
  1038. + {
  1039. + Console.WriteLine(ex.Message);
  1040. + throw ex;
  1041. + }
  1042. + });
  1043. +
  1044. + await tcs.Task;
  1045. +
  1046. + Writer.CancelPendingFlush();
  1047. +
  1048. + await task;
  1049. +
  1050. + Assert.True(flushResult.IsCanceled);
  1051. + }
  1052. +
  1053. + [Fact]
  1054. + public async Task CancelPendingFlushLostOfCancellationsNoDataLost()
  1055. + {
  1056. + var writeSize = 16;
  1057. + var singleWriteStream = new SingleWriteStream();
  1058. + MemoryStream = singleWriteStream;
  1059. + Writer = new StreamPipeWriter(MemoryStream, minimumSegmentSize: writeSize);
  1060. +
  1061. + for (var i = 0; i < 10; i++)
  1062. + {
  1063. + FlushResult flushResult = new FlushResult();
  1064. + var expectedData = Encoding.ASCII.GetBytes(new string('a', writeSize));
  1065. +
  1066. + var tcs = new TaskCompletionSource<int>(TaskCreationOptions.RunContinuationsAsynchronously);
  1067. + // TaskCreationOptions.RunAsync
  1068. +
  1069. + var task = Task.Run(async () =>
  1070. + {
  1071. + try
  1072. + {
  1073. + // Create two Segments
  1074. + // First one will succeed to write, other one will hang.
  1075. + for (var j = 0; j < 2; j++)
  1076. + {
  1077. + Writer.Write(expectedData);
  1078. + }
  1079. +
  1080. + var flushTask = Writer.FlushAsync();
  1081. + tcs.SetResult(0);
  1082. + flushResult = await flushTask;
  1083. + }
  1084. + catch (Exception ex)
  1085. + {
  1086. + Console.WriteLine(ex.Message);
  1087. + throw ex;
  1088. + }
  1089. + });
  1090. +
  1091. + await tcs.Task;
  1092. +
  1093. + Writer.CancelPendingFlush();
  1094. +
  1095. + await task;
  1096. +
  1097. + Assert.True(flushResult.IsCanceled);
  1098. + }
  1099. +
  1100. + // Only half of the data was written because every other flush failed.
  1101. + Assert.Equal(16 * 10, ReadWithoutFlush().Length);
  1102. +
  1103. + // Start allowing all writes to make read succeed.
  1104. + singleWriteStream.AllowAllWrites = true;
  1105. +
  1106. + Assert.Equal(16 * 10 * 2, Read().Length);
  1107. + }
  1108. +
  1109. + private async Task CheckWriteIsNotCanceled()
  1110. + {
  1111. + var flushResult = await Writer.WriteAsync(Encoding.ASCII.GetBytes("data"));
  1112. + Assert.False(flushResult.IsCanceled);
  1113. + }
  1114. +
  1115. + private void CheckCanceledFlush()
  1116. + {
  1117. + PipeWriter writableBuffer = Writer.WriteEmpty(MaximumSizeHigh);
  1118. +
  1119. + Writer.CancelPendingFlush();
  1120. +
  1121. + ValueTask<FlushResult> flushAsync = writableBuffer.FlushAsync();
  1122. +
  1123. + Assert.True(flushAsync.IsCompleted);
  1124. + FlushResult flushResult = flushAsync.GetAwaiter().GetResult();
  1125. + Assert.True(flushResult.IsCanceled);
  1126. + }
  1127. + }
  1128. +
  1129. + internal class HangingStream : MemoryStream
  1130. + {
  1131. +
  1132. + public HangingStream()
  1133. + {
  1134. + }
  1135. +
  1136. + public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
  1137. + {
  1138. + await Task.Delay(30000, cancellationToken);
  1139. + }
  1140. +
  1141. + public override async Task FlushAsync(CancellationToken cancellationToken)
  1142. + {
  1143. + await Task.Delay(30000, cancellationToken);
  1144. + }
  1145. +
  1146. + public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
  1147. + {
  1148. + await Task.Delay(30000, cancellationToken);
  1149. + return 0;
  1150. + }
  1151. + }
  1152. +
  1153. + internal class SingleWriteStream : MemoryStream
  1154. + {
  1155. + private bool _shouldNextWriteFail;
  1156. +
  1157. + public bool AllowAllWrites { get; set; }
  1158. +
  1159. +
  1160. +#if NETCOREAPP2_2
  1161. + public override async ValueTask WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default)
  1162. + {
  1163. + try
  1164. + {
  1165. + if (_shouldNextWriteFail && !AllowAllWrites)
  1166. + {
  1167. + await Task.Delay(30000, cancellationToken);
  1168. + }
  1169. + else
  1170. + {
  1171. + await base.WriteAsync(source, cancellationToken);
  1172. + }
  1173. + }
  1174. + finally
  1175. + {
  1176. + _shouldNextWriteFail = !_shouldNextWriteFail;
  1177. + }
  1178. + }
  1179. +#endif
  1180. +
  1181. + public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
  1182. + {
  1183. + try
  1184. + {
  1185. + if (_shouldNextWriteFail && !AllowAllWrites)
  1186. + {
  1187. + await Task.Delay(30000, cancellationToken);
  1188. + }
  1189. + await base.WriteAsync(buffer, offset, count, cancellationToken);
  1190. + }
  1191. + finally
  1192. + {
  1193. + _shouldNextWriteFail = !_shouldNextWriteFail;
  1194. + }
  1195. + }
  1196. + }
  1197. +
  1198. + internal class CannotFlushStream : MemoryStream
  1199. + {
  1200. + public override async Task FlushAsync(CancellationToken cancellationToken)
  1201. + {
  1202. + await Task.Delay(30000, cancellationToken);
  1203. + }
  1204. + }
  1205. +
  1206. + internal static class TestWriterExtensions
  1207. + {
  1208. + public static PipeWriter WriteEmpty(this PipeWriter Writer, int count)
  1209. + {
  1210. + Writer.GetSpan(count).Slice(0, count).Fill(0);
  1211. + Writer.Advance(count);
  1212. + return Writer;
  1213. + }
  1214. + }
  1215. +}
  1216. diff --git a/test/Microsoft.AspNetCore.Http.Tests/TestMemoryPool.cs b/test/Microsoft.AspNetCore.Http.Tests/TestMemoryPool.cs
  1217. new file mode 100644
  1218. index 00000000000..c5dd647dd19
  1219. --- /dev/null
  1220. +++ b/test/Microsoft.AspNetCore.Http.Tests/TestMemoryPool.cs
  1221. @@ -0,0 +1,139 @@
  1222. +// Licensed to the .NET Foundation under one or more agreements.
  1223. +// The .NET Foundation licenses this file to you under the MIT license.
  1224. +// See the LICENSE file in the project root for more information.
  1225. +
  1226. +using System;
  1227. +using System.Buffers;
  1228. +using System.Diagnostics;
  1229. +using System.Runtime.CompilerServices;
  1230. +using System.Runtime.InteropServices;
  1231. +using System.Threading;
  1232. +
  1233. +namespace Microsoft.AspNetCore.Http.Tests
  1234. +{
  1235. + public class TestMemoryPool : MemoryPool<byte>
  1236. + {
  1237. + private MemoryPool<byte> _pool = Shared;
  1238. +
  1239. + private bool _disposed;
  1240. +
  1241. + public override IMemoryOwner<byte> Rent(int minBufferSize = -1)
  1242. + {
  1243. + CheckDisposed();
  1244. + return new PooledMemory(_pool.Rent(minBufferSize), this);
  1245. + }
  1246. +
  1247. + protected override void Dispose(bool disposing)
  1248. + {
  1249. + _disposed = true;
  1250. + }
  1251. +
  1252. + public override int MaxBufferSize => 4096;
  1253. +
  1254. + internal void CheckDisposed()
  1255. + {
  1256. + if (_disposed)
  1257. + {
  1258. + throw new ObjectDisposedException(nameof(TestMemoryPool));
  1259. + }
  1260. + }
  1261. +
  1262. + private class PooledMemory : MemoryManager<byte>
  1263. + {
  1264. + private IMemoryOwner<byte> _owner;
  1265. +
  1266. + private readonly TestMemoryPool _pool;
  1267. +
  1268. + private int _referenceCount;
  1269. +
  1270. + private bool _returned;
  1271. +
  1272. + private string _leaser;
  1273. +
  1274. + public PooledMemory(IMemoryOwner<byte> owner, TestMemoryPool pool)
  1275. + {
  1276. + _owner = owner;
  1277. + _pool = pool;
  1278. + _leaser = Environment.StackTrace;
  1279. + _referenceCount = 1;
  1280. + }
  1281. +
  1282. + ~PooledMemory()
  1283. + {
  1284. + Debug.Assert(_returned, "Block being garbage collected instead of returned to pool" + Environment.NewLine + _leaser);
  1285. + }
  1286. +
  1287. + protected override void Dispose(bool disposing)
  1288. + {
  1289. + _pool.CheckDisposed();
  1290. + }
  1291. +
  1292. + public override MemoryHandle Pin(int elementIndex = 0)
  1293. + {
  1294. + _pool.CheckDisposed();
  1295. + Interlocked.Increment(ref _referenceCount);
  1296. +
  1297. + if (!MemoryMarshal.TryGetArray(_owner.Memory, out ArraySegment<byte> segment))
  1298. + {
  1299. + throw new InvalidOperationException();
  1300. + }
  1301. +
  1302. + unsafe
  1303. + {
  1304. + try
  1305. + {
  1306. + if ((uint)elementIndex > (uint)segment.Count)
  1307. + {
  1308. + throw new ArgumentOutOfRangeException(nameof(elementIndex));
  1309. + }
  1310. +
  1311. + GCHandle handle = GCHandle.Alloc(segment.Array, GCHandleType.Pinned);
  1312. +
  1313. + return new MemoryHandle(Unsafe.Add<byte>(((void*)handle.AddrOfPinnedObject()), elementIndex + segment.Offset), handle, this);
  1314. + }
  1315. + catch
  1316. + {
  1317. + Unpin();
  1318. + throw;
  1319. + }
  1320. + }
  1321. + }
  1322. +
  1323. + public override void Unpin()
  1324. + {
  1325. + _pool.CheckDisposed();
  1326. +
  1327. + int newRefCount = Interlocked.Decrement(ref _referenceCount);
  1328. +
  1329. + if (newRefCount < 0)
  1330. + throw new InvalidOperationException();
  1331. +
  1332. + if (newRefCount == 0)
  1333. + {
  1334. + _returned = true;
  1335. + }
  1336. + }
  1337. +
  1338. + protected override bool TryGetArray(out ArraySegment<byte> segment)
  1339. + {
  1340. + _pool.CheckDisposed();
  1341. + return MemoryMarshal.TryGetArray(_owner.Memory, out segment);
  1342. + }
  1343. +
  1344. + public override Memory<byte> Memory
  1345. + {
  1346. + get
  1347. + {
  1348. + _pool.CheckDisposed();
  1349. + return _owner.Memory;
  1350. + }
  1351. + }
  1352. +
  1353. + public override Span<byte> GetSpan()
  1354. + {
  1355. + _pool.CheckDisposed();
  1356. + return _owner.Memory.Span;
  1357. + }
  1358. + }
  1359. + }
  1360. +}
  1361. \ No newline at end of file