1
0
Эх сурвалжийг харах

Implement concat with ienumrable params as lazy

Oren Novotny 9 жил өмнө
parent
commit
4eb73c50ca

+ 84 - 12
Ix.NET/Source/System.Interactive.Async/Concatenate.cs

@@ -44,22 +44,94 @@ namespace System.Linq
 
         private static IAsyncEnumerable<TSource> Concat_<TSource>(this IEnumerable<IAsyncEnumerable<TSource>> sources)
         {
-            using (var e = sources.GetEnumerator())
+            return new ConcatEnumerableAsyncIterator<TSource>(sources);
+        }
+
+
+        private sealed class ConcatEnumerableAsyncIterator<TSource> : AsyncIterator<TSource>
+        {
+            private readonly IEnumerable<IAsyncEnumerable<TSource>> source;
+
+            public ConcatEnumerableAsyncIterator(IEnumerable<IAsyncEnumerable<TSource>> source)
             {
-                IAsyncEnumerable<TSource> prev = null;
-                while (e.MoveNext())
+                this.source = source;
+            }
+
+            public override AsyncIterator<TSource> Clone()
+            {
+                return new ConcatEnumerableAsyncIterator<TSource>(source);
+            }
+
+            public override void Dispose()
+            {
+                if (outerEnumerator != null)
                 {
-                    if (prev == null)
-                    {
-                        prev = e.Current;
-                    }
-                    else
-                    {
-                        prev = prev.Concat(e.Current);
-                    }
+                    outerEnumerator.Dispose();
+                    outerEnumerator = null;
+                }
+
+                if (currentEnumerator != null)
+                {
+                    currentEnumerator.Dispose();
+                    currentEnumerator = null;
                 }
 
-                return prev ?? Empty<TSource>();
+                base.Dispose();
+            }
+
+            // State machine vars
+            IEnumerator<IAsyncEnumerable<TSource>> outerEnumerator;
+            IAsyncEnumerator<TSource> currentEnumerator;
+            int mode;
+
+            const int State_OuterNext = 1;
+            const int State_While = 4;
+
+            protected override async Task<bool> MoveNextCore(CancellationToken cancellationToken)
+            {
+
+                switch (state)
+                {
+                    case AsyncIteratorState.Allocated:
+                        outerEnumerator = source.GetEnumerator();
+                        mode = State_OuterNext;
+                        state = AsyncIteratorState.Iterating;
+                        goto case AsyncIteratorState.Iterating;
+
+                    case AsyncIteratorState.Iterating:
+                        switch (mode)
+                        {
+                            case State_OuterNext:
+                                if (outerEnumerator.MoveNext())
+                                {
+                                    // make sure we dispose the previous one if we're about to replace it
+                                    currentEnumerator?.Dispose();
+                                    currentEnumerator = outerEnumerator.Current.GetEnumerator();
+                                   
+                                    mode = State_While;
+                                    goto case State_While;
+                                }
+
+                                break;
+                            case State_While:
+                                if (await currentEnumerator.MoveNext(cancellationToken)
+                                                           .ConfigureAwait(false))
+                                {
+                                    current = currentEnumerator.Current;
+                                    return true;
+                                }
+
+                                // No more on the inner enumerator, move to the next outer
+                                goto case State_OuterNext;
+                   
+                        }
+
+                        Dispose();
+
+                        break;
+                }
+
+                return false;
             }
         }