// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information. using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; namespace System.Linq { public static partial class AsyncEnumerableEx { public static IAsyncEnumerable> Buffer(this IAsyncEnumerable source, int count) { if (source == null) throw Error.ArgumentNull(nameof(source)); if (count <= 0) throw Error.ArgumentOutOfRange(nameof(count)); return AsyncEnumerable.Create(Core); async IAsyncEnumerator> Core(CancellationToken cancellationToken) { var buffer = new List(count); await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false)) { buffer.Add(item); if (buffer.Count == count) { yield return buffer; buffer = new List(count); } } if (buffer.Count > 0) { yield return buffer; } } } public static IAsyncEnumerable> Buffer(this IAsyncEnumerable source, int count, int skip) { if (source == null) throw Error.ArgumentNull(nameof(source)); if (count <= 0) throw Error.ArgumentOutOfRange(nameof(count)); if (skip <= 0) throw Error.ArgumentOutOfRange(nameof(skip)); return AsyncEnumerable.Create(Core); async IAsyncEnumerator> Core(CancellationToken cancellationToken) { var buffers = new Queue>(); var index = 0; await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false)) { if (index++ % skip == 0) { buffers.Enqueue(new List(count)); } foreach (var buffer in buffers) { buffer.Add(item); } if (buffers.Count > 0 && buffers.Peek().Count == count) { yield return buffers.Dequeue(); } } while (buffers.Count > 0) { yield return buffers.Dequeue(); } } } } }