1
0

EventSource.cs 2.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. namespace System.Reactive
  6. {
  7. class EventSource<T> : IEventSource<T>
  8. {
  9. private readonly IObservable<T> _source;
  10. private readonly Dictionary<Delegate, Stack<IDisposable>> _subscriptions;
  11. private readonly Action<Action<T>, /*object,*/ T> _invokeHandler;
  12. public EventSource(IObservable<T> source, Action<Action<T>, /*object,*/ T> invokeHandler)
  13. {
  14. _source = source;
  15. _invokeHandler = invokeHandler;
  16. _subscriptions = new Dictionary<Delegate, Stack<IDisposable>>();
  17. }
  18. public event Action<T> OnNext
  19. {
  20. add
  21. {
  22. var gate = new object();
  23. var isAdded = false;
  24. var isDone = false;
  25. var remove = new Action(() =>
  26. {
  27. lock (gate)
  28. {
  29. if (isAdded)
  30. Remove(value);
  31. else
  32. isDone = true;
  33. }
  34. });
  35. //
  36. // [OK] Use of unsafe Subscribe: non-pretentious wrapper of an observable in an event; exceptions can occur during +=.
  37. //
  38. var d = _source.Subscribe/*Unsafe*/(
  39. x => _invokeHandler(value, /*this,*/ x),
  40. ex => { remove(); ex.Throw(); },
  41. () => remove()
  42. );
  43. lock (gate)
  44. {
  45. if (!isDone)
  46. {
  47. Add(value, d);
  48. isAdded = true;
  49. }
  50. }
  51. }
  52. remove
  53. {
  54. Remove(value);
  55. }
  56. }
  57. private void Add(Delegate handler, IDisposable disposable)
  58. {
  59. lock (_subscriptions)
  60. {
  61. var l = new Stack<IDisposable>();
  62. if (!_subscriptions.TryGetValue(handler, out l))
  63. _subscriptions[handler] = l = new Stack<IDisposable>();
  64. l.Push(disposable);
  65. }
  66. }
  67. private void Remove(Delegate handler)
  68. {
  69. var d = default(IDisposable);
  70. lock (_subscriptions)
  71. {
  72. var l = new Stack<IDisposable>();
  73. if (_subscriptions.TryGetValue(handler, out l))
  74. {
  75. d = l.Pop();
  76. if (l.Count == 0)
  77. _subscriptions.Remove(handler);
  78. }
  79. }
  80. if (d != null)
  81. d.Dispose();
  82. }
  83. }
  84. }