Browse Source

OnErrorResumeNext

Oren Novotny 9 years ago
parent
commit
6af1192aa3
1 changed files with 72 additions and 43 deletions
  1. 72 43
      Ix.NET/Source/System.Interactive.Async/OnErrorResumeNext.cs

+ 72 - 43
Ix.NET/Source/System.Interactive.Async/OnErrorResumeNext.cs

@@ -40,59 +40,88 @@ namespace System.Linq
 
         private static IAsyncEnumerable<TSource> OnErrorResumeNext_<TSource>(IEnumerable<IAsyncEnumerable<TSource>> sources)
         {
-            return CreateEnumerable(
-                () =>
+            return new OnErrorResumeNextAsyncIterator<TSource>(sources);
+        }
+
+        private sealed class OnErrorResumeNextAsyncIterator<TSource> : AsyncIterator<TSource>
+        {
+            private readonly IEnumerable<IAsyncEnumerable<TSource>> sources;
+
+            private IAsyncEnumerator<TSource> enumerator;
+            private IEnumerator<IAsyncEnumerable<TSource>> sourcesEnumerator;
+
+            public OnErrorResumeNextAsyncIterator(IEnumerable<IAsyncEnumerable<TSource>> sources)
+            {
+                this.sources = sources;
+            }
+
+            public override AsyncIterator<TSource> Clone()
+            {
+                return new OnErrorResumeNextAsyncIterator<TSource>(sources);
+            }
+
+            public override void Dispose()
+            {
+                if (sourcesEnumerator != null)
                 {
-                    var se = sources.GetEnumerator();
-                    var e = default(IAsyncEnumerator<TSource>);
+                    sourcesEnumerator.Dispose();
+                    sourcesEnumerator = null;
+                }
 
-                    var cts = new CancellationTokenDisposable();
-                    var a = new AssignableDisposable();
-                    var d = Disposable.Create(cts, se, a);
+                if (enumerator != null)
+                {
+                    enumerator.Dispose();
+                    enumerator = null;
+                }
 
-                    var f = default(Func<CancellationToken, Task<bool>>);
-                    f = async ct =>
+                base.Dispose();
+            }
+
+            protected override async Task<bool> MoveNextCore(CancellationToken cancellationToken)
+            {
+                switch (state)
+                {
+                    case AsyncIteratorState.Allocated:
+                        sourcesEnumerator = sources.GetEnumerator();
+
+                        state = AsyncIteratorState.Iterating;
+                        goto case AsyncIteratorState.Iterating;
+
+                    case AsyncIteratorState.Iterating:
+                        if (enumerator == null)
                         {
-                            if (e == null)
+                            if (!sourcesEnumerator.MoveNext())
                             {
-                                var b = false;
-                                b = se.MoveNext();
-                                if (b)
-                                    e = se.Current.GetEnumerator();
-                                else
-                                {
-                                    return false;
-                                }
-
-                                a.Disposable = e;
+                                break; // done, nothing else to do
                             }
 
-                            try
-                            {
-                                if (await e.MoveNext(ct)
-                                           .ConfigureAwait(false))
-                                {
-                                    return true;
-                                }
-                            }
-                            catch
+                            enumerator = sourcesEnumerator.Current.GetEnumerator();
+                        }
+
+                        try
+                        {
+                            if (await enumerator.MoveNext(cancellationToken)
+                                                .ConfigureAwait(false))
                             {
-                                // ignore
+                                current = enumerator.Current;
+                                return true;
                             }
+                        }
+                        catch
+                        {
+                            // Ignore
+                        }
+
+                        // Done with the current one, go to the next
+                        enumerator.Dispose();
+                        enumerator = null;
+                        goto case AsyncIteratorState.Iterating;
+                }
+
 
-                            e.Dispose();
-                            e = null;
-                            return await f(ct)
-                                       .ConfigureAwait(false);
-                        };
-
-                    return CreateEnumerator(
-                        f,
-                        () => e.Current,
-                        d.Dispose,
-                        a
-                    );
-                });
+                Dispose();
+                return false;
+            }
         }
     }
 }