Bläddra i källkod

Async variant of Zip.

Bart De Smet 8 år sedan
förälder
incheckning
abcc4d0e94
1 ändrade filer med 86 tillägg och 1 borttagningar
  1. 86 1
      Ix.NET/Source/System.Interactive.Async/Zip.cs

+ 86 - 1
Ix.NET/Source/System.Interactive.Async/Zip.cs

@@ -4,7 +4,6 @@
 
 using System.Collections.Generic;
 using System.Diagnostics;
-using System.Threading;
 using System.Threading.Tasks;
 
 namespace System.Linq
@@ -23,6 +22,18 @@ namespace System.Linq
             return new ZipAsyncIterator<TFirst, TSecond, TResult>(first, second, selector);
         }
 
+        public static IAsyncEnumerable<TResult> Zip<TFirst, TSecond, TResult>(this IAsyncEnumerable<TFirst> first, IAsyncEnumerable<TSecond> second, Func<TFirst, TSecond, Task<TResult>> selector)
+        {
+            if (first == null)
+                throw new ArgumentNullException(nameof(first));
+            if (second == null)
+                throw new ArgumentNullException(nameof(second));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return new ZipAsyncIteratorWithTask<TFirst, TSecond, TResult>(first, second, selector);
+        }
+
         private sealed class ZipAsyncIterator<TFirst, TSecond, TResult> : AsyncIterator<TResult>
         {
             private readonly IAsyncEnumerable<TFirst> first;
@@ -96,5 +107,79 @@ namespace System.Linq
                 return false;
             }
         }
+
+        private sealed class ZipAsyncIteratorWithTask<TFirst, TSecond, TResult> : AsyncIterator<TResult>
+        {
+            private readonly IAsyncEnumerable<TFirst> first;
+            private readonly IAsyncEnumerable<TSecond> second;
+            private readonly Func<TFirst, TSecond, Task<TResult>> selector;
+
+            private IAsyncEnumerator<TFirst> firstEnumerator;
+            private IAsyncEnumerator<TSecond> secondEnumerator;
+
+            public ZipAsyncIteratorWithTask(IAsyncEnumerable<TFirst> first, IAsyncEnumerable<TSecond> second, Func<TFirst, TSecond, Task<TResult>> selector)
+            {
+                Debug.Assert(first != null);
+                Debug.Assert(second != null);
+                Debug.Assert(selector != null);
+
+                this.first = first;
+                this.second = second;
+                this.selector = selector;
+            }
+
+            public override AsyncIterator<TResult> Clone()
+            {
+                return new ZipAsyncIteratorWithTask<TFirst, TSecond, TResult>(first, second, selector);
+            }
+
+            public override async Task DisposeAsync()
+            {
+                if (firstEnumerator != null)
+                {
+                    await firstEnumerator.DisposeAsync().ConfigureAwait(false);
+                    firstEnumerator = null;
+                }
+                if (secondEnumerator != null)
+                {
+                    await secondEnumerator.DisposeAsync().ConfigureAwait(false);
+                    secondEnumerator = null;
+                }
+
+                await base.DisposeAsync().ConfigureAwait(false);
+            }
+
+            protected override async Task<bool> MoveNextCore()
+            {
+                switch (state)
+                {
+                    case AsyncIteratorState.Allocated:
+                        firstEnumerator = first.GetAsyncEnumerator();
+                        secondEnumerator = second.GetAsyncEnumerator();
+
+                        state = AsyncIteratorState.Iterating;
+                        goto case AsyncIteratorState.Iterating;
+
+                    case AsyncIteratorState.Iterating:
+
+                        // We kick these off and join so they can potentially run in parallel
+                        var ft = firstEnumerator.MoveNextAsync();
+                        var st = secondEnumerator.MoveNextAsync();
+                        await Task.WhenAll(ft, st)
+                                  .ConfigureAwait(false);
+
+                        if (ft.Result && st.Result)
+                        {
+                            current = await selector(firstEnumerator.Current, secondEnumerator.Current).ConfigureAwait(false);
+                            return true;
+                        }
+
+                        await DisposeAsync().ConfigureAwait(false);
+                        break;
+                }
+
+                return false;
+            }
+        }
     }
 }