| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697 |
- // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
- using System.Collections.Generic;
- using System.Reactive.Disposables;
- using System.Reactive.Linq;
- namespace System.Reactive.Joins
- {
- internal interface IJoinObserver : IDisposable
- {
- void Subscribe(object gate);
- void Dequeue();
- }
- internal sealed class JoinObserver<T> : ObserverBase<Notification<T>>, IJoinObserver
- {
- private object gate;
- private readonly IObservable<T> source;
- private readonly Action<Exception> onError;
- private List<ActivePlan> activePlans;
- public Queue<Notification<T>> Queue { get; private set; }
- private readonly SingleAssignmentDisposable subscription;
- private bool isDisposed;
- public JoinObserver(IObservable<T> source, Action<Exception> onError)
- {
- this.source = source;
- this.onError = onError;
- Queue = new Queue<Notification<T>>();
- subscription = new SingleAssignmentDisposable();
- activePlans = new List<ActivePlan>();
- }
- public void AddActivePlan(ActivePlan activePlan)
- {
- activePlans.Add(activePlan);
- }
- public void Subscribe(object gate)
- {
- this.gate = gate;
- subscription.Disposable = source.Materialize().SubscribeSafe(this);
- }
- public void Dequeue()
- {
- Queue.Dequeue();
- }
- protected override void OnNextCore(Notification<T> notification)
- {
- lock (gate)
- {
- if (!isDisposed)
- {
- if (notification.Kind == NotificationKind.OnError)
- {
- onError(notification.Exception);
- return;
- }
- Queue.Enqueue(notification);
- foreach (var activePlan in activePlans.ToArray())
- activePlan.Match();
- }
- }
- }
- protected override void OnErrorCore(Exception exception)
- {
- }
- protected override void OnCompletedCore()
- {
- }
- internal void RemoveActivePlan(ActivePlan activePlan)
- {
- activePlans.Remove(activePlan);
- if (activePlans.Count == 0)
- Dispose();
- }
- protected override void Dispose(bool disposing)
- {
- base.Dispose(disposing);
- if (!isDisposed)
- {
- if (disposing)
- subscription.Dispose();
- isDisposed = true;
- }
- }
- }
- }
|