Subject.cs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. #if !NO_PERF
  3. using System.Reactive.Disposables;
  4. using System.Threading;
  5. namespace System.Reactive.Subjects
  6. {
  7. /// <summary>
  8. /// Represents an object that is both an observable sequence as well as an observer.
  9. /// Each notification is broadcasted to all subscribed observers.
  10. /// </summary>
  11. /// <typeparam name="T">The type of the elements processed by the subject.</typeparam>
  12. public sealed class Subject<T> : ISubject<T>, IDisposable
  13. {
  14. private volatile IObserver<T> _observer;
  15. /// <summary>
  16. /// Creates a subject.
  17. /// </summary>
  18. public Subject()
  19. {
  20. _observer = NopObserver<T>.Instance;
  21. }
  22. /// <summary>
  23. /// Indicates whether the subject has observers subscribed to it.
  24. /// </summary>
  25. public bool HasObservers
  26. {
  27. get
  28. {
  29. return _observer != NopObserver<T>.Instance && !(_observer is DoneObserver<T>) && _observer != DisposedObserver<T>.Instance;
  30. }
  31. }
  32. /// <summary>
  33. /// Notifies all subscribed observers about the end of the sequence.
  34. /// </summary>
  35. public void OnCompleted()
  36. {
  37. var oldObserver = default(IObserver<T>);
  38. var newObserver = DoneObserver<T>.Completed;
  39. do
  40. {
  41. oldObserver = _observer;
  42. if (oldObserver == DisposedObserver<T>.Instance || oldObserver is DoneObserver<T>)
  43. break;
  44. #pragma warning disable 0420
  45. } while (Interlocked.CompareExchange(ref _observer, newObserver, oldObserver) != oldObserver);
  46. #pragma warning restore 0420
  47. oldObserver.OnCompleted();
  48. }
  49. /// <summary>
  50. /// Notifies all subscribed observers about the specified exception.
  51. /// </summary>
  52. /// <param name="error">The exception to send to all currently subscribed observers.</param>
  53. /// <exception cref="ArgumentNullException"><paramref name="error"/> is null.</exception>
  54. public void OnError(Exception error)
  55. {
  56. if (error == null)
  57. throw new ArgumentNullException("error");
  58. var oldObserver = default(IObserver<T>);
  59. var newObserver = new DoneObserver<T> { Exception = error };
  60. do
  61. {
  62. oldObserver = _observer;
  63. if (oldObserver == DisposedObserver<T>.Instance || oldObserver is DoneObserver<T>)
  64. break;
  65. #pragma warning disable 0420
  66. } while (Interlocked.CompareExchange(ref _observer, newObserver, oldObserver) != oldObserver);
  67. #pragma warning restore 0420
  68. oldObserver.OnError(error);
  69. }
  70. /// <summary>
  71. /// Notifies all subscribed observers about the arrival of the specified element in the sequence.
  72. /// </summary>
  73. /// <param name="value">The value to send to all currently subscribed observers.</param>
  74. public void OnNext(T value)
  75. {
  76. _observer.OnNext(value);
  77. }
  78. /// <summary>
  79. /// Subscribes an observer to the subject.
  80. /// </summary>
  81. /// <param name="observer">Observer to subscribe to the subject.</param>
  82. /// <returns>Disposable object that can be used to unsubscribe the observer from the subject.</returns>
  83. /// <exception cref="ArgumentNullException"><paramref name="observer"/> is null.</exception>
  84. public IDisposable Subscribe(IObserver<T> observer)
  85. {
  86. if (observer == null)
  87. throw new ArgumentNullException("observer");
  88. var oldObserver = default(IObserver<T>);
  89. var newObserver = default(IObserver<T>);
  90. do
  91. {
  92. oldObserver = _observer;
  93. if (oldObserver == DisposedObserver<T>.Instance)
  94. {
  95. throw new ObjectDisposedException("");
  96. }
  97. if (oldObserver == DoneObserver<T>.Completed)
  98. {
  99. observer.OnCompleted();
  100. return Disposable.Empty;
  101. }
  102. var done = oldObserver as DoneObserver<T>;
  103. if (done != null)
  104. {
  105. observer.OnError(done.Exception);
  106. return Disposable.Empty;
  107. }
  108. if (oldObserver == NopObserver<T>.Instance)
  109. {
  110. newObserver = observer;
  111. }
  112. else
  113. {
  114. var obs = oldObserver as Observer<T>;
  115. if (obs != null)
  116. {
  117. newObserver = obs.Add(observer);
  118. }
  119. else
  120. {
  121. newObserver = new Observer<T>(new ImmutableList<IObserver<T>>(new[] { oldObserver, observer }));
  122. }
  123. }
  124. #pragma warning disable 0420
  125. } while (Interlocked.CompareExchange(ref _observer, newObserver, oldObserver) != oldObserver);
  126. #pragma warning restore 0420
  127. return new Subscription(this, observer);
  128. }
  129. class Subscription : IDisposable
  130. {
  131. private Subject<T> _subject;
  132. private IObserver<T> _observer;
  133. public Subscription(Subject<T> subject, IObserver<T> observer)
  134. {
  135. _subject = subject;
  136. _observer = observer;
  137. }
  138. public void Dispose()
  139. {
  140. var observer = Interlocked.Exchange(ref _observer, null);
  141. if (observer == null)
  142. return;
  143. _subject.Unsubscribe(observer);
  144. _subject = null;
  145. }
  146. }
  147. private void Unsubscribe(IObserver<T> observer)
  148. {
  149. var oldObserver = default(IObserver<T>);
  150. var newObserver = default(IObserver<T>);
  151. do
  152. {
  153. oldObserver = _observer;
  154. if (oldObserver == DisposedObserver<T>.Instance || oldObserver is DoneObserver<T>)
  155. return;
  156. var obs = oldObserver as Observer<T>;
  157. if (obs != null)
  158. {
  159. newObserver = obs.Remove(observer);
  160. }
  161. else
  162. {
  163. if (oldObserver != observer)
  164. return;
  165. newObserver = NopObserver<T>.Instance;
  166. }
  167. #pragma warning disable 0420
  168. } while (Interlocked.CompareExchange(ref _observer, newObserver, oldObserver) != oldObserver);
  169. #pragma warning restore 0420
  170. }
  171. /// <summary>
  172. /// Releases all resources used by the current instance of the <see cref="System.Reactive.Subjects.Subject&lt;T&gt;"/> class and unsubscribes all observers.
  173. /// </summary>
  174. public void Dispose()
  175. {
  176. _observer = DisposedObserver<T>.Instance;
  177. }
  178. }
  179. }
  180. #else
  181. using System.Reactive.Disposables;
  182. using System.Threading;
  183. namespace System.Reactive.Subjects
  184. {
  185. /// <summary>
  186. /// Represents an object that is both an observable sequence as well as an observer.
  187. /// Each notification is broadcasted to all subscribed observers.
  188. /// </summary>
  189. /// <typeparam name="T">The type of the elements processed by the subject.</typeparam>
  190. public sealed class Subject<T> : ISubject<T>, IDisposable
  191. {
  192. bool isDisposed;
  193. bool isStopped;
  194. ImmutableList<IObserver<T>> observers;
  195. object gate = new object();
  196. Exception exception;
  197. /// <summary>
  198. /// Creates a subject.
  199. /// </summary>
  200. public Subject()
  201. {
  202. observers = new ImmutableList<IObserver<T>>();
  203. }
  204. /// <summary>
  205. /// Notifies all subscribed observers about the end of the sequence.
  206. /// </summary>
  207. public void OnCompleted()
  208. {
  209. var os = default(IObserver<T>[]);
  210. lock (gate)
  211. {
  212. CheckDisposed();
  213. if (!isStopped)
  214. {
  215. os = observers.Data;
  216. observers = new ImmutableList<IObserver<T>>();
  217. isStopped = true;
  218. }
  219. }
  220. if (os != null)
  221. foreach (var o in os)
  222. o.OnCompleted();
  223. }
  224. /// <summary>
  225. /// Notifies all subscribed observers with the exception.
  226. /// </summary>
  227. /// <param name="error">The exception to send to all subscribed observers.</param>
  228. /// <exception cref="ArgumentNullException"><paramref name="error"/> is null.</exception>
  229. public void OnError(Exception error)
  230. {
  231. if (error == null)
  232. throw new ArgumentNullException("error");
  233. var os = default(IObserver<T>[]);
  234. lock (gate)
  235. {
  236. CheckDisposed();
  237. if (!isStopped)
  238. {
  239. os = observers.Data;
  240. observers = new ImmutableList<IObserver<T>>();
  241. isStopped = true;
  242. exception = error;
  243. }
  244. }
  245. if (os != null)
  246. foreach (var o in os)
  247. o.OnError(error);
  248. }
  249. /// <summary>
  250. /// Notifies all subscribed observers with the value.
  251. /// </summary>
  252. /// <param name="value">The value to send to all subscribed observers.</param>
  253. public void OnNext(T value)
  254. {
  255. var os = default(IObserver<T>[]);
  256. lock (gate)
  257. {
  258. CheckDisposed();
  259. if (!isStopped)
  260. {
  261. os = observers.Data;
  262. }
  263. }
  264. if (os != null)
  265. foreach (var o in os)
  266. o.OnNext(value);
  267. }
  268. /// <summary>
  269. /// Subscribes an observer to the subject.
  270. /// </summary>
  271. /// <param name="observer">Observer to subscribe to the subject.</param>
  272. /// <remarks>IDisposable object that can be used to unsubscribe the observer from the subject.</remarks>
  273. /// <exception cref="ArgumentNullException"><paramref name="observer"/> is null.</exception>
  274. public IDisposable Subscribe(IObserver<T> observer)
  275. {
  276. if (observer == null)
  277. throw new ArgumentNullException("observer");
  278. lock (gate)
  279. {
  280. CheckDisposed();
  281. if (!isStopped)
  282. {
  283. observers = observers.Add(observer);
  284. return new Subscription(this, observer);
  285. }
  286. else if (exception != null)
  287. {
  288. observer.OnError(exception);
  289. return Disposable.Empty;
  290. }
  291. else
  292. {
  293. observer.OnCompleted();
  294. return Disposable.Empty;
  295. }
  296. }
  297. }
  298. void Unsubscribe(IObserver<T> observer)
  299. {
  300. lock (gate)
  301. {
  302. if (observers != null)
  303. observers = observers.Remove(observer);
  304. }
  305. }
  306. class Subscription : IDisposable
  307. {
  308. Subject<T> subject;
  309. IObserver<T> observer;
  310. public Subscription(Subject<T> subject, IObserver<T> observer)
  311. {
  312. this.subject = subject;
  313. this.observer = observer;
  314. }
  315. public void Dispose()
  316. {
  317. var o = Interlocked.Exchange<IObserver<T>>(ref observer, null);
  318. if (o != null)
  319. {
  320. subject.Unsubscribe(o);
  321. subject = null;
  322. }
  323. }
  324. }
  325. void CheckDisposed()
  326. {
  327. if (isDisposed)
  328. throw new ObjectDisposedException(string.Empty);
  329. }
  330. /// <summary>
  331. /// Unsubscribe all observers and release resources.
  332. /// </summary>
  333. public void Dispose()
  334. {
  335. lock (gate)
  336. {
  337. isDisposed = true;
  338. observers = null;
  339. }
  340. }
  341. }
  342. }
  343. #endif