// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT License.
// See the LICENSE file in the project root for more information.
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace System.Linq
{
public static partial class AsyncEnumerableEx
{
///
/// Projects each element of an async-enumerable sequence into consecutive non-overlapping buffers which are produced based on element count information.
///
/// The type of the elements in the source sequence, and in the lists in the result sequence.
/// Source sequence to produce buffers over.
/// Length of each buffer.
/// An async-enumerable sequence of buffers.
/// is null.
/// is less than or equal to zero.
public static IAsyncEnumerable> Buffer(this IAsyncEnumerable source, int count)
{
if (source == null)
throw Error.ArgumentNull(nameof(source));
if (count <= 0)
throw Error.ArgumentOutOfRange(nameof(count));
return Core(source, count);
static async IAsyncEnumerable> Core(IAsyncEnumerable source, int count, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var buffer = new List(count);
await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
{
buffer.Add(item);
if (buffer.Count == count)
{
yield return buffer;
buffer = new List(count);
}
}
if (buffer.Count > 0)
{
yield return buffer;
}
}
}
///
/// Projects each element of an async-enumerable sequence into zero or more buffers which are produced based on element count information.
///
/// The type of the elements in the source sequence, and in the lists in the result sequence.
/// Source sequence to produce buffers over.
/// Length of each buffer.
/// Number of elements to skip between creation of consecutive buffers.
/// An async-enumerable sequence of buffers.
/// is null.
/// or is less than or equal to zero.
public static IAsyncEnumerable> Buffer(this IAsyncEnumerable source, int count, int skip)
{
if (source == null)
throw Error.ArgumentNull(nameof(source));
if (count <= 0)
throw Error.ArgumentOutOfRange(nameof(count));
if (skip <= 0)
throw Error.ArgumentOutOfRange(nameof(skip));
return Core(source, count, skip);
static async IAsyncEnumerable> Core(IAsyncEnumerable source, int count, int skip, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var buffers = new Queue>();
var index = 0;
await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
{
if (index++ % skip == 0)
{
buffers.Enqueue(new List(count));
}
foreach (var buffer in buffers)
{
buffer.Add(item);
}
if (buffers.Count > 0 && buffers.Peek().Count == count)
{
yield return buffers.Dequeue();
}
}
while (buffers.Count > 0)
{
yield return buffers.Dequeue();
}
}
}
}
}