Browse Source

Initial implementation of Merge.

Bart De Smet 8 years ago
parent
commit
3760083711

+ 0 - 2
Ix.NET/Source/Playground/Program.cs

@@ -21,8 +21,6 @@ namespace Playground
         static async Task Experiment()
         {
             // Add test code here
-
-            await Task.Yield();
         }
 
         [Demo(11, "LINQ to Objects for IEnumerable<T>")]

+ 134 - 0
Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Merge.cs

@@ -0,0 +1,134 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information. 
+
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Threading.Tasks;
+
+namespace System.Linq
+{
+    public static partial class AsyncEnumerableEx
+    {
+        public static IAsyncEnumerable<TSource> Merge<TSource>(params IAsyncEnumerable<TSource>[] sources)
+        {
+            if (sources == null)
+                throw new ArgumentNullException(nameof(sources));
+
+            return new MergeAsyncIterator<TSource>(sources);
+        }
+
+        public static IAsyncEnumerable<TSource> Merge<TSource>(this IEnumerable<IAsyncEnumerable<TSource>> sources)
+        {
+            if (sources == null)
+                throw new ArgumentNullException(nameof(sources));
+
+            return sources.ToAsyncEnumerable().SelectMany(source => source);
+        }
+
+        public static IAsyncEnumerable<TSource> Merge<TSource>(this IAsyncEnumerable<IAsyncEnumerable<TSource>> sources)
+        {
+            if (sources == null)
+                throw new ArgumentNullException(nameof(sources));
+
+            return sources.SelectMany(source => source);
+        }
+
+        private sealed class MergeAsyncIterator<TSource> : AsyncIterator<TSource>
+        {
+            private readonly IAsyncEnumerable<TSource>[] sources;
+
+            private IAsyncEnumerator<TSource>[] enumerators;
+            private Task<bool>[] moveNexts;
+            private int active;
+
+            public MergeAsyncIterator(IAsyncEnumerable<TSource>[] sources)
+            {
+                Debug.Assert(sources != null);
+
+                this.sources = sources;
+            }
+
+            public override AsyncIterator<TSource> Clone()
+            {
+                return new MergeAsyncIterator<TSource>(sources);
+            }
+
+            public override async Task DisposeAsync()
+            {
+                if (enumerators != null)
+                {
+                    var n = enumerators.Length;
+
+                    var disposes = new Task[n];
+
+                    for (var i = 0; i < n; i++)
+                    {
+                        var dispose = enumerators[i].DisposeAsync();
+                        disposes[i] = dispose;
+                    }
+
+                    await Task.WhenAll(disposes).ConfigureAwait(false);
+                    enumerators = null;
+                }
+
+                await base.DisposeAsync().ConfigureAwait(false);
+            }
+
+            protected override async Task<bool> MoveNextCore()
+            {
+                switch (state)
+                {
+                    case AsyncIteratorState.Allocated:
+                        var n = sources.Length;
+
+                        enumerators = new IAsyncEnumerator<TSource>[n];
+                        moveNexts = new Task<bool>[n];
+                        active = n;
+
+                        for (var i = 0; i < n; i++)
+                        {
+                            var enumerator = sources[i].GetAsyncEnumerator();
+                            enumerators[i] = enumerator;
+                            moveNexts[i] = enumerator.MoveNextAsync();
+                        }
+
+                        state = AsyncIteratorState.Iterating;
+                        goto case AsyncIteratorState.Iterating;
+
+                    case AsyncIteratorState.Iterating:
+                        while (active > 0)
+                        {
+                            //
+                            // REVIEW: This approach does have a bias towards giving sources on the left
+                            //         priority over sources on the right when yielding values. We may
+                            //         want to consider a "prefer fairness" option.
+                            //
+
+                            var moveNext = await Task.WhenAny(moveNexts).ConfigureAwait(false);
+
+                            var index = Array.IndexOf(moveNexts, moveNext);
+
+                            if (!await moveNext.ConfigureAwait(false))
+                            {
+                                moveNexts[index] = TaskExt.Never;
+                                active--;
+                            }
+                            else
+                            {
+                                var enumerator = enumerators[index];
+                                current = enumerator.Current;
+                                moveNexts[index] = enumerator.MoveNextAsync();
+                                return true;
+                            }
+                        }
+
+                        break;
+                }
+
+                await DisposeAsync().ConfigureAwait(false);
+                return false;
+            }
+        }
+    }
+}

+ 1 - 0
Ix.NET/Source/System.Interactive.Async/TaskExt.cs

@@ -8,6 +8,7 @@ namespace System.Threading.Tasks
     {
         public static readonly Task<bool> True = Task.FromResult(true);
         public static readonly Task<bool> False = Task.FromResult(false);
+        public static readonly Task<bool> Never = new TaskCompletionSource<bool>().Task;
         public static readonly Task CompletedTask = True;
     }
 }