// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information. using System.Collections.Generic; using System.Reactive.Concurrency; namespace System.Reactive.Linq { partial class AsyncObservable { public static IAsyncObservable> Buffer(this IAsyncObservable source, int count) { if (source == null) throw new ArgumentNullException(nameof(source)); if (count <= 0) throw new ArgumentNullException(nameof(count)); return Create>(observer => source.SubscribeAsync(AsyncObserver.Buffer(observer, count))); } public static IAsyncObservable> Buffer(this IAsyncObservable source, int count, int skip) { if (source == null) throw new ArgumentNullException(nameof(source)); if (count <= 0) throw new ArgumentNullException(nameof(count)); if (skip <= 0) throw new ArgumentNullException(nameof(skip)); return Create>(observer => source.SubscribeAsync(AsyncObserver.Buffer(observer, count, skip))); } public static IAsyncObservable> Buffer(this IAsyncObservable source, TimeSpan timeSpan) { if (source == null) throw new ArgumentNullException(nameof(source)); if (timeSpan < TimeSpan.Zero) throw new ArgumentNullException(nameof(timeSpan)); return Create>(observer => source.SubscribeAsync(AsyncObserver.Buffer(observer, timeSpan))); } public static IAsyncObservable> Buffer(this IAsyncObservable source, TimeSpan timeSpan, IAsyncScheduler scheduler) { if (source == null) throw new ArgumentNullException(nameof(source)); if (timeSpan < TimeSpan.Zero) throw new ArgumentNullException(nameof(timeSpan)); if (scheduler == null) throw new ArgumentNullException(nameof(scheduler)); return Create>(observer => source.SubscribeAsync(AsyncObserver.Buffer(observer, timeSpan, scheduler))); } public static IAsyncObservable> Buffer(this IAsyncObservable source, TimeSpan timeSpan, TimeSpan timeShift) { if (source == null) throw new ArgumentNullException(nameof(source)); if (timeSpan < TimeSpan.Zero) throw new ArgumentNullException(nameof(timeSpan)); if (timeShift < TimeSpan.Zero) throw new ArgumentNullException(nameof(timeShift)); return Create>(observer => source.SubscribeAsync(AsyncObserver.Buffer(observer, timeSpan, timeShift))); } public static IAsyncObservable> Buffer(this IAsyncObservable source, TimeSpan timeSpan, TimeSpan timeShift, IAsyncScheduler scheduler) { if (source == null) throw new ArgumentNullException(nameof(source)); if (timeSpan < TimeSpan.Zero) throw new ArgumentNullException(nameof(timeSpan)); if (timeShift < TimeSpan.Zero) throw new ArgumentNullException(nameof(timeShift)); if (scheduler == null) throw new ArgumentNullException(nameof(scheduler)); return Create>(observer => source.SubscribeAsync(AsyncObserver.Buffer(observer, timeSpan, timeShift, scheduler))); } public static IAsyncObservable> Buffer(this IAsyncObservable source, TimeSpan timeSpan, int count) { if (source == null) throw new ArgumentNullException(nameof(source)); if (timeSpan < TimeSpan.Zero) throw new ArgumentNullException(nameof(timeSpan)); if (count <= 0) throw new ArgumentNullException(nameof(count)); return Create>(observer => source.SubscribeAsync(AsyncObserver.Buffer(observer, timeSpan, count))); } public static IAsyncObservable> Buffer(this IAsyncObservable source, TimeSpan timeSpan, int count, IAsyncScheduler scheduler) { if (source == null) throw new ArgumentNullException(nameof(source)); if (timeSpan < TimeSpan.Zero) throw new ArgumentNullException(nameof(timeSpan)); if (count <= 0) throw new ArgumentNullException(nameof(count)); if (scheduler == null) throw new ArgumentNullException(nameof(scheduler)); return Create>(observer => source.SubscribeAsync(AsyncObserver.Buffer(observer, timeSpan, count, scheduler))); } } partial class AsyncObserver { public static IAsyncObserver Buffer(IAsyncObserver> observer, int count) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (count <= 0) throw new ArgumentNullException(nameof(count)); return Buffer(observer, count, count); } public static IAsyncObserver Buffer(IAsyncObserver> observer, int count, int skip) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (count <= 0) throw new ArgumentNullException(nameof(count)); if (skip <= 0) throw new ArgumentNullException(nameof(skip)); var queue = new Queue>(); var n = 0; void CreateBuffer() => queue.Enqueue(new List()); CreateBuffer(); return Create( async x => { foreach (var buffer in queue) { buffer.Add(x); } var c = n - count + 1; if (c >= 0 && c % skip == 0) { var buffer = queue.Dequeue(); if (buffer.Count > 0) { await observer.OnNextAsync(buffer).ConfigureAwait(false); } } n++; if (n % skip == 0) { CreateBuffer(); } }, ex => { while (queue.Count > 0) { queue.Dequeue().Clear(); } return observer.OnErrorAsync(ex); }, async () => { while (queue.Count > 0) { var buffer = queue.Dequeue(); if (buffer.Count > 0) { await observer.OnNextAsync(buffer).ConfigureAwait(false); } } await observer.OnCompletedAsync().ConfigureAwait(false); } ); } public static IAsyncObserver Buffer(IAsyncObserver> observer, TimeSpan timeSpan) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (timeSpan < TimeSpan.Zero) throw new ArgumentNullException(nameof(timeSpan)); throw new NotImplementedException(); } public static IAsyncObserver Buffer(IAsyncObserver> observer, TimeSpan timeSpan, IAsyncScheduler scheduler) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (timeSpan < TimeSpan.Zero) throw new ArgumentNullException(nameof(timeSpan)); if (scheduler == null) throw new ArgumentNullException(nameof(scheduler)); throw new NotImplementedException(); } public static IAsyncObserver Buffer(IAsyncObserver> observer, TimeSpan timeSpan, TimeSpan timeShift) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (timeSpan < TimeSpan.Zero) throw new ArgumentNullException(nameof(timeSpan)); if (timeShift < TimeSpan.Zero) throw new ArgumentNullException(nameof(timeShift)); throw new NotImplementedException(); } public static IAsyncObserver Buffer(IAsyncObserver> observer, TimeSpan timeSpan, TimeSpan timeShift, IAsyncScheduler scheduler) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (timeSpan < TimeSpan.Zero) throw new ArgumentNullException(nameof(timeSpan)); if (timeShift < TimeSpan.Zero) throw new ArgumentNullException(nameof(timeShift)); if (scheduler == null) throw new ArgumentNullException(nameof(scheduler)); throw new NotImplementedException(); } public static IAsyncObserver Buffer(IAsyncObserver> observer, TimeSpan timeSpan, int count) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (timeSpan < TimeSpan.Zero) throw new ArgumentNullException(nameof(timeSpan)); if (count <= 0) throw new ArgumentNullException(nameof(count)); throw new NotImplementedException(); } public static IAsyncObserver Buffer(IAsyncObserver> observer, TimeSpan timeSpan, int count, IAsyncScheduler scheduler) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (timeSpan < TimeSpan.Zero) throw new ArgumentNullException(nameof(timeSpan)); if (count <= 0) throw new ArgumentNullException(nameof(count)); if (scheduler == null) throw new ArgumentNullException(nameof(scheduler)); throw new NotImplementedException(); } } }