1
0

BehaviorSubject.cs 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. using System.Reactive.Disposables;
  3. namespace System.Reactive.Subjects
  4. {
  5. /// <summary>
  6. /// Represents a value that changes over time.
  7. /// Observers can subscribe to the subject to receive the last (or initial) value and all subsequent notifications.
  8. /// </summary>
  9. /// <typeparam name="T">The type of the elements processed by the subject.</typeparam>
  10. public sealed class BehaviorSubject<T> : ISubject<T>, IDisposable
  11. {
  12. private readonly object _gate = new object();
  13. private ImmutableList<IObserver<T>> _observers;
  14. private bool _isStopped;
  15. private T _value;
  16. private Exception _exception;
  17. private bool _isDisposed;
  18. /// <summary>
  19. /// Initializes a new instance of the <see cref="System.Reactive.Subjects.BehaviorSubject&lt;T&gt;"/> class which creates a subject that caches its last value and starts with the specified value.
  20. /// </summary>
  21. /// <param name="value">Initial value sent to observers when no other value has been received by the subject yet.</param>
  22. public BehaviorSubject(T value)
  23. {
  24. _value = value;
  25. _observers = new ImmutableList<IObserver<T>>();
  26. }
  27. /// <summary>
  28. /// Indicates whether the subject has observers subscribed to it.
  29. /// </summary>
  30. public bool HasObservers
  31. {
  32. get
  33. {
  34. var observers = _observers;
  35. return observers != null && observers.Data.Length > 0;
  36. }
  37. }
  38. /// <summary>
  39. /// Notifies all subscribed observers about the end of the sequence.
  40. /// </summary>
  41. public void OnCompleted()
  42. {
  43. var os = default(IObserver<T>[]);
  44. lock (_gate)
  45. {
  46. CheckDisposed();
  47. if (!_isStopped)
  48. {
  49. os = _observers.Data;
  50. _observers = new ImmutableList<IObserver<T>>();
  51. _isStopped = true;
  52. }
  53. }
  54. if (os != null)
  55. {
  56. foreach (var o in os)
  57. o.OnCompleted();
  58. }
  59. }
  60. /// <summary>
  61. /// Notifies all subscribed observers about the exception.
  62. /// </summary>
  63. /// <param name="error">The exception to send to all observers.</param>
  64. /// <exception cref="ArgumentNullException"><paramref name="error"/> is null.</exception>
  65. public void OnError(Exception error)
  66. {
  67. if (error == null)
  68. throw new ArgumentNullException("error");
  69. var os = default(IObserver<T>[]);
  70. lock (_gate)
  71. {
  72. CheckDisposed();
  73. if (!_isStopped)
  74. {
  75. os = _observers.Data;
  76. _observers = new ImmutableList<IObserver<T>>();
  77. _isStopped = true;
  78. _exception = error;
  79. }
  80. }
  81. if (os != null)
  82. foreach (var o in os)
  83. o.OnError(error);
  84. }
  85. /// <summary>
  86. /// Notifies all subscribed observers about the arrival of the specified element in the sequence.
  87. /// </summary>
  88. /// <param name="value">The value to send to all observers.</param>
  89. public void OnNext(T value)
  90. {
  91. var os = default(IObserver<T>[]);
  92. lock (_gate)
  93. {
  94. CheckDisposed();
  95. if (!_isStopped)
  96. {
  97. _value = value;
  98. os = _observers.Data;
  99. }
  100. }
  101. if (os != null)
  102. {
  103. foreach (var o in os)
  104. o.OnNext(value);
  105. }
  106. }
  107. /// <summary>
  108. /// Subscribes an observer to the subject.
  109. /// </summary>
  110. /// <param name="observer">Observer to subscribe to the subject.</param>
  111. /// <returns>Disposable object that can be used to unsubscribe the observer from the subject.</returns>
  112. /// <exception cref="ArgumentNullException"><paramref name="observer"/> is null.</exception>
  113. public IDisposable Subscribe(IObserver<T> observer)
  114. {
  115. if (observer == null)
  116. throw new ArgumentNullException("observer");
  117. var ex = default(Exception);
  118. lock (_gate)
  119. {
  120. CheckDisposed();
  121. if (!_isStopped)
  122. {
  123. _observers = _observers.Add(observer);
  124. observer.OnNext(_value);
  125. return new Subscription(this, observer);
  126. }
  127. ex = _exception;
  128. }
  129. if (ex != null)
  130. observer.OnError(ex);
  131. else
  132. observer.OnCompleted();
  133. return Disposable.Empty;
  134. }
  135. class Subscription : IDisposable
  136. {
  137. private readonly BehaviorSubject<T> _subject;
  138. private IObserver<T> _observer;
  139. public Subscription(BehaviorSubject<T> subject, IObserver<T> observer)
  140. {
  141. _subject = subject;
  142. _observer = observer;
  143. }
  144. public void Dispose()
  145. {
  146. if (_observer != null)
  147. {
  148. lock (_subject._gate)
  149. {
  150. if (!_subject._isDisposed && _observer != null)
  151. {
  152. _subject._observers = _subject._observers.Remove(_observer);
  153. _observer = null;
  154. }
  155. }
  156. }
  157. }
  158. }
  159. void CheckDisposed()
  160. {
  161. if (_isDisposed)
  162. throw new ObjectDisposedException(string.Empty);
  163. }
  164. /// <summary>
  165. /// Unsubscribe all observers and release resources.
  166. /// </summary>
  167. public void Dispose()
  168. {
  169. lock (_gate)
  170. {
  171. _isDisposed = true;
  172. _observers = null;
  173. _value = default(T);
  174. _exception = null;
  175. }
  176. }
  177. }
  178. }