// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT License. // See the LICENSE file in the project root for more information. using System.Collections.Generic; using System.Threading.Tasks; namespace System.Reactive { /// /// Base class for classes that expose an observable sequence as a well-known event pattern (sender, event arguments). /// Contains functionality to maintain a map of event handler delegates to observable sequence subscriptions. Subclasses /// should only add an event with custom add and remove methods calling into the base class's operations. /// /// The type of the sender that raises the event. /// The type of the event data generated by the event. internal abstract class EventPatternSourceBase { private readonly IAsyncObservable> _source; private readonly Dictionary> _subscriptions; private readonly Action, /*object,*/ EventPattern> _invokeHandler; /// /// Creates a new event pattern source. /// /// Source sequence to expose as an event. /// Delegate used to invoke the event for each element of the sequence. /// or is null. protected EventPatternSourceBase(IAsyncObservable> source, Action, /*object,*/ EventPattern> invokeHandler) { if (source == null) throw new ArgumentNullException(nameof(source)); if (invokeHandler == null) throw new ArgumentNullException(nameof(invokeHandler)); _source = source; _invokeHandler = invokeHandler; _subscriptions = new Dictionary>(); } /// /// Adds the specified event handler, causing a subscription to the underlying source. /// /// Event handler to add. The same delegate should be passed to the Remove operation in order to remove the event handler. /// Invocation delegate to raise the event in the derived class. /// or is null. protected void Add(Delegate handler, Action invoke) { if (handler == null) throw new ArgumentNullException(nameof(handler)); if (invoke == null) throw new ArgumentNullException(nameof(invoke)); var gate = new object(); var isAdded = false; var isDone = false; var remove = new Action(() => { lock (gate) { if (isAdded) Remove(handler); else isDone = true; } }); // // [OK] Use of unsafe SubscribeAsync: non-pretentious wrapper of an observable in an event; exceptions can occur during +=. // var d = _source.SubscribeAsync( x => { _invokeHandler(invoke, /*this,*/ x); return Task.CompletedTask; }, ex => { remove(); return Task.FromException(ex); }, () => { remove(); return Task.CompletedTask; } ); lock (gate) { if (!isDone) { Add(handler, d); isAdded = true; } } } private void Add(Delegate handler, IDisposable disposable) { lock (_subscriptions) { var l = new Stack(); if (!_subscriptions.TryGetValue(handler, out l)) _subscriptions[handler] = l = new Stack(); l.Push(disposable); } } /// /// Removes the specified event handler, causing a disposal of the corresponding subscription to the underlying source that was created during the Add operation. /// /// Event handler to remove. This should be the same delegate as one that was passed to the Add operation. /// is null. protected void Remove(Delegate handler) { if (handler == null) throw new ArgumentNullException(nameof(handler)); var d = default(IDisposable); lock (_subscriptions) { var l = new Stack(); if (_subscriptions.TryGetValue(handler, out l)) { d = l.Pop(); if (l.Count == 0) _subscriptions.Remove(handler); } } d?.Dispose(); } } }