瀏覽代碼

Adding n-ary Amb overloads.

Bart De Smet 8 年之前
父節點
當前提交
25c246ded6
共有 1 個文件被更改,包括 126 次插入2 次删除
  1. 126 2
      AsyncRx.NET/System.Reactive.Async/System/Reactive/Linq/Operators/Amb.cs

+ 126 - 2
AsyncRx.NET/System.Reactive.Async/System/Reactive/Linq/Operators/Amb.cs

@@ -2,6 +2,8 @@
 // The .NET Foundation licenses this file to you under the Apache 2.0 License.
 // See the LICENSE file in the project root for more information. 
 
+using System.Collections.Generic;
+using System.Linq;
 using System.Reactive.Disposables;
 using System.Threading;
 using System.Threading.Tasks;
@@ -24,14 +26,47 @@ namespace System.Reactive.Linq
 
                 var (firstObserver, secondObserver) = AsyncObserver.Amb(observer, firstSubscription, secondSubscription);
 
-                var firstTask = first.SubscribeAsync(firstObserver).ContinueWith(d => firstSubscription.AssignAsync(d.Result));
-                var secondTask = second.SubscribeAsync(secondObserver).ContinueWith(d => secondSubscription.AssignAsync(d.Result));
+                var firstTask = first.SubscribeAsync(firstObserver).ContinueWith(d => firstSubscription.AssignAsync(d.Result)).Unwrap();
+                var secondTask = second.SubscribeAsync(secondObserver).ContinueWith(d => secondSubscription.AssignAsync(d.Result)).Unwrap();
 
                 await Task.WhenAll(firstTask, secondTask).ConfigureAwait(false);
 
                 return StableCompositeAsyncDisposable.Create(firstSubscription, secondSubscription);
             });
         }
+
+        public static IAsyncObservable<TSource> Amb<TSource>(this IEnumerable<IAsyncObservable<TSource>> sources) => Amb(sources.ToArray());
+
+        public static IAsyncObservable<TSource> Amb<TSource>(params IAsyncObservable<TSource>[] sources)
+        {
+            if (sources == null)
+                throw new ArgumentNullException(nameof(sources));
+
+            return Create<TSource>(async observer =>
+            {
+                var count = sources.Length;
+
+                var subscriptions = new SingleAssignmentAsyncDisposable[count];
+
+                for (var i = 0; i < count; i++)
+                {
+                    subscriptions[i] = new SingleAssignmentAsyncDisposable();
+                }
+
+                var observers = AsyncObserver.Amb(observer, subscriptions);
+
+                var tasks = new Task[count];
+
+                for (var i = 0; i < count; i++)
+                {
+                    tasks[i] = sources[i].SubscribeAsync(observers[i]).ContinueWith(d => subscriptions[i].AssignAsync(d.Result)).Unwrap();
+                }
+
+                await Task.WhenAll(tasks).ConfigureAwait(false);
+
+                return StableCompositeAsyncDisposable.Create(subscriptions);
+            });
+        }
     }
 
     partial class AsyncObserver
@@ -154,6 +189,95 @@ namespace System.Reactive.Linq
                 );
         }
 
+        public static IAsyncObserver<TSource>[] Amb<TSource>(IAsyncObserver<TSource> observer, IAsyncDisposable[] subscriptions)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (subscriptions == null)
+                throw new ArgumentNullException(nameof(subscriptions));
+
+            var gate = new AsyncLock();
+
+            var winner = default(int?);
+
+            var count = subscriptions.Length;
+
+            async Task ElectWinnerAsync(int index)
+            {
+                winner = index;
+
+                var dispose = new List<Task>(count - 1);
+
+                for (var i = 0; i < count; i++)
+                {
+                    if (i != index)
+                    {
+                        dispose.Add(subscriptions[i].DisposeAsync());
+                    }
+                }
+
+                await Task.WhenAll(dispose).ConfigureAwait(false);
+            }
+
+            IAsyncObserver<TSource> CreateObserver(int index) =>
+                Create<TSource>(
+                    async x =>
+                    {
+                        using (await gate.LockAsync().ConfigureAwait(false))
+                        {
+                            if (winner == null)
+                            {
+                                await ElectWinnerAsync(index).ConfigureAwait(false);
+                            }
+
+                            if (winner == index)
+                            {
+                                await observer.OnNextAsync(x).ConfigureAwait(false);
+                            }
+                        }
+                    },
+                    async ex =>
+                    {
+                        using (await gate.LockAsync().ConfigureAwait(false))
+                        {
+                            if (winner == null)
+                            {
+                                await ElectWinnerAsync(index).ConfigureAwait(false);
+                            }
+
+                            if (winner == index)
+                            {
+                                await observer.OnErrorAsync(ex).ConfigureAwait(false);
+                            }
+                        }
+                    },
+                    async () =>
+                    {
+                        using (await gate.LockAsync().ConfigureAwait(false))
+                        {
+                            if (winner == null)
+                            {
+                                await ElectWinnerAsync(index).ConfigureAwait(false);
+                            }
+
+                            if (winner == index)
+                            {
+                                await observer.OnCompletedAsync().ConfigureAwait(false);
+                            }
+                        }
+                    }
+                );
+
+            var res = new IAsyncObserver<TSource>[count];
+
+            for (var i = 0; i < count; i++)
+            {
+                res[i] = CreateObserver(i);
+            }
+
+            return res;
+        }
+
         private enum AmbState
         {
             None,