// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT License. // See the LICENSE file in the project root for more information. using System.Collections.Generic; using System.Threading.Tasks; namespace System.Reactive { /// /// Base class for classes that expose an observable sequence as a well-known event pattern (sender, event arguments). /// Contains functionality to maintain a map of event handler delegates to observable sequence subscriptions. Subclasses /// should only add an event with custom add and remove methods calling into the base class's operations. /// /// The type of the sender that raises the event. /// The type of the event data generated by the event. /// /// TODO: System.Reactive defines an EventPatternSourceBase. I (idg10) renamed this to EventPatternSourceBaseInternal to /// avoid a conflict. Work out whether we could in fact just use the type defined in System.Reactive. It's not identical, /// but perhaps it offers what we need. /// internal abstract class EventPatternSourceBaseInternal { private readonly IAsyncObservable> _source; private readonly Dictionary> _subscriptions; private readonly Action, /*object,*/ EventPattern> _invokeHandler; /// /// Creates a new event pattern source. /// /// Source sequence to expose as an event. /// Delegate used to invoke the event for each element of the sequence. /// or is null. protected EventPatternSourceBaseInternal(IAsyncObservable> source, Action, /*object,*/ EventPattern> invokeHandler) { _source = source ?? throw new ArgumentNullException(nameof(source)); _invokeHandler = invokeHandler ?? throw new ArgumentNullException(nameof(invokeHandler)); _subscriptions = new Dictionary>(); } /// /// Adds the specified event handler, causing a subscription to the underlying source. /// /// Event handler to add. The same delegate should be passed to the Remove operation in order to remove the event handler. /// Invocation delegate to raise the event in the derived class. /// or is null. protected void Add(Delegate handler, Action invoke) { if (handler == null) throw new ArgumentNullException(nameof(handler)); if (invoke == null) throw new ArgumentNullException(nameof(invoke)); var gate = new object(); var isAdded = false; var isDone = false; var remove = new Action(() => { lock (gate) { if (isAdded) Remove(handler); else isDone = true; } }); // // [OK] Use of unsafe SubscribeAsync: non-pretentious wrapper of an observable in an event; exceptions can occur during +=. // var d = _source.SubscribeAsync( x => { _invokeHandler(invoke, /*this,*/ x); return default; }, ex => { remove(); return new ValueTask(Task.FromException(ex)); }, () => { remove(); return default; } ).GetAwaiter().GetResult(); lock (gate) { if (!isDone) { Add(handler, d); isAdded = true; } } } private void Add(Delegate handler, IAsyncDisposable disposable) { lock (_subscriptions) { var l = new Stack(); if (!_subscriptions.TryGetValue(handler, out l)) _subscriptions[handler] = l = new Stack(); l.Push(disposable); } } /// /// Removes the specified event handler, causing a disposal of the corresponding subscription to the underlying source that was created during the Add operation. /// /// Event handler to remove. This should be the same delegate as one that was passed to the Add operation. /// is null. protected void Remove(Delegate handler) { if (handler == null) throw new ArgumentNullException(nameof(handler)); var d = default(IAsyncDisposable); lock (_subscriptions) { var l = new Stack(); if (_subscriptions.TryGetValue(handler, out l)) { d = l.Pop(); if (l.Count == 0) _subscriptions.Remove(handler); } } if (d != null) { d.DisposeAsync().GetAwaiter().GetResult(); } } } }