Explorar o código

Adding initial implementation of N-ary Amb overloads.

Bart De Smet %!s(int64=8) %!d(string=hai) anos
pai
achega
d3dca51ece

+ 114 - 0
Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Amb.cs

@@ -20,6 +20,22 @@ namespace System.Linq
             return new AmbAsyncIterator<TSource>(first, second);
         }
 
+        public static IAsyncEnumerable<TSource> Amb<TSource>(params IAsyncEnumerable<TSource>[] sources)
+        {
+            if (sources == null)
+                throw new ArgumentNullException(nameof(sources));
+
+            return new AmbAsyncIteratorN<TSource>(sources);
+        }
+
+        public static IAsyncEnumerable<TSource> Amb<TSource>(this IEnumerable<IAsyncEnumerable<TSource>> sources)
+        {
+            if (sources == null)
+                throw new ArgumentNullException(nameof(sources));
+
+            return new AmbAsyncIteratorN<TSource>(sources.ToArray());
+        }
+
         private sealed class AmbAsyncIterator<TSource> : AsyncIterator<TSource>
         {
             private readonly IAsyncEnumerable<TSource> first;
@@ -117,5 +133,103 @@ namespace System.Linq
                 return false;
             }
         }
+
+        private sealed class AmbAsyncIteratorN<TSource> : AsyncIterator<TSource>
+        {
+            private readonly IAsyncEnumerable<TSource>[] sources;
+
+            private IAsyncEnumerator<TSource> enumerator;
+
+            public AmbAsyncIteratorN(IAsyncEnumerable<TSource>[] sources)
+            {
+                Debug.Assert(sources != null);
+
+                this.sources = sources;
+            }
+
+            public override AsyncIterator<TSource> Clone()
+            {
+                return new AmbAsyncIteratorN<TSource>(sources);
+            }
+
+            public override async Task DisposeAsync()
+            {
+                if (enumerator != null)
+                {
+                    await enumerator.DisposeAsync().ConfigureAwait(false);
+                    enumerator = null;
+                }
+
+                await base.DisposeAsync().ConfigureAwait(false);
+            }
+
+            protected override async Task<bool> MoveNextCore()
+            {
+                switch (state)
+                {
+                    case AsyncIteratorState.Allocated:
+                        var n = sources.Length;
+
+                        var enumerators = new IAsyncEnumerator<TSource>[n];
+                        var moveNexts = new Task<bool>[n];
+
+                        for (var i = 0; i < n; i++)
+                        {
+                            var enumerator = sources[i].GetAsyncEnumerator();
+
+                            enumerators[i] = enumerator;
+                            moveNexts[i] = enumerator.MoveNextAsync();
+                        }
+
+                        var winner = await Task.WhenAny(moveNexts).ConfigureAwait(false);
+
+                        //
+                        // REVIEW: An alternative option is to call DisposeAsync on the other and await it, but this has two drawbacks:
+                        //
+                        // 1. Concurrent DisposeAsync while a MoveNextAsync is in flight.
+                        // 2. The winner elected by Amb is blocked to yield results until all losers unblocks.
+                        //
+                        // The approach below has one drawback, namely that exceptions raised by any loser are dropped on the floor.
+                        //
+
+                        var winnerIndex = Array.IndexOf(moveNexts, winner);
+
+                        enumerator = enumerators[winnerIndex];
+
+                        for (var i = 0; i < n; i++)
+                        {
+                            if (i != winnerIndex)
+                            {
+                                var ignored = moveNexts[i].ContinueWith(_ =>
+                                {
+                                    enumerators[i].DisposeAsync();
+                                });
+                            }
+                        }
+
+                        state = AsyncIteratorState.Iterating;
+
+                        if (await winner.ConfigureAwait(false))
+                        {
+                            current = enumerator.Current;
+                            return true;
+                        }
+
+                        break;
+
+                    case AsyncIteratorState.Iterating:
+                        if (await enumerator.MoveNextAsync().ConfigureAwait(false))
+                        {
+                            current = enumerator.Current;
+                            return true;
+                        }
+
+                        break;
+                }
+
+                await DisposeAsync().ConfigureAwait(false);
+                return false;
+            }
+        }
     }
 }