EventSource.cs 2.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Threading.Tasks;
  6. namespace System.Reactive
  7. {
  8. internal sealed class EventSource<T> : IEventSource<T>
  9. {
  10. private readonly IAsyncObservable<T> _source;
  11. private readonly Dictionary<Delegate, Stack<IDisposable>> _subscriptions;
  12. private readonly Func<Action<T>, /*object,*/ T, Task> _invokeHandler;
  13. public EventSource(IAsyncObservable<T> source, Func<Action<T>, /*object,*/ T, Task> invokeHandler)
  14. {
  15. _source = source;
  16. _invokeHandler = invokeHandler;
  17. _subscriptions = new Dictionary<Delegate, Stack<IDisposable>>();
  18. }
  19. public event Action<T> OnNext
  20. {
  21. add
  22. {
  23. var gate = new object();
  24. var isAdded = false;
  25. var isDone = false;
  26. var remove = new Action(() =>
  27. {
  28. lock (gate)
  29. {
  30. if (isAdded)
  31. Remove(value);
  32. else
  33. isDone = true;
  34. }
  35. });
  36. //
  37. // [OK] Use of unsafe SubscribeAsync: non-pretentious wrapper of an observable in an event; exceptions can occur during +=.
  38. //
  39. var d = _source.SubscribeAsync(
  40. x => _invokeHandler(value, /*this,*/ x),
  41. ex => { remove(); return Task.FromException(ex); },
  42. () => { remove(); return Task.CompletedTask; }
  43. );
  44. lock (gate)
  45. {
  46. if (!isDone)
  47. {
  48. Add(value, d);
  49. isAdded = true;
  50. }
  51. }
  52. }
  53. remove
  54. {
  55. Remove(value);
  56. }
  57. }
  58. private void Add(Delegate handler, IDisposable disposable)
  59. {
  60. lock (_subscriptions)
  61. {
  62. var l = new Stack<IDisposable>();
  63. if (!_subscriptions.TryGetValue(handler, out l))
  64. _subscriptions[handler] = l = new Stack<IDisposable>();
  65. l.Push(disposable);
  66. }
  67. }
  68. private void Remove(Delegate handler)
  69. {
  70. var d = default(IDisposable);
  71. lock (_subscriptions)
  72. {
  73. var l = new Stack<IDisposable>();
  74. if (_subscriptions.TryGetValue(handler, out l))
  75. {
  76. d = l.Pop();
  77. if (l.Count == 0)
  78. _subscriptions.Remove(handler);
  79. }
  80. }
  81. d?.Dispose();
  82. }
  83. }
  84. }