// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT License. // See the LICENSE file in the project root for more information. using System.Collections.Generic; using System.Threading.Tasks; namespace System.Reactive { internal sealed class EventSource : IEventSource { private readonly IAsyncObservable _source; private readonly Dictionary> _subscriptions; private readonly Action, /*object,*/ T> _invokeHandler; public EventSource(IAsyncObservable source, Action, /*object,*/ T> invokeHandler) { _source = source; _invokeHandler = invokeHandler; _subscriptions = new Dictionary>(); } public event Action OnNext { add { var gate = new object(); var isAdded = false; var isDone = false; var remove = new Action(() => { lock (gate) { if (isAdded) Remove(value); else isDone = true; } }); // // [OK] Use of unsafe SubscribeAsync: non-pretentious wrapper of an observable in an event; exceptions can occur during +=. // var d = _source.SubscribeAsync( x => { _invokeHandler(value, /*this,*/ x); return default; }, ex => { remove(); return new ValueTask(Task.FromException(ex)); }, () => { remove(); return default; } ).GetAwaiter().GetResult(); lock (gate) { if (!isDone) { Add(value, d); isAdded = true; } } } remove { Remove(value); } } private void Add(Delegate handler, IAsyncDisposable disposable) { lock (_subscriptions) { var l = new Stack(); if (!_subscriptions.TryGetValue(handler, out l)) _subscriptions[handler] = l = new Stack(); l.Push(disposable); } } private void Remove(Delegate handler) { var d = default(IAsyncDisposable); lock (_subscriptions) { var l = new Stack(); if (_subscriptions.TryGetValue(handler, out l)) { d = l.Pop(); if (l.Count == 0) _subscriptions.Remove(handler); } } d?.DisposeAsync().GetAwaiter().GetResult(); } } }