123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596 |
- // Licensed to the .NET Foundation under one or more agreements.
- // The .NET Foundation licenses this file to you under the MIT License.
- // See the LICENSE file in the project root for more information.
- using System.Collections.Generic;
- using System.Reactive.Disposables;
- using System.Reactive.Linq;
- using System.Threading;
- using System.Threading.Tasks;
- namespace System.Reactive.Joins
- {
- internal sealed class AsyncJoinObserver<T> : AsyncObserverBase<Notification<T>>, IAsyncJoinObserver
- {
- private readonly IAsyncObservable<T> _source;
- private readonly Func<Exception, Task> _onError;
- private readonly List<ActiveAsyncPlan> _activePlans = new List<ActiveAsyncPlan>();
- private readonly SingleAssignmentAsyncDisposable _subscription = new SingleAssignmentAsyncDisposable();
- private AsyncLock _gate;
- private bool _isDisposed;
- public AsyncJoinObserver(IAsyncObservable<T> source, Func<Exception, Task> onError)
- {
- _source = source;
- _onError = onError;
- }
- public Queue<Notification<T>> Queue { get; } = new Queue<Notification<T>>();
- public void Dequeue() => Queue.Dequeue();
- public void AddActivePlan(ActiveAsyncPlan activePlan)
- {
- _activePlans.Add(activePlan);
- }
- internal async Task RemoveActivePlan(ActiveAsyncPlan activePlan)
- {
- _activePlans.Remove(activePlan);
- if (_activePlans.Count == 0)
- {
- await DisposeAsync().ConfigureAwait(false);
- }
- }
- public async Task DisposeAsync()
- {
- if (!_isDisposed)
- {
- await _subscription.DisposeAsync().ConfigureAwait(false);
- _isDisposed = true;
- }
- }
- public async Task SubscribeAsync(AsyncLock gate)
- {
- _gate = gate;
- var d = await _source.Materialize().SubscribeSafeAsync(this).ConfigureAwait(false);
- await _subscription.AssignAsync(d).ConfigureAwait(false);
- }
- protected override Task OnCompletedAsyncCore() => Task.CompletedTask;
- protected override Task OnErrorAsyncCore(Exception error) => Task.CompletedTask;
- protected override async Task OnNextAsyncCore(Notification<T> notification)
- {
- using (await _gate.LockAsync().ConfigureAwait(false))
- {
- if (!_isDisposed)
- {
- if (notification.Kind == NotificationKind.OnError)
- {
- await _onError(notification.Exception).ConfigureAwait(false);
- }
- else
- {
- Queue.Enqueue(notification);
- var plans = _activePlans.ToArray();
- for (var i = 0; i < plans.Length; i++)
- {
- await plans[i].Match().ConfigureAwait(false); // REVIEW: Consider concurrent matching.
- }
- }
- }
- }
- }
- }
- }
|