Subject.cs 13 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. #if !NO_PERF
  5. using System.Reactive.Disposables;
  6. using System.Threading;
  7. namespace System.Reactive.Subjects
  8. {
  9. /// <summary>
  10. /// Represents an object that is both an observable sequence as well as an observer.
  11. /// Each notification is broadcasted to all subscribed observers.
  12. /// </summary>
  13. /// <typeparam name="T">The type of the elements processed by the subject.</typeparam>
  14. public sealed class Subject<T> : SubjectBase<T>, IDisposable
  15. {
  16. #region Fields
  17. private volatile IObserver<T> _observer;
  18. #endregion
  19. #region Constructors
  20. /// <summary>
  21. /// Creates a subject.
  22. /// </summary>
  23. public Subject()
  24. {
  25. _observer = NopObserver<T>.Instance;
  26. }
  27. #endregion
  28. #region Properties
  29. /// <summary>
  30. /// Indicates whether the subject has observers subscribed to it.
  31. /// </summary>
  32. public override bool HasObservers
  33. {
  34. get
  35. {
  36. return _observer != NopObserver<T>.Instance && !(_observer is DoneObserver<T>) && _observer != DisposedObserver<T>.Instance;
  37. }
  38. }
  39. /// <summary>
  40. /// Indicates whether the subject has been disposed.
  41. /// </summary>
  42. public override bool IsDisposed
  43. {
  44. get
  45. {
  46. return _observer is DisposedObserver<T>;
  47. }
  48. }
  49. #endregion
  50. #region Methods
  51. #region IObserver<T> implementation
  52. /// <summary>
  53. /// Notifies all subscribed observers about the end of the sequence.
  54. /// </summary>
  55. public override void OnCompleted()
  56. {
  57. var oldObserver = default(IObserver<T>);
  58. var newObserver = DoneObserver<T>.Completed;
  59. do
  60. {
  61. oldObserver = _observer;
  62. if (oldObserver == DisposedObserver<T>.Instance || oldObserver is DoneObserver<T>)
  63. break;
  64. #pragma warning disable 0420
  65. } while (Interlocked.CompareExchange(ref _observer, newObserver, oldObserver) != oldObserver);
  66. #pragma warning restore 0420
  67. oldObserver.OnCompleted();
  68. }
  69. /// <summary>
  70. /// Notifies all subscribed observers about the specified exception.
  71. /// </summary>
  72. /// <param name="error">The exception to send to all currently subscribed observers.</param>
  73. /// <exception cref="ArgumentNullException"><paramref name="error"/> is null.</exception>
  74. public override void OnError(Exception error)
  75. {
  76. if (error == null)
  77. throw new ArgumentNullException(nameof(error));
  78. var oldObserver = default(IObserver<T>);
  79. var newObserver = new DoneObserver<T> { Exception = error };
  80. do
  81. {
  82. oldObserver = _observer;
  83. if (oldObserver == DisposedObserver<T>.Instance || oldObserver is DoneObserver<T>)
  84. break;
  85. #pragma warning disable 0420
  86. } while (Interlocked.CompareExchange(ref _observer, newObserver, oldObserver) != oldObserver);
  87. #pragma warning restore 0420
  88. oldObserver.OnError(error);
  89. }
  90. /// <summary>
  91. /// Notifies all subscribed observers about the arrival of the specified element in the sequence.
  92. /// </summary>
  93. /// <param name="value">The value to send to all currently subscribed observers.</param>
  94. public override void OnNext(T value)
  95. {
  96. _observer.OnNext(value);
  97. }
  98. #endregion
  99. #region IObservable<T> implementation
  100. /// <summary>
  101. /// Subscribes an observer to the subject.
  102. /// </summary>
  103. /// <param name="observer">Observer to subscribe to the subject.</param>
  104. /// <returns>Disposable object that can be used to unsubscribe the observer from the subject.</returns>
  105. /// <exception cref="ArgumentNullException"><paramref name="observer"/> is null.</exception>
  106. public override IDisposable Subscribe(IObserver<T> observer)
  107. {
  108. if (observer == null)
  109. throw new ArgumentNullException(nameof(observer));
  110. var oldObserver = default(IObserver<T>);
  111. var newObserver = default(IObserver<T>);
  112. do
  113. {
  114. oldObserver = _observer;
  115. if (oldObserver == DisposedObserver<T>.Instance)
  116. {
  117. throw new ObjectDisposedException("");
  118. }
  119. if (oldObserver == DoneObserver<T>.Completed)
  120. {
  121. observer.OnCompleted();
  122. return Disposable.Empty;
  123. }
  124. var done = oldObserver as DoneObserver<T>;
  125. if (done != null)
  126. {
  127. observer.OnError(done.Exception);
  128. return Disposable.Empty;
  129. }
  130. if (oldObserver == NopObserver<T>.Instance)
  131. {
  132. newObserver = observer;
  133. }
  134. else
  135. {
  136. var obs = oldObserver as Observer<T>;
  137. if (obs != null)
  138. {
  139. newObserver = obs.Add(observer);
  140. }
  141. else
  142. {
  143. newObserver = new Observer<T>(new ImmutableList<IObserver<T>>(new[] { oldObserver, observer }));
  144. }
  145. }
  146. #pragma warning disable 0420
  147. } while (Interlocked.CompareExchange(ref _observer, newObserver, oldObserver) != oldObserver);
  148. #pragma warning restore 0420
  149. return new Subscription(this, observer);
  150. }
  151. class Subscription : IDisposable
  152. {
  153. private Subject<T> _subject;
  154. private IObserver<T> _observer;
  155. public Subscription(Subject<T> subject, IObserver<T> observer)
  156. {
  157. _subject = subject;
  158. _observer = observer;
  159. }
  160. public void Dispose()
  161. {
  162. var observer = Interlocked.Exchange(ref _observer, null);
  163. if (observer == null)
  164. return;
  165. _subject.Unsubscribe(observer);
  166. _subject = null;
  167. }
  168. }
  169. private void Unsubscribe(IObserver<T> observer)
  170. {
  171. var oldObserver = default(IObserver<T>);
  172. var newObserver = default(IObserver<T>);
  173. do
  174. {
  175. oldObserver = _observer;
  176. if (oldObserver == DisposedObserver<T>.Instance || oldObserver is DoneObserver<T>)
  177. return;
  178. var obs = oldObserver as Observer<T>;
  179. if (obs != null)
  180. {
  181. newObserver = obs.Remove(observer);
  182. }
  183. else
  184. {
  185. if (oldObserver != observer)
  186. return;
  187. newObserver = NopObserver<T>.Instance;
  188. }
  189. #pragma warning disable 0420
  190. } while (Interlocked.CompareExchange(ref _observer, newObserver, oldObserver) != oldObserver);
  191. #pragma warning restore 0420
  192. }
  193. #endregion
  194. #region IDisposable implementation
  195. /// <summary>
  196. /// Releases all resources used by the current instance of the <see cref="System.Reactive.Subjects.Subject&lt;T&gt;"/> class and unsubscribes all observers.
  197. /// </summary>
  198. public override void Dispose()
  199. {
  200. _observer = DisposedObserver<T>.Instance;
  201. }
  202. #endregion
  203. #endregion
  204. }
  205. }
  206. #else
  207. using System.Reactive.Disposables;
  208. using System.Threading;
  209. namespace System.Reactive.Subjects
  210. {
  211. /// <summary>
  212. /// Represents an object that is both an observable sequence as well as an observer.
  213. /// Each notification is broadcasted to all subscribed observers.
  214. /// </summary>
  215. /// <typeparam name="T">The type of the elements processed by the subject.</typeparam>
  216. public sealed class Subject<T> : ISubject<T>, IDisposable
  217. {
  218. bool isDisposed;
  219. bool isStopped;
  220. ImmutableList<IObserver<T>> observers;
  221. object gate = new object();
  222. Exception exception;
  223. /// <summary>
  224. /// Creates a subject.
  225. /// </summary>
  226. public Subject()
  227. {
  228. observers = new ImmutableList<IObserver<T>>();
  229. }
  230. /// <summary>
  231. /// Notifies all subscribed observers about the end of the sequence.
  232. /// </summary>
  233. public void OnCompleted()
  234. {
  235. var os = default(IObserver<T>[]);
  236. lock (gate)
  237. {
  238. CheckDisposed();
  239. if (!isStopped)
  240. {
  241. os = observers.Data;
  242. observers = new ImmutableList<IObserver<T>>();
  243. isStopped = true;
  244. }
  245. }
  246. if (os != null)
  247. foreach (var o in os)
  248. o.OnCompleted();
  249. }
  250. /// <summary>
  251. /// Notifies all subscribed observers with the exception.
  252. /// </summary>
  253. /// <param name="error">The exception to send to all subscribed observers.</param>
  254. /// <exception cref="ArgumentNullException"><paramref name="error"/> is null.</exception>
  255. public void OnError(Exception error)
  256. {
  257. if (error == null)
  258. throw new ArgumentNullException("error");
  259. var os = default(IObserver<T>[]);
  260. lock (gate)
  261. {
  262. CheckDisposed();
  263. if (!isStopped)
  264. {
  265. os = observers.Data;
  266. observers = new ImmutableList<IObserver<T>>();
  267. isStopped = true;
  268. exception = error;
  269. }
  270. }
  271. if (os != null)
  272. foreach (var o in os)
  273. o.OnError(error);
  274. }
  275. /// <summary>
  276. /// Notifies all subscribed observers with the value.
  277. /// </summary>
  278. /// <param name="value">The value to send to all subscribed observers.</param>
  279. public void OnNext(T value)
  280. {
  281. var os = default(IObserver<T>[]);
  282. lock (gate)
  283. {
  284. CheckDisposed();
  285. if (!isStopped)
  286. {
  287. os = observers.Data;
  288. }
  289. }
  290. if (os != null)
  291. foreach (var o in os)
  292. o.OnNext(value);
  293. }
  294. /// <summary>
  295. /// Subscribes an observer to the subject.
  296. /// </summary>
  297. /// <param name="observer">Observer to subscribe to the subject.</param>
  298. /// <remarks>IDisposable object that can be used to unsubscribe the observer from the subject.</remarks>
  299. /// <exception cref="ArgumentNullException"><paramref name="observer"/> is null.</exception>
  300. public IDisposable Subscribe(IObserver<T> observer)
  301. {
  302. if (observer == null)
  303. throw new ArgumentNullException("observer");
  304. lock (gate)
  305. {
  306. CheckDisposed();
  307. if (!isStopped)
  308. {
  309. observers = observers.Add(observer);
  310. return new Subscription(this, observer);
  311. }
  312. else if (exception != null)
  313. {
  314. observer.OnError(exception);
  315. return Disposable.Empty;
  316. }
  317. else
  318. {
  319. observer.OnCompleted();
  320. return Disposable.Empty;
  321. }
  322. }
  323. }
  324. void Unsubscribe(IObserver<T> observer)
  325. {
  326. lock (gate)
  327. {
  328. if (observers != null)
  329. observers = observers.Remove(observer);
  330. }
  331. }
  332. class Subscription : IDisposable
  333. {
  334. Subject<T> subject;
  335. IObserver<T> observer;
  336. public Subscription(Subject<T> subject, IObserver<T> observer)
  337. {
  338. this.subject = subject;
  339. this.observer = observer;
  340. }
  341. public void Dispose()
  342. {
  343. var o = Interlocked.Exchange<IObserver<T>>(ref observer, null);
  344. if (o != null)
  345. {
  346. subject.Unsubscribe(o);
  347. subject = null;
  348. }
  349. }
  350. }
  351. void CheckDisposed()
  352. {
  353. if (isDisposed)
  354. throw new ObjectDisposedException(string.Empty);
  355. }
  356. /// <summary>
  357. /// Unsubscribe all observers and release resources.
  358. /// </summary>
  359. public void Dispose()
  360. {
  361. lock (gate)
  362. {
  363. isDisposed = true;
  364. observers = null;
  365. }
  366. }
  367. }
  368. }
  369. #endif