| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697 | 
							- // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
 
- using System.Collections.Generic;
 
- using System.Reactive.Disposables;
 
- using System.Reactive.Linq;
 
- namespace System.Reactive.Joins
 
- {
 
-     internal interface IJoinObserver : IDisposable
 
-     {
 
-         void Subscribe(object gate);
 
-         void Dequeue();
 
-     }
 
-     internal sealed class JoinObserver<T> : ObserverBase<Notification<T>>, IJoinObserver
 
-     {
 
-         private object gate;
 
-         private readonly IObservable<T> source;
 
-         private readonly Action<Exception> onError;
 
-         private List<ActivePlan> activePlans;
 
-         public Queue<Notification<T>> Queue { get; private set; }
 
-         private readonly SingleAssignmentDisposable subscription;
 
-         private bool isDisposed;
 
-         public JoinObserver(IObservable<T> source, Action<Exception> onError)
 
-         {
 
-             this.source = source;
 
-             this.onError = onError;
 
-             Queue = new Queue<Notification<T>>();
 
-             subscription = new SingleAssignmentDisposable();
 
-             activePlans = new List<ActivePlan>();
 
-         }
 
-         public void AddActivePlan(ActivePlan activePlan)
 
-         {
 
-             activePlans.Add(activePlan);
 
-         }
 
-         public void Subscribe(object gate)
 
-         {
 
-             this.gate = gate;
 
-             subscription.Disposable = source.Materialize().SubscribeSafe(this);
 
-         }
 
-         public void Dequeue()
 
-         {
 
-             Queue.Dequeue();
 
-         }
 
-         protected override void OnNextCore(Notification<T> notification)
 
-         {
 
-             lock (gate)
 
-             {
 
-                 if (!isDisposed)
 
-                 {
 
-                     if (notification.Kind == NotificationKind.OnError)
 
-                     {
 
-                         onError(notification.Exception);
 
-                         return;
 
-                     }
 
-                     Queue.Enqueue(notification);
 
-                     foreach (var activePlan in activePlans.ToArray())
 
-                         activePlan.Match();
 
-                 }
 
-             }
 
-         }
 
-         protected override void OnErrorCore(Exception exception)
 
-         {
 
-         }
 
-         protected override void OnCompletedCore()
 
-         {
 
-         }
 
-         internal void RemoveActivePlan(ActivePlan activePlan)
 
-         {
 
-             activePlans.Remove(activePlan);
 
-             if (activePlans.Count == 0)
 
-                 Dispose();
 
-         }
 
-         protected override void Dispose(bool disposing)
 
-         {
 
-             base.Dispose(disposing);
 
-             if (!isDisposed)
 
-             {
 
-                 if (disposing)
 
-                     subscription.Dispose();
 
-                 isDisposed = true;
 
-             }
 
-         }
 
-     }
 
- }
 
 
  |