1
0
Oren Novotny 9 жил өмнө
parent
commit
d4caf3a0ca

+ 83 - 52
Ix.NET/Source/System.Interactive.Async/Buffer.cs

@@ -19,7 +19,7 @@ namespace System.Linq
             if (count <= 0)
                 throw new ArgumentOutOfRangeException(nameof(count));
 
-            return source.Buffer_(count, count);
+            return new BufferAsyncIterator<TSource>(source, count, count);
         }
 
         public static IAsyncEnumerable<IList<TSource>> Buffer<TSource>(this IAsyncEnumerable<TSource> source, int count, int skip)
@@ -31,72 +31,103 @@ namespace System.Linq
             if (skip <= 0)
                 throw new ArgumentOutOfRangeException(nameof(skip));
 
-            return source.Buffer_(count, skip);
+            return new BufferAsyncIterator<TSource>(source, count, skip);
         }
 
-        private static IAsyncEnumerable<IList<TSource>> Buffer_<TSource>(this IAsyncEnumerable<TSource> source, int count, int skip)
+        private sealed class BufferAsyncIterator<TSource> : AsyncIterator<IList<TSource>>
         {
-            return CreateEnumerable(
-                () =>
+            private readonly int count;
+            private readonly int skip;
+            private readonly IAsyncEnumerable<TSource> source;
+
+            private Queue<IList<TSource>> buffers;
+            private IAsyncEnumerator<TSource> enumerator;
+            private int index;
+            private bool stopped;
+
+            public BufferAsyncIterator(IAsyncEnumerable<TSource> source, int count, int skip)
+            {
+                this.source = source;
+                this.count = count;
+                this.skip = skip;
+            }
+
+            public override AsyncIterator<IList<TSource>> Clone()
+            {
+                return new BufferAsyncIterator<TSource>(source, count, skip);
+            }
+
+            public override void Dispose()
+            {
+                if (enumerator != null)
                 {
-                    var e = source.GetEnumerator();
+                    enumerator.Dispose();
+                    enumerator = null;
+                }
 
-                    var cts = new CancellationTokenDisposable();
-                    var d = Disposable.Create(cts, e);
+                buffers = null;
 
-                    var buffers = new Queue<IList<TSource>>();
+                base.Dispose();
+            }
 
-                    var i = 0;
+            protected override async Task<bool> MoveNextCore(CancellationToken cancellationToken)
+            {
+                switch (state)
+                {
+                    case AsyncIteratorState.Allocated:
+                        enumerator = source.GetEnumerator();
+                        buffers = new Queue<IList<TSource>>();
+                        index = 0;
+                        stopped = false;
 
-                    var current = default(IList<TSource>);
-                    var stopped = false;
+                        state = AsyncIteratorState.Iterating;
+                        goto case AsyncIteratorState.Iterating;
 
-                    var f = default(Func<CancellationToken, Task<bool>>);
-                    f = async ct =>
+                    case AsyncIteratorState.Iterating:
+                        if (!stopped)
                         {
-                            if (!stopped)
+                            if (await enumerator.MoveNext(cancellationToken)
+                                                .ConfigureAwait(false))
                             {
-                                if (await e.MoveNext(ct)
-                                           .ConfigureAwait(false))
+                                var item = enumerator.Current;
+                                if (index++%skip == 0)
                                 {
-                                    var item = e.Current;
-
-                                    if (i++%skip == 0)
-                                        buffers.Enqueue(new List<TSource>(count));
-
-                                    foreach (var buffer in buffers)
-                                        buffer.Add(item);
-
-                                    if (buffers.Count > 0 && buffers.Peek()
-                                                                    .Count == count)
-                                    {
-                                        current = buffers.Dequeue();
-                                        return true;
-                                    }
-                                    return await f(ct)
-                                               .ConfigureAwait(false);
+                                    buffers.Enqueue(new List<TSource>(count));
                                 }
-                                stopped = true;
-                                e.Dispose();
 
-                                return await f(ct)
-                                           .ConfigureAwait(false);
-                            }
-                            if (buffers.Count > 0)
-                            {
-                                current = buffers.Dequeue();
-                                return true;
+                                foreach (var buffer in buffers)
+                                {
+                                    buffer.Add(item);
+                                }
+
+                                if (buffers.Count > 0 && buffers.Peek()
+                                                                .Count == count)
+                                {
+                                    current = buffers.Dequeue();
+                                    return true;
+                                }
+
+                                goto case AsyncIteratorState.Iterating; // loop
                             }
-                            return false;
-                        };
-
-                    return CreateEnumerator(
-                        f,
-                        () => current,
-                        d.Dispose,
-                        e
-                    );
-                });
+                            stopped = true;
+                            enumerator.Dispose();
+                            enumerator = null;
+
+                            goto case AsyncIteratorState.Iterating; // loop
+                        }
+
+                        if (buffers.Count > 0)
+                        {
+                            current = buffers.Dequeue();
+                            return true;
+                        }
+
+                        break;
+                }
+
+                Dispose();
+                return false;
+            }
         }
     }
 }