EventSource.cs 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Threading.Tasks;
  6. namespace System.Reactive
  7. {
  8. internal sealed class EventSource<T> : IEventSource<T>
  9. {
  10. private readonly IAsyncObservable<T> _source;
  11. private readonly Dictionary<Delegate, Stack<IAsyncDisposable>> _subscriptions;
  12. private readonly Action<Action<T>, /*object,*/ T> _invokeHandler;
  13. public EventSource(IAsyncObservable<T> source, Action<Action<T>, /*object,*/ T> invokeHandler)
  14. {
  15. _source = source;
  16. _invokeHandler = invokeHandler;
  17. _subscriptions = new Dictionary<Delegate, Stack<IAsyncDisposable>>();
  18. }
  19. public event Action<T> OnNext
  20. {
  21. add
  22. {
  23. var gate = new object();
  24. var isAdded = false;
  25. var isDone = false;
  26. var remove = new Action(() =>
  27. {
  28. lock (gate)
  29. {
  30. if (isAdded)
  31. Remove(value);
  32. else
  33. isDone = true;
  34. }
  35. });
  36. //
  37. // [OK] Use of unsafe SubscribeAsync: non-pretentious wrapper of an observable in an event; exceptions can occur during +=.
  38. //
  39. var d = _source.SubscribeAsync(
  40. x => { _invokeHandler(value, /*this,*/ x); return default; },
  41. ex => { remove(); return new ValueTask(Task.FromException(ex)); },
  42. () => { remove(); return default; }
  43. ).GetAwaiter().GetResult();
  44. lock (gate)
  45. {
  46. if (!isDone)
  47. {
  48. Add(value, d);
  49. isAdded = true;
  50. }
  51. }
  52. }
  53. remove
  54. {
  55. Remove(value);
  56. }
  57. }
  58. private void Add(Delegate handler, IAsyncDisposable disposable)
  59. {
  60. lock (_subscriptions)
  61. {
  62. var l = new Stack<IAsyncDisposable>();
  63. if (!_subscriptions.TryGetValue(handler, out l))
  64. _subscriptions[handler] = l = new Stack<IAsyncDisposable>();
  65. l.Push(disposable);
  66. }
  67. }
  68. private void Remove(Delegate handler)
  69. {
  70. var d = default(IAsyncDisposable);
  71. lock (_subscriptions)
  72. {
  73. var l = new Stack<IAsyncDisposable>();
  74. if (_subscriptions.TryGetValue(handler, out l))
  75. {
  76. d = l.Pop();
  77. if (l.Count == 0)
  78. _subscriptions.Remove(handler);
  79. }
  80. }
  81. if (d != null)
  82. {
  83. d.DisposeAsync().GetAwaiter().GetResult();
  84. }
  85. }
  86. }
  87. }