Window.cs 37 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Reactive.Concurrency;
  6. using System.Reactive.Disposables;
  7. using System.Reactive.Subjects;
  8. using System.Threading;
  9. using System.Threading.Tasks;
  10. namespace System.Reactive.Linq
  11. {
  12. public partial class AsyncObservable
  13. {
  14. public static IAsyncObservable<IAsyncObservable<TSource>> Window<TSource>(this IAsyncObservable<TSource> source, int count)
  15. {
  16. if (source == null)
  17. throw new ArgumentNullException(nameof(source));
  18. if (count <= 0)
  19. throw new ArgumentOutOfRangeException(nameof(count));
  20. return Create(
  21. source,
  22. count,
  23. default(IAsyncObservable<TSource>),
  24. (source, count, observer) => WindowCore(source, observer, (o, d) => AsyncObserver.Window(o, d, count)));
  25. }
  26. public static IAsyncObservable<IAsyncObservable<TSource>> Window<TSource>(this IAsyncObservable<TSource> source, int count, int skip)
  27. {
  28. if (source == null)
  29. throw new ArgumentNullException(nameof(source));
  30. if (count <= 0)
  31. throw new ArgumentOutOfRangeException(nameof(count));
  32. if (skip <= 0)
  33. throw new ArgumentOutOfRangeException(nameof(skip));
  34. return Create(
  35. source,
  36. (count, skip),
  37. default(IAsyncObservable<TSource>),
  38. (source, state, observer) => WindowCore(source, observer, (o, d) => AsyncObserver.Window(o, d, state.count, state.skip)));
  39. }
  40. public static IAsyncObservable<IAsyncObservable<TSource>> Window<TSource>(this IAsyncObservable<TSource> source, TimeSpan timeSpan)
  41. {
  42. if (source == null)
  43. throw new ArgumentNullException(nameof(source));
  44. if (timeSpan < TimeSpan.Zero)
  45. throw new ArgumentOutOfRangeException(nameof(timeSpan));
  46. return Create(
  47. source,
  48. timeSpan,
  49. default(IAsyncObservable<TSource>),
  50. (source, timeSpan, observer) => WindowAsyncCore(source, observer, (o, d) => AsyncObserver.Window(o, d, timeSpan)));
  51. }
  52. public static IAsyncObservable<IAsyncObservable<TSource>> Window<TSource>(this IAsyncObservable<TSource> source, TimeSpan timeSpan, IAsyncScheduler scheduler)
  53. {
  54. if (source == null)
  55. throw new ArgumentNullException(nameof(source));
  56. if (timeSpan < TimeSpan.Zero)
  57. throw new ArgumentOutOfRangeException(nameof(timeSpan));
  58. if (scheduler == null)
  59. throw new ArgumentNullException(nameof(scheduler));
  60. return Create(
  61. source,
  62. (timeSpan, scheduler),
  63. default(IAsyncObservable<TSource>),
  64. (source, state, observer) => WindowAsyncCore(source, observer, (o, d) => AsyncObserver.Window(o, d, state.timeSpan, state.scheduler)));
  65. }
  66. public static IAsyncObservable<IAsyncObservable<TSource>> Window<TSource>(this IAsyncObservable<TSource> source, TimeSpan timeSpan, TimeSpan timeShift)
  67. {
  68. if (source == null)
  69. throw new ArgumentNullException(nameof(source));
  70. if (timeSpan < TimeSpan.Zero)
  71. throw new ArgumentOutOfRangeException(nameof(timeSpan));
  72. if (timeShift < TimeSpan.Zero)
  73. throw new ArgumentOutOfRangeException(nameof(timeShift));
  74. return Create(
  75. source,
  76. (timeSpan, timeShift),
  77. default(IAsyncObservable<TSource>),
  78. (source, state, observer) => WindowAsyncCore(source, observer, (o, d) => AsyncObserver.Window(o, d, state.timeSpan, state.timeShift)));
  79. }
  80. public static IAsyncObservable<IAsyncObservable<TSource>> Window<TSource>(this IAsyncObservable<TSource> source, TimeSpan timeSpan, TimeSpan timeShift, IAsyncScheduler scheduler)
  81. {
  82. if (source == null)
  83. throw new ArgumentNullException(nameof(source));
  84. if (timeSpan < TimeSpan.Zero)
  85. throw new ArgumentOutOfRangeException(nameof(timeSpan));
  86. if (timeShift < TimeSpan.Zero)
  87. throw new ArgumentOutOfRangeException(nameof(timeShift));
  88. if (scheduler == null)
  89. throw new ArgumentNullException(nameof(scheduler));
  90. return Create(
  91. source,
  92. (timeSpan, timeShift, scheduler),
  93. default(IAsyncObservable<TSource>),
  94. (source, state, observer) => WindowAsyncCore(source, observer, (o, d) => AsyncObserver.Window(o, d, state.timeSpan, state.timeShift, state.scheduler)));
  95. }
  96. public static IAsyncObservable<IAsyncObservable<TSource>> Window<TSource>(this IAsyncObservable<TSource> source, TimeSpan timeSpan, int count)
  97. {
  98. if (source == null)
  99. throw new ArgumentNullException(nameof(source));
  100. if (timeSpan < TimeSpan.Zero)
  101. throw new ArgumentOutOfRangeException(nameof(timeSpan));
  102. if (count <= 0)
  103. throw new ArgumentOutOfRangeException(nameof(count));
  104. return Create(
  105. source,
  106. (timeSpan, count),
  107. default(IAsyncObservable<TSource>),
  108. (source, state, observer) => WindowAsyncCore(source, observer, (o, d) => AsyncObserver.Window(o, d, state.timeSpan, state.count)));
  109. }
  110. public static IAsyncObservable<IAsyncObservable<TSource>> Window<TSource>(this IAsyncObservable<TSource> source, TimeSpan timeSpan, int count, IAsyncScheduler scheduler)
  111. {
  112. if (source == null)
  113. throw new ArgumentNullException(nameof(source));
  114. if (timeSpan < TimeSpan.Zero)
  115. throw new ArgumentOutOfRangeException(nameof(timeSpan));
  116. if (count <= 0)
  117. throw new ArgumentOutOfRangeException(nameof(count));
  118. if (scheduler == null)
  119. throw new ArgumentNullException(nameof(scheduler));
  120. return Create(
  121. source,
  122. (timeSpan, count, scheduler),
  123. default(IAsyncObservable<TSource>),
  124. (source, state, observer) => WindowAsyncCore(source, observer, (o, d) => AsyncObserver.Window(o, d, state.timeSpan, state.count, state.scheduler)));
  125. }
  126. public static IAsyncObservable<IAsyncObservable<TSource>> Window<TSource, TWindowBoundary>(this IAsyncObservable<TSource> source, IAsyncObservable<TWindowBoundary> windowBoundaries)
  127. {
  128. if (source == null)
  129. throw new ArgumentNullException(nameof(source));
  130. if (windowBoundaries == null)
  131. throw new ArgumentNullException(nameof(windowBoundaries));
  132. return Create(
  133. source,
  134. windowBoundaries,
  135. default(IAsyncObservable<TSource>),
  136. async (source, windowBoundaries, observer) =>
  137. {
  138. var d = new CompositeAsyncDisposable();
  139. var (sourceObserver, boundariesObserver, subscription) = await AsyncObserver.Window<TSource, TWindowBoundary>(observer, d).ConfigureAwait(false);
  140. var sourceSubscription = await source.SubscribeSafeAsync(sourceObserver).ConfigureAwait(false);
  141. await d.AddAsync(sourceSubscription).ConfigureAwait(false);
  142. var boundariesSubscription = await windowBoundaries.SubscribeSafeAsync(boundariesObserver).ConfigureAwait(false);
  143. await d.AddAsync(boundariesSubscription).ConfigureAwait(false);
  144. return subscription;
  145. });
  146. }
  147. // REVIEW: This overload is inherited from Rx but arguably a bit esoteric as it doesn't provide context to the closing selector.
  148. public static IAsyncObservable<IAsyncObservable<TSource>> Window<TSource, TWindowClosing>(this IAsyncObservable<TSource> source, Func<IAsyncObservable<TWindowClosing>> windowClosingSelector)
  149. {
  150. if (source == null)
  151. throw new ArgumentNullException(nameof(source));
  152. if (windowClosingSelector == null)
  153. throw new ArgumentNullException(nameof(windowClosingSelector));
  154. return Create(
  155. source,
  156. windowClosingSelector,
  157. default(IAsyncObservable<TSource>),
  158. async (source, windowClosingSelector, observer) =>
  159. {
  160. var d = new CompositeAsyncDisposable();
  161. var (sourceObserver, closingSubscription, subscription) = await AsyncObserver.Window<TSource, TWindowClosing>(observer, windowClosingSelector, d).ConfigureAwait(false);
  162. await d.AddAsync(closingSubscription).ConfigureAwait(false);
  163. var sourceSubscription = await source.SubscribeSafeAsync(sourceObserver).ConfigureAwait(false);
  164. await d.AddAsync(sourceSubscription).ConfigureAwait(false);
  165. return subscription;
  166. });
  167. }
  168. private static async ValueTask<IAsyncDisposable> WindowCore<TSource>(IAsyncObservable<TSource> source, IAsyncObserver<IAsyncObservable<TSource>> observer, Func<IAsyncObserver<IAsyncObservable<TSource>>, IAsyncDisposable, (IAsyncObserver<TSource>, IAsyncDisposable)> createObserver)
  169. {
  170. var d = new SingleAssignmentAsyncDisposable();
  171. var (sink, subscription) = createObserver(observer, d);
  172. var inner = await source.SubscribeSafeAsync(sink).ConfigureAwait(false);
  173. await d.AssignAsync(inner).ConfigureAwait(false);
  174. return subscription;
  175. }
  176. private static async ValueTask<IAsyncDisposable> WindowAsyncCore<TSource>(IAsyncObservable<TSource> source, IAsyncObserver<IAsyncObservable<TSource>> observer, Func<IAsyncObserver<IAsyncObservable<TSource>>, IAsyncDisposable, ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)>> createObserverAsync)
  177. {
  178. var d = new SingleAssignmentAsyncDisposable();
  179. var (sink, subscription) = await createObserverAsync(observer, d).ConfigureAwait(false);
  180. var inner = await source.SubscribeSafeAsync(sink).ConfigureAwait(false);
  181. await d.AssignAsync(inner).ConfigureAwait(false);
  182. return subscription;
  183. }
  184. }
  185. public partial class AsyncObserver
  186. {
  187. public static (IAsyncObserver<TSource>, IAsyncDisposable) Window<TSource>(IAsyncObserver<IAsyncObservable<TSource>> observer, IAsyncDisposable subscription, int count) => Window(observer, subscription, count, count);
  188. public static (IAsyncObserver<TSource>, IAsyncDisposable) Window<TSource>(IAsyncObserver<IAsyncObservable<TSource>> observer, IAsyncDisposable subscription, int count, int skip)
  189. {
  190. if (observer == null)
  191. throw new ArgumentNullException(nameof(observer));
  192. if (subscription == null)
  193. throw new ArgumentNullException(nameof(subscription));
  194. if (count <= 0)
  195. throw new ArgumentOutOfRangeException(nameof(count));
  196. if (skip <= 0)
  197. throw new ArgumentOutOfRangeException(nameof(skip));
  198. var refCount = new RefCountAsyncDisposable(subscription);
  199. var queue = new Queue<IAsyncSubject<TSource>>();
  200. var n = 0;
  201. return
  202. (
  203. Create<TSource>
  204. (
  205. async x =>
  206. {
  207. foreach (var window in queue)
  208. {
  209. await window.OnNextAsync(x).ConfigureAwait(false);
  210. }
  211. var i = n - count + 1;
  212. if (i >= 0 && i % skip == 0)
  213. {
  214. await queue.Dequeue().OnCompletedAsync().ConfigureAwait(false);
  215. }
  216. n++;
  217. if (n % skip == 0)
  218. {
  219. var window = new SequentialSimpleAsyncSubject<TSource>();
  220. queue.Enqueue(window);
  221. var wrapper = new WindowAsyncObservable<TSource>(window, refCount);
  222. await observer.OnNextAsync(wrapper).ConfigureAwait(false);
  223. }
  224. },
  225. async ex =>
  226. {
  227. while (queue.Count > 0)
  228. {
  229. await queue.Dequeue().OnErrorAsync(ex).ConfigureAwait(false);
  230. }
  231. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  232. },
  233. async () =>
  234. {
  235. while (queue.Count > 0)
  236. {
  237. await queue.Dequeue().OnCompletedAsync().ConfigureAwait(false);
  238. }
  239. await observer.OnCompletedAsync().ConfigureAwait(false);
  240. }
  241. ),
  242. refCount
  243. );
  244. }
  245. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> Window<TSource>(IAsyncObserver<IAsyncObservable<TSource>> observer, IAsyncDisposable subscription, TimeSpan timeSpan) => Window(observer, subscription, timeSpan, TaskPoolAsyncScheduler.Default);
  246. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> Window<TSource>(IAsyncObserver<IAsyncObservable<TSource>> observer, IAsyncDisposable subscription, TimeSpan timeSpan, IAsyncScheduler scheduler)
  247. {
  248. if (observer == null)
  249. throw new ArgumentNullException(nameof(observer));
  250. if (subscription == null)
  251. throw new ArgumentNullException(nameof(subscription));
  252. if (timeSpan < TimeSpan.Zero)
  253. throw new ArgumentOutOfRangeException(nameof(timeSpan));
  254. if (scheduler == null)
  255. throw new ArgumentNullException(nameof(scheduler));
  256. var gate = new AsyncLock();
  257. var window = default(IAsyncSubject<TSource>);
  258. var d = new CompositeAsyncDisposable();
  259. var refCount = new RefCountAsyncDisposable(d);
  260. async Task CreateWindowAsync()
  261. {
  262. window = new SequentialSimpleAsyncSubject<TSource>();
  263. var wrapper = new WindowAsyncObservable<TSource>(window, refCount);
  264. await observer.OnNextAsync(wrapper).ConfigureAwait(false);
  265. }
  266. async ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> CoreAsync()
  267. {
  268. await d.AddAsync(subscription).ConfigureAwait(false);
  269. await CreateWindowAsync().ConfigureAwait(false);
  270. var timer = await scheduler.ScheduleAsync(async ct =>
  271. {
  272. while (!ct.IsCancellationRequested)
  273. {
  274. using (await gate.LockAsync().ConfigureAwait(false))
  275. {
  276. await window.OnCompletedAsync().ConfigureAwait(false);
  277. await CreateWindowAsync().ConfigureAwait(false);
  278. }
  279. await scheduler.Delay(timeSpan, ct).RendezVous(scheduler, ct);
  280. }
  281. }, timeSpan).ConfigureAwait(false);
  282. await d.AddAsync(timer).ConfigureAwait(false);
  283. return
  284. (
  285. Create<TSource>(
  286. async x =>
  287. {
  288. using (await gate.LockAsync().ConfigureAwait(false))
  289. {
  290. await window.OnNextAsync(x).ConfigureAwait(false);
  291. }
  292. },
  293. async ex =>
  294. {
  295. using (await gate.LockAsync().ConfigureAwait(false))
  296. {
  297. await window.OnErrorAsync(ex).ConfigureAwait(false);
  298. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  299. }
  300. },
  301. async () =>
  302. {
  303. using (await gate.LockAsync().ConfigureAwait(false))
  304. {
  305. await window.OnCompletedAsync().ConfigureAwait(false);
  306. await observer.OnCompletedAsync().ConfigureAwait(false);
  307. }
  308. }
  309. ),
  310. refCount
  311. );
  312. }
  313. return CoreAsync();
  314. }
  315. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> Window<TSource>(IAsyncObserver<IAsyncObservable<TSource>> observer, IAsyncDisposable subscription, TimeSpan timeSpan, TimeSpan timeShift) => Window(observer, subscription, timeSpan, timeShift, TaskPoolAsyncScheduler.Default);
  316. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> Window<TSource>(IAsyncObserver<IAsyncObservable<TSource>> observer, IAsyncDisposable subscription, TimeSpan timeSpan, TimeSpan timeShift, IAsyncScheduler scheduler)
  317. {
  318. if (observer == null)
  319. throw new ArgumentNullException(nameof(observer));
  320. if (subscription == null)
  321. throw new ArgumentNullException(nameof(subscription));
  322. if (timeSpan < TimeSpan.Zero)
  323. throw new ArgumentOutOfRangeException(nameof(timeSpan));
  324. if (timeShift < TimeSpan.Zero)
  325. throw new ArgumentOutOfRangeException(nameof(timeShift));
  326. if (scheduler == null)
  327. throw new ArgumentNullException(nameof(scheduler));
  328. var gate = new AsyncLock();
  329. var d = new CompositeAsyncDisposable();
  330. var timer = new SerialAsyncDisposable();
  331. var refCount = new RefCountAsyncDisposable(d);
  332. var queue = new Queue<IAsyncSubject<TSource>>();
  333. var nextOpen = timeShift;
  334. var nextClose = timeSpan;
  335. var totalTime = TimeSpan.Zero;
  336. var isOpen = false;
  337. var isClose = false;
  338. TimeSpan GetNextDue()
  339. {
  340. if (nextOpen == nextClose)
  341. {
  342. isOpen = isClose = true;
  343. }
  344. else if (nextClose < nextOpen)
  345. {
  346. isClose = true;
  347. isOpen = false;
  348. }
  349. else
  350. {
  351. isOpen = true;
  352. isClose = false;
  353. }
  354. var newTotalTime = isClose ? nextClose : nextOpen;
  355. var due = newTotalTime - totalTime;
  356. totalTime = newTotalTime;
  357. if (isOpen)
  358. {
  359. nextOpen += timeShift;
  360. }
  361. if (isClose)
  362. {
  363. nextClose += timeShift;
  364. }
  365. return due;
  366. }
  367. async Task CreateWindowAsync()
  368. {
  369. var window = new SequentialSimpleAsyncSubject<TSource>();
  370. queue.Enqueue(window);
  371. var wrapper = new WindowAsyncObservable<TSource>(window, refCount);
  372. await observer.OnNextAsync(wrapper).ConfigureAwait(false);
  373. }
  374. async Task CreateTimer()
  375. {
  376. var inner = new SingleAssignmentAsyncDisposable();
  377. await timer.AssignAsync(inner).ConfigureAwait(false);
  378. var task = await scheduler.ScheduleAsync(async ct =>
  379. {
  380. while (!ct.IsCancellationRequested)
  381. {
  382. using (await gate.LockAsync().ConfigureAwait(false))
  383. {
  384. if (isClose)
  385. {
  386. await queue.Dequeue().OnCompletedAsync().RendezVous(scheduler, ct);
  387. }
  388. if (isOpen)
  389. {
  390. await CreateWindowAsync().RendezVous(scheduler, ct);
  391. }
  392. }
  393. await scheduler.Delay(GetNextDue(), ct).RendezVous(scheduler, ct);
  394. }
  395. }, GetNextDue()).ConfigureAwait(false);
  396. await inner.AssignAsync(task).ConfigureAwait(false);
  397. }
  398. async ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> CoreAsync()
  399. {
  400. await d.AddAsync(subscription).ConfigureAwait(false);
  401. await d.AddAsync(timer).ConfigureAwait(false);
  402. await CreateWindowAsync().ConfigureAwait(false);
  403. await CreateTimer().ConfigureAwait(false);
  404. return
  405. (
  406. Create<TSource>(
  407. async x =>
  408. {
  409. using (await gate.LockAsync().ConfigureAwait(false))
  410. {
  411. foreach (var window in queue)
  412. {
  413. await window.OnNextAsync(x).ConfigureAwait(false);
  414. }
  415. }
  416. },
  417. async ex =>
  418. {
  419. using (await gate.LockAsync().ConfigureAwait(false))
  420. {
  421. while (queue.Count > 0)
  422. {
  423. await queue.Dequeue().OnErrorAsync(ex).ConfigureAwait(false);
  424. }
  425. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  426. }
  427. },
  428. async () =>
  429. {
  430. using (await gate.LockAsync().ConfigureAwait(false))
  431. {
  432. while (queue.Count > 0)
  433. {
  434. await queue.Dequeue().OnCompletedAsync().ConfigureAwait(false);
  435. }
  436. await observer.OnCompletedAsync().ConfigureAwait(false);
  437. }
  438. }
  439. ),
  440. refCount
  441. );
  442. }
  443. return CoreAsync();
  444. }
  445. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> Window<TSource>(IAsyncObserver<IAsyncObservable<TSource>> observer, IAsyncDisposable subscription, TimeSpan timeSpan, int count) => Window(observer, subscription, timeSpan, count, TaskPoolAsyncScheduler.Default);
  446. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> Window<TSource>(IAsyncObserver<IAsyncObservable<TSource>> observer, IAsyncDisposable subscription, TimeSpan timeSpan, int count, IAsyncScheduler scheduler)
  447. {
  448. if (observer == null)
  449. throw new ArgumentNullException(nameof(observer));
  450. if (subscription == null)
  451. throw new ArgumentNullException(nameof(subscription));
  452. if (timeSpan < TimeSpan.Zero)
  453. throw new ArgumentOutOfRangeException(nameof(timeSpan));
  454. if (count <= 0)
  455. throw new ArgumentOutOfRangeException(nameof(count));
  456. if (scheduler == null)
  457. throw new ArgumentNullException(nameof(scheduler));
  458. var gate = new AsyncLock();
  459. var n = 0;
  460. var window = default(IAsyncSubject<TSource>);
  461. var d = new CompositeAsyncDisposable();
  462. var timer = new SerialAsyncDisposable();
  463. var refCount = new RefCountAsyncDisposable(d);
  464. async Task CreateTimer(IAsyncSubject<TSource> currentWindow)
  465. {
  466. var inner = new SingleAssignmentAsyncDisposable();
  467. await timer.AssignAsync(inner).ConfigureAwait(false);
  468. var task = await scheduler.ScheduleAsync(async ct =>
  469. {
  470. var newWindow = default(IAsyncSubject<TSource>);
  471. using (await gate.LockAsync().ConfigureAwait(false))
  472. {
  473. if (window != currentWindow)
  474. {
  475. return;
  476. }
  477. n = 0;
  478. newWindow = await CreateWindowAsync().RendezVous(scheduler, ct);
  479. }
  480. await CreateTimer(newWindow).RendezVous(scheduler, ct);
  481. }, timeSpan).ConfigureAwait(false);
  482. await inner.AssignAsync(task).ConfigureAwait(false);
  483. }
  484. async Task<IAsyncSubject<TSource>> CreateWindowAsync()
  485. {
  486. window = new SequentialSimpleAsyncSubject<TSource>();
  487. var wrapper = new WindowAsyncObservable<TSource>(window, refCount);
  488. await observer.OnNextAsync(wrapper).ConfigureAwait(false);
  489. return window;
  490. }
  491. async ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> CoreAsync()
  492. {
  493. await d.AddAsync(subscription).ConfigureAwait(false);
  494. await d.AddAsync(timer).ConfigureAwait(false);
  495. var w = await CreateWindowAsync().ConfigureAwait(false);
  496. await CreateTimer(w).ConfigureAwait(false);
  497. return
  498. (
  499. Create<TSource>(
  500. async x =>
  501. {
  502. var newWindow = default(IAsyncSubject<TSource>);
  503. using (await gate.LockAsync().ConfigureAwait(false))
  504. {
  505. await window.OnNextAsync(x).ConfigureAwait(false);
  506. n++;
  507. if (n == count)
  508. {
  509. await window.OnCompletedAsync().ConfigureAwait(false);
  510. n = 0;
  511. newWindow = await CreateWindowAsync().ConfigureAwait(false);
  512. }
  513. }
  514. if (newWindow != null)
  515. {
  516. await CreateTimer(newWindow).ConfigureAwait(false);
  517. }
  518. },
  519. async ex =>
  520. {
  521. using (await gate.LockAsync().ConfigureAwait(false))
  522. {
  523. await window.OnErrorAsync(ex).ConfigureAwait(false);
  524. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  525. }
  526. },
  527. async () =>
  528. {
  529. using (await gate.LockAsync().ConfigureAwait(false))
  530. {
  531. await window.OnCompletedAsync().ConfigureAwait(false);
  532. await observer.OnCompletedAsync().ConfigureAwait(false);
  533. }
  534. }
  535. ),
  536. refCount
  537. );
  538. }
  539. return CoreAsync();
  540. }
  541. public static ValueTask<(IAsyncObserver<TSource>, IAsyncObserver<TWindowBoundary>, IAsyncDisposable)> Window<TSource, TWindowBoundary>(IAsyncObserver<IAsyncObservable<TSource>> observer, IAsyncDisposable subscription)
  542. {
  543. if (observer == null)
  544. throw new ArgumentNullException(nameof(observer));
  545. if (subscription == null)
  546. throw new ArgumentNullException(nameof(subscription));
  547. var gate = new AsyncLock();
  548. var refCount = new RefCountAsyncDisposable(subscription);
  549. var window = default(IAsyncSubject<TSource>);
  550. async Task CreateWindowAsync()
  551. {
  552. window = new SequentialSimpleAsyncSubject<TSource>();
  553. var wrapper = new WindowAsyncObservable<TSource>(window, refCount);
  554. await observer.OnNextAsync(wrapper).ConfigureAwait(false);
  555. }
  556. async ValueTask<(IAsyncObserver<TSource>, IAsyncObserver<TWindowBoundary>, IAsyncDisposable)> CoreAsync()
  557. {
  558. await CreateWindowAsync().ConfigureAwait(false);
  559. return
  560. (
  561. Create<TSource>(
  562. async x =>
  563. {
  564. using (await gate.LockAsync().ConfigureAwait(false))
  565. {
  566. await window.OnNextAsync(x).ConfigureAwait(false);
  567. }
  568. },
  569. async ex =>
  570. {
  571. using (await gate.LockAsync().ConfigureAwait(false))
  572. {
  573. await window.OnErrorAsync(ex).ConfigureAwait(false);
  574. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  575. }
  576. },
  577. async () =>
  578. {
  579. using (await gate.LockAsync().ConfigureAwait(false))
  580. {
  581. await window.OnCompletedAsync().ConfigureAwait(false);
  582. await observer.OnCompletedAsync().ConfigureAwait(false);
  583. }
  584. }
  585. ),
  586. Create<TWindowBoundary>(
  587. async x =>
  588. {
  589. using (await gate.LockAsync().ConfigureAwait(false))
  590. {
  591. await window.OnCompletedAsync().ConfigureAwait(false);
  592. await CreateWindowAsync().ConfigureAwait(false);
  593. }
  594. },
  595. async ex =>
  596. {
  597. using (await gate.LockAsync().ConfigureAwait(false))
  598. {
  599. await window.OnErrorAsync(ex).ConfigureAwait(false);
  600. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  601. }
  602. },
  603. async () =>
  604. {
  605. using (await gate.LockAsync().ConfigureAwait(false))
  606. {
  607. await window.OnCompletedAsync().ConfigureAwait(false);
  608. await observer.OnCompletedAsync().ConfigureAwait(false);
  609. }
  610. }
  611. ),
  612. refCount
  613. );
  614. }
  615. return CoreAsync();
  616. }
  617. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable, IAsyncDisposable)> Window<TSource, TWindowClosing>(IAsyncObserver<IAsyncObservable<TSource>> observer, Func<IAsyncObservable<TWindowClosing>> windowClosingSelector, IAsyncDisposable subscription)
  618. {
  619. if (observer == null)
  620. throw new ArgumentNullException(nameof(observer));
  621. if (windowClosingSelector == null)
  622. throw new ArgumentNullException(nameof(windowClosingSelector));
  623. if (subscription == null)
  624. throw new ArgumentNullException(nameof(subscription));
  625. var closeSubscription = new SerialAsyncDisposable();
  626. var gate = new AsyncLock();
  627. var queueLock = new AsyncQueueLock();
  628. var refCount = new RefCountAsyncDisposable(subscription);
  629. var window = default(IAsyncSubject<TSource>);
  630. async ValueTask CreateWindowAsync()
  631. {
  632. window = new SequentialSimpleAsyncSubject<TSource>();
  633. var wrapper = new WindowAsyncObservable<TSource>(window, refCount);
  634. await observer.OnNextAsync(wrapper).ConfigureAwait(false);
  635. }
  636. async ValueTask CreateWindowCloseAsync()
  637. {
  638. var closing = default(IAsyncObservable<TWindowClosing>);
  639. try
  640. {
  641. closing = windowClosingSelector(); // REVIEW: Do we need an async variant?
  642. }
  643. catch (Exception ex)
  644. {
  645. using (await gate.LockAsync().ConfigureAwait(false))
  646. {
  647. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  648. }
  649. return;
  650. }
  651. var closingSubscription = new SingleAssignmentAsyncDisposable();
  652. await closeSubscription.AssignAsync(closingSubscription).ConfigureAwait(false);
  653. async ValueTask CloseWindowAsync()
  654. {
  655. await closingSubscription.DisposeAsync().ConfigureAwait(false);
  656. using (await gate.LockAsync().ConfigureAwait(false))
  657. {
  658. await window.OnCompletedAsync().ConfigureAwait(false);
  659. await CreateWindowAsync().ConfigureAwait(false);
  660. }
  661. await queueLock.WaitAsync(CreateWindowCloseAsync).ConfigureAwait(false);
  662. }
  663. var closingObserver =
  664. Create<TWindowClosing>(
  665. x => CloseWindowAsync(),
  666. async ex =>
  667. {
  668. using (await gate.LockAsync().ConfigureAwait(false))
  669. {
  670. await window.OnErrorAsync(ex).ConfigureAwait(false);
  671. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  672. }
  673. },
  674. CloseWindowAsync
  675. );
  676. var closingSubscriptionInner = await closing.SubscribeSafeAsync(closingObserver).ConfigureAwait(false);
  677. await closingSubscription.AssignAsync(closingSubscriptionInner).ConfigureAwait(false);
  678. }
  679. async ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable, IAsyncDisposable)> CoreAsync()
  680. {
  681. await CreateWindowAsync().ConfigureAwait(false);
  682. var sink =
  683. Create<TSource>(
  684. async x =>
  685. {
  686. using (await gate.LockAsync().ConfigureAwait(false))
  687. {
  688. await window.OnNextAsync(x).ConfigureAwait(false);
  689. }
  690. },
  691. async ex =>
  692. {
  693. using (await gate.LockAsync().ConfigureAwait(false))
  694. {
  695. await window.OnErrorAsync(ex).ConfigureAwait(false);
  696. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  697. }
  698. },
  699. async () =>
  700. {
  701. using (await gate.LockAsync().ConfigureAwait(false))
  702. {
  703. await window.OnCompletedAsync().ConfigureAwait(false);
  704. await observer.OnCompletedAsync().ConfigureAwait(false);
  705. }
  706. }
  707. );
  708. await queueLock.WaitAsync(CreateWindowCloseAsync).ConfigureAwait(false);
  709. return (sink, closeSubscription, refCount);
  710. }
  711. return CoreAsync();
  712. }
  713. }
  714. }