Oren Novotny 9 years ago
parent
commit
3e79057f33
1 changed files with 68 additions and 42 deletions
  1. 68 42
      Ix.NET/Source/System.Interactive.Async/Expand.cs

+ 68 - 42
Ix.NET/Source/System.Interactive.Async/Expand.cs

@@ -19,59 +19,85 @@ namespace System.Linq
             if (selector == null)
                 throw new ArgumentNullException(nameof(selector));
 
-            return CreateEnumerable(
-                () =>
+            return new ExpandAsyncIterator<TSource>(source, selector);
+        }
+
+        private sealed class ExpandAsyncIterator<TSource> : AsyncIterator<TSource>
+        {
+            private readonly Func<TSource, IAsyncEnumerable<TSource>> selector;
+            private readonly IAsyncEnumerable<TSource> source;
+            private IAsyncEnumerator<TSource> enumerator;
+
+            private Queue<IAsyncEnumerable<TSource>> queue;
+
+            public ExpandAsyncIterator(IAsyncEnumerable<TSource> source, Func<TSource, IAsyncEnumerable<TSource>> selector)
+            {
+                this.source = source;
+                this.selector = selector;
+            }
+
+            public override AsyncIterator<TSource> Clone()
+            {
+                return new ExpandAsyncIterator<TSource>(source, selector);
+            }
+
+            public override void Dispose()
+            {
+                if (enumerator != null)
                 {
-                    var e = default(IAsyncEnumerator<TSource>);
+                    enumerator.Dispose();
+                    enumerator = null;
+                }
 
-                    var cts = new CancellationTokenDisposable();
-                    var a = new AssignableDisposable();
-                    var d = Disposable.Create(cts, a);
+                queue = null;
 
-                    var queue = new Queue<IAsyncEnumerable<TSource>>();
-                    queue.Enqueue(source);
+                base.Dispose();
+            }
 
-                    var current = default(TSource);
+            protected override async Task<bool> MoveNextCore(CancellationToken cancellationToken)
+            {
+                switch (state)
+                {
+                    case AsyncIteratorState.Allocated:
+                        queue = new Queue<IAsyncEnumerable<TSource>>();
+                        queue.Enqueue(source);
+
+                        state = AsyncIteratorState.Iterating;
+                        goto case AsyncIteratorState.Iterating;
 
-                    var f = default(Func<CancellationToken, Task<bool>>);
-                    f = async ct =>
+                    case AsyncIteratorState.Iterating:
+                        if (enumerator == null)
                         {
-                            if (e == null)
+                            if (queue.Count > 0)
                             {
-                                if (queue.Count > 0)
-                                {
-                                    var src = queue.Dequeue();
+                                var src = queue.Dequeue();
 
-                                    e = src.GetEnumerator();
+                                enumerator?.Dispose();
+                                enumerator = src.GetEnumerator();
 
-                                    a.Disposable = e;
-                                    return await f(ct)
-                                               .ConfigureAwait(false);
-                                }
-                                return false;
+                                goto case AsyncIteratorState.Iterating; // loop
                             }
-                            if (await e.MoveNext(ct)
-                                       .ConfigureAwait(false))
-                            {
-                                var item = e.Current;
-                                var next = selector(item);
 
-                                queue.Enqueue(next);
-                                current = item;
-                                return true;
-                            }
-                            e = null;
-                            return await f(ct)
-                                       .ConfigureAwait(false);
-                        };
-
-                    return CreateEnumerator(
-                        f,
-                        () => current,
-                        d.Dispose,
-                        e
-                    );
-                });
+                            break;
+                        }
+
+                        if (await enumerator.MoveNext(cancellationToken)
+                                            .ConfigureAwait(false))
+                        {
+                            var item = enumerator.Current;
+                            var next = selector(item);
+                            queue.Enqueue(next);
+                            current = item;
+                            return true;
+                        }
+                        enumerator.Dispose();
+                        enumerator = null;
+                        goto case AsyncIteratorState.Iterating; // loop
+                }
+
+                Dispose();
+                return false;
+            }
         }
     }
 }