EventSource.cs 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. #nullable disable
  5. using System.Collections.Generic;
  6. namespace System.Reactive
  7. {
  8. internal sealed class EventSource<T> : IEventSource<T>
  9. {
  10. private readonly IObservable<T> _source;
  11. private readonly Dictionary<Delegate, Stack<IDisposable>> _subscriptions;
  12. private readonly Action<Action<T>, /*object,*/ T> _invokeHandler;
  13. public EventSource(IObservable<T> source, Action<Action<T>, /*object,*/ T> invokeHandler)
  14. {
  15. _source = source;
  16. _invokeHandler = invokeHandler;
  17. _subscriptions = new Dictionary<Delegate, Stack<IDisposable>>();
  18. }
  19. public event Action<T> OnNext
  20. {
  21. add
  22. {
  23. var gate = new object();
  24. var isAdded = false;
  25. var isDone = false;
  26. var remove = new Action(() =>
  27. {
  28. lock (gate)
  29. {
  30. if (isAdded)
  31. {
  32. Remove(value);
  33. }
  34. else
  35. {
  36. isDone = true;
  37. }
  38. }
  39. });
  40. //
  41. // [OK] Use of unsafe Subscribe: non-pretentious wrapper of an observable in an event; exceptions can occur during +=.
  42. //
  43. var d = _source.Subscribe/*Unsafe*/(
  44. x => _invokeHandler(value, /*this,*/ x),
  45. ex => { remove(); ex.Throw(); },
  46. remove
  47. );
  48. lock (gate)
  49. {
  50. if (!isDone)
  51. {
  52. Add(value, d);
  53. isAdded = true;
  54. }
  55. }
  56. }
  57. remove
  58. {
  59. Remove(value);
  60. }
  61. }
  62. private void Add(Delegate handler, IDisposable disposable)
  63. {
  64. lock (_subscriptions)
  65. {
  66. if (!_subscriptions.TryGetValue(handler, out var l))
  67. {
  68. _subscriptions[handler] = l = new Stack<IDisposable>();
  69. }
  70. l.Push(disposable);
  71. }
  72. }
  73. private void Remove(Delegate handler)
  74. {
  75. IDisposable d = null;
  76. lock (_subscriptions)
  77. {
  78. var l = new Stack<IDisposable>();
  79. if (_subscriptions.TryGetValue(handler, out l))
  80. {
  81. d = l.Pop();
  82. if (l.Count == 0)
  83. {
  84. _subscriptions.Remove(handler);
  85. }
  86. }
  87. }
  88. d?.Dispose();
  89. }
  90. }
  91. }