| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697 | // Licensed to the .NET Foundation under one or more agreements.// The .NET Foundation licenses this file to you under the Apache 2.0 License.// See the LICENSE file in the project root for more information. using System.Collections.Generic;using System.Threading.Tasks;namespace System.Reactive{    internal sealed class EventSource<T> : IEventSource<T>    {        private readonly IAsyncObservable<T> _source;        private readonly Dictionary<Delegate, Stack<IDisposable>> _subscriptions;        private readonly Action<Action<T>, /*object,*/ T> _invokeHandler;        public EventSource(IAsyncObservable<T> source, Action<Action<T>, /*object,*/ T> invokeHandler)        {            _source = source;            _invokeHandler = invokeHandler;            _subscriptions = new Dictionary<Delegate, Stack<IDisposable>>();        }        public event Action<T> OnNext        {            add            {                var gate = new object();                var isAdded = false;                var isDone = false;                var remove = new Action(() =>                {                    lock (gate)                    {                        if (isAdded)                            Remove(value);                        else                            isDone = true;                    }                });                //                // [OK] Use of unsafe SubscribeAsync: non-pretentious wrapper of an observable in an event; exceptions can occur during +=.                //                var d = _source.SubscribeAsync(                    x => { _invokeHandler(value, /*this,*/ x); return Task.CompletedTask; },                    ex => { remove(); return Task.FromException(ex); },                    () => { remove(); return Task.CompletedTask; }                );                lock (gate)                {                    if (!isDone)                    {                        Add(value, d);                        isAdded = true;                    }                }            }            remove            {                Remove(value);            }        }        private void Add(Delegate handler, IDisposable disposable)        {            lock (_subscriptions)            {                var l = new Stack<IDisposable>();                if (!_subscriptions.TryGetValue(handler, out l))                    _subscriptions[handler] = l = new Stack<IDisposable>();                l.Push(disposable);            }        }        private void Remove(Delegate handler)        {            var d = default(IDisposable);            lock (_subscriptions)            {                var l = new Stack<IDisposable>();                if (_subscriptions.TryGetValue(handler, out l))                {                    d = l.Pop();                    if (l.Count == 0)                        _subscriptions.Remove(handler);                }            }            d?.Dispose();        }    }}
 |