Buffer.cs 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Reactive.Concurrency;
  6. using System.Reactive.Disposables;
  7. using System.Threading;
  8. using System.Threading.Tasks;
  9. namespace System.Reactive.Linq
  10. {
  11. partial class AsyncObservable
  12. {
  13. public static IAsyncObservable<IList<TSource>> Buffer<TSource>(this IAsyncObservable<TSource> source, int count)
  14. {
  15. if (source == null)
  16. throw new ArgumentNullException(nameof(source));
  17. if (count <= 0)
  18. throw new ArgumentNullException(nameof(count));
  19. return Create<IList<TSource>>(observer => source.SubscribeSafeAsync(AsyncObserver.Buffer(observer, count)));
  20. }
  21. public static IAsyncObservable<IList<TSource>> Buffer<TSource>(this IAsyncObservable<TSource> source, int count, int skip)
  22. {
  23. if (source == null)
  24. throw new ArgumentNullException(nameof(source));
  25. if (count <= 0)
  26. throw new ArgumentNullException(nameof(count));
  27. if (skip <= 0)
  28. throw new ArgumentNullException(nameof(skip));
  29. return Create<IList<TSource>>(observer => source.SubscribeSafeAsync(AsyncObserver.Buffer(observer, count, skip)));
  30. }
  31. public static IAsyncObservable<IList<TSource>> Buffer<TSource>(this IAsyncObservable<TSource> source, TimeSpan timeSpan)
  32. {
  33. if (source == null)
  34. throw new ArgumentNullException(nameof(source));
  35. if (timeSpan < TimeSpan.Zero)
  36. throw new ArgumentNullException(nameof(timeSpan));
  37. return Create<IList<TSource>>(async observer =>
  38. {
  39. var (sink, timer) = await AsyncObserver.Buffer(observer, timeSpan).ConfigureAwait(false);
  40. var subscription = await source.SubscribeSafeAsync(sink).ConfigureAwait(false);
  41. return StableCompositeAsyncDisposable.Create(subscription, timer);
  42. });
  43. }
  44. public static IAsyncObservable<IList<TSource>> Buffer<TSource>(this IAsyncObservable<TSource> source, TimeSpan timeSpan, IAsyncScheduler scheduler)
  45. {
  46. if (source == null)
  47. throw new ArgumentNullException(nameof(source));
  48. if (timeSpan < TimeSpan.Zero)
  49. throw new ArgumentNullException(nameof(timeSpan));
  50. if (scheduler == null)
  51. throw new ArgumentNullException(nameof(scheduler));
  52. return Create<IList<TSource>>(async observer =>
  53. {
  54. var (sink, timer) = await AsyncObserver.Buffer(observer, timeSpan, scheduler).ConfigureAwait(false);
  55. var subscription = await source.SubscribeSafeAsync(sink).ConfigureAwait(false);
  56. return StableCompositeAsyncDisposable.Create(subscription, timer);
  57. });
  58. }
  59. public static IAsyncObservable<IList<TSource>> Buffer<TSource>(this IAsyncObservable<TSource> source, TimeSpan timeSpan, TimeSpan timeShift)
  60. {
  61. if (source == null)
  62. throw new ArgumentNullException(nameof(source));
  63. if (timeSpan < TimeSpan.Zero)
  64. throw new ArgumentNullException(nameof(timeSpan));
  65. if (timeShift < TimeSpan.Zero)
  66. throw new ArgumentNullException(nameof(timeShift));
  67. return Create<IList<TSource>>(async observer =>
  68. {
  69. var (sink, timer) = await AsyncObserver.Buffer(observer, timeSpan, timeShift).ConfigureAwait(false);
  70. var subscription = await source.SubscribeSafeAsync(sink).ConfigureAwait(false);
  71. return StableCompositeAsyncDisposable.Create(subscription, timer);
  72. });
  73. }
  74. public static IAsyncObservable<IList<TSource>> Buffer<TSource>(this IAsyncObservable<TSource> source, TimeSpan timeSpan, TimeSpan timeShift, IAsyncScheduler scheduler)
  75. {
  76. if (source == null)
  77. throw new ArgumentNullException(nameof(source));
  78. if (timeSpan < TimeSpan.Zero)
  79. throw new ArgumentNullException(nameof(timeSpan));
  80. if (timeShift < TimeSpan.Zero)
  81. throw new ArgumentNullException(nameof(timeShift));
  82. if (scheduler == null)
  83. throw new ArgumentNullException(nameof(scheduler));
  84. return Create<IList<TSource>>(async observer =>
  85. {
  86. var (sink, timer) = await AsyncObserver.Buffer(observer, timeSpan, timeShift, scheduler).ConfigureAwait(false);
  87. var subscription = await source.SubscribeSafeAsync(sink).ConfigureAwait(false);
  88. return StableCompositeAsyncDisposable.Create(subscription, timer);
  89. });
  90. }
  91. public static IAsyncObservable<IList<TSource>> Buffer<TSource>(this IAsyncObservable<TSource> source, TimeSpan timeSpan, int count)
  92. {
  93. if (source == null)
  94. throw new ArgumentNullException(nameof(source));
  95. if (timeSpan < TimeSpan.Zero)
  96. throw new ArgumentNullException(nameof(timeSpan));
  97. if (count <= 0)
  98. throw new ArgumentNullException(nameof(count));
  99. return Create<IList<TSource>>(async observer =>
  100. {
  101. var (sink, timer) = await AsyncObserver.Buffer(observer, timeSpan, count).ConfigureAwait(false);
  102. var subscription = await source.SubscribeSafeAsync(sink).ConfigureAwait(false);
  103. return StableCompositeAsyncDisposable.Create(subscription, timer);
  104. });
  105. }
  106. public static IAsyncObservable<IList<TSource>> Buffer<TSource>(this IAsyncObservable<TSource> source, TimeSpan timeSpan, int count, IAsyncScheduler scheduler)
  107. {
  108. if (source == null)
  109. throw new ArgumentNullException(nameof(source));
  110. if (timeSpan < TimeSpan.Zero)
  111. throw new ArgumentNullException(nameof(timeSpan));
  112. if (count <= 0)
  113. throw new ArgumentNullException(nameof(count));
  114. if (scheduler == null)
  115. throw new ArgumentNullException(nameof(scheduler));
  116. return Create<IList<TSource>>(async observer =>
  117. {
  118. var (sink, timer) = await AsyncObserver.Buffer(observer, timeSpan, count, scheduler).ConfigureAwait(false);
  119. var subscription = await source.SubscribeSafeAsync(sink).ConfigureAwait(false);
  120. return StableCompositeAsyncDisposable.Create(subscription, timer);
  121. });
  122. }
  123. public static IAsyncObservable<IList<TSource>> Buffer<TSource, TBufferBoundary>(this IAsyncObservable<TSource> source, IAsyncObservable<TBufferBoundary> bufferBoundaries)
  124. {
  125. if (source == null)
  126. throw new ArgumentNullException(nameof(source));
  127. if (bufferBoundaries == null)
  128. throw new ArgumentNullException(nameof(bufferBoundaries));
  129. return Create<IList<TSource>>(async observer =>
  130. {
  131. var (sourceObserver, boundariesObserver) = AsyncObserver.Buffer<TSource, TBufferBoundary>(observer);
  132. var sourceSubscription = await source.SubscribeSafeAsync(sourceObserver).ConfigureAwait(false);
  133. var boundariesSubscription = await bufferBoundaries.SubscribeSafeAsync(boundariesObserver).ConfigureAwait(false);
  134. return StableCompositeAsyncDisposable.Create(sourceSubscription, boundariesSubscription);
  135. });
  136. }
  137. // REVIEW: This overload is inherited from Rx but arguably a bit esoteric as it doesn't provide context to the closing selector.
  138. public static IAsyncObservable<IList<TSource>> Buffer<TSource, TBufferClosing>(this IAsyncObservable<TSource> source, Func<IAsyncObservable<TBufferClosing>> bufferClosingSelector)
  139. {
  140. if (source == null)
  141. throw new ArgumentNullException(nameof(source));
  142. if (bufferClosingSelector == null)
  143. throw new ArgumentNullException(nameof(bufferClosingSelector));
  144. return Create<IList<TSource>>(async observer =>
  145. {
  146. var (sourceObserver, closingDisposable) = await AsyncObserver.Buffer<TSource, TBufferClosing>(observer, bufferClosingSelector).ConfigureAwait(false);
  147. var sourceSubscription = await source.SubscribeSafeAsync(sourceObserver).ConfigureAwait(false);
  148. return StableCompositeAsyncDisposable.Create(sourceSubscription, closingDisposable);
  149. });
  150. }
  151. }
  152. partial class AsyncObserver
  153. {
  154. public static IAsyncObserver<TSource> Buffer<TSource>(IAsyncObserver<IList<TSource>> observer, int count) => Buffer(observer, count, count);
  155. public static IAsyncObserver<TSource> Buffer<TSource>(IAsyncObserver<IList<TSource>> observer, int count, int skip)
  156. {
  157. if (observer == null)
  158. throw new ArgumentNullException(nameof(observer));
  159. if (count <= 0)
  160. throw new ArgumentNullException(nameof(count));
  161. if (skip <= 0)
  162. throw new ArgumentNullException(nameof(skip));
  163. var queue = new Queue<IList<TSource>>();
  164. var n = 0;
  165. void CreateBuffer() => queue.Enqueue(new List<TSource>());
  166. CreateBuffer();
  167. return Create<TSource>(
  168. async x =>
  169. {
  170. foreach (var buffer in queue)
  171. {
  172. buffer.Add(x);
  173. }
  174. var c = n - count + 1;
  175. if (c >= 0 && c % skip == 0)
  176. {
  177. var buffer = queue.Dequeue();
  178. if (buffer.Count > 0)
  179. {
  180. await observer.OnNextAsync(buffer).ConfigureAwait(false);
  181. }
  182. }
  183. n++;
  184. if (n % skip == 0)
  185. {
  186. CreateBuffer();
  187. }
  188. },
  189. ex =>
  190. {
  191. while (queue.Count > 0)
  192. {
  193. queue.Dequeue().Clear();
  194. }
  195. return observer.OnErrorAsync(ex);
  196. },
  197. async () =>
  198. {
  199. while (queue.Count > 0)
  200. {
  201. var buffer = queue.Dequeue();
  202. if (buffer.Count > 0)
  203. {
  204. await observer.OnNextAsync(buffer).ConfigureAwait(false);
  205. }
  206. }
  207. await observer.OnCompletedAsync().ConfigureAwait(false);
  208. }
  209. );
  210. }
  211. public static Task<(IAsyncObserver<TSource>, IAsyncDisposable)> Buffer<TSource>(IAsyncObserver<IList<TSource>> observer, TimeSpan timeSpan) => Buffer(observer, timeSpan, TaskPoolAsyncScheduler.Default);
  212. public static Task<(IAsyncObserver<TSource>, IAsyncDisposable)> Buffer<TSource>(IAsyncObserver<IList<TSource>> observer, TimeSpan timeSpan, IAsyncScheduler scheduler)
  213. {
  214. if (observer == null)
  215. throw new ArgumentNullException(nameof(observer));
  216. if (timeSpan < TimeSpan.Zero)
  217. throw new ArgumentNullException(nameof(timeSpan));
  218. if (scheduler == null)
  219. throw new ArgumentNullException(nameof(scheduler));
  220. return CoreAsync();
  221. async Task<(IAsyncObserver<TSource>, IAsyncDisposable)> CoreAsync()
  222. {
  223. var gate = new AsyncLock();
  224. var buffer = new List<TSource>();
  225. var sink = Create<TSource>(
  226. async x =>
  227. {
  228. using (await gate.LockAsync().ConfigureAwait(false))
  229. {
  230. buffer.Add(x);
  231. }
  232. },
  233. async ex =>
  234. {
  235. using (await gate.LockAsync().ConfigureAwait(false))
  236. {
  237. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  238. }
  239. },
  240. async () =>
  241. {
  242. using (await gate.LockAsync().ConfigureAwait(false))
  243. {
  244. if (buffer.Count > 0)
  245. {
  246. await observer.OnNextAsync(buffer).ConfigureAwait(false);
  247. }
  248. await observer.OnCompletedAsync().ConfigureAwait(false);
  249. }
  250. }
  251. );
  252. var timer = await scheduler.ScheduleAsync(async ct =>
  253. {
  254. while (!ct.IsCancellationRequested)
  255. {
  256. using (await gate.LockAsync().ConfigureAwait(false))
  257. {
  258. if (buffer.Count > 0)
  259. {
  260. await observer.OnNextAsync(buffer).ConfigureAwait(false);
  261. buffer = new List<TSource>();
  262. }
  263. }
  264. await scheduler.Delay(timeSpan, ct).RendezVous(scheduler, ct);
  265. }
  266. }, timeSpan).ConfigureAwait(false);
  267. return (sink, timer);
  268. };
  269. }
  270. public static Task<(IAsyncObserver<TSource>, IAsyncDisposable)> Buffer<TSource>(IAsyncObserver<IList<TSource>> observer, TimeSpan timeSpan, TimeSpan timeShift) => Buffer(observer, timeSpan, timeShift, TaskPoolAsyncScheduler.Default);
  271. public static Task<(IAsyncObserver<TSource>, IAsyncDisposable)> Buffer<TSource>(IAsyncObserver<IList<TSource>> observer, TimeSpan timeSpan, TimeSpan timeShift, IAsyncScheduler scheduler)
  272. {
  273. if (observer == null)
  274. throw new ArgumentNullException(nameof(observer));
  275. if (timeSpan < TimeSpan.Zero)
  276. throw new ArgumentNullException(nameof(timeSpan));
  277. if (timeShift < TimeSpan.Zero)
  278. throw new ArgumentNullException(nameof(timeShift));
  279. if (scheduler == null)
  280. throw new ArgumentNullException(nameof(scheduler));
  281. return CoreAsync();
  282. async Task<(IAsyncObserver<TSource>, IAsyncDisposable)> CoreAsync()
  283. {
  284. var gate = new AsyncLock();
  285. var queue = new Queue<List<TSource>>();
  286. queue.Enqueue(new List<TSource>());
  287. var sink = Create<TSource>(
  288. async x =>
  289. {
  290. using (await gate.LockAsync().ConfigureAwait(false))
  291. {
  292. foreach (var buffer in queue)
  293. {
  294. buffer.Add(x);
  295. }
  296. }
  297. },
  298. async ex =>
  299. {
  300. using (await gate.LockAsync().ConfigureAwait(false))
  301. {
  302. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  303. }
  304. },
  305. async () =>
  306. {
  307. using (await gate.LockAsync().ConfigureAwait(false))
  308. {
  309. while (queue.Count > 0)
  310. {
  311. var buffer = queue.Dequeue();
  312. if (buffer.Count > 0)
  313. {
  314. await observer.OnNextAsync(buffer).ConfigureAwait(false);
  315. }
  316. }
  317. await observer.OnCompletedAsync().ConfigureAwait(false);
  318. }
  319. }
  320. );
  321. var nextOpen = timeShift;
  322. var nextClose = timeSpan;
  323. var totalTime = TimeSpan.Zero;
  324. var isOpen = false;
  325. var isClose = false;
  326. TimeSpan GetNextDue()
  327. {
  328. if (nextOpen == nextClose)
  329. {
  330. isOpen = isClose = true;
  331. }
  332. else if (nextClose < nextOpen)
  333. {
  334. isClose = true;
  335. isOpen = false;
  336. }
  337. else
  338. {
  339. isOpen = true;
  340. isClose = false;
  341. }
  342. var newTotalTime = isClose ? nextClose : nextOpen;
  343. var due = newTotalTime - totalTime;
  344. totalTime = newTotalTime;
  345. if (isOpen)
  346. {
  347. nextOpen += timeShift;
  348. }
  349. if (isClose)
  350. {
  351. nextClose += timeShift;
  352. }
  353. return due;
  354. }
  355. var timer = await scheduler.ScheduleAsync(async ct =>
  356. {
  357. while (!ct.IsCancellationRequested)
  358. {
  359. using (await gate.LockAsync().ConfigureAwait(false))
  360. {
  361. if (isClose)
  362. {
  363. var buffer = queue.Dequeue();
  364. if (buffer.Count > 0)
  365. {
  366. await observer.OnNextAsync(buffer).RendezVous(scheduler, ct);
  367. }
  368. }
  369. if (isOpen)
  370. {
  371. queue.Enqueue(new List<TSource>());
  372. }
  373. }
  374. await scheduler.Delay(GetNextDue(), ct).RendezVous(scheduler, ct);
  375. }
  376. }, GetNextDue()).ConfigureAwait(false);
  377. return (sink, timer);
  378. };
  379. }
  380. public static Task<(IAsyncObserver<TSource>, IAsyncDisposable)> Buffer<TSource>(IAsyncObserver<IList<TSource>> observer, TimeSpan timeSpan, int count) => Buffer(observer, timeSpan, count, TaskPoolAsyncScheduler.Default);
  381. public static Task<(IAsyncObserver<TSource>, IAsyncDisposable)> Buffer<TSource>(IAsyncObserver<IList<TSource>> observer, TimeSpan timeSpan, int count, IAsyncScheduler scheduler)
  382. {
  383. if (observer == null)
  384. throw new ArgumentNullException(nameof(observer));
  385. if (timeSpan < TimeSpan.Zero)
  386. throw new ArgumentNullException(nameof(timeSpan));
  387. if (count <= 0)
  388. throw new ArgumentNullException(nameof(count));
  389. if (scheduler == null)
  390. throw new ArgumentNullException(nameof(scheduler));
  391. return CoreAsync();
  392. async Task<(IAsyncObserver<TSource>, IAsyncDisposable)> CoreAsync()
  393. {
  394. var gate = new AsyncLock();
  395. var timer = new SerialAsyncDisposable();
  396. var buffer = new List<TSource>();
  397. var n = 0;
  398. var id = 0;
  399. async Task CreateTimerAsync(int timerId)
  400. {
  401. var d = await scheduler.ScheduleAsync(async ct =>
  402. {
  403. using (await gate.LockAsync().ConfigureAwait(false))
  404. {
  405. if (timerId == id)
  406. {
  407. await FlushAsync().ConfigureAwait(false);
  408. }
  409. }
  410. }, timeSpan).ConfigureAwait(false);
  411. await timer.AssignAsync(d).ConfigureAwait(false);
  412. }
  413. async Task FlushAsync()
  414. {
  415. n = 0;
  416. ++id;
  417. var res = buffer;
  418. buffer = new List<TSource>();
  419. await observer.OnNextAsync(buffer).ConfigureAwait(false);
  420. await CreateTimerAsync(id).ConfigureAwait(false);
  421. }
  422. var sink = Create<TSource>(
  423. async x =>
  424. {
  425. using (await gate.LockAsync().ConfigureAwait(false))
  426. {
  427. buffer.Add(x);
  428. if (++n == count)
  429. {
  430. await FlushAsync().ConfigureAwait(false);
  431. }
  432. }
  433. },
  434. async ex =>
  435. {
  436. using (await gate.LockAsync().ConfigureAwait(false))
  437. {
  438. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  439. }
  440. },
  441. async () =>
  442. {
  443. using (await gate.LockAsync().ConfigureAwait(false))
  444. {
  445. await observer.OnNextAsync(buffer).ConfigureAwait(false); // NB: We don't check for non-empty in sync Rx either.
  446. await observer.OnCompletedAsync().ConfigureAwait(false);
  447. }
  448. }
  449. );
  450. await CreateTimerAsync(0).ConfigureAwait(false);
  451. return (sink, timer);
  452. };
  453. }
  454. public static (IAsyncObserver<TSource>, IAsyncObserver<TBufferBoundary>) Buffer<TSource, TBufferBoundary>(IAsyncObserver<IList<TSource>> observer)
  455. {
  456. if (observer == null)
  457. throw new ArgumentNullException(nameof(observer));
  458. var gate = new AsyncLock();
  459. var buffer = new List<TSource>();
  460. return
  461. (
  462. Create<TSource>(
  463. async x =>
  464. {
  465. using (await gate.LockAsync().ConfigureAwait(false))
  466. {
  467. buffer.Add(x);
  468. }
  469. },
  470. async ex =>
  471. {
  472. using (await gate.LockAsync().ConfigureAwait(false))
  473. {
  474. buffer.Clear();
  475. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  476. }
  477. },
  478. async () =>
  479. {
  480. using (await gate.LockAsync().ConfigureAwait(false))
  481. {
  482. await observer.OnNextAsync(buffer).ConfigureAwait(false);
  483. await observer.OnCompletedAsync().ConfigureAwait(false);
  484. }
  485. }
  486. ),
  487. Create<TBufferBoundary>(
  488. async x =>
  489. {
  490. using (await gate.LockAsync().ConfigureAwait(false))
  491. {
  492. var oldBuffer = buffer;
  493. buffer = new List<TSource>();
  494. await observer.OnNextAsync(oldBuffer).ConfigureAwait(false);
  495. }
  496. },
  497. async ex =>
  498. {
  499. using (await gate.LockAsync().ConfigureAwait(false))
  500. {
  501. buffer.Clear();
  502. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  503. }
  504. },
  505. async () =>
  506. {
  507. using (await gate.LockAsync().ConfigureAwait(false))
  508. {
  509. await observer.OnNextAsync(buffer).ConfigureAwait(false);
  510. await observer.OnCompletedAsync().ConfigureAwait(false);
  511. }
  512. }
  513. )
  514. );
  515. }
  516. public static Task<(IAsyncObserver<TSource>, IAsyncDisposable)> Buffer<TSource, TBufferClosing>(IAsyncObserver<IList<TSource>> observer, Func<IAsyncObservable<TBufferClosing>> bufferClosingSelector)
  517. {
  518. if (observer == null)
  519. throw new ArgumentNullException(nameof(observer));
  520. if (bufferClosingSelector == null)
  521. throw new ArgumentNullException(nameof(bufferClosingSelector));
  522. return CoreAsync();
  523. async Task<(IAsyncObserver<TSource>, IAsyncDisposable)> CoreAsync()
  524. {
  525. var closeSubscription = new SerialAsyncDisposable();
  526. var gate = new AsyncLock();
  527. var queueLock = new AsyncQueueLock();
  528. var buffer = new List<TSource>();
  529. async Task CreateBufferCloseAsync()
  530. {
  531. var closing = default(IAsyncObservable<TBufferClosing>);
  532. try
  533. {
  534. closing = bufferClosingSelector(); // REVIEW: Do we need an async variant?
  535. }
  536. catch (Exception ex)
  537. {
  538. using (await gate.LockAsync().ConfigureAwait(false))
  539. {
  540. buffer.Clear();
  541. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  542. }
  543. return;
  544. }
  545. var closingSubscription = new SingleAssignmentAsyncDisposable();
  546. await closeSubscription.AssignAsync(closingSubscription).ConfigureAwait(false);
  547. async Task CloseBufferAsync()
  548. {
  549. await closingSubscription.DisposeAsync().ConfigureAwait(false);
  550. using (await gate.LockAsync().ConfigureAwait(false))
  551. {
  552. var oldBuffer = buffer;
  553. buffer = new List<TSource>();
  554. await observer.OnNextAsync(oldBuffer).ConfigureAwait(false);
  555. }
  556. await queueLock.WaitAsync(CreateBufferCloseAsync).ConfigureAwait(false);
  557. }
  558. var closingObserver =
  559. Create<TBufferClosing>(
  560. x => CloseBufferAsync(),
  561. async ex =>
  562. {
  563. using (await gate.LockAsync().ConfigureAwait(false))
  564. {
  565. buffer.Clear();
  566. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  567. }
  568. },
  569. CloseBufferAsync
  570. );
  571. var closingSubscriptionInner = await closing.SubscribeSafeAsync(closingObserver).ConfigureAwait(false);
  572. await closingSubscription.AssignAsync(closingSubscriptionInner).ConfigureAwait(false);
  573. }
  574. var sink =
  575. Create<TSource>(
  576. async x =>
  577. {
  578. using (await gate.LockAsync().ConfigureAwait(false))
  579. {
  580. buffer.Add(x);
  581. }
  582. },
  583. async ex =>
  584. {
  585. using (await gate.LockAsync().ConfigureAwait(false))
  586. {
  587. buffer.Clear();
  588. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  589. }
  590. },
  591. async () =>
  592. {
  593. using (await gate.LockAsync().ConfigureAwait(false))
  594. {
  595. await observer.OnNextAsync(buffer).ConfigureAwait(false);
  596. await observer.OnCompletedAsync().ConfigureAwait(false);
  597. }
  598. }
  599. );
  600. await queueLock.WaitAsync(CreateBufferCloseAsync).ConfigureAwait(false);
  601. return (sink, closeSubscription);
  602. }
  603. }
  604. }
  605. }