Browse Source

Implementing count-based Buffer.

Bart De Smet 8 years ago
parent
commit
0f8320758a

+ 59 - 2
AsyncRx.NET/System.Reactive.Async/System/Reactive/Linq/Operators/Buffer.cs

@@ -115,7 +115,7 @@ namespace System.Reactive.Linq
             if (count <= 0)
                 throw new ArgumentNullException(nameof(count));
 
-            throw new NotImplementedException();
+            return Buffer(observer, count, count);
         }
 
         public static IAsyncObserver<TSource> Buffer<TSource>(IAsyncObserver<IList<TSource>> observer, int count, int skip)
@@ -127,7 +127,64 @@ namespace System.Reactive.Linq
             if (skip <= 0)
                 throw new ArgumentNullException(nameof(skip));
 
-            throw new NotImplementedException();
+            var queue = new Queue<IList<TSource>>();
+            var n = 0;
+
+            void CreateBuffer() => queue.Enqueue(new List<TSource>());
+
+            CreateBuffer();
+
+            return Create<TSource>(
+                async x =>
+                {
+                    foreach (var buffer in queue)
+                    {
+                        buffer.Add(x);
+                    }
+
+                    var c = n - count + 1;
+
+                    if (c >= 0 && c % skip == 0)
+                    {
+                        var buffer = queue.Dequeue();
+
+                        if (buffer.Count > 0)
+                        {
+                            await observer.OnNextAsync(buffer).ConfigureAwait(false);
+                        }
+                    }
+
+                    n++;
+
+                    if (n % skip == 0)
+                    {
+                        CreateBuffer();
+                    }
+                },
+                ex =>
+                {
+                    while (queue.Count > 0)
+                    {
+                        queue.Dequeue().Clear();
+                    }
+
+                    return observer.OnErrorAsync(ex);
+                },
+                async () =>
+                {
+                    while (queue.Count > 0)
+                    {
+                        var buffer = queue.Dequeue();
+
+                        if (buffer.Count > 0)
+                        {
+                            await observer.OnNextAsync(buffer).ConfigureAwait(false);
+                        }
+                    }
+
+                    await observer.OnCompletedAsync().ConfigureAwait(false);
+                }
+            );
         }
 
         public static IAsyncObserver<TSource> Buffer<TSource>(IAsyncObserver<IList<TSource>> observer, TimeSpan timeSpan)