Window.cs 37 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Reactive.Concurrency;
  6. using System.Reactive.Disposables;
  7. using System.Reactive.Subjects;
  8. using System.Reactive.Threading;
  9. using System.Threading;
  10. using System.Threading.Tasks;
  11. namespace System.Reactive.Linq
  12. {
  13. public partial class AsyncObservable
  14. {
  15. public static IAsyncObservable<IAsyncObservable<TSource>> Window<TSource>(this IAsyncObservable<TSource> source, int count)
  16. {
  17. if (source == null)
  18. throw new ArgumentNullException(nameof(source));
  19. if (count <= 0)
  20. throw new ArgumentOutOfRangeException(nameof(count));
  21. return CreateAsyncObservable<IAsyncObservable<TSource>>.From(
  22. source,
  23. count,
  24. static (source, count, observer) => WindowCore(source, observer, (o, d) => AsyncObserver.Window(o, d, count)));
  25. }
  26. public static IAsyncObservable<IAsyncObservable<TSource>> Window<TSource>(this IAsyncObservable<TSource> source, int count, int skip)
  27. {
  28. if (source == null)
  29. throw new ArgumentNullException(nameof(source));
  30. if (count <= 0)
  31. throw new ArgumentOutOfRangeException(nameof(count));
  32. if (skip <= 0)
  33. throw new ArgumentOutOfRangeException(nameof(skip));
  34. return CreateAsyncObservable<IAsyncObservable<TSource>>.From(
  35. source,
  36. (count, skip),
  37. static (source, state, observer) => WindowCore(source, observer, (o, d) => AsyncObserver.Window(o, d, state.count, state.skip)));
  38. }
  39. public static IAsyncObservable<IAsyncObservable<TSource>> Window<TSource>(this IAsyncObservable<TSource> source, TimeSpan timeSpan)
  40. {
  41. if (source == null)
  42. throw new ArgumentNullException(nameof(source));
  43. if (timeSpan < TimeSpan.Zero)
  44. throw new ArgumentOutOfRangeException(nameof(timeSpan));
  45. return CreateAsyncObservable<IAsyncObservable<TSource>>.From(
  46. source,
  47. timeSpan,
  48. static (source, timeSpan, observer) => WindowAsyncCore(source, observer, (o, d) => AsyncObserver.Window(o, d, timeSpan)));
  49. }
  50. public static IAsyncObservable<IAsyncObservable<TSource>> Window<TSource>(this IAsyncObservable<TSource> source, TimeSpan timeSpan, IAsyncScheduler scheduler)
  51. {
  52. if (source == null)
  53. throw new ArgumentNullException(nameof(source));
  54. if (timeSpan < TimeSpan.Zero)
  55. throw new ArgumentOutOfRangeException(nameof(timeSpan));
  56. if (scheduler == null)
  57. throw new ArgumentNullException(nameof(scheduler));
  58. return CreateAsyncObservable<IAsyncObservable<TSource>>.From(
  59. source,
  60. (timeSpan, scheduler),
  61. static (source, state, observer) => WindowAsyncCore(source, observer, (o, d) => AsyncObserver.Window(o, d, state.timeSpan, state.scheduler)));
  62. }
  63. public static IAsyncObservable<IAsyncObservable<TSource>> Window<TSource>(this IAsyncObservable<TSource> source, TimeSpan timeSpan, TimeSpan timeShift)
  64. {
  65. if (source == null)
  66. throw new ArgumentNullException(nameof(source));
  67. if (timeSpan < TimeSpan.Zero)
  68. throw new ArgumentOutOfRangeException(nameof(timeSpan));
  69. if (timeShift < TimeSpan.Zero)
  70. throw new ArgumentOutOfRangeException(nameof(timeShift));
  71. return CreateAsyncObservable<IAsyncObservable<TSource>>.From(
  72. source,
  73. (timeSpan, timeShift),
  74. static (source, state, observer) => WindowAsyncCore(source, observer, (o, d) => AsyncObserver.Window(o, d, state.timeSpan, state.timeShift)));
  75. }
  76. public static IAsyncObservable<IAsyncObservable<TSource>> Window<TSource>(this IAsyncObservable<TSource> source, TimeSpan timeSpan, TimeSpan timeShift, IAsyncScheduler scheduler)
  77. {
  78. if (source == null)
  79. throw new ArgumentNullException(nameof(source));
  80. if (timeSpan < TimeSpan.Zero)
  81. throw new ArgumentOutOfRangeException(nameof(timeSpan));
  82. if (timeShift < TimeSpan.Zero)
  83. throw new ArgumentOutOfRangeException(nameof(timeShift));
  84. if (scheduler == null)
  85. throw new ArgumentNullException(nameof(scheduler));
  86. return CreateAsyncObservable<IAsyncObservable<TSource>>.From(
  87. source,
  88. (timeSpan, timeShift, scheduler),
  89. static (source, state, observer) => WindowAsyncCore(source, observer, (o, d) => AsyncObserver.Window(o, d, state.timeSpan, state.timeShift, state.scheduler)));
  90. }
  91. public static IAsyncObservable<IAsyncObservable<TSource>> Window<TSource>(this IAsyncObservable<TSource> source, TimeSpan timeSpan, int count)
  92. {
  93. if (source == null)
  94. throw new ArgumentNullException(nameof(source));
  95. if (timeSpan < TimeSpan.Zero)
  96. throw new ArgumentOutOfRangeException(nameof(timeSpan));
  97. if (count <= 0)
  98. throw new ArgumentOutOfRangeException(nameof(count));
  99. return CreateAsyncObservable<IAsyncObservable<TSource>>.From(
  100. source,
  101. (timeSpan, count),
  102. static (source, state, observer) => WindowAsyncCore(source, observer, (o, d) => AsyncObserver.Window(o, d, state.timeSpan, state.count)));
  103. }
  104. public static IAsyncObservable<IAsyncObservable<TSource>> Window<TSource>(this IAsyncObservable<TSource> source, TimeSpan timeSpan, int count, IAsyncScheduler scheduler)
  105. {
  106. if (source == null)
  107. throw new ArgumentNullException(nameof(source));
  108. if (timeSpan < TimeSpan.Zero)
  109. throw new ArgumentOutOfRangeException(nameof(timeSpan));
  110. if (count <= 0)
  111. throw new ArgumentOutOfRangeException(nameof(count));
  112. if (scheduler == null)
  113. throw new ArgumentNullException(nameof(scheduler));
  114. return CreateAsyncObservable<IAsyncObservable<TSource>>.From(
  115. source,
  116. (timeSpan, count, scheduler),
  117. static (source, state, observer) => WindowAsyncCore(source, observer, (o, d) => AsyncObserver.Window(o, d, state.timeSpan, state.count, state.scheduler)));
  118. }
  119. public static IAsyncObservable<IAsyncObservable<TSource>> Window<TSource, TWindowBoundary>(this IAsyncObservable<TSource> source, IAsyncObservable<TWindowBoundary> windowBoundaries)
  120. {
  121. if (source == null)
  122. throw new ArgumentNullException(nameof(source));
  123. if (windowBoundaries == null)
  124. throw new ArgumentNullException(nameof(windowBoundaries));
  125. return CreateAsyncObservable<IAsyncObservable<TSource>>.From(
  126. source,
  127. windowBoundaries,
  128. static async (source, windowBoundaries, observer) =>
  129. {
  130. var d = new CompositeAsyncDisposable();
  131. var (sourceObserver, boundariesObserver, subscription) = await AsyncObserver.Window<TSource, TWindowBoundary>(observer, d).ConfigureAwait(false);
  132. var sourceSubscription = await source.SubscribeSafeAsync(sourceObserver).ConfigureAwait(false);
  133. await d.AddAsync(sourceSubscription).ConfigureAwait(false);
  134. var boundariesSubscription = await windowBoundaries.SubscribeSafeAsync(boundariesObserver).ConfigureAwait(false);
  135. await d.AddAsync(boundariesSubscription).ConfigureAwait(false);
  136. return subscription;
  137. });
  138. }
  139. // REVIEW: This overload is inherited from Rx but arguably a bit esoteric as it doesn't provide context to the closing selector.
  140. public static IAsyncObservable<IAsyncObservable<TSource>> Window<TSource, TWindowClosing>(this IAsyncObservable<TSource> source, Func<IAsyncObservable<TWindowClosing>> windowClosingSelector)
  141. {
  142. if (source == null)
  143. throw new ArgumentNullException(nameof(source));
  144. if (windowClosingSelector == null)
  145. throw new ArgumentNullException(nameof(windowClosingSelector));
  146. return CreateAsyncObservable<IAsyncObservable<TSource>>.From(
  147. source,
  148. windowClosingSelector,
  149. static async (source, windowClosingSelector, observer) =>
  150. {
  151. var d = new CompositeAsyncDisposable();
  152. var (sourceObserver, closingSubscription, subscription) = await AsyncObserver.Window<TSource, TWindowClosing>(observer, windowClosingSelector, d).ConfigureAwait(false);
  153. await d.AddAsync(closingSubscription).ConfigureAwait(false);
  154. var sourceSubscription = await source.SubscribeSafeAsync(sourceObserver).ConfigureAwait(false);
  155. await d.AddAsync(sourceSubscription).ConfigureAwait(false);
  156. return subscription;
  157. });
  158. }
  159. private static async ValueTask<IAsyncDisposable> WindowCore<TSource>(IAsyncObservable<TSource> source, IAsyncObserver<IAsyncObservable<TSource>> observer, Func<IAsyncObserver<IAsyncObservable<TSource>>, IAsyncDisposable, (IAsyncObserver<TSource>, IAsyncDisposable)> createObserver)
  160. {
  161. var d = new SingleAssignmentAsyncDisposable();
  162. var (sink, subscription) = createObserver(observer, d);
  163. var inner = await source.SubscribeSafeAsync(sink).ConfigureAwait(false);
  164. await d.AssignAsync(inner).ConfigureAwait(false);
  165. return subscription;
  166. }
  167. private static async ValueTask<IAsyncDisposable> WindowAsyncCore<TSource>(IAsyncObservable<TSource> source, IAsyncObserver<IAsyncObservable<TSource>> observer, Func<IAsyncObserver<IAsyncObservable<TSource>>, IAsyncDisposable, ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)>> createObserverAsync)
  168. {
  169. var d = new SingleAssignmentAsyncDisposable();
  170. var (sink, subscription) = await createObserverAsync(observer, d).ConfigureAwait(false);
  171. var inner = await source.SubscribeSafeAsync(sink).ConfigureAwait(false);
  172. await d.AssignAsync(inner).ConfigureAwait(false);
  173. return subscription;
  174. }
  175. }
  176. public partial class AsyncObserver
  177. {
  178. public static (IAsyncObserver<TSource>, IAsyncDisposable) Window<TSource>(IAsyncObserver<IAsyncObservable<TSource>> observer, IAsyncDisposable subscription, int count) => Window(observer, subscription, count, count);
  179. public static (IAsyncObserver<TSource>, IAsyncDisposable) Window<TSource>(IAsyncObserver<IAsyncObservable<TSource>> observer, IAsyncDisposable subscription, int count, int skip)
  180. {
  181. if (observer == null)
  182. throw new ArgumentNullException(nameof(observer));
  183. if (subscription == null)
  184. throw new ArgumentNullException(nameof(subscription));
  185. if (count <= 0)
  186. throw new ArgumentOutOfRangeException(nameof(count));
  187. if (skip <= 0)
  188. throw new ArgumentOutOfRangeException(nameof(skip));
  189. var refCount = new RefCountAsyncDisposable(subscription);
  190. var queue = new Queue<IAsyncSubject<TSource>>();
  191. var n = 0;
  192. return
  193. (
  194. Create<TSource>
  195. (
  196. async x =>
  197. {
  198. foreach (var window in queue)
  199. {
  200. await window.OnNextAsync(x).ConfigureAwait(false);
  201. }
  202. var i = n - count + 1;
  203. if (i >= 0 && i % skip == 0)
  204. {
  205. await queue.Dequeue().OnCompletedAsync().ConfigureAwait(false);
  206. }
  207. n++;
  208. if (n % skip == 0)
  209. {
  210. var window = new SequentialSimpleAsyncSubject<TSource>();
  211. queue.Enqueue(window);
  212. var wrapper = new WindowAsyncObservable<TSource>(window, refCount);
  213. await observer.OnNextAsync(wrapper).ConfigureAwait(false);
  214. }
  215. },
  216. async ex =>
  217. {
  218. while (queue.Count > 0)
  219. {
  220. await queue.Dequeue().OnErrorAsync(ex).ConfigureAwait(false);
  221. }
  222. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  223. },
  224. async () =>
  225. {
  226. while (queue.Count > 0)
  227. {
  228. await queue.Dequeue().OnCompletedAsync().ConfigureAwait(false);
  229. }
  230. await observer.OnCompletedAsync().ConfigureAwait(false);
  231. }
  232. ),
  233. refCount
  234. );
  235. }
  236. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> Window<TSource>(IAsyncObserver<IAsyncObservable<TSource>> observer, IAsyncDisposable subscription, TimeSpan timeSpan) => Window(observer, subscription, timeSpan, TaskPoolAsyncScheduler.Default);
  237. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> Window<TSource>(IAsyncObserver<IAsyncObservable<TSource>> observer, IAsyncDisposable subscription, TimeSpan timeSpan, IAsyncScheduler scheduler)
  238. {
  239. if (observer == null)
  240. throw new ArgumentNullException(nameof(observer));
  241. if (subscription == null)
  242. throw new ArgumentNullException(nameof(subscription));
  243. if (timeSpan < TimeSpan.Zero)
  244. throw new ArgumentOutOfRangeException(nameof(timeSpan));
  245. if (scheduler == null)
  246. throw new ArgumentNullException(nameof(scheduler));
  247. var gate = AsyncGate.Create();
  248. var window = default(IAsyncSubject<TSource>);
  249. var d = new CompositeAsyncDisposable();
  250. var refCount = new RefCountAsyncDisposable(d);
  251. async Task CreateWindowAsync()
  252. {
  253. window = new SequentialSimpleAsyncSubject<TSource>();
  254. var wrapper = new WindowAsyncObservable<TSource>(window, refCount);
  255. await observer.OnNextAsync(wrapper).ConfigureAwait(false);
  256. }
  257. async ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> CoreAsync()
  258. {
  259. await d.AddAsync(subscription).ConfigureAwait(false);
  260. await CreateWindowAsync().ConfigureAwait(false);
  261. var timer = await scheduler.ScheduleAsync(async ct =>
  262. {
  263. while (!ct.IsCancellationRequested)
  264. {
  265. using (await gate.LockAsync().ConfigureAwait(false))
  266. {
  267. await window.OnCompletedAsync().ConfigureAwait(false);
  268. await CreateWindowAsync().ConfigureAwait(false);
  269. }
  270. await scheduler.Delay(timeSpan, ct).RendezVous(scheduler, ct);
  271. }
  272. }, timeSpan).ConfigureAwait(false);
  273. await d.AddAsync(timer).ConfigureAwait(false);
  274. return
  275. (
  276. Create<TSource>(
  277. async x =>
  278. {
  279. using (await gate.LockAsync().ConfigureAwait(false))
  280. {
  281. await window.OnNextAsync(x).ConfigureAwait(false);
  282. }
  283. },
  284. async ex =>
  285. {
  286. using (await gate.LockAsync().ConfigureAwait(false))
  287. {
  288. await window.OnErrorAsync(ex).ConfigureAwait(false);
  289. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  290. }
  291. },
  292. async () =>
  293. {
  294. using (await gate.LockAsync().ConfigureAwait(false))
  295. {
  296. await window.OnCompletedAsync().ConfigureAwait(false);
  297. await observer.OnCompletedAsync().ConfigureAwait(false);
  298. }
  299. }
  300. ),
  301. refCount
  302. );
  303. }
  304. return CoreAsync();
  305. }
  306. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> Window<TSource>(IAsyncObserver<IAsyncObservable<TSource>> observer, IAsyncDisposable subscription, TimeSpan timeSpan, TimeSpan timeShift) => Window(observer, subscription, timeSpan, timeShift, TaskPoolAsyncScheduler.Default);
  307. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> Window<TSource>(IAsyncObserver<IAsyncObservable<TSource>> observer, IAsyncDisposable subscription, TimeSpan timeSpan, TimeSpan timeShift, IAsyncScheduler scheduler)
  308. {
  309. if (observer == null)
  310. throw new ArgumentNullException(nameof(observer));
  311. if (subscription == null)
  312. throw new ArgumentNullException(nameof(subscription));
  313. if (timeSpan < TimeSpan.Zero)
  314. throw new ArgumentOutOfRangeException(nameof(timeSpan));
  315. if (timeShift < TimeSpan.Zero)
  316. throw new ArgumentOutOfRangeException(nameof(timeShift));
  317. if (scheduler == null)
  318. throw new ArgumentNullException(nameof(scheduler));
  319. var gate = AsyncGate.Create();
  320. var d = new CompositeAsyncDisposable();
  321. var timer = new SerialAsyncDisposable();
  322. var refCount = new RefCountAsyncDisposable(d);
  323. var queue = new Queue<IAsyncSubject<TSource>>();
  324. var nextOpen = timeShift;
  325. var nextClose = timeSpan;
  326. var totalTime = TimeSpan.Zero;
  327. var isOpen = false;
  328. var isClose = false;
  329. TimeSpan GetNextDue()
  330. {
  331. if (nextOpen == nextClose)
  332. {
  333. isOpen = isClose = true;
  334. }
  335. else if (nextClose < nextOpen)
  336. {
  337. isClose = true;
  338. isOpen = false;
  339. }
  340. else
  341. {
  342. isOpen = true;
  343. isClose = false;
  344. }
  345. var newTotalTime = isClose ? nextClose : nextOpen;
  346. var due = newTotalTime - totalTime;
  347. totalTime = newTotalTime;
  348. if (isOpen)
  349. {
  350. nextOpen += timeShift;
  351. }
  352. if (isClose)
  353. {
  354. nextClose += timeShift;
  355. }
  356. return due;
  357. }
  358. async Task CreateWindowAsync()
  359. {
  360. var window = new SequentialSimpleAsyncSubject<TSource>();
  361. queue.Enqueue(window);
  362. var wrapper = new WindowAsyncObservable<TSource>(window, refCount);
  363. await observer.OnNextAsync(wrapper).ConfigureAwait(false);
  364. }
  365. async Task CreateTimer()
  366. {
  367. var inner = new SingleAssignmentAsyncDisposable();
  368. await timer.AssignAsync(inner).ConfigureAwait(false);
  369. var task = await scheduler.ScheduleAsync(async ct =>
  370. {
  371. while (!ct.IsCancellationRequested)
  372. {
  373. using (await gate.LockAsync().ConfigureAwait(false))
  374. {
  375. if (isClose)
  376. {
  377. await queue.Dequeue().OnCompletedAsync().RendezVous(scheduler, ct);
  378. }
  379. if (isOpen)
  380. {
  381. await CreateWindowAsync().RendezVous(scheduler, ct);
  382. }
  383. }
  384. await scheduler.Delay(GetNextDue(), ct).RendezVous(scheduler, ct);
  385. }
  386. }, GetNextDue()).ConfigureAwait(false);
  387. await inner.AssignAsync(task).ConfigureAwait(false);
  388. }
  389. async ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> CoreAsync()
  390. {
  391. await d.AddAsync(subscription).ConfigureAwait(false);
  392. await d.AddAsync(timer).ConfigureAwait(false);
  393. await CreateWindowAsync().ConfigureAwait(false);
  394. await CreateTimer().ConfigureAwait(false);
  395. return
  396. (
  397. Create<TSource>(
  398. async x =>
  399. {
  400. using (await gate.LockAsync().ConfigureAwait(false))
  401. {
  402. foreach (var window in queue)
  403. {
  404. await window.OnNextAsync(x).ConfigureAwait(false);
  405. }
  406. }
  407. },
  408. async ex =>
  409. {
  410. using (await gate.LockAsync().ConfigureAwait(false))
  411. {
  412. while (queue.Count > 0)
  413. {
  414. await queue.Dequeue().OnErrorAsync(ex).ConfigureAwait(false);
  415. }
  416. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  417. }
  418. },
  419. async () =>
  420. {
  421. using (await gate.LockAsync().ConfigureAwait(false))
  422. {
  423. while (queue.Count > 0)
  424. {
  425. await queue.Dequeue().OnCompletedAsync().ConfigureAwait(false);
  426. }
  427. await observer.OnCompletedAsync().ConfigureAwait(false);
  428. }
  429. }
  430. ),
  431. refCount
  432. );
  433. }
  434. return CoreAsync();
  435. }
  436. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> Window<TSource>(IAsyncObserver<IAsyncObservable<TSource>> observer, IAsyncDisposable subscription, TimeSpan timeSpan, int count) => Window(observer, subscription, timeSpan, count, TaskPoolAsyncScheduler.Default);
  437. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> Window<TSource>(IAsyncObserver<IAsyncObservable<TSource>> observer, IAsyncDisposable subscription, TimeSpan timeSpan, int count, IAsyncScheduler scheduler)
  438. {
  439. if (observer == null)
  440. throw new ArgumentNullException(nameof(observer));
  441. if (subscription == null)
  442. throw new ArgumentNullException(nameof(subscription));
  443. if (timeSpan < TimeSpan.Zero)
  444. throw new ArgumentOutOfRangeException(nameof(timeSpan));
  445. if (count <= 0)
  446. throw new ArgumentOutOfRangeException(nameof(count));
  447. if (scheduler == null)
  448. throw new ArgumentNullException(nameof(scheduler));
  449. var gate = AsyncGate.Create();
  450. var n = 0;
  451. var window = default(IAsyncSubject<TSource>);
  452. var d = new CompositeAsyncDisposable();
  453. var timer = new SerialAsyncDisposable();
  454. var refCount = new RefCountAsyncDisposable(d);
  455. async Task CreateTimer(IAsyncSubject<TSource> currentWindow)
  456. {
  457. var inner = new SingleAssignmentAsyncDisposable();
  458. await timer.AssignAsync(inner).ConfigureAwait(false);
  459. var task = await scheduler.ScheduleAsync(async ct =>
  460. {
  461. var newWindow = default(IAsyncSubject<TSource>);
  462. using (await gate.LockAsync().ConfigureAwait(false))
  463. {
  464. if (window != currentWindow)
  465. {
  466. return;
  467. }
  468. n = 0;
  469. newWindow = await CreateWindowAsync().RendezVous(scheduler, ct);
  470. }
  471. await CreateTimer(newWindow).RendezVous(scheduler, ct);
  472. }, timeSpan).ConfigureAwait(false);
  473. await inner.AssignAsync(task).ConfigureAwait(false);
  474. }
  475. async Task<IAsyncSubject<TSource>> CreateWindowAsync()
  476. {
  477. window = new SequentialSimpleAsyncSubject<TSource>();
  478. var wrapper = new WindowAsyncObservable<TSource>(window, refCount);
  479. await observer.OnNextAsync(wrapper).ConfigureAwait(false);
  480. return window;
  481. }
  482. async ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> CoreAsync()
  483. {
  484. await d.AddAsync(subscription).ConfigureAwait(false);
  485. await d.AddAsync(timer).ConfigureAwait(false);
  486. var w = await CreateWindowAsync().ConfigureAwait(false);
  487. await CreateTimer(w).ConfigureAwait(false);
  488. return
  489. (
  490. Create<TSource>(
  491. async x =>
  492. {
  493. var newWindow = default(IAsyncSubject<TSource>);
  494. using (await gate.LockAsync().ConfigureAwait(false))
  495. {
  496. await window.OnNextAsync(x).ConfigureAwait(false);
  497. n++;
  498. if (n == count)
  499. {
  500. await window.OnCompletedAsync().ConfigureAwait(false);
  501. n = 0;
  502. newWindow = await CreateWindowAsync().ConfigureAwait(false);
  503. }
  504. }
  505. if (newWindow != null)
  506. {
  507. await CreateTimer(newWindow).ConfigureAwait(false);
  508. }
  509. },
  510. async ex =>
  511. {
  512. using (await gate.LockAsync().ConfigureAwait(false))
  513. {
  514. await window.OnErrorAsync(ex).ConfigureAwait(false);
  515. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  516. }
  517. },
  518. async () =>
  519. {
  520. using (await gate.LockAsync().ConfigureAwait(false))
  521. {
  522. await window.OnCompletedAsync().ConfigureAwait(false);
  523. await observer.OnCompletedAsync().ConfigureAwait(false);
  524. }
  525. }
  526. ),
  527. refCount
  528. );
  529. }
  530. return CoreAsync();
  531. }
  532. public static ValueTask<(IAsyncObserver<TSource>, IAsyncObserver<TWindowBoundary>, IAsyncDisposable)> Window<TSource, TWindowBoundary>(IAsyncObserver<IAsyncObservable<TSource>> observer, IAsyncDisposable subscription)
  533. {
  534. if (observer == null)
  535. throw new ArgumentNullException(nameof(observer));
  536. if (subscription == null)
  537. throw new ArgumentNullException(nameof(subscription));
  538. var gate = AsyncGate.Create();
  539. var refCount = new RefCountAsyncDisposable(subscription);
  540. var window = default(IAsyncSubject<TSource>);
  541. async Task CreateWindowAsync()
  542. {
  543. window = new SequentialSimpleAsyncSubject<TSource>();
  544. var wrapper = new WindowAsyncObservable<TSource>(window, refCount);
  545. await observer.OnNextAsync(wrapper).ConfigureAwait(false);
  546. }
  547. async ValueTask<(IAsyncObserver<TSource>, IAsyncObserver<TWindowBoundary>, IAsyncDisposable)> CoreAsync()
  548. {
  549. await CreateWindowAsync().ConfigureAwait(false);
  550. return
  551. (
  552. Create<TSource>(
  553. async x =>
  554. {
  555. using (await gate.LockAsync().ConfigureAwait(false))
  556. {
  557. await window.OnNextAsync(x).ConfigureAwait(false);
  558. }
  559. },
  560. async ex =>
  561. {
  562. using (await gate.LockAsync().ConfigureAwait(false))
  563. {
  564. await window.OnErrorAsync(ex).ConfigureAwait(false);
  565. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  566. }
  567. },
  568. async () =>
  569. {
  570. using (await gate.LockAsync().ConfigureAwait(false))
  571. {
  572. await window.OnCompletedAsync().ConfigureAwait(false);
  573. await observer.OnCompletedAsync().ConfigureAwait(false);
  574. }
  575. }
  576. ),
  577. Create<TWindowBoundary>(
  578. async x =>
  579. {
  580. using (await gate.LockAsync().ConfigureAwait(false))
  581. {
  582. await window.OnCompletedAsync().ConfigureAwait(false);
  583. await CreateWindowAsync().ConfigureAwait(false);
  584. }
  585. },
  586. async ex =>
  587. {
  588. using (await gate.LockAsync().ConfigureAwait(false))
  589. {
  590. await window.OnErrorAsync(ex).ConfigureAwait(false);
  591. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  592. }
  593. },
  594. async () =>
  595. {
  596. using (await gate.LockAsync().ConfigureAwait(false))
  597. {
  598. await window.OnCompletedAsync().ConfigureAwait(false);
  599. await observer.OnCompletedAsync().ConfigureAwait(false);
  600. }
  601. }
  602. ),
  603. refCount
  604. );
  605. }
  606. return CoreAsync();
  607. }
  608. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable, IAsyncDisposable)> Window<TSource, TWindowClosing>(IAsyncObserver<IAsyncObservable<TSource>> observer, Func<IAsyncObservable<TWindowClosing>> windowClosingSelector, IAsyncDisposable subscription)
  609. {
  610. if (observer == null)
  611. throw new ArgumentNullException(nameof(observer));
  612. if (windowClosingSelector == null)
  613. throw new ArgumentNullException(nameof(windowClosingSelector));
  614. if (subscription == null)
  615. throw new ArgumentNullException(nameof(subscription));
  616. var closeSubscription = new SerialAsyncDisposable();
  617. var gate = AsyncGate.Create();
  618. var queueLock = new AsyncQueueLock();
  619. var refCount = new RefCountAsyncDisposable(subscription);
  620. var window = default(IAsyncSubject<TSource>);
  621. async ValueTask CreateWindowAsync()
  622. {
  623. window = new SequentialSimpleAsyncSubject<TSource>();
  624. var wrapper = new WindowAsyncObservable<TSource>(window, refCount);
  625. await observer.OnNextAsync(wrapper).ConfigureAwait(false);
  626. }
  627. async ValueTask CreateWindowCloseAsync()
  628. {
  629. var closing = default(IAsyncObservable<TWindowClosing>);
  630. try
  631. {
  632. closing = windowClosingSelector(); // REVIEW: Do we need an async variant?
  633. }
  634. catch (Exception ex)
  635. {
  636. using (await gate.LockAsync().ConfigureAwait(false))
  637. {
  638. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  639. }
  640. return;
  641. }
  642. var closingSubscription = new SingleAssignmentAsyncDisposable();
  643. await closeSubscription.AssignAsync(closingSubscription).ConfigureAwait(false);
  644. async ValueTask CloseWindowAsync()
  645. {
  646. await closingSubscription.DisposeAsync().ConfigureAwait(false);
  647. using (await gate.LockAsync().ConfigureAwait(false))
  648. {
  649. await window.OnCompletedAsync().ConfigureAwait(false);
  650. await CreateWindowAsync().ConfigureAwait(false);
  651. }
  652. await queueLock.WaitAsync(CreateWindowCloseAsync).ConfigureAwait(false);
  653. }
  654. var closingObserver =
  655. Create<TWindowClosing>(
  656. x => CloseWindowAsync(),
  657. async ex =>
  658. {
  659. using (await gate.LockAsync().ConfigureAwait(false))
  660. {
  661. await window.OnErrorAsync(ex).ConfigureAwait(false);
  662. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  663. }
  664. },
  665. CloseWindowAsync
  666. );
  667. var closingSubscriptionInner = await closing.SubscribeSafeAsync(closingObserver).ConfigureAwait(false);
  668. await closingSubscription.AssignAsync(closingSubscriptionInner).ConfigureAwait(false);
  669. }
  670. async ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable, IAsyncDisposable)> CoreAsync()
  671. {
  672. await CreateWindowAsync().ConfigureAwait(false);
  673. var sink =
  674. Create<TSource>(
  675. async x =>
  676. {
  677. using (await gate.LockAsync().ConfigureAwait(false))
  678. {
  679. await window.OnNextAsync(x).ConfigureAwait(false);
  680. }
  681. },
  682. async ex =>
  683. {
  684. using (await gate.LockAsync().ConfigureAwait(false))
  685. {
  686. await window.OnErrorAsync(ex).ConfigureAwait(false);
  687. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  688. }
  689. },
  690. async () =>
  691. {
  692. using (await gate.LockAsync().ConfigureAwait(false))
  693. {
  694. await window.OnCompletedAsync().ConfigureAwait(false);
  695. await observer.OnCompletedAsync().ConfigureAwait(false);
  696. }
  697. }
  698. );
  699. await queueLock.WaitAsync(CreateWindowCloseAsync).ConfigureAwait(false);
  700. return (sink, closeSubscription, refCount);
  701. }
  702. return CoreAsync();
  703. }
  704. }
  705. }