| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495 | 
							- // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
 
- using System.Collections.Generic;
 
- namespace System.Reactive
 
- {
 
-     class EventSource<T> : IEventSource<T>
 
-     {
 
-         private readonly IObservable<T> _source;
 
-         private readonly Dictionary<Delegate, Stack<IDisposable>> _subscriptions;
 
-         private readonly Action<Action<T>, /*object,*/ T> _invokeHandler;
 
-         public EventSource(IObservable<T> source, Action<Action<T>, /*object,*/ T> invokeHandler)
 
-         {
 
-             _source = source;
 
-             _invokeHandler = invokeHandler;
 
-             _subscriptions = new Dictionary<Delegate, Stack<IDisposable>>();
 
-         }
 
-         public event Action<T> OnNext
 
-         {
 
-             add
 
-             {
 
-                 var gate = new object();
 
-                 var isAdded = false;
 
-                 var isDone = false;
 
-                 var remove = new Action(() =>
 
-                 {
 
-                     lock (gate)
 
-                     {
 
-                         if (isAdded)
 
-                             Remove(value);
 
-                         else
 
-                             isDone = true;
 
-                     }
 
-                 });
 
-                 //
 
-                 // [OK] Use of unsafe Subscribe: non-pretentious wrapper of an observable in an event; exceptions can occur during +=.
 
-                 //
 
-                 var d = _source.Subscribe/*Unsafe*/(
 
-                     x => _invokeHandler(value, /*this,*/ x),
 
-                     ex => { remove(); ex.Throw(); },
 
-                     () => remove()
 
-                 );
 
-                 lock (gate)
 
-                 {
 
-                     if (!isDone)
 
-                     {
 
-                         Add(value, d);
 
-                         isAdded = true;
 
-                     }
 
-                 }
 
-             }
 
-             remove
 
-             {
 
-                 Remove(value);
 
-             }
 
-         }
 
-         private void Add(Delegate handler, IDisposable disposable)
 
-         {
 
-             lock (_subscriptions)
 
-             {
 
-                 var l = new Stack<IDisposable>();
 
-                 if (!_subscriptions.TryGetValue(handler, out l))
 
-                     _subscriptions[handler] = l = new Stack<IDisposable>();
 
-                 l.Push(disposable);
 
-             }
 
-         }
 
-         private void Remove(Delegate handler)
 
-         {
 
-             var d = default(IDisposable);
 
-             lock (_subscriptions)
 
-             {
 
-                 var l = new Stack<IDisposable>();
 
-                 if (_subscriptions.TryGetValue(handler, out l))
 
-                 {
 
-                     d = l.Pop();
 
-                     if (l.Count == 0)
 
-                         _subscriptions.Remove(handler);
 
-                 }
 
-             }
 
-             if (d != null)
 
-                 d.Dispose();
 
-         }
 
-     }
 
- }
 
 
  |