EventSource.cs 2.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. using System.Collections.Generic;
  3. namespace System.Reactive
  4. {
  5. class EventSource<T> : IEventSource<T>
  6. {
  7. private readonly IObservable<T> _source;
  8. private readonly Dictionary<Delegate, Stack<IDisposable>> _subscriptions;
  9. private readonly Action<Action<T>, /*object,*/ T> _invokeHandler;
  10. public EventSource(IObservable<T> source, Action<Action<T>, /*object,*/ T> invokeHandler)
  11. {
  12. _source = source;
  13. _invokeHandler = invokeHandler;
  14. _subscriptions = new Dictionary<Delegate, Stack<IDisposable>>();
  15. }
  16. public event Action<T> OnNext
  17. {
  18. add
  19. {
  20. var gate = new object();
  21. var isAdded = false;
  22. var isDone = false;
  23. var remove = new Action(() =>
  24. {
  25. lock (gate)
  26. {
  27. if (isAdded)
  28. Remove(value);
  29. else
  30. isDone = true;
  31. }
  32. });
  33. //
  34. // [OK] Use of unsafe Subscribe: non-pretentious wrapper of an observable in an event; exceptions can occur during +=.
  35. //
  36. var d = _source.Subscribe/*Unsafe*/(
  37. x => _invokeHandler(value, /*this,*/ x),
  38. ex => { remove(); ex.Throw(); },
  39. () => remove()
  40. );
  41. lock (gate)
  42. {
  43. if (!isDone)
  44. {
  45. Add(value, d);
  46. isAdded = true;
  47. }
  48. }
  49. }
  50. remove
  51. {
  52. Remove(value);
  53. }
  54. }
  55. private void Add(Delegate handler, IDisposable disposable)
  56. {
  57. lock (_subscriptions)
  58. {
  59. var l = new Stack<IDisposable>();
  60. if (!_subscriptions.TryGetValue(handler, out l))
  61. _subscriptions[handler] = l = new Stack<IDisposable>();
  62. l.Push(disposable);
  63. }
  64. }
  65. private void Remove(Delegate handler)
  66. {
  67. var d = default(IDisposable);
  68. lock (_subscriptions)
  69. {
  70. var l = new Stack<IDisposable>();
  71. if (_subscriptions.TryGetValue(handler, out l))
  72. {
  73. d = l.Pop();
  74. if (l.Count == 0)
  75. _subscriptions.Remove(handler);
  76. }
  77. }
  78. if (d != null)
  79. d.Dispose();
  80. }
  81. }
  82. }