12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697 |
- // Licensed to the .NET Foundation under one or more agreements.
- // The .NET Foundation licenses this file to you under the MIT License.
- // See the LICENSE file in the project root for more information.
- using System.Collections.Generic;
- using System.Threading.Tasks;
- namespace System.Reactive
- {
- internal sealed class EventSource<T> : IEventSource<T>
- {
- private readonly IAsyncObservable<T> _source;
- private readonly Dictionary<Delegate, Stack<IDisposable>> _subscriptions;
- private readonly Action<Action<T>, /*object,*/ T> _invokeHandler;
- public EventSource(IAsyncObservable<T> source, Action<Action<T>, /*object,*/ T> invokeHandler)
- {
- _source = source;
- _invokeHandler = invokeHandler;
- _subscriptions = new Dictionary<Delegate, Stack<IDisposable>>();
- }
- public event Action<T> OnNext
- {
- add
- {
- var gate = new object();
- var isAdded = false;
- var isDone = false;
- var remove = new Action(() =>
- {
- lock (gate)
- {
- if (isAdded)
- Remove(value);
- else
- isDone = true;
- }
- });
- //
- // [OK] Use of unsafe SubscribeAsync: non-pretentious wrapper of an observable in an event; exceptions can occur during +=.
- //
- var d = _source.SubscribeAsync(
- x => { _invokeHandler(value, /*this,*/ x); return Task.CompletedTask; },
- ex => { remove(); return Task.FromException(ex); },
- () => { remove(); return Task.CompletedTask; }
- );
- lock (gate)
- {
- if (!isDone)
- {
- Add(value, d);
- isAdded = true;
- }
- }
- }
- remove
- {
- Remove(value);
- }
- }
- private void Add(Delegate handler, IDisposable disposable)
- {
- lock (_subscriptions)
- {
- var l = new Stack<IDisposable>();
- if (!_subscriptions.TryGetValue(handler, out l))
- _subscriptions[handler] = l = new Stack<IDisposable>();
- l.Push(disposable);
- }
- }
- private void Remove(Delegate handler)
- {
- var d = default(IDisposable);
- lock (_subscriptions)
- {
- var l = new Stack<IDisposable>();
- if (_subscriptions.TryGetValue(handler, out l))
- {
- d = l.Pop();
- if (l.Count == 0)
- _subscriptions.Remove(handler);
- }
- }
- d?.Dispose();
- }
- }
- }
|