// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information. using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; namespace System.Linq { public static partial class AsyncEnumerableEx { /// /// Projects each element of an async-enumerable sequence into consecutive non-overlapping buffers which are produced based on element count information. /// /// The type of the elements in the source sequence, and in the lists in the result sequence. /// Source sequence to produce buffers over. /// Length of each buffer. /// An async-enumerable sequence of buffers. /// is null. /// is less than or equal to zero. public static IAsyncEnumerable> Buffer(this IAsyncEnumerable source, int count) { if (source == null) throw Error.ArgumentNull(nameof(source)); if (count <= 0) throw Error.ArgumentOutOfRange(nameof(count)); return AsyncEnumerable.Create(Core); async IAsyncEnumerator> Core(CancellationToken cancellationToken) { var buffer = new List(count); await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false)) { buffer.Add(item); if (buffer.Count == count) { yield return buffer; buffer = new List(count); } } if (buffer.Count > 0) { yield return buffer; } } } /// /// Projects each element of an async-enumerable sequence into zero or more buffers which are produced based on element count information. /// /// The type of the elements in the source sequence, and in the lists in the result sequence. /// Source sequence to produce buffers over. /// Length of each buffer. /// Number of elements to skip between creation of consecutive buffers. /// An async-enumerable sequence of buffers. /// is null. /// or is less than or equal to zero. public static IAsyncEnumerable> Buffer(this IAsyncEnumerable source, int count, int skip) { if (source == null) throw Error.ArgumentNull(nameof(source)); if (count <= 0) throw Error.ArgumentOutOfRange(nameof(count)); if (skip <= 0) throw Error.ArgumentOutOfRange(nameof(skip)); return AsyncEnumerable.Create(Core); async IAsyncEnumerator> Core(CancellationToken cancellationToken) { var buffers = new Queue>(); var index = 0; await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false)) { if (index++ % skip == 0) { buffers.Enqueue(new List(count)); } foreach (var buffer in buffers) { buffer.Add(item); } if (buffers.Count > 0 && buffers.Peek().Count == count) { yield return buffers.Dequeue(); } } while (buffers.Count > 0) { yield return buffers.Dequeue(); } } } } }