| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495 | // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.using System.Collections.Generic;namespace System.Reactive{    class EventSource<T> : IEventSource<T>    {        private readonly IObservable<T> _source;        private readonly Dictionary<Delegate, Stack<IDisposable>> _subscriptions;        private readonly Action<Action<T>, /*object,*/ T> _invokeHandler;        public EventSource(IObservable<T> source, Action<Action<T>, /*object,*/ T> invokeHandler)        {            _source = source;            _invokeHandler = invokeHandler;            _subscriptions = new Dictionary<Delegate, Stack<IDisposable>>();        }        public event Action<T> OnNext        {            add            {                var gate = new object();                var isAdded = false;                var isDone = false;                var remove = new Action(() =>                {                    lock (gate)                    {                        if (isAdded)                            Remove(value);                        else                            isDone = true;                    }                });                //                // [OK] Use of unsafe Subscribe: non-pretentious wrapper of an observable in an event; exceptions can occur during +=.                //                var d = _source.Subscribe/*Unsafe*/(                    x => _invokeHandler(value, /*this,*/ x),                    ex => { remove(); ex.Throw(); },                    () => remove()                );                lock (gate)                {                    if (!isDone)                    {                        Add(value, d);                        isAdded = true;                    }                }            }            remove            {                Remove(value);            }        }        private void Add(Delegate handler, IDisposable disposable)        {            lock (_subscriptions)            {                var l = new Stack<IDisposable>();                if (!_subscriptions.TryGetValue(handler, out l))                    _subscriptions[handler] = l = new Stack<IDisposable>();                l.Push(disposable);            }        }        private void Remove(Delegate handler)        {            var d = default(IDisposable);            lock (_subscriptions)            {                var l = new Stack<IDisposable>();                if (_subscriptions.TryGetValue(handler, out l))                {                    d = l.Pop();                    if (l.Count == 0)                        _subscriptions.Remove(handler);                }            }            if (d != null)                d.Dispose();        }    }}
 |