Window.cs 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Reactive.Concurrency;
  6. using System.Reactive.Disposables;
  7. using System.Reactive.Subjects;
  8. using System.Threading;
  9. using System.Threading.Tasks;
  10. namespace System.Reactive.Linq
  11. {
  12. partial class AsyncObservable
  13. {
  14. public static IAsyncObservable<IAsyncObservable<TSource>> Window<TSource>(this IAsyncObservable<TSource> source, int count)
  15. {
  16. if (source == null)
  17. throw new ArgumentNullException(nameof(source));
  18. if (count <= 0)
  19. throw new ArgumentOutOfRangeException(nameof(count));
  20. return Create<IAsyncObservable<TSource>>(observer => WindowCore(source, observer, (o, d) => AsyncObserver.Window(o, d, count)));
  21. }
  22. public static IAsyncObservable<IAsyncObservable<TSource>> Window<TSource>(this IAsyncObservable<TSource> source, int count, int skip)
  23. {
  24. if (source == null)
  25. throw new ArgumentNullException(nameof(source));
  26. if (count <= 0)
  27. throw new ArgumentOutOfRangeException(nameof(count));
  28. if (skip <= 0)
  29. throw new ArgumentOutOfRangeException(nameof(skip));
  30. return Create<IAsyncObservable<TSource>>(observer => WindowCore(source, observer, (o, d) => AsyncObserver.Window(o, d, count, skip)));
  31. }
  32. public static IAsyncObservable<IAsyncObservable<TSource>> Window<TSource>(this IAsyncObservable<TSource> source, TimeSpan timeSpan)
  33. {
  34. if (source == null)
  35. throw new ArgumentNullException(nameof(source));
  36. if (timeSpan < TimeSpan.Zero)
  37. throw new ArgumentOutOfRangeException(nameof(timeSpan));
  38. return Create<IAsyncObservable<TSource>>(observer => WindowAsyncCore(source, observer, (o, d) => AsyncObserver.Window(o, d, timeSpan)));
  39. }
  40. public static IAsyncObservable<IAsyncObservable<TSource>> Window<TSource>(this IAsyncObservable<TSource> source, TimeSpan timeSpan, IAsyncScheduler scheduler)
  41. {
  42. if (source == null)
  43. throw new ArgumentNullException(nameof(source));
  44. if (timeSpan < TimeSpan.Zero)
  45. throw new ArgumentOutOfRangeException(nameof(timeSpan));
  46. if (scheduler == null)
  47. throw new ArgumentNullException(nameof(scheduler));
  48. return Create<IAsyncObservable<TSource>>(observer => WindowAsyncCore(source, observer, (o, d) => AsyncObserver.Window(o, d, timeSpan, scheduler)));
  49. }
  50. public static IAsyncObservable<IAsyncObservable<TSource>> Window<TSource>(this IAsyncObservable<TSource> source, TimeSpan timeSpan, TimeSpan timeShift)
  51. {
  52. if (source == null)
  53. throw new ArgumentNullException(nameof(source));
  54. if (timeSpan < TimeSpan.Zero)
  55. throw new ArgumentOutOfRangeException(nameof(timeSpan));
  56. if (timeShift < TimeSpan.Zero)
  57. throw new ArgumentOutOfRangeException(nameof(timeShift));
  58. return Create<IAsyncObservable<TSource>>(observer => WindowAsyncCore(source, observer, (o, d) => AsyncObserver.Window(o, d, timeSpan, timeShift)));
  59. }
  60. public static IAsyncObservable<IAsyncObservable<TSource>> Window<TSource>(this IAsyncObservable<TSource> source, TimeSpan timeSpan, TimeSpan timeShift, IAsyncScheduler scheduler)
  61. {
  62. if (source == null)
  63. throw new ArgumentNullException(nameof(source));
  64. if (timeSpan < TimeSpan.Zero)
  65. throw new ArgumentOutOfRangeException(nameof(timeSpan));
  66. if (timeShift < TimeSpan.Zero)
  67. throw new ArgumentOutOfRangeException(nameof(timeShift));
  68. if (scheduler == null)
  69. throw new ArgumentNullException(nameof(scheduler));
  70. return Create<IAsyncObservable<TSource>>(observer => WindowAsyncCore(source, observer, (o, d) => AsyncObserver.Window(o, d, timeSpan, timeShift, scheduler)));
  71. }
  72. public static IAsyncObservable<IAsyncObservable<TSource>> Window<TSource>(this IAsyncObservable<TSource> source, TimeSpan timeSpan, int count)
  73. {
  74. if (source == null)
  75. throw new ArgumentNullException(nameof(source));
  76. if (timeSpan < TimeSpan.Zero)
  77. throw new ArgumentOutOfRangeException(nameof(timeSpan));
  78. if (count <= 0)
  79. throw new ArgumentOutOfRangeException(nameof(count));
  80. return Create<IAsyncObservable<TSource>>(observer => WindowAsyncCore(source, observer, (o, d) => AsyncObserver.Window(o, d, timeSpan, count)));
  81. }
  82. public static IAsyncObservable<IAsyncObservable<TSource>> Window<TSource>(this IAsyncObservable<TSource> source, TimeSpan timeSpan, int count, IAsyncScheduler scheduler)
  83. {
  84. if (source == null)
  85. throw new ArgumentNullException(nameof(source));
  86. if (timeSpan < TimeSpan.Zero)
  87. throw new ArgumentOutOfRangeException(nameof(timeSpan));
  88. if (count <= 0)
  89. throw new ArgumentOutOfRangeException(nameof(count));
  90. if (scheduler == null)
  91. throw new ArgumentNullException(nameof(scheduler));
  92. return Create<IAsyncObservable<TSource>>(observer => WindowAsyncCore(source, observer, (o, d) => AsyncObserver.Window(o, d, timeSpan, count, scheduler)));
  93. }
  94. public static IAsyncObservable<IAsyncObservable<TSource>> Window<TSource, TWindowBoundary>(this IAsyncObservable<TSource> source, IAsyncObservable<TWindowBoundary> windowBoundaries)
  95. {
  96. if (source == null)
  97. throw new ArgumentNullException(nameof(source));
  98. if (windowBoundaries == null)
  99. throw new ArgumentNullException(nameof(windowBoundaries));
  100. return Create<IAsyncObservable<TSource>>(async observer =>
  101. {
  102. var d = new CompositeAsyncDisposable();
  103. var (sourceObserver, boundariesObserver, subscription) = await AsyncObserver.Window<TSource, TWindowBoundary>(observer, d).ConfigureAwait(false);
  104. var sourceSubscription = await source.SubscribeSafeAsync(sourceObserver).ConfigureAwait(false);
  105. await d.AddAsync(sourceSubscription).ConfigureAwait(false);
  106. var boundariesSubscription = await windowBoundaries.SubscribeSafeAsync(boundariesObserver).ConfigureAwait(false);
  107. await d.AddAsync(boundariesSubscription).ConfigureAwait(false);
  108. return subscription;
  109. });
  110. }
  111. // REVIEW: This overload is inherited from Rx but arguably a bit esoteric as it doesn't provide context to the closing selector.
  112. public static IAsyncObservable<IAsyncObservable<TSource>> Window<TSource, TWindowClosing>(this IAsyncObservable<TSource> source, Func<IAsyncObservable<TWindowClosing>> windowClosingSelector)
  113. {
  114. if (source == null)
  115. throw new ArgumentNullException(nameof(source));
  116. if (windowClosingSelector == null)
  117. throw new ArgumentNullException(nameof(windowClosingSelector));
  118. return Create<IAsyncObservable<TSource>>(async observer =>
  119. {
  120. var d = new CompositeAsyncDisposable();
  121. var (sourceObserver, closingSubscription, subscription) = await AsyncObserver.Window<TSource, TWindowClosing>(observer, windowClosingSelector, d).ConfigureAwait(false);
  122. await d.AddAsync(closingSubscription).ConfigureAwait(false);
  123. var sourceSubscription = await source.SubscribeSafeAsync(sourceObserver).ConfigureAwait(false);
  124. await d.AddAsync(sourceSubscription).ConfigureAwait(false);
  125. return subscription;
  126. });
  127. }
  128. private static async Task<IAsyncDisposable> WindowCore<TSource>(IAsyncObservable<TSource> source, IAsyncObserver<IAsyncObservable<TSource>> observer, Func<IAsyncObserver<IAsyncObservable<TSource>>, IAsyncDisposable, (IAsyncObserver<TSource>, IAsyncDisposable)> createObserver)
  129. {
  130. var d = new SingleAssignmentAsyncDisposable();
  131. var (sink, subscription) = createObserver(observer, d);
  132. var inner = await source.SubscribeSafeAsync(sink).ConfigureAwait(false);
  133. await d.AssignAsync(inner).ConfigureAwait(false);
  134. return subscription;
  135. }
  136. private static async Task<IAsyncDisposable> WindowAsyncCore<TSource>(IAsyncObservable<TSource> source, IAsyncObserver<IAsyncObservable<TSource>> observer, Func<IAsyncObserver<IAsyncObservable<TSource>>, IAsyncDisposable, Task<(IAsyncObserver<TSource>, IAsyncDisposable)>> createObserverAsync)
  137. {
  138. var d = new SingleAssignmentAsyncDisposable();
  139. var (sink, subscription) = await createObserverAsync(observer, d).ConfigureAwait(false);
  140. var inner = await source.SubscribeSafeAsync(sink).ConfigureAwait(false);
  141. await d.AssignAsync(inner).ConfigureAwait(false);
  142. return subscription;
  143. }
  144. }
  145. partial class AsyncObserver
  146. {
  147. public static (IAsyncObserver<TSource>, IAsyncDisposable) Window<TSource>(IAsyncObserver<IAsyncObservable<TSource>> observer, IAsyncDisposable subscription, int count) => Window(observer, subscription, count, count);
  148. public static (IAsyncObserver<TSource>, IAsyncDisposable) Window<TSource>(IAsyncObserver<IAsyncObservable<TSource>> observer, IAsyncDisposable subscription, int count, int skip)
  149. {
  150. if (observer == null)
  151. throw new ArgumentNullException(nameof(observer));
  152. if (subscription == null)
  153. throw new ArgumentNullException(nameof(subscription));
  154. if (count <= 0)
  155. throw new ArgumentOutOfRangeException(nameof(count));
  156. if (skip <= 0)
  157. throw new ArgumentOutOfRangeException(nameof(skip));
  158. var refCount = new RefCountAsyncDisposable(subscription);
  159. var queue = new Queue<IAsyncSubject<TSource>>();
  160. var n = 0;
  161. return
  162. (
  163. Create<TSource>
  164. (
  165. async x =>
  166. {
  167. foreach (var window in queue)
  168. {
  169. await window.OnNextAsync(x).ConfigureAwait(false);
  170. }
  171. var i = n - count + 1;
  172. if (i >= 0 && i % skip == 0)
  173. {
  174. await queue.Dequeue().OnCompletedAsync().ConfigureAwait(false);
  175. }
  176. n++;
  177. if (n % skip == 0)
  178. {
  179. var window = new SequentialSimpleAsyncSubject<TSource>();
  180. queue.Enqueue(window);
  181. var wrapper = new WindowAsyncObservable<TSource>(window, refCount);
  182. await observer.OnNextAsync(wrapper).ConfigureAwait(false);
  183. }
  184. },
  185. async ex =>
  186. {
  187. while (queue.Count > 0)
  188. {
  189. await queue.Dequeue().OnErrorAsync(ex).ConfigureAwait(false);
  190. }
  191. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  192. },
  193. async () =>
  194. {
  195. while (queue.Count > 0)
  196. {
  197. await queue.Dequeue().OnCompletedAsync().ConfigureAwait(false);
  198. }
  199. await observer.OnCompletedAsync().ConfigureAwait(false);
  200. }
  201. ),
  202. refCount
  203. );
  204. }
  205. public static Task<(IAsyncObserver<TSource>, IAsyncDisposable)> Window<TSource>(IAsyncObserver<IAsyncObservable<TSource>> observer, IAsyncDisposable subscription, TimeSpan timeSpan) => Window(observer, subscription, timeSpan, TaskPoolAsyncScheduler.Default);
  206. public static Task<(IAsyncObserver<TSource>, IAsyncDisposable)> Window<TSource>(IAsyncObserver<IAsyncObservable<TSource>> observer, IAsyncDisposable subscription, TimeSpan timeSpan, IAsyncScheduler scheduler)
  207. {
  208. if (observer == null)
  209. throw new ArgumentNullException(nameof(observer));
  210. if (subscription == null)
  211. throw new ArgumentNullException(nameof(subscription));
  212. if (timeSpan < TimeSpan.Zero)
  213. throw new ArgumentOutOfRangeException(nameof(timeSpan));
  214. if (scheduler == null)
  215. throw new ArgumentNullException(nameof(scheduler));
  216. var gate = new AsyncLock();
  217. var window = default(IAsyncSubject<TSource>);
  218. var d = new CompositeAsyncDisposable();
  219. var refCount = new RefCountAsyncDisposable(d);
  220. async Task CreateWindowAsync()
  221. {
  222. window = new SequentialSimpleAsyncSubject<TSource>();
  223. var wrapper = new WindowAsyncObservable<TSource>(window, refCount);
  224. await observer.OnNextAsync(wrapper).ConfigureAwait(false);
  225. }
  226. async Task<(IAsyncObserver<TSource>, IAsyncDisposable)> CoreAsync()
  227. {
  228. await d.AddAsync(subscription).ConfigureAwait(false);
  229. await CreateWindowAsync().ConfigureAwait(false);
  230. var timer = await scheduler.ScheduleAsync(async ct =>
  231. {
  232. while (!ct.IsCancellationRequested)
  233. {
  234. using (await gate.LockAsync().ConfigureAwait(false))
  235. {
  236. await window.OnCompletedAsync().ConfigureAwait(false);
  237. await CreateWindowAsync().ConfigureAwait(false);
  238. }
  239. await scheduler.Delay(timeSpan, ct).RendezVous(scheduler, ct);
  240. }
  241. }, timeSpan).ConfigureAwait(false);
  242. await d.AddAsync(timer).ConfigureAwait(false);
  243. return
  244. (
  245. Create<TSource>(
  246. async x =>
  247. {
  248. using (await gate.LockAsync().ConfigureAwait(false))
  249. {
  250. await window.OnNextAsync(x).ConfigureAwait(false);
  251. }
  252. },
  253. async ex =>
  254. {
  255. using (await gate.LockAsync().ConfigureAwait(false))
  256. {
  257. await window.OnErrorAsync(ex).ConfigureAwait(false);
  258. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  259. }
  260. },
  261. async () =>
  262. {
  263. using (await gate.LockAsync().ConfigureAwait(false))
  264. {
  265. await window.OnCompletedAsync().ConfigureAwait(false);
  266. await observer.OnCompletedAsync().ConfigureAwait(false);
  267. }
  268. }
  269. ),
  270. refCount
  271. );
  272. }
  273. return CoreAsync();
  274. }
  275. public static Task<(IAsyncObserver<TSource>, IAsyncDisposable)> Window<TSource>(IAsyncObserver<IAsyncObservable<TSource>> observer, IAsyncDisposable subscription, TimeSpan timeSpan, TimeSpan timeShift) => Window(observer, subscription, timeSpan, timeShift, TaskPoolAsyncScheduler.Default);
  276. public static Task<(IAsyncObserver<TSource>, IAsyncDisposable)> Window<TSource>(IAsyncObserver<IAsyncObservable<TSource>> observer, IAsyncDisposable subscription, TimeSpan timeSpan, TimeSpan timeShift, IAsyncScheduler scheduler)
  277. {
  278. if (observer == null)
  279. throw new ArgumentNullException(nameof(observer));
  280. if (subscription == null)
  281. throw new ArgumentNullException(nameof(subscription));
  282. if (timeSpan < TimeSpan.Zero)
  283. throw new ArgumentOutOfRangeException(nameof(timeSpan));
  284. if (timeShift < TimeSpan.Zero)
  285. throw new ArgumentOutOfRangeException(nameof(timeShift));
  286. if (scheduler == null)
  287. throw new ArgumentNullException(nameof(scheduler));
  288. var gate = new AsyncLock();
  289. var d = new CompositeAsyncDisposable();
  290. var timer = new SerialAsyncDisposable();
  291. var refCount = new RefCountAsyncDisposable(d);
  292. var queue = new Queue<IAsyncSubject<TSource>>();
  293. var nextOpen = timeShift;
  294. var nextClose = timeSpan;
  295. var totalTime = TimeSpan.Zero;
  296. var isOpen = false;
  297. var isClose = false;
  298. TimeSpan GetNextDue()
  299. {
  300. if (nextOpen == nextClose)
  301. {
  302. isOpen = isClose = true;
  303. }
  304. else if (nextClose < nextOpen)
  305. {
  306. isClose = true;
  307. isOpen = false;
  308. }
  309. else
  310. {
  311. isOpen = true;
  312. isClose = false;
  313. }
  314. var newTotalTime = isClose ? nextClose : nextOpen;
  315. var due = newTotalTime - totalTime;
  316. totalTime = newTotalTime;
  317. if (isOpen)
  318. {
  319. nextOpen += timeShift;
  320. }
  321. if (isClose)
  322. {
  323. nextClose += timeShift;
  324. }
  325. return due;
  326. }
  327. async Task CreateWindowAsync()
  328. {
  329. var window = new SequentialSimpleAsyncSubject<TSource>();
  330. queue.Enqueue(window);
  331. var wrapper = new WindowAsyncObservable<TSource>(window, refCount);
  332. await observer.OnNextAsync(wrapper).ConfigureAwait(false);
  333. }
  334. async Task CreateTimer()
  335. {
  336. var inner = new SingleAssignmentAsyncDisposable();
  337. await timer.AssignAsync(inner).ConfigureAwait(false);
  338. var task = await scheduler.ScheduleAsync(async ct =>
  339. {
  340. while (!ct.IsCancellationRequested)
  341. {
  342. using (await gate.LockAsync().ConfigureAwait(false))
  343. {
  344. if (isClose)
  345. {
  346. await queue.Dequeue().OnCompletedAsync().RendezVous(scheduler, ct);
  347. }
  348. if (isOpen)
  349. {
  350. await CreateWindowAsync().RendezVous(scheduler, ct);
  351. }
  352. }
  353. await scheduler.Delay(GetNextDue(), ct).RendezVous(scheduler, ct);
  354. }
  355. }, GetNextDue()).ConfigureAwait(false);
  356. await inner.AssignAsync(task).ConfigureAwait(false);
  357. }
  358. async Task<(IAsyncObserver<TSource>, IAsyncDisposable)> CoreAsync()
  359. {
  360. await d.AddAsync(subscription).ConfigureAwait(false);
  361. await d.AddAsync(timer).ConfigureAwait(false);
  362. await CreateWindowAsync().ConfigureAwait(false);
  363. await CreateTimer().ConfigureAwait(false);
  364. return
  365. (
  366. Create<TSource>(
  367. async x =>
  368. {
  369. using (await gate.LockAsync().ConfigureAwait(false))
  370. {
  371. foreach (var window in queue)
  372. {
  373. await window.OnNextAsync(x).ConfigureAwait(false);
  374. }
  375. }
  376. },
  377. async ex =>
  378. {
  379. using (await gate.LockAsync().ConfigureAwait(false))
  380. {
  381. while (queue.Count > 0)
  382. {
  383. await queue.Dequeue().OnErrorAsync(ex).ConfigureAwait(false);
  384. }
  385. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  386. }
  387. },
  388. async () =>
  389. {
  390. using (await gate.LockAsync().ConfigureAwait(false))
  391. {
  392. while (queue.Count > 0)
  393. {
  394. await queue.Dequeue().OnCompletedAsync().ConfigureAwait(false);
  395. }
  396. await observer.OnCompletedAsync().ConfigureAwait(false);
  397. }
  398. }
  399. ),
  400. refCount
  401. );
  402. }
  403. return CoreAsync();
  404. }
  405. public static Task<(IAsyncObserver<TSource>, IAsyncDisposable)> Window<TSource>(IAsyncObserver<IAsyncObservable<TSource>> observer, IAsyncDisposable subscription, TimeSpan timeSpan, int count) => Window(observer, subscription, timeSpan, count, TaskPoolAsyncScheduler.Default);
  406. public static Task<(IAsyncObserver<TSource>, IAsyncDisposable)> Window<TSource>(IAsyncObserver<IAsyncObservable<TSource>> observer, IAsyncDisposable subscription, TimeSpan timeSpan, int count, IAsyncScheduler scheduler)
  407. {
  408. if (observer == null)
  409. throw new ArgumentNullException(nameof(observer));
  410. if (subscription == null)
  411. throw new ArgumentNullException(nameof(subscription));
  412. if (timeSpan < TimeSpan.Zero)
  413. throw new ArgumentOutOfRangeException(nameof(timeSpan));
  414. if (count <= 0)
  415. throw new ArgumentOutOfRangeException(nameof(count));
  416. if (scheduler == null)
  417. throw new ArgumentNullException(nameof(scheduler));
  418. var gate = new AsyncLock();
  419. var n = 0;
  420. var window = default(IAsyncSubject<TSource>);
  421. var d = new CompositeAsyncDisposable();
  422. var timer = new SerialAsyncDisposable();
  423. var refCount = new RefCountAsyncDisposable(d);
  424. async Task CreateTimer(IAsyncSubject<TSource> currentWindow)
  425. {
  426. var inner = new SingleAssignmentAsyncDisposable();
  427. await timer.AssignAsync(inner).ConfigureAwait(false);
  428. var task = await scheduler.ScheduleAsync(async ct =>
  429. {
  430. var newWindow = default(IAsyncSubject<TSource>);
  431. using (await gate.LockAsync().ConfigureAwait(false))
  432. {
  433. if (window != currentWindow)
  434. {
  435. return;
  436. }
  437. n = 0;
  438. newWindow = await CreateWindowAsync().RendezVous(scheduler, ct);
  439. }
  440. await CreateTimer(newWindow).RendezVous(scheduler, ct);
  441. }, timeSpan).ConfigureAwait(false);
  442. await inner.AssignAsync(task).ConfigureAwait(false);
  443. }
  444. async Task<IAsyncSubject<TSource>> CreateWindowAsync()
  445. {
  446. window = new SequentialSimpleAsyncSubject<TSource>();
  447. var wrapper = new WindowAsyncObservable<TSource>(window, refCount);
  448. await observer.OnNextAsync(wrapper).ConfigureAwait(false);
  449. return window;
  450. }
  451. async Task<(IAsyncObserver<TSource>, IAsyncDisposable)> CoreAsync()
  452. {
  453. await d.AddAsync(subscription).ConfigureAwait(false);
  454. await d.AddAsync(timer).ConfigureAwait(false);
  455. var w = await CreateWindowAsync().ConfigureAwait(false);
  456. await CreateTimer(w).ConfigureAwait(false);
  457. return
  458. (
  459. Create<TSource>(
  460. async x =>
  461. {
  462. var newWindow = default(IAsyncSubject<TSource>);
  463. using (await gate.LockAsync().ConfigureAwait(false))
  464. {
  465. await window.OnNextAsync(x).ConfigureAwait(false);
  466. n++;
  467. if (n == count)
  468. {
  469. await window.OnCompletedAsync().ConfigureAwait(false);
  470. n = 0;
  471. newWindow = await CreateWindowAsync().ConfigureAwait(false);
  472. }
  473. }
  474. if (newWindow != null)
  475. {
  476. await CreateTimer(newWindow).ConfigureAwait(false);
  477. }
  478. },
  479. async ex =>
  480. {
  481. using (await gate.LockAsync().ConfigureAwait(false))
  482. {
  483. await window.OnErrorAsync(ex).ConfigureAwait(false);
  484. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  485. }
  486. },
  487. async () =>
  488. {
  489. using (await gate.LockAsync().ConfigureAwait(false))
  490. {
  491. await window.OnCompletedAsync().ConfigureAwait(false);
  492. await observer.OnCompletedAsync().ConfigureAwait(false);
  493. }
  494. }
  495. ),
  496. refCount
  497. );
  498. }
  499. return CoreAsync();
  500. }
  501. public static Task<(IAsyncObserver<TSource>, IAsyncObserver<TWindowBoundary>, IAsyncDisposable)> Window<TSource, TWindowBoundary>(IAsyncObserver<IAsyncObservable<TSource>> observer, IAsyncDisposable subscription)
  502. {
  503. if (observer == null)
  504. throw new ArgumentNullException(nameof(observer));
  505. if (subscription == null)
  506. throw new ArgumentNullException(nameof(subscription));
  507. var gate = new AsyncLock();
  508. var refCount = new RefCountAsyncDisposable(subscription);
  509. var window = default(IAsyncSubject<TSource>);
  510. async Task CreateWindowAsync()
  511. {
  512. window = new SequentialSimpleAsyncSubject<TSource>();
  513. var wrapper = new WindowAsyncObservable<TSource>(window, refCount);
  514. await observer.OnNextAsync(wrapper).ConfigureAwait(false);
  515. }
  516. async Task<(IAsyncObserver<TSource>, IAsyncObserver<TWindowBoundary>, IAsyncDisposable)> CoreAsync()
  517. {
  518. await CreateWindowAsync().ConfigureAwait(false);
  519. return
  520. (
  521. Create<TSource>(
  522. async x =>
  523. {
  524. using (await gate.LockAsync().ConfigureAwait(false))
  525. {
  526. await window.OnNextAsync(x).ConfigureAwait(false);
  527. }
  528. },
  529. async ex =>
  530. {
  531. using (await gate.LockAsync().ConfigureAwait(false))
  532. {
  533. await window.OnErrorAsync(ex).ConfigureAwait(false);
  534. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  535. }
  536. },
  537. async () =>
  538. {
  539. using (await gate.LockAsync().ConfigureAwait(false))
  540. {
  541. await window.OnCompletedAsync().ConfigureAwait(false);
  542. await observer.OnCompletedAsync().ConfigureAwait(false);
  543. }
  544. }
  545. ),
  546. Create<TWindowBoundary>(
  547. async x =>
  548. {
  549. using (await gate.LockAsync().ConfigureAwait(false))
  550. {
  551. await window.OnCompletedAsync().ConfigureAwait(false);
  552. await CreateWindowAsync().ConfigureAwait(false);
  553. }
  554. },
  555. async ex =>
  556. {
  557. using (await gate.LockAsync().ConfigureAwait(false))
  558. {
  559. await window.OnErrorAsync(ex).ConfigureAwait(false);
  560. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  561. }
  562. },
  563. async () =>
  564. {
  565. using (await gate.LockAsync().ConfigureAwait(false))
  566. {
  567. await window.OnCompletedAsync().ConfigureAwait(false);
  568. await observer.OnCompletedAsync().ConfigureAwait(false);
  569. }
  570. }
  571. ),
  572. refCount
  573. );
  574. }
  575. return CoreAsync();
  576. }
  577. public static Task<(IAsyncObserver<TSource>, IAsyncDisposable, IAsyncDisposable)> Window<TSource, TWindowClosing>(IAsyncObserver<IAsyncObservable<TSource>> observer, Func<IAsyncObservable<TWindowClosing>> windowClosingSelector, IAsyncDisposable subscription)
  578. {
  579. if (observer == null)
  580. throw new ArgumentNullException(nameof(observer));
  581. if (windowClosingSelector == null)
  582. throw new ArgumentNullException(nameof(windowClosingSelector));
  583. if (subscription == null)
  584. throw new ArgumentNullException(nameof(subscription));
  585. var closeSubscription = new SerialAsyncDisposable();
  586. var gate = new AsyncLock();
  587. var queueLock = new AsyncQueueLock();
  588. var refCount = new RefCountAsyncDisposable(subscription);
  589. var window = default(IAsyncSubject<TSource>);
  590. async Task CreateWindowAsync()
  591. {
  592. window = new SequentialSimpleAsyncSubject<TSource>();
  593. var wrapper = new WindowAsyncObservable<TSource>(window, refCount);
  594. await observer.OnNextAsync(wrapper).ConfigureAwait(false);
  595. }
  596. async Task CreateWindowCloseAsync()
  597. {
  598. var closing = default(IAsyncObservable<TWindowClosing>);
  599. try
  600. {
  601. closing = windowClosingSelector(); // REVIEW: Do we need an async variant?
  602. }
  603. catch (Exception ex)
  604. {
  605. using (await gate.LockAsync().ConfigureAwait(false))
  606. {
  607. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  608. }
  609. return;
  610. }
  611. var closingSubscription = new SingleAssignmentAsyncDisposable();
  612. await closeSubscription.AssignAsync(closingSubscription).ConfigureAwait(false);
  613. async Task CloseWindowAsync()
  614. {
  615. await closingSubscription.DisposeAsync().ConfigureAwait(false);
  616. using (await gate.LockAsync().ConfigureAwait(false))
  617. {
  618. await window.OnCompletedAsync().ConfigureAwait(false);
  619. await CreateWindowAsync().ConfigureAwait(false);
  620. }
  621. await queueLock.WaitAsync(CreateWindowCloseAsync).ConfigureAwait(false);
  622. }
  623. var closingObserver =
  624. Create<TWindowClosing>(
  625. x => CloseWindowAsync(),
  626. async ex =>
  627. {
  628. using (await gate.LockAsync().ConfigureAwait(false))
  629. {
  630. await window.OnErrorAsync(ex).ConfigureAwait(false);
  631. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  632. }
  633. },
  634. CloseWindowAsync
  635. );
  636. var closingSubscriptionInner = await closing.SubscribeSafeAsync(closingObserver).ConfigureAwait(false);
  637. await closingSubscription.AssignAsync(closingSubscriptionInner).ConfigureAwait(false);
  638. }
  639. async Task<(IAsyncObserver<TSource>, IAsyncDisposable, IAsyncDisposable)> CoreAsync()
  640. {
  641. await CreateWindowAsync().ConfigureAwait(false);
  642. var sink =
  643. Create<TSource>(
  644. async x =>
  645. {
  646. using (await gate.LockAsync().ConfigureAwait(false))
  647. {
  648. await window.OnNextAsync(x).ConfigureAwait(false);
  649. }
  650. },
  651. async ex =>
  652. {
  653. using (await gate.LockAsync().ConfigureAwait(false))
  654. {
  655. await window.OnErrorAsync(ex).ConfigureAwait(false);
  656. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  657. }
  658. },
  659. async () =>
  660. {
  661. using (await gate.LockAsync().ConfigureAwait(false))
  662. {
  663. await window.OnCompletedAsync().ConfigureAwait(false);
  664. await observer.OnCompletedAsync().ConfigureAwait(false);
  665. }
  666. }
  667. );
  668. await queueLock.WaitAsync(CreateWindowCloseAsync).ConfigureAwait(false);
  669. return (sink, closeSubscription, refCount);
  670. }
  671. return CoreAsync();
  672. }
  673. }
  674. }