// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT License.
// See the LICENSE file in the project root for more information.
using System.Collections.Generic;
using System.Threading.Tasks;
namespace System.Reactive
{
///
/// Base class for classes that expose an observable sequence as a well-known event pattern (sender, event arguments).
/// Contains functionality to maintain a map of event handler delegates to observable sequence subscriptions. Subclasses
/// should only add an event with custom add and remove methods calling into the base class's operations.
///
/// The type of the sender that raises the event.
/// The type of the event data generated by the event.
///
/// TODO: System.Reactive defines an EventPatternSourceBase. I (idg10) renamed this to EventPatternSourceBaseInternal to
/// avoid a conflict. Work out whether we could in fact just use the type defined in System.Reactive. It's not identical,
/// but perhaps it offers what we need.
///
internal abstract class EventPatternSourceBaseInternal
{
private readonly IAsyncObservable> _source;
private readonly Dictionary> _subscriptions;
private readonly Action, /*object,*/ EventPattern> _invokeHandler;
///
/// Creates a new event pattern source.
///
/// Source sequence to expose as an event.
/// Delegate used to invoke the event for each element of the sequence.
/// or is null.
protected EventPatternSourceBaseInternal(IAsyncObservable> source, Action, /*object,*/ EventPattern> invokeHandler)
{
_source = source ?? throw new ArgumentNullException(nameof(source));
_invokeHandler = invokeHandler ?? throw new ArgumentNullException(nameof(invokeHandler));
_subscriptions = new Dictionary>();
}
///
/// Adds the specified event handler, causing a subscription to the underlying source.
///
/// Event handler to add. The same delegate should be passed to the Remove operation in order to remove the event handler.
/// Invocation delegate to raise the event in the derived class.
/// or is null.
protected void Add(Delegate handler, Action invoke)
{
if (handler == null)
throw new ArgumentNullException(nameof(handler));
if (invoke == null)
throw new ArgumentNullException(nameof(invoke));
var gate = new object();
var isAdded = false;
var isDone = false;
var remove = new Action(() =>
{
lock (gate)
{
if (isAdded)
Remove(handler);
else
isDone = true;
}
});
//
// [OK] Use of unsafe SubscribeAsync: non-pretentious wrapper of an observable in an event; exceptions can occur during +=.
//
var d = _source.SubscribeAsync(
x => { _invokeHandler(invoke, /*this,*/ x); return default; },
ex => { remove(); return new ValueTask(Task.FromException(ex)); },
() => { remove(); return default; }
).GetAwaiter().GetResult();
lock (gate)
{
if (!isDone)
{
Add(handler, d);
isAdded = true;
}
}
}
private void Add(Delegate handler, IAsyncDisposable disposable)
{
lock (_subscriptions)
{
var l = new Stack();
if (!_subscriptions.TryGetValue(handler, out l))
_subscriptions[handler] = l = new Stack();
l.Push(disposable);
}
}
///
/// Removes the specified event handler, causing a disposal of the corresponding subscription to the underlying source that was created during the Add operation.
///
/// Event handler to remove. This should be the same delegate as one that was passed to the Add operation.
/// is null.
protected void Remove(Delegate handler)
{
if (handler == null)
throw new ArgumentNullException(nameof(handler));
var d = default(IAsyncDisposable);
lock (_subscriptions)
{
var l = new Stack();
if (_subscriptions.TryGetValue(handler, out l))
{
d = l.Pop();
if (l.Count == 0)
_subscriptions.Remove(handler);
}
}
if (d != null)
{
d.DisposeAsync().GetAwaiter().GetResult();
}
}
}
}