// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT License. // See the LICENSE file in the project root for more information. using System.Collections.Generic; using System.Threading.Tasks; namespace System.Reactive { internal sealed class EventSource : IEventSource { private readonly IAsyncObservable _source; private readonly Dictionary> _subscriptions; private readonly Action, /*object,*/ T> _invokeHandler; public EventSource(IAsyncObservable source, Action, /*object,*/ T> invokeHandler) { _source = source; _invokeHandler = invokeHandler; _subscriptions = new Dictionary>(); } public event Action OnNext { add { var gate = new object(); var isAdded = false; var isDone = false; var remove = new Action(() => { lock (gate) { if (isAdded) Remove(value); else isDone = true; } }); // // [OK] Use of unsafe SubscribeAsync: non-pretentious wrapper of an observable in an event; exceptions can occur during +=. // var d = _source.SubscribeAsync( x => { _invokeHandler(value, /*this,*/ x); return Task.CompletedTask; }, ex => { remove(); return Task.FromException(ex); }, () => { remove(); return Task.CompletedTask; } ); lock (gate) { if (!isDone) { Add(value, d); isAdded = true; } } } remove { Remove(value); } } private void Add(Delegate handler, IDisposable disposable) { lock (_subscriptions) { var l = new Stack(); if (!_subscriptions.TryGetValue(handler, out l)) _subscriptions[handler] = l = new Stack(); l.Push(disposable); } } private void Remove(Delegate handler) { var d = default(IDisposable); lock (_subscriptions) { var l = new Stack(); if (_subscriptions.TryGetValue(handler, out l)) { d = l.Pop(); if (l.Count == 0) _subscriptions.Remove(handler); } } d?.Dispose(); } } }