// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT License.
// See the LICENSE file in the project root for more information.
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
namespace System.Linq
{
public static partial class AsyncEnumerable
{
///
/// Returns a specified number of contiguous elements from the end of an async-enumerable sequence.
///
/// The type of the elements in the source sequence.
/// Source sequence.
/// Number of elements to take from the end of the source sequence.
/// An async-enumerable sequence containing the specified number of elements from the end of the source sequence.
/// is null.
/// is less than zero.
///
/// This operator accumulates a buffer with a length enough to store elements elements. Upon completion of
/// the source sequence, this buffer is drained on the result sequence. This causes the elements to be delayed.
///
public static IAsyncEnumerable TakeLast(this IAsyncEnumerable source, int count)
{
if (source == null)
throw Error.ArgumentNull(nameof(source));
if (count <= 0)
{
return Empty();
}
#if HAS_ASYNC_ENUMERABLE_CANCELLATION
return Core(source, count);
static async IAsyncEnumerable Core(IAsyncEnumerable source, int count, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
#else
return Create(Core);
async IAsyncEnumerator Core(CancellationToken cancellationToken)
#endif
{
Queue queue;
await using (var e = source.GetConfiguredAsyncEnumerator(cancellationToken, false))
{
if (!await e.MoveNextAsync())
{
yield break;
}
queue = new Queue();
queue.Enqueue(e.Current);
while (await e.MoveNextAsync())
{
if (queue.Count < count)
{
queue.Enqueue(e.Current);
}
else
{
do
{
queue.Dequeue();
queue.Enqueue(e.Current);
}
while (await e.MoveNextAsync());
break;
}
}
}
Debug.Assert(queue.Count <= count);
do
{
yield return queue.Dequeue();
}
while (queue.Count > 0);
}
}
}
}