Window.cs 37 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Reactive.Concurrency;
  6. using System.Reactive.Disposables;
  7. using System.Reactive.Subjects;
  8. using System.Threading;
  9. using System.Threading.Tasks;
  10. namespace System.Reactive.Linq
  11. {
  12. public partial class AsyncObservable
  13. {
  14. public static IAsyncObservable<IAsyncObservable<TSource>> Window<TSource>(this IAsyncObservable<TSource> source, int count)
  15. {
  16. if (source == null)
  17. throw new ArgumentNullException(nameof(source));
  18. if (count <= 0)
  19. throw new ArgumentOutOfRangeException(nameof(count));
  20. return CreateAsyncObservable<IAsyncObservable<TSource>>.From(
  21. source,
  22. count,
  23. static (source, count, observer) => WindowCore(source, observer, (o, d) => AsyncObserver.Window(o, d, count)));
  24. }
  25. public static IAsyncObservable<IAsyncObservable<TSource>> Window<TSource>(this IAsyncObservable<TSource> source, int count, int skip)
  26. {
  27. if (source == null)
  28. throw new ArgumentNullException(nameof(source));
  29. if (count <= 0)
  30. throw new ArgumentOutOfRangeException(nameof(count));
  31. if (skip <= 0)
  32. throw new ArgumentOutOfRangeException(nameof(skip));
  33. return CreateAsyncObservable<IAsyncObservable<TSource>>.From(
  34. source,
  35. (count, skip),
  36. static (source, state, observer) => WindowCore(source, observer, (o, d) => AsyncObserver.Window(o, d, state.count, state.skip)));
  37. }
  38. public static IAsyncObservable<IAsyncObservable<TSource>> Window<TSource>(this IAsyncObservable<TSource> source, TimeSpan timeSpan)
  39. {
  40. if (source == null)
  41. throw new ArgumentNullException(nameof(source));
  42. if (timeSpan < TimeSpan.Zero)
  43. throw new ArgumentOutOfRangeException(nameof(timeSpan));
  44. return CreateAsyncObservable<IAsyncObservable<TSource>>.From(
  45. source,
  46. timeSpan,
  47. static (source, timeSpan, observer) => WindowAsyncCore(source, observer, (o, d) => AsyncObserver.Window(o, d, timeSpan)));
  48. }
  49. public static IAsyncObservable<IAsyncObservable<TSource>> Window<TSource>(this IAsyncObservable<TSource> source, TimeSpan timeSpan, IAsyncScheduler scheduler)
  50. {
  51. if (source == null)
  52. throw new ArgumentNullException(nameof(source));
  53. if (timeSpan < TimeSpan.Zero)
  54. throw new ArgumentOutOfRangeException(nameof(timeSpan));
  55. if (scheduler == null)
  56. throw new ArgumentNullException(nameof(scheduler));
  57. return CreateAsyncObservable<IAsyncObservable<TSource>>.From(
  58. source,
  59. (timeSpan, scheduler),
  60. static (source, state, observer) => WindowAsyncCore(source, observer, (o, d) => AsyncObserver.Window(o, d, state.timeSpan, state.scheduler)));
  61. }
  62. public static IAsyncObservable<IAsyncObservable<TSource>> Window<TSource>(this IAsyncObservable<TSource> source, TimeSpan timeSpan, TimeSpan timeShift)
  63. {
  64. if (source == null)
  65. throw new ArgumentNullException(nameof(source));
  66. if (timeSpan < TimeSpan.Zero)
  67. throw new ArgumentOutOfRangeException(nameof(timeSpan));
  68. if (timeShift < TimeSpan.Zero)
  69. throw new ArgumentOutOfRangeException(nameof(timeShift));
  70. return CreateAsyncObservable<IAsyncObservable<TSource>>.From(
  71. source,
  72. (timeSpan, timeShift),
  73. static (source, state, observer) => WindowAsyncCore(source, observer, (o, d) => AsyncObserver.Window(o, d, state.timeSpan, state.timeShift)));
  74. }
  75. public static IAsyncObservable<IAsyncObservable<TSource>> Window<TSource>(this IAsyncObservable<TSource> source, TimeSpan timeSpan, TimeSpan timeShift, IAsyncScheduler scheduler)
  76. {
  77. if (source == null)
  78. throw new ArgumentNullException(nameof(source));
  79. if (timeSpan < TimeSpan.Zero)
  80. throw new ArgumentOutOfRangeException(nameof(timeSpan));
  81. if (timeShift < TimeSpan.Zero)
  82. throw new ArgumentOutOfRangeException(nameof(timeShift));
  83. if (scheduler == null)
  84. throw new ArgumentNullException(nameof(scheduler));
  85. return CreateAsyncObservable<IAsyncObservable<TSource>>.From(
  86. source,
  87. (timeSpan, timeShift, scheduler),
  88. static (source, state, observer) => WindowAsyncCore(source, observer, (o, d) => AsyncObserver.Window(o, d, state.timeSpan, state.timeShift, state.scheduler)));
  89. }
  90. public static IAsyncObservable<IAsyncObservable<TSource>> Window<TSource>(this IAsyncObservable<TSource> source, TimeSpan timeSpan, int count)
  91. {
  92. if (source == null)
  93. throw new ArgumentNullException(nameof(source));
  94. if (timeSpan < TimeSpan.Zero)
  95. throw new ArgumentOutOfRangeException(nameof(timeSpan));
  96. if (count <= 0)
  97. throw new ArgumentOutOfRangeException(nameof(count));
  98. return CreateAsyncObservable<IAsyncObservable<TSource>>.From(
  99. source,
  100. (timeSpan, count),
  101. static (source, state, observer) => WindowAsyncCore(source, observer, (o, d) => AsyncObserver.Window(o, d, state.timeSpan, state.count)));
  102. }
  103. public static IAsyncObservable<IAsyncObservable<TSource>> Window<TSource>(this IAsyncObservable<TSource> source, TimeSpan timeSpan, int count, IAsyncScheduler scheduler)
  104. {
  105. if (source == null)
  106. throw new ArgumentNullException(nameof(source));
  107. if (timeSpan < TimeSpan.Zero)
  108. throw new ArgumentOutOfRangeException(nameof(timeSpan));
  109. if (count <= 0)
  110. throw new ArgumentOutOfRangeException(nameof(count));
  111. if (scheduler == null)
  112. throw new ArgumentNullException(nameof(scheduler));
  113. return CreateAsyncObservable<IAsyncObservable<TSource>>.From(
  114. source,
  115. (timeSpan, count, scheduler),
  116. static (source, state, observer) => WindowAsyncCore(source, observer, (o, d) => AsyncObserver.Window(o, d, state.timeSpan, state.count, state.scheduler)));
  117. }
  118. public static IAsyncObservable<IAsyncObservable<TSource>> Window<TSource, TWindowBoundary>(this IAsyncObservable<TSource> source, IAsyncObservable<TWindowBoundary> windowBoundaries)
  119. {
  120. if (source == null)
  121. throw new ArgumentNullException(nameof(source));
  122. if (windowBoundaries == null)
  123. throw new ArgumentNullException(nameof(windowBoundaries));
  124. return CreateAsyncObservable<IAsyncObservable<TSource>>.From(
  125. source,
  126. windowBoundaries,
  127. static async (source, windowBoundaries, observer) =>
  128. {
  129. var d = new CompositeAsyncDisposable();
  130. var (sourceObserver, boundariesObserver, subscription) = await AsyncObserver.Window<TSource, TWindowBoundary>(observer, d).ConfigureAwait(false);
  131. var sourceSubscription = await source.SubscribeSafeAsync(sourceObserver).ConfigureAwait(false);
  132. await d.AddAsync(sourceSubscription).ConfigureAwait(false);
  133. var boundariesSubscription = await windowBoundaries.SubscribeSafeAsync(boundariesObserver).ConfigureAwait(false);
  134. await d.AddAsync(boundariesSubscription).ConfigureAwait(false);
  135. return subscription;
  136. });
  137. }
  138. // REVIEW: This overload is inherited from Rx but arguably a bit esoteric as it doesn't provide context to the closing selector.
  139. public static IAsyncObservable<IAsyncObservable<TSource>> Window<TSource, TWindowClosing>(this IAsyncObservable<TSource> source, Func<IAsyncObservable<TWindowClosing>> windowClosingSelector)
  140. {
  141. if (source == null)
  142. throw new ArgumentNullException(nameof(source));
  143. if (windowClosingSelector == null)
  144. throw new ArgumentNullException(nameof(windowClosingSelector));
  145. return CreateAsyncObservable<IAsyncObservable<TSource>>.From(
  146. source,
  147. windowClosingSelector,
  148. static async (source, windowClosingSelector, observer) =>
  149. {
  150. var d = new CompositeAsyncDisposable();
  151. var (sourceObserver, closingSubscription, subscription) = await AsyncObserver.Window<TSource, TWindowClosing>(observer, windowClosingSelector, d).ConfigureAwait(false);
  152. await d.AddAsync(closingSubscription).ConfigureAwait(false);
  153. var sourceSubscription = await source.SubscribeSafeAsync(sourceObserver).ConfigureAwait(false);
  154. await d.AddAsync(sourceSubscription).ConfigureAwait(false);
  155. return subscription;
  156. });
  157. }
  158. private static async ValueTask<IAsyncDisposable> WindowCore<TSource>(IAsyncObservable<TSource> source, IAsyncObserver<IAsyncObservable<TSource>> observer, Func<IAsyncObserver<IAsyncObservable<TSource>>, IAsyncDisposable, (IAsyncObserver<TSource>, IAsyncDisposable)> createObserver)
  159. {
  160. var d = new SingleAssignmentAsyncDisposable();
  161. var (sink, subscription) = createObserver(observer, d);
  162. var inner = await source.SubscribeSafeAsync(sink).ConfigureAwait(false);
  163. await d.AssignAsync(inner).ConfigureAwait(false);
  164. return subscription;
  165. }
  166. private static async ValueTask<IAsyncDisposable> WindowAsyncCore<TSource>(IAsyncObservable<TSource> source, IAsyncObserver<IAsyncObservable<TSource>> observer, Func<IAsyncObserver<IAsyncObservable<TSource>>, IAsyncDisposable, ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)>> createObserverAsync)
  167. {
  168. var d = new SingleAssignmentAsyncDisposable();
  169. var (sink, subscription) = await createObserverAsync(observer, d).ConfigureAwait(false);
  170. var inner = await source.SubscribeSafeAsync(sink).ConfigureAwait(false);
  171. await d.AssignAsync(inner).ConfigureAwait(false);
  172. return subscription;
  173. }
  174. }
  175. public partial class AsyncObserver
  176. {
  177. public static (IAsyncObserver<TSource>, IAsyncDisposable) Window<TSource>(IAsyncObserver<IAsyncObservable<TSource>> observer, IAsyncDisposable subscription, int count) => Window(observer, subscription, count, count);
  178. public static (IAsyncObserver<TSource>, IAsyncDisposable) Window<TSource>(IAsyncObserver<IAsyncObservable<TSource>> observer, IAsyncDisposable subscription, int count, int skip)
  179. {
  180. if (observer == null)
  181. throw new ArgumentNullException(nameof(observer));
  182. if (subscription == null)
  183. throw new ArgumentNullException(nameof(subscription));
  184. if (count <= 0)
  185. throw new ArgumentOutOfRangeException(nameof(count));
  186. if (skip <= 0)
  187. throw new ArgumentOutOfRangeException(nameof(skip));
  188. var refCount = new RefCountAsyncDisposable(subscription);
  189. var queue = new Queue<IAsyncSubject<TSource>>();
  190. var n = 0;
  191. return
  192. (
  193. Create<TSource>
  194. (
  195. async x =>
  196. {
  197. foreach (var window in queue)
  198. {
  199. await window.OnNextAsync(x).ConfigureAwait(false);
  200. }
  201. var i = n - count + 1;
  202. if (i >= 0 && i % skip == 0)
  203. {
  204. await queue.Dequeue().OnCompletedAsync().ConfigureAwait(false);
  205. }
  206. n++;
  207. if (n % skip == 0)
  208. {
  209. var window = new SequentialSimpleAsyncSubject<TSource>();
  210. queue.Enqueue(window);
  211. var wrapper = new WindowAsyncObservable<TSource>(window, refCount);
  212. await observer.OnNextAsync(wrapper).ConfigureAwait(false);
  213. }
  214. },
  215. async ex =>
  216. {
  217. while (queue.Count > 0)
  218. {
  219. await queue.Dequeue().OnErrorAsync(ex).ConfigureAwait(false);
  220. }
  221. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  222. },
  223. async () =>
  224. {
  225. while (queue.Count > 0)
  226. {
  227. await queue.Dequeue().OnCompletedAsync().ConfigureAwait(false);
  228. }
  229. await observer.OnCompletedAsync().ConfigureAwait(false);
  230. }
  231. ),
  232. refCount
  233. );
  234. }
  235. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> Window<TSource>(IAsyncObserver<IAsyncObservable<TSource>> observer, IAsyncDisposable subscription, TimeSpan timeSpan) => Window(observer, subscription, timeSpan, TaskPoolAsyncScheduler.Default);
  236. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> Window<TSource>(IAsyncObserver<IAsyncObservable<TSource>> observer, IAsyncDisposable subscription, TimeSpan timeSpan, IAsyncScheduler scheduler)
  237. {
  238. if (observer == null)
  239. throw new ArgumentNullException(nameof(observer));
  240. if (subscription == null)
  241. throw new ArgumentNullException(nameof(subscription));
  242. if (timeSpan < TimeSpan.Zero)
  243. throw new ArgumentOutOfRangeException(nameof(timeSpan));
  244. if (scheduler == null)
  245. throw new ArgumentNullException(nameof(scheduler));
  246. var gate = new AsyncGate();
  247. var window = default(IAsyncSubject<TSource>);
  248. var d = new CompositeAsyncDisposable();
  249. var refCount = new RefCountAsyncDisposable(d);
  250. async Task CreateWindowAsync()
  251. {
  252. window = new SequentialSimpleAsyncSubject<TSource>();
  253. var wrapper = new WindowAsyncObservable<TSource>(window, refCount);
  254. await observer.OnNextAsync(wrapper).ConfigureAwait(false);
  255. }
  256. async ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> CoreAsync()
  257. {
  258. await d.AddAsync(subscription).ConfigureAwait(false);
  259. await CreateWindowAsync().ConfigureAwait(false);
  260. var timer = await scheduler.ScheduleAsync(async ct =>
  261. {
  262. while (!ct.IsCancellationRequested)
  263. {
  264. using (await gate.LockAsync().ConfigureAwait(false))
  265. {
  266. await window.OnCompletedAsync().ConfigureAwait(false);
  267. await CreateWindowAsync().ConfigureAwait(false);
  268. }
  269. await scheduler.Delay(timeSpan, ct).RendezVous(scheduler, ct);
  270. }
  271. }, timeSpan).ConfigureAwait(false);
  272. await d.AddAsync(timer).ConfigureAwait(false);
  273. return
  274. (
  275. Create<TSource>(
  276. async x =>
  277. {
  278. using (await gate.LockAsync().ConfigureAwait(false))
  279. {
  280. await window.OnNextAsync(x).ConfigureAwait(false);
  281. }
  282. },
  283. async ex =>
  284. {
  285. using (await gate.LockAsync().ConfigureAwait(false))
  286. {
  287. await window.OnErrorAsync(ex).ConfigureAwait(false);
  288. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  289. }
  290. },
  291. async () =>
  292. {
  293. using (await gate.LockAsync().ConfigureAwait(false))
  294. {
  295. await window.OnCompletedAsync().ConfigureAwait(false);
  296. await observer.OnCompletedAsync().ConfigureAwait(false);
  297. }
  298. }
  299. ),
  300. refCount
  301. );
  302. }
  303. return CoreAsync();
  304. }
  305. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> Window<TSource>(IAsyncObserver<IAsyncObservable<TSource>> observer, IAsyncDisposable subscription, TimeSpan timeSpan, TimeSpan timeShift) => Window(observer, subscription, timeSpan, timeShift, TaskPoolAsyncScheduler.Default);
  306. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> Window<TSource>(IAsyncObserver<IAsyncObservable<TSource>> observer, IAsyncDisposable subscription, TimeSpan timeSpan, TimeSpan timeShift, IAsyncScheduler scheduler)
  307. {
  308. if (observer == null)
  309. throw new ArgumentNullException(nameof(observer));
  310. if (subscription == null)
  311. throw new ArgumentNullException(nameof(subscription));
  312. if (timeSpan < TimeSpan.Zero)
  313. throw new ArgumentOutOfRangeException(nameof(timeSpan));
  314. if (timeShift < TimeSpan.Zero)
  315. throw new ArgumentOutOfRangeException(nameof(timeShift));
  316. if (scheduler == null)
  317. throw new ArgumentNullException(nameof(scheduler));
  318. var gate = new AsyncGate();
  319. var d = new CompositeAsyncDisposable();
  320. var timer = new SerialAsyncDisposable();
  321. var refCount = new RefCountAsyncDisposable(d);
  322. var queue = new Queue<IAsyncSubject<TSource>>();
  323. var nextOpen = timeShift;
  324. var nextClose = timeSpan;
  325. var totalTime = TimeSpan.Zero;
  326. var isOpen = false;
  327. var isClose = false;
  328. TimeSpan GetNextDue()
  329. {
  330. if (nextOpen == nextClose)
  331. {
  332. isOpen = isClose = true;
  333. }
  334. else if (nextClose < nextOpen)
  335. {
  336. isClose = true;
  337. isOpen = false;
  338. }
  339. else
  340. {
  341. isOpen = true;
  342. isClose = false;
  343. }
  344. var newTotalTime = isClose ? nextClose : nextOpen;
  345. var due = newTotalTime - totalTime;
  346. totalTime = newTotalTime;
  347. if (isOpen)
  348. {
  349. nextOpen += timeShift;
  350. }
  351. if (isClose)
  352. {
  353. nextClose += timeShift;
  354. }
  355. return due;
  356. }
  357. async Task CreateWindowAsync()
  358. {
  359. var window = new SequentialSimpleAsyncSubject<TSource>();
  360. queue.Enqueue(window);
  361. var wrapper = new WindowAsyncObservable<TSource>(window, refCount);
  362. await observer.OnNextAsync(wrapper).ConfigureAwait(false);
  363. }
  364. async Task CreateTimer()
  365. {
  366. var inner = new SingleAssignmentAsyncDisposable();
  367. await timer.AssignAsync(inner).ConfigureAwait(false);
  368. var task = await scheduler.ScheduleAsync(async ct =>
  369. {
  370. while (!ct.IsCancellationRequested)
  371. {
  372. using (await gate.LockAsync().ConfigureAwait(false))
  373. {
  374. if (isClose)
  375. {
  376. await queue.Dequeue().OnCompletedAsync().RendezVous(scheduler, ct);
  377. }
  378. if (isOpen)
  379. {
  380. await CreateWindowAsync().RendezVous(scheduler, ct);
  381. }
  382. }
  383. await scheduler.Delay(GetNextDue(), ct).RendezVous(scheduler, ct);
  384. }
  385. }, GetNextDue()).ConfigureAwait(false);
  386. await inner.AssignAsync(task).ConfigureAwait(false);
  387. }
  388. async ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> CoreAsync()
  389. {
  390. await d.AddAsync(subscription).ConfigureAwait(false);
  391. await d.AddAsync(timer).ConfigureAwait(false);
  392. await CreateWindowAsync().ConfigureAwait(false);
  393. await CreateTimer().ConfigureAwait(false);
  394. return
  395. (
  396. Create<TSource>(
  397. async x =>
  398. {
  399. using (await gate.LockAsync().ConfigureAwait(false))
  400. {
  401. foreach (var window in queue)
  402. {
  403. await window.OnNextAsync(x).ConfigureAwait(false);
  404. }
  405. }
  406. },
  407. async ex =>
  408. {
  409. using (await gate.LockAsync().ConfigureAwait(false))
  410. {
  411. while (queue.Count > 0)
  412. {
  413. await queue.Dequeue().OnErrorAsync(ex).ConfigureAwait(false);
  414. }
  415. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  416. }
  417. },
  418. async () =>
  419. {
  420. using (await gate.LockAsync().ConfigureAwait(false))
  421. {
  422. while (queue.Count > 0)
  423. {
  424. await queue.Dequeue().OnCompletedAsync().ConfigureAwait(false);
  425. }
  426. await observer.OnCompletedAsync().ConfigureAwait(false);
  427. }
  428. }
  429. ),
  430. refCount
  431. );
  432. }
  433. return CoreAsync();
  434. }
  435. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> Window<TSource>(IAsyncObserver<IAsyncObservable<TSource>> observer, IAsyncDisposable subscription, TimeSpan timeSpan, int count) => Window(observer, subscription, timeSpan, count, TaskPoolAsyncScheduler.Default);
  436. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> Window<TSource>(IAsyncObserver<IAsyncObservable<TSource>> observer, IAsyncDisposable subscription, TimeSpan timeSpan, int count, IAsyncScheduler scheduler)
  437. {
  438. if (observer == null)
  439. throw new ArgumentNullException(nameof(observer));
  440. if (subscription == null)
  441. throw new ArgumentNullException(nameof(subscription));
  442. if (timeSpan < TimeSpan.Zero)
  443. throw new ArgumentOutOfRangeException(nameof(timeSpan));
  444. if (count <= 0)
  445. throw new ArgumentOutOfRangeException(nameof(count));
  446. if (scheduler == null)
  447. throw new ArgumentNullException(nameof(scheduler));
  448. var gate = new AsyncGate();
  449. var n = 0;
  450. var window = default(IAsyncSubject<TSource>);
  451. var d = new CompositeAsyncDisposable();
  452. var timer = new SerialAsyncDisposable();
  453. var refCount = new RefCountAsyncDisposable(d);
  454. async Task CreateTimer(IAsyncSubject<TSource> currentWindow)
  455. {
  456. var inner = new SingleAssignmentAsyncDisposable();
  457. await timer.AssignAsync(inner).ConfigureAwait(false);
  458. var task = await scheduler.ScheduleAsync(async ct =>
  459. {
  460. var newWindow = default(IAsyncSubject<TSource>);
  461. using (await gate.LockAsync().ConfigureAwait(false))
  462. {
  463. if (window != currentWindow)
  464. {
  465. return;
  466. }
  467. n = 0;
  468. newWindow = await CreateWindowAsync().RendezVous(scheduler, ct);
  469. }
  470. await CreateTimer(newWindow).RendezVous(scheduler, ct);
  471. }, timeSpan).ConfigureAwait(false);
  472. await inner.AssignAsync(task).ConfigureAwait(false);
  473. }
  474. async Task<IAsyncSubject<TSource>> CreateWindowAsync()
  475. {
  476. window = new SequentialSimpleAsyncSubject<TSource>();
  477. var wrapper = new WindowAsyncObservable<TSource>(window, refCount);
  478. await observer.OnNextAsync(wrapper).ConfigureAwait(false);
  479. return window;
  480. }
  481. async ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)> CoreAsync()
  482. {
  483. await d.AddAsync(subscription).ConfigureAwait(false);
  484. await d.AddAsync(timer).ConfigureAwait(false);
  485. var w = await CreateWindowAsync().ConfigureAwait(false);
  486. await CreateTimer(w).ConfigureAwait(false);
  487. return
  488. (
  489. Create<TSource>(
  490. async x =>
  491. {
  492. var newWindow = default(IAsyncSubject<TSource>);
  493. using (await gate.LockAsync().ConfigureAwait(false))
  494. {
  495. await window.OnNextAsync(x).ConfigureAwait(false);
  496. n++;
  497. if (n == count)
  498. {
  499. await window.OnCompletedAsync().ConfigureAwait(false);
  500. n = 0;
  501. newWindow = await CreateWindowAsync().ConfigureAwait(false);
  502. }
  503. }
  504. if (newWindow != null)
  505. {
  506. await CreateTimer(newWindow).ConfigureAwait(false);
  507. }
  508. },
  509. async ex =>
  510. {
  511. using (await gate.LockAsync().ConfigureAwait(false))
  512. {
  513. await window.OnErrorAsync(ex).ConfigureAwait(false);
  514. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  515. }
  516. },
  517. async () =>
  518. {
  519. using (await gate.LockAsync().ConfigureAwait(false))
  520. {
  521. await window.OnCompletedAsync().ConfigureAwait(false);
  522. await observer.OnCompletedAsync().ConfigureAwait(false);
  523. }
  524. }
  525. ),
  526. refCount
  527. );
  528. }
  529. return CoreAsync();
  530. }
  531. public static ValueTask<(IAsyncObserver<TSource>, IAsyncObserver<TWindowBoundary>, IAsyncDisposable)> Window<TSource, TWindowBoundary>(IAsyncObserver<IAsyncObservable<TSource>> observer, IAsyncDisposable subscription)
  532. {
  533. if (observer == null)
  534. throw new ArgumentNullException(nameof(observer));
  535. if (subscription == null)
  536. throw new ArgumentNullException(nameof(subscription));
  537. var gate = new AsyncGate();
  538. var refCount = new RefCountAsyncDisposable(subscription);
  539. var window = default(IAsyncSubject<TSource>);
  540. async Task CreateWindowAsync()
  541. {
  542. window = new SequentialSimpleAsyncSubject<TSource>();
  543. var wrapper = new WindowAsyncObservable<TSource>(window, refCount);
  544. await observer.OnNextAsync(wrapper).ConfigureAwait(false);
  545. }
  546. async ValueTask<(IAsyncObserver<TSource>, IAsyncObserver<TWindowBoundary>, IAsyncDisposable)> CoreAsync()
  547. {
  548. await CreateWindowAsync().ConfigureAwait(false);
  549. return
  550. (
  551. Create<TSource>(
  552. async x =>
  553. {
  554. using (await gate.LockAsync().ConfigureAwait(false))
  555. {
  556. await window.OnNextAsync(x).ConfigureAwait(false);
  557. }
  558. },
  559. async ex =>
  560. {
  561. using (await gate.LockAsync().ConfigureAwait(false))
  562. {
  563. await window.OnErrorAsync(ex).ConfigureAwait(false);
  564. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  565. }
  566. },
  567. async () =>
  568. {
  569. using (await gate.LockAsync().ConfigureAwait(false))
  570. {
  571. await window.OnCompletedAsync().ConfigureAwait(false);
  572. await observer.OnCompletedAsync().ConfigureAwait(false);
  573. }
  574. }
  575. ),
  576. Create<TWindowBoundary>(
  577. async x =>
  578. {
  579. using (await gate.LockAsync().ConfigureAwait(false))
  580. {
  581. await window.OnCompletedAsync().ConfigureAwait(false);
  582. await CreateWindowAsync().ConfigureAwait(false);
  583. }
  584. },
  585. async ex =>
  586. {
  587. using (await gate.LockAsync().ConfigureAwait(false))
  588. {
  589. await window.OnErrorAsync(ex).ConfigureAwait(false);
  590. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  591. }
  592. },
  593. async () =>
  594. {
  595. using (await gate.LockAsync().ConfigureAwait(false))
  596. {
  597. await window.OnCompletedAsync().ConfigureAwait(false);
  598. await observer.OnCompletedAsync().ConfigureAwait(false);
  599. }
  600. }
  601. ),
  602. refCount
  603. );
  604. }
  605. return CoreAsync();
  606. }
  607. public static ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable, IAsyncDisposable)> Window<TSource, TWindowClosing>(IAsyncObserver<IAsyncObservable<TSource>> observer, Func<IAsyncObservable<TWindowClosing>> windowClosingSelector, IAsyncDisposable subscription)
  608. {
  609. if (observer == null)
  610. throw new ArgumentNullException(nameof(observer));
  611. if (windowClosingSelector == null)
  612. throw new ArgumentNullException(nameof(windowClosingSelector));
  613. if (subscription == null)
  614. throw new ArgumentNullException(nameof(subscription));
  615. var closeSubscription = new SerialAsyncDisposable();
  616. var gate = new AsyncGate();
  617. var queueLock = new AsyncQueueLock();
  618. var refCount = new RefCountAsyncDisposable(subscription);
  619. var window = default(IAsyncSubject<TSource>);
  620. async ValueTask CreateWindowAsync()
  621. {
  622. window = new SequentialSimpleAsyncSubject<TSource>();
  623. var wrapper = new WindowAsyncObservable<TSource>(window, refCount);
  624. await observer.OnNextAsync(wrapper).ConfigureAwait(false);
  625. }
  626. async ValueTask CreateWindowCloseAsync()
  627. {
  628. var closing = default(IAsyncObservable<TWindowClosing>);
  629. try
  630. {
  631. closing = windowClosingSelector(); // REVIEW: Do we need an async variant?
  632. }
  633. catch (Exception ex)
  634. {
  635. using (await gate.LockAsync().ConfigureAwait(false))
  636. {
  637. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  638. }
  639. return;
  640. }
  641. var closingSubscription = new SingleAssignmentAsyncDisposable();
  642. await closeSubscription.AssignAsync(closingSubscription).ConfigureAwait(false);
  643. async ValueTask CloseWindowAsync()
  644. {
  645. await closingSubscription.DisposeAsync().ConfigureAwait(false);
  646. using (await gate.LockAsync().ConfigureAwait(false))
  647. {
  648. await window.OnCompletedAsync().ConfigureAwait(false);
  649. await CreateWindowAsync().ConfigureAwait(false);
  650. }
  651. await queueLock.WaitAsync(CreateWindowCloseAsync).ConfigureAwait(false);
  652. }
  653. var closingObserver =
  654. Create<TWindowClosing>(
  655. x => CloseWindowAsync(),
  656. async ex =>
  657. {
  658. using (await gate.LockAsync().ConfigureAwait(false))
  659. {
  660. await window.OnErrorAsync(ex).ConfigureAwait(false);
  661. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  662. }
  663. },
  664. CloseWindowAsync
  665. );
  666. var closingSubscriptionInner = await closing.SubscribeSafeAsync(closingObserver).ConfigureAwait(false);
  667. await closingSubscription.AssignAsync(closingSubscriptionInner).ConfigureAwait(false);
  668. }
  669. async ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable, IAsyncDisposable)> CoreAsync()
  670. {
  671. await CreateWindowAsync().ConfigureAwait(false);
  672. var sink =
  673. Create<TSource>(
  674. async x =>
  675. {
  676. using (await gate.LockAsync().ConfigureAwait(false))
  677. {
  678. await window.OnNextAsync(x).ConfigureAwait(false);
  679. }
  680. },
  681. async ex =>
  682. {
  683. using (await gate.LockAsync().ConfigureAwait(false))
  684. {
  685. await window.OnErrorAsync(ex).ConfigureAwait(false);
  686. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  687. }
  688. },
  689. async () =>
  690. {
  691. using (await gate.LockAsync().ConfigureAwait(false))
  692. {
  693. await window.OnCompletedAsync().ConfigureAwait(false);
  694. await observer.OnCompletedAsync().ConfigureAwait(false);
  695. }
  696. }
  697. );
  698. await queueLock.WaitAsync(CreateWindowCloseAsync).ConfigureAwait(false);
  699. return (sink, closeSubscription, refCount);
  700. }
  701. return CoreAsync();
  702. }
  703. }
  704. }