Parcourir la source

Using async iterators for Buffer.

Bart De Smet il y a 6 ans
Parent
commit
73df2b7b70

+ 62 - 0
Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Buffer.cs

@@ -18,7 +18,33 @@ namespace System.Linq
             if (count <= 0)
                 throw Error.ArgumentOutOfRange(nameof(count));
 
+#if USE_ASYNC_ITERATOR
+            return Create(Core);
+
+            async IAsyncEnumerator<IList<TSource>> Core(CancellationToken cancellationToken)
+            {
+                var buffer = new List<TSource>(count);
+
+                await foreach (TSource item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
+                {
+                    buffer.Add(item);
+
+                    if (buffer.Count == count)
+                    {
+                        yield return buffer;
+
+                        buffer = new List<TSource>(count);
+                    }
+                }
+
+                if (buffer.Count > 0)
+                {
+                    yield return buffer;
+                }
+            }
+#else
             return new BufferAsyncIterator<TSource>(source, count, count);
+#endif
         }
 
         public static IAsyncEnumerable<IList<TSource>> Buffer<TSource>(this IAsyncEnumerable<TSource> source, int count, int skip)
@@ -30,9 +56,44 @@ namespace System.Linq
             if (skip <= 0)
                 throw Error.ArgumentOutOfRange(nameof(skip));
 
+#if USE_ASYNC_ITERATOR
+            return Create(Core);
+
+            async IAsyncEnumerator<IList<TSource>> Core(CancellationToken cancellationToken)
+            {
+                var buffers = new Queue<IList<TSource>>();
+
+                int index = 0;
+
+                await foreach (TSource item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
+                {
+                    if (index++ % skip == 0)
+                    {
+                        buffers.Enqueue(new List<TSource>(count));
+                    }
+
+                    foreach (var buffer in buffers)
+                    {
+                        buffer.Add(item);
+                    }
+
+                    if (buffers.Count > 0 && buffers.Peek().Count == count)
+                    {
+                        yield return buffers.Dequeue();
+                    }
+                }
+
+                while (buffers.Count > 0)
+                {
+                    yield return buffers.Dequeue();
+                }
+            }
+#else
             return new BufferAsyncIterator<TSource>(source, count, skip);
+#endif
         }
 
+#if !USE_ASYNC_ITERATOR
         private sealed class BufferAsyncIterator<TSource> : AsyncIterator<IList<TSource>>
         {
             private readonly int _count;
@@ -136,4 +197,5 @@ namespace System.Linq
             }
         }
     }
+#endif
 }