| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596 | // Licensed to the .NET Foundation under one or more agreements.// The .NET Foundation licenses this file to you under the Apache 2.0 License.// See the LICENSE file in the project root for more information. using System.Collections.Generic;using System.Reactive.Disposables;using System.Reactive.Linq;using System.Threading;using System.Threading.Tasks;namespace System.Reactive.Joins{    internal sealed class AsyncJoinObserver<T> : AsyncObserverBase<Notification<T>>, IAsyncJoinObserver    {        private readonly IAsyncObservable<T> _source;        private readonly Func<Exception, Task> _onError;        private readonly List<ActiveAsyncPlan> _activePlans = new List<ActiveAsyncPlan>();        private readonly SingleAssignmentAsyncDisposable _subscription = new SingleAssignmentAsyncDisposable();        private AsyncLock _gate;        private bool _isDisposed;        public AsyncJoinObserver(IAsyncObservable<T> source, Func<Exception, Task> onError)        {            _source = source;            _onError = onError;        }        public Queue<Notification<T>> Queue { get; } = new Queue<Notification<T>>();        public void Dequeue() => Queue.Dequeue();        public void AddActivePlan(ActiveAsyncPlan activePlan)        {            _activePlans.Add(activePlan);        }        internal async Task RemoveActivePlan(ActiveAsyncPlan activePlan)        {            _activePlans.Remove(activePlan);            if (_activePlans.Count == 0)            {                await DisposeAsync().ConfigureAwait(false);            }        }        public async Task DisposeAsync()        {            if (!_isDisposed)            {                await _subscription.DisposeAsync().ConfigureAwait(false);                _isDisposed = true;            }        }        public async Task SubscribeAsync(AsyncLock gate)        {            _gate = gate;            var d = await _source.Materialize().SubscribeSafeAsync(this).ConfigureAwait(false);            await _subscription.AssignAsync(d).ConfigureAwait(false);        }        protected override Task OnCompletedAsyncCore() => Task.CompletedTask;        protected override Task OnErrorAsyncCore(Exception error) => Task.CompletedTask;        protected override async Task OnNextAsyncCore(Notification<T> notification)        {            using (await _gate.LockAsync().ConfigureAwait(false))            {                if (!_isDisposed)                {                    if (notification.Kind == NotificationKind.OnError)                    {                        await _onError(notification.Exception).ConfigureAwait(false);                    }                    else                    {                        Queue.Enqueue(notification);                        var plans = _activePlans.ToArray();                        for (var i = 0; i < plans.Length; i++)                        {                            await plans[i].Match().ConfigureAwait(false); // REVIEW: Consider concurrent matching.                        }                    }                }            }        }    }}
 |