AsyncSubject.cs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Reactive.Disposables;
  5. using System.Threading;
  6. using System.Runtime.CompilerServices;
  7. using System.Reactive.Concurrency;
  8. namespace System.Reactive.Subjects
  9. {
  10. /// <summary>
  11. /// Represents the result of an asynchronous operation.
  12. /// The last value before the OnCompleted notification, or the error received through OnError, is sent to all subscribed observers.
  13. /// </summary>
  14. /// <typeparam name="T">The type of the elements processed by the subject.</typeparam>
  15. public sealed class AsyncSubject<T> : SubjectBase<T>, IDisposable
  16. #if HAS_AWAIT
  17. , INotifyCompletion
  18. #endif
  19. {
  20. #region Fields
  21. private readonly object _gate = new object();
  22. private ImmutableList<IObserver<T>> _observers;
  23. private bool _isDisposed;
  24. private bool _isStopped;
  25. private T _value;
  26. private bool _hasValue;
  27. private Exception _exception;
  28. #endregion
  29. #region Constructors
  30. /// <summary>
  31. /// Creates a subject that can only receive one value and that value is cached for all future observations.
  32. /// </summary>
  33. public AsyncSubject()
  34. {
  35. _observers = ImmutableList<IObserver<T>>.Empty;
  36. }
  37. #endregion
  38. #region Properties
  39. /// <summary>
  40. /// Indicates whether the subject has observers subscribed to it.
  41. /// </summary>
  42. public override bool HasObservers
  43. {
  44. get
  45. {
  46. var observers = _observers;
  47. return observers != null && observers.Data.Length > 0;
  48. }
  49. }
  50. /// <summary>
  51. /// Indicates whether the subject has been disposed.
  52. /// </summary>
  53. public override bool IsDisposed
  54. {
  55. get
  56. {
  57. lock (_gate)
  58. {
  59. return _isDisposed;
  60. }
  61. }
  62. }
  63. #endregion
  64. #region Methods
  65. #region IObserver<T> implementation
  66. /// <summary>
  67. /// Notifies all subscribed observers about the end of the sequence, also causing the last received value to be sent out (if any).
  68. /// </summary>
  69. public override void OnCompleted()
  70. {
  71. var os = default(IObserver<T>[]);
  72. var v = default(T);
  73. var hv = false;
  74. lock (_gate)
  75. {
  76. CheckDisposed();
  77. if (!_isStopped)
  78. {
  79. os = _observers.Data;
  80. _observers = ImmutableList<IObserver<T>>.Empty;
  81. _isStopped = true;
  82. v = _value;
  83. hv = _hasValue;
  84. }
  85. }
  86. if (os != null)
  87. {
  88. if (hv)
  89. {
  90. foreach (var o in os)
  91. {
  92. o.OnNext(v);
  93. o.OnCompleted();
  94. }
  95. }
  96. else
  97. foreach (var o in os)
  98. o.OnCompleted();
  99. }
  100. }
  101. /// <summary>
  102. /// Notifies all subscribed observers about the exception.
  103. /// </summary>
  104. /// <param name="error">The exception to send to all observers.</param>
  105. /// <exception cref="ArgumentNullException"><paramref name="error"/> is null.</exception>
  106. public override void OnError(Exception error)
  107. {
  108. if (error == null)
  109. throw new ArgumentNullException(nameof(error));
  110. var os = default(IObserver<T>[]);
  111. lock (_gate)
  112. {
  113. CheckDisposed();
  114. if (!_isStopped)
  115. {
  116. os = _observers.Data;
  117. _observers = ImmutableList<IObserver<T>>.Empty;
  118. _isStopped = true;
  119. _exception = error;
  120. }
  121. }
  122. if (os != null)
  123. foreach (var o in os)
  124. o.OnError(error);
  125. }
  126. /// <summary>
  127. /// Sends a value to the subject. The last value received before successful termination will be sent to all subscribed and future observers.
  128. /// </summary>
  129. /// <param name="value">The value to store in the subject.</param>
  130. public override void OnNext(T value)
  131. {
  132. lock (_gate)
  133. {
  134. CheckDisposed();
  135. if (!_isStopped)
  136. {
  137. _value = value;
  138. _hasValue = true;
  139. }
  140. }
  141. }
  142. #endregion
  143. #region IObservable<T> implementation
  144. /// <summary>
  145. /// Subscribes an observer to the subject.
  146. /// </summary>
  147. /// <param name="observer">Observer to subscribe to the subject.</param>
  148. /// <returns>Disposable object that can be used to unsubscribe the observer from the subject.</returns>
  149. /// <exception cref="ArgumentNullException"><paramref name="observer"/> is null.</exception>
  150. public override IDisposable Subscribe(IObserver<T> observer)
  151. {
  152. if (observer == null)
  153. throw new ArgumentNullException(nameof(observer));
  154. var ex = default(Exception);
  155. var v = default(T);
  156. var hv = false;
  157. lock (_gate)
  158. {
  159. CheckDisposed();
  160. if (!_isStopped)
  161. {
  162. _observers = _observers.Add(observer);
  163. return new Subscription(this, observer);
  164. }
  165. ex = _exception;
  166. hv = _hasValue;
  167. v = _value;
  168. }
  169. if (ex != null)
  170. {
  171. observer.OnError(ex);
  172. }
  173. else if (hv)
  174. {
  175. observer.OnNext(v);
  176. observer.OnCompleted();
  177. }
  178. else
  179. {
  180. observer.OnCompleted();
  181. }
  182. return Disposable.Empty;
  183. }
  184. class Subscription : IDisposable
  185. {
  186. private readonly AsyncSubject<T> _subject;
  187. private IObserver<T> _observer;
  188. public Subscription(AsyncSubject<T> subject, IObserver<T> observer)
  189. {
  190. _subject = subject;
  191. _observer = observer;
  192. }
  193. public void Dispose()
  194. {
  195. if (_observer != null)
  196. {
  197. lock (_subject._gate)
  198. {
  199. if (!_subject._isDisposed && _observer != null)
  200. {
  201. _subject._observers = _subject._observers.Remove(_observer);
  202. _observer = null;
  203. }
  204. }
  205. }
  206. }
  207. }
  208. #endregion
  209. #region IDisposable implementation
  210. void CheckDisposed()
  211. {
  212. if (_isDisposed)
  213. throw new ObjectDisposedException(string.Empty);
  214. }
  215. /// <summary>
  216. /// Unsubscribe all observers and release resources.
  217. /// </summary>
  218. public override void Dispose()
  219. {
  220. lock (_gate)
  221. {
  222. _isDisposed = true;
  223. _observers = null;
  224. _exception = null;
  225. _value = default(T);
  226. }
  227. }
  228. #endregion
  229. #region Await support
  230. #if HAS_AWAIT
  231. /// <summary>
  232. /// Gets an awaitable object for the current AsyncSubject.
  233. /// </summary>
  234. /// <returns>Object that can be awaited.</returns>
  235. public AsyncSubject<T> GetAwaiter()
  236. {
  237. return this;
  238. }
  239. /// <summary>
  240. /// Specifies a callback action that will be invoked when the subject completes.
  241. /// </summary>
  242. /// <param name="continuation">Callback action that will be invoked when the subject completes.</param>
  243. /// <exception cref="ArgumentNullException"><paramref name="continuation"/> is null.</exception>
  244. public void OnCompleted(Action continuation)
  245. {
  246. if (continuation == null)
  247. throw new ArgumentNullException(nameof(continuation));
  248. OnCompleted(continuation, true);
  249. }
  250. #endif
  251. private void OnCompleted(Action continuation, bool originalContext)
  252. {
  253. //
  254. // [OK] Use of unsafe Subscribe: this type's Subscribe implementation is safe.
  255. //
  256. this.Subscribe/*Unsafe*/(new AwaitObserver(continuation, originalContext));
  257. }
  258. class AwaitObserver : IObserver<T>
  259. {
  260. #if HAS_AWAIT
  261. private readonly SynchronizationContext _context;
  262. #endif
  263. private readonly Action _callback;
  264. public AwaitObserver(Action callback, bool originalContext)
  265. {
  266. #if HAS_AWAIT
  267. if (originalContext)
  268. _context = SynchronizationContext.Current;
  269. #else
  270. System.Diagnostics.Debug.Assert(!originalContext);
  271. #endif
  272. _callback = callback;
  273. }
  274. public void OnCompleted()
  275. {
  276. InvokeOnOriginalContext();
  277. }
  278. public void OnError(Exception error)
  279. {
  280. InvokeOnOriginalContext();
  281. }
  282. public void OnNext(T value)
  283. {
  284. }
  285. private void InvokeOnOriginalContext()
  286. {
  287. #if HAS_AWAIT
  288. if (_context != null)
  289. {
  290. //
  291. // No need for OperationStarted and OperationCompleted calls here;
  292. // this code is invoked through await support and will have a way
  293. // to observe its start/complete behavior, either through returned
  294. // Task objects or the async method builder's interaction with the
  295. // SynchronizationContext object.
  296. //
  297. _context.Post(c => ((Action)c)(), _callback);
  298. }
  299. else
  300. #endif
  301. {
  302. _callback();
  303. }
  304. }
  305. }
  306. /// <summary>
  307. /// Gets whether the AsyncSubject has completed.
  308. /// </summary>
  309. public bool IsCompleted
  310. {
  311. get
  312. {
  313. return _isStopped;
  314. }
  315. }
  316. /// <summary>
  317. /// Gets the last element of the subject, potentially blocking until the subject completes successfully or exceptionally.
  318. /// </summary>
  319. /// <returns>The last element of the subject. Throws an InvalidOperationException if no element was received.</returns>
  320. /// <exception cref="InvalidOperationException">The source sequence is empty.</exception>
  321. [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1024:UsePropertiesWhereAppropriate", Justification = "Await pattern for C# and VB compilers.")]
  322. public T GetResult()
  323. {
  324. if (!_isStopped)
  325. {
  326. var e = new ManualResetEvent(false);
  327. OnCompleted(() => e.Set(), false);
  328. e.WaitOne();
  329. }
  330. _exception.ThrowIfNotNull();
  331. if (!_hasValue)
  332. throw new InvalidOperationException(Strings_Linq.NO_ELEMENTS);
  333. return _value;
  334. }
  335. #endregion
  336. #endregion
  337. }
  338. }