| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291 | // Licensed to the .NET Foundation under one or more agreements.// The .NET Foundation licenses this file to you under the MIT License.// See the LICENSE file in the project root for more information. using System.Collections.Generic;using System.Linq;using System.Reactive.Disposables;using System.Threading;using System.Threading.Tasks;namespace System.Reactive.Linq{    public partial class AsyncObservable    {        public static IAsyncObservable<TSource> Amb<TSource>(this IAsyncObservable<TSource> first, IAsyncObservable<TSource> second)        {            if (first == null)                throw new ArgumentNullException(nameof(first));            if (second == null)                throw new ArgumentNullException(nameof(second));            return CreateAsyncObservable<TSource>.From(                first,                second,                static async (first, second, observer) =>                {                    var firstSubscription = new SingleAssignmentAsyncDisposable();                    var secondSubscription = new SingleAssignmentAsyncDisposable();                    var (firstObserver, secondObserver) = AsyncObserver.Amb(observer, firstSubscription, secondSubscription);                    var firstTask = first.SubscribeSafeAsync(firstObserver).AsTask().ContinueWith(d => firstSubscription.AssignAsync(d.Result).AsTask()).Unwrap();                    var secondTask = second.SubscribeSafeAsync(secondObserver).AsTask().ContinueWith(d => secondSubscription.AssignAsync(d.Result).AsTask()).Unwrap();                    await Task.WhenAll(firstTask, secondTask).ConfigureAwait(false);                    return StableCompositeAsyncDisposable.Create(firstSubscription, secondSubscription);                });        }        public static IAsyncObservable<TSource> Amb<TSource>(this IEnumerable<IAsyncObservable<TSource>> sources) => Amb(sources.ToArray());        public static IAsyncObservable<TSource> Amb<TSource>(params IAsyncObservable<TSource>[] sources)        {            if (sources == null)                throw new ArgumentNullException(nameof(sources));            return Create<TSource>(async observer =>            {                var count = sources.Length;                var subscriptions = new SingleAssignmentAsyncDisposable[count];                for (var i = 0; i < count; i++)                {                    subscriptions[i] = new SingleAssignmentAsyncDisposable();                }                var observers = AsyncObserver.Amb(observer, subscriptions);                var tasks = new Task[count];                for (var i = 0; i < count; i++)                {                    tasks[i] = sources[i].SubscribeSafeAsync(observers[i]).AsTask().ContinueWith(d => subscriptions[i].AssignAsync(d.Result).AsTask()).Unwrap();                }                await Task.WhenAll(tasks).ConfigureAwait(false);                return StableCompositeAsyncDisposable.Create(subscriptions);            });        }    }    public partial class AsyncObserver    {        public static (IAsyncObserver<TSource>, IAsyncObserver<TSource>) Amb<TSource>(IAsyncObserver<TSource> observer, IAsyncDisposable first, IAsyncDisposable second)        {            if (observer == null)                throw new ArgumentNullException(nameof(observer));            if (first == null)                throw new ArgumentNullException(nameof(first));            if (second == null)                throw new ArgumentNullException(nameof(second));            var gate = new AsyncGate();            var state = AmbState.None;            return                (                    Create<TSource>(                        async x =>                        {                            using (await gate.LockAsync().ConfigureAwait(false))                            {                                if (state == AmbState.None)                                {                                    state = AmbState.First;                                    await second.DisposeAsync().ConfigureAwait(false);                                }                                if (state == AmbState.First)                                {                                    await observer.OnNextAsync(x).ConfigureAwait(false);                                }                            }                        },                        async ex =>                        {                            using (await gate.LockAsync().ConfigureAwait(false))                            {                                if (state == AmbState.None)                                {                                    state = AmbState.First;                                    await second.DisposeAsync().ConfigureAwait(false);                                }                                if (state == AmbState.First)                                {                                    await observer.OnErrorAsync(ex).ConfigureAwait(false);                                }                            }                        },                        async () =>                        {                            using (await gate.LockAsync().ConfigureAwait(false))                            {                                if (state == AmbState.None)                                {                                    state = AmbState.First;                                    await second.DisposeAsync().ConfigureAwait(false);                                }                                if (state == AmbState.First)                                {                                    await observer.OnCompletedAsync().ConfigureAwait(false);                                }                            }                        }                    ),                    Create<TSource>(                        async x =>                        {                            using (await gate.LockAsync().ConfigureAwait(false))                            {                                if (state == AmbState.None)                                {                                    state = AmbState.Second;                                    await first.DisposeAsync().ConfigureAwait(false);                                }                                if (state == AmbState.Second)                                {                                    await observer.OnNextAsync(x).ConfigureAwait(false);                                }                            }                        },                        async ex =>                        {                            using (await gate.LockAsync().ConfigureAwait(false))                            {                                if (state == AmbState.None)                                {                                    state = AmbState.Second;                                    await first.DisposeAsync().ConfigureAwait(false);                                }                                if (state == AmbState.Second)                                {                                    await observer.OnErrorAsync(ex).ConfigureAwait(false);                                }                            }                        },                        async () =>                        {                            using (await gate.LockAsync().ConfigureAwait(false))                            {                                if (state == AmbState.None)                                {                                    state = AmbState.Second;                                    await first.DisposeAsync().ConfigureAwait(false);                                }                                if (state == AmbState.Second)                                {                                    await observer.OnCompletedAsync().ConfigureAwait(false);                                }                            }                        }                    )                );        }        public static IAsyncObserver<TSource>[] Amb<TSource>(IAsyncObserver<TSource> observer, IAsyncDisposable[] subscriptions)        {            if (observer == null)                throw new ArgumentNullException(nameof(observer));            if (subscriptions == null)                throw new ArgumentNullException(nameof(subscriptions));            var gate = new AsyncGate();            var winner = default(int?);            var count = subscriptions.Length;            async Task ElectWinnerAsync(int index)            {                winner = index;                var dispose = new List<Task>(count - 1);                for (var i = 0; i < count; i++)                {                    if (i != index)                    {                        dispose.Add(subscriptions[i].DisposeAsync().AsTask());                    }                }                await Task.WhenAll(dispose).ConfigureAwait(false);            }            IAsyncObserver<TSource> CreateObserver(int index) =>                Create<TSource>(                    async x =>                    {                        using (await gate.LockAsync().ConfigureAwait(false))                        {                            if (winner == null)                            {                                await ElectWinnerAsync(index).ConfigureAwait(false);                            }                            if (winner == index)                            {                                await observer.OnNextAsync(x).ConfigureAwait(false);                            }                        }                    },                    async ex =>                    {                        using (await gate.LockAsync().ConfigureAwait(false))                        {                            if (winner == null)                            {                                await ElectWinnerAsync(index).ConfigureAwait(false);                            }                            if (winner == index)                            {                                await observer.OnErrorAsync(ex).ConfigureAwait(false);                            }                        }                    },                    async () =>                    {                        using (await gate.LockAsync().ConfigureAwait(false))                        {                            if (winner == null)                            {                                await ElectWinnerAsync(index).ConfigureAwait(false);                            }                            if (winner == index)                            {                                await observer.OnCompletedAsync().ConfigureAwait(false);                            }                        }                    }                );            var res = new IAsyncObserver<TSource>[count];            for (var i = 0; i < count; i++)            {                res[i] = CreateObserver(i);            }            return res;        }        private enum AmbState        {            None,            First,            Second,        }    }}
 |