Notification.cs 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Diagnostics;
  6. using System.Globalization;
  7. using System.Reactive.Concurrency;
  8. #pragma warning disable 0659
  9. #pragma warning disable 0661
  10. namespace System.Reactive
  11. {
  12. /// <summary>
  13. /// Indicates the type of a notification.
  14. /// </summary>
  15. public enum NotificationKind
  16. {
  17. /// <summary>
  18. /// Represents an OnNext notification.
  19. /// </summary>
  20. OnNext,
  21. /// <summary>
  22. /// Represents an OnError notification.
  23. /// </summary>
  24. OnError,
  25. /// <summary>
  26. /// Represents an OnCompleted notification.
  27. /// </summary>
  28. OnCompleted
  29. }
  30. /// <summary>
  31. /// Represents a notification to an observer.
  32. /// </summary>
  33. /// <typeparam name="T">The type of the elements received by the observer.</typeparam>
  34. #if !NO_SERIALIZABLE
  35. [Serializable]
  36. #endif
  37. [Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Usage", "CA2218:OverrideGetHashCodeOnOverridingEquals", Justification = "Resembles a discriminated union with finite number of subclasses (external users shouldn't create their own subtypes), each of which does override GetHashCode itself.")]
  38. public abstract class Notification<T> : IEquatable<Notification<T>>
  39. {
  40. /// <summary>
  41. /// Default constructor used by derived types.
  42. /// </summary>
  43. protected internal Notification()
  44. {
  45. }
  46. /// <summary>
  47. /// Returns the value of an OnNext notification or throws an exception.
  48. /// </summary>
  49. public abstract T Value { get; }
  50. /// <summary>
  51. /// Returns a value that indicates whether the notification has a value.
  52. /// </summary>
  53. public abstract bool HasValue { get; }
  54. /// <summary>
  55. /// Returns the exception of an OnError notification or returns <c>null</c>.
  56. /// </summary>
  57. public abstract Exception Exception { get; }
  58. /// <summary>
  59. /// Gets the kind of notification that is represented.
  60. /// </summary>
  61. public abstract NotificationKind Kind { get; }
  62. /// <summary>
  63. /// Represents an OnNext notification to an observer.
  64. /// </summary>
  65. [DebuggerDisplay("OnNext({Value})")]
  66. #if !NO_SERIALIZABLE
  67. [Serializable]
  68. #endif
  69. internal sealed class OnNextNotification : Notification<T>
  70. {
  71. /// <summary>
  72. /// Constructs a notification of a new value.
  73. /// </summary>
  74. public OnNextNotification(T value)
  75. {
  76. Value = value;
  77. }
  78. /// <summary>
  79. /// Returns the value of an OnNext notification.
  80. /// </summary>
  81. public override T Value { get; }
  82. /// <summary>
  83. /// Returns <c>null</c>.
  84. /// </summary>
  85. public override Exception Exception => null;
  86. /// <summary>
  87. /// Returns <c>true</c>.
  88. /// </summary>
  89. public override bool HasValue => true;
  90. /// <summary>
  91. /// Returns <see cref="NotificationKind.OnNext"/>.
  92. /// </summary>
  93. public override NotificationKind Kind => NotificationKind.OnNext;
  94. /// <summary>
  95. /// Returns the hash code for this instance.
  96. /// </summary>
  97. public override int GetHashCode() => EqualityComparer<T>.Default.GetHashCode(Value);
  98. /// <summary>
  99. /// Indicates whether this instance and a specified object are equal.
  100. /// </summary>
  101. public override bool Equals(Notification<T> other)
  102. {
  103. if (ReferenceEquals(this, other))
  104. {
  105. return true;
  106. }
  107. if (other is null)
  108. {
  109. return false;
  110. }
  111. if (other.Kind != NotificationKind.OnNext)
  112. {
  113. return false;
  114. }
  115. return EqualityComparer<T>.Default.Equals(Value, other.Value);
  116. }
  117. /// <summary>
  118. /// Returns a string representation of this instance.
  119. /// </summary>
  120. public override string ToString() => string.Format(CultureInfo.CurrentCulture, "OnNext({0})", Value);
  121. /// <summary>
  122. /// Invokes the observer's method corresponding to the notification.
  123. /// </summary>
  124. /// <param name="observer">Observer to invoke the notification on.</param>
  125. public override void Accept(IObserver<T> observer)
  126. {
  127. if (observer == null)
  128. {
  129. throw new ArgumentNullException(nameof(observer));
  130. }
  131. observer.OnNext(Value);
  132. }
  133. /// <summary>
  134. /// Invokes the observer's method corresponding to the notification and returns the produced result.
  135. /// </summary>
  136. /// <param name="observer">Observer to invoke the notification on.</param>
  137. /// <returns>Result produced by the observation.</returns>
  138. public override TResult Accept<TResult>(IObserver<T, TResult> observer)
  139. {
  140. if (observer == null)
  141. {
  142. throw new ArgumentNullException(nameof(observer));
  143. }
  144. return observer.OnNext(Value);
  145. }
  146. /// <summary>
  147. /// Invokes the delegate corresponding to the notification.
  148. /// </summary>
  149. /// <param name="onNext">Delegate to invoke for an OnNext notification.</param>
  150. /// <param name="onError">Delegate to invoke for an OnError notification.</param>
  151. /// <param name="onCompleted">Delegate to invoke for an OnCompleted notification.</param>
  152. public override void Accept(Action<T> onNext, Action<Exception> onError, Action onCompleted)
  153. {
  154. if (onNext == null)
  155. {
  156. throw new ArgumentNullException(nameof(onNext));
  157. }
  158. if (onError == null)
  159. {
  160. throw new ArgumentNullException(nameof(onError));
  161. }
  162. if (onCompleted == null)
  163. {
  164. throw new ArgumentNullException(nameof(onCompleted));
  165. }
  166. onNext(Value);
  167. }
  168. /// <summary>
  169. /// Invokes the delegate corresponding to the notification and returns the produced result.
  170. /// </summary>
  171. /// <param name="onNext">Delegate to invoke for an OnNext notification.</param>
  172. /// <param name="onError">Delegate to invoke for an OnError notification.</param>
  173. /// <param name="onCompleted">Delegate to invoke for an OnCompleted notification.</param>
  174. /// <returns>Result produced by the observation.</returns>
  175. public override TResult Accept<TResult>(Func<T, TResult> onNext, Func<Exception, TResult> onError, Func<TResult> onCompleted)
  176. {
  177. if (onNext == null)
  178. {
  179. throw new ArgumentNullException(nameof(onNext));
  180. }
  181. if (onError == null)
  182. {
  183. throw new ArgumentNullException(nameof(onError));
  184. }
  185. if (onCompleted == null)
  186. {
  187. throw new ArgumentNullException(nameof(onCompleted));
  188. }
  189. return onNext(Value);
  190. }
  191. }
  192. /// <summary>
  193. /// Represents an OnError notification to an observer.
  194. /// </summary>
  195. [DebuggerDisplay("OnError({Exception})")]
  196. #if !NO_SERIALIZABLE
  197. [Serializable]
  198. #endif
  199. internal sealed class OnErrorNotification : Notification<T>
  200. {
  201. /// <summary>
  202. /// Constructs a notification of an exception.
  203. /// </summary>
  204. public OnErrorNotification(Exception exception)
  205. {
  206. Exception = exception;
  207. }
  208. /// <summary>
  209. /// Throws the exception.
  210. /// </summary>
  211. public override T Value { get { Exception.Throw(); return default; } }
  212. /// <summary>
  213. /// Returns the exception.
  214. /// </summary>
  215. public override Exception Exception { get; }
  216. /// <summary>
  217. /// Returns <c>false</c>.
  218. /// </summary>
  219. public override bool HasValue => false;
  220. /// <summary>
  221. /// Returns <see cref="NotificationKind.OnError"/>.
  222. /// </summary>
  223. public override NotificationKind Kind => NotificationKind.OnError;
  224. /// <summary>
  225. /// Returns the hash code for this instance.
  226. /// </summary>
  227. public override int GetHashCode() => Exception.GetHashCode();
  228. /// <summary>
  229. /// Indicates whether this instance and other are equal.
  230. /// </summary>
  231. public override bool Equals(Notification<T> other)
  232. {
  233. if (ReferenceEquals(this, other))
  234. {
  235. return true;
  236. }
  237. if (other is null)
  238. {
  239. return false;
  240. }
  241. if (other.Kind != NotificationKind.OnError)
  242. {
  243. return false;
  244. }
  245. return Equals(Exception, other.Exception);
  246. }
  247. /// <summary>
  248. /// Returns a string representation of this instance.
  249. /// </summary>
  250. public override string ToString() => string.Format(CultureInfo.CurrentCulture, "OnError({0})", Exception.GetType().FullName);
  251. /// <summary>
  252. /// Invokes the observer's method corresponding to the notification.
  253. /// </summary>
  254. /// <param name="observer">Observer to invoke the notification on.</param>
  255. public override void Accept(IObserver<T> observer)
  256. {
  257. if (observer == null)
  258. {
  259. throw new ArgumentNullException(nameof(observer));
  260. }
  261. observer.OnError(Exception);
  262. }
  263. /// <summary>
  264. /// Invokes the observer's method corresponding to the notification and returns the produced result.
  265. /// </summary>
  266. /// <param name="observer">Observer to invoke the notification on.</param>
  267. /// <returns>Result produced by the observation.</returns>
  268. public override TResult Accept<TResult>(IObserver<T, TResult> observer)
  269. {
  270. if (observer == null)
  271. {
  272. throw new ArgumentNullException(nameof(observer));
  273. }
  274. return observer.OnError(Exception);
  275. }
  276. /// <summary>
  277. /// Invokes the delegate corresponding to the notification.
  278. /// </summary>
  279. /// <param name="onNext">Delegate to invoke for an OnNext notification.</param>
  280. /// <param name="onError">Delegate to invoke for an OnError notification.</param>
  281. /// <param name="onCompleted">Delegate to invoke for an OnCompleted notification.</param>
  282. public override void Accept(Action<T> onNext, Action<Exception> onError, Action onCompleted)
  283. {
  284. if (onNext == null)
  285. {
  286. throw new ArgumentNullException(nameof(onNext));
  287. }
  288. if (onError == null)
  289. {
  290. throw new ArgumentNullException(nameof(onError));
  291. }
  292. if (onCompleted == null)
  293. {
  294. throw new ArgumentNullException(nameof(onCompleted));
  295. }
  296. onError(Exception);
  297. }
  298. /// <summary>
  299. /// Invokes the delegate corresponding to the notification and returns the produced result.
  300. /// </summary>
  301. /// <param name="onNext">Delegate to invoke for an OnNext notification.</param>
  302. /// <param name="onError">Delegate to invoke for an OnError notification.</param>
  303. /// <param name="onCompleted">Delegate to invoke for an OnCompleted notification.</param>
  304. /// <returns>Result produced by the observation.</returns>
  305. public override TResult Accept<TResult>(Func<T, TResult> onNext, Func<Exception, TResult> onError, Func<TResult> onCompleted)
  306. {
  307. if (onNext == null)
  308. {
  309. throw new ArgumentNullException(nameof(onNext));
  310. }
  311. if (onError == null)
  312. {
  313. throw new ArgumentNullException(nameof(onError));
  314. }
  315. if (onCompleted == null)
  316. {
  317. throw new ArgumentNullException(nameof(onCompleted));
  318. }
  319. return onError(Exception);
  320. }
  321. }
  322. /// <summary>
  323. /// Represents an OnCompleted notification to an observer.
  324. /// </summary>
  325. #if !NO_DEBUGGER_ATTRIBUTES
  326. [DebuggerDisplay("OnCompleted()")]
  327. #endif
  328. #if !NO_SERIALIZABLE
  329. [Serializable]
  330. #endif
  331. internal sealed class OnCompletedNotification : Notification<T>
  332. {
  333. /// <summary>
  334. /// Complete notifications are stateless thus only one instance
  335. /// can ever exist per type.
  336. /// </summary>
  337. internal static readonly Notification<T> Instance = new OnCompletedNotification();
  338. /// <summary>
  339. /// Constructs a notification of the end of a sequence.
  340. /// </summary>
  341. private OnCompletedNotification()
  342. {
  343. }
  344. /// <summary>
  345. /// Throws an <see cref="InvalidOperationException"/>.
  346. /// </summary>
  347. public override T Value { get { throw new InvalidOperationException(Strings_Core.COMPLETED_NO_VALUE); } }
  348. /// <summary>
  349. /// Returns <c>null</c>.
  350. /// </summary>
  351. public override Exception Exception => null;
  352. /// <summary>
  353. /// Returns <c>false</c>.
  354. /// </summary>
  355. public override bool HasValue => false;
  356. /// <summary>
  357. /// Returns <see cref="NotificationKind.OnCompleted"/>.
  358. /// </summary>
  359. public override NotificationKind Kind => NotificationKind.OnCompleted;
  360. /// <summary>
  361. /// Returns the hash code for this instance.
  362. /// </summary>
  363. public override int GetHashCode() => typeof(T).GetHashCode() ^ 8510;
  364. /// <summary>
  365. /// Indicates whether this instance and other are equal.
  366. /// </summary>
  367. public override bool Equals(Notification<T> other)
  368. {
  369. if (ReferenceEquals(this, other))
  370. {
  371. return true;
  372. }
  373. if (other is null)
  374. {
  375. return false;
  376. }
  377. return other.Kind == NotificationKind.OnCompleted;
  378. }
  379. /// <summary>
  380. /// Returns a string representation of this instance.
  381. /// </summary>
  382. public override string ToString() => "OnCompleted()";
  383. /// <summary>
  384. /// Invokes the observer's method corresponding to the notification.
  385. /// </summary>
  386. /// <param name="observer">Observer to invoke the notification on.</param>
  387. public override void Accept(IObserver<T> observer)
  388. {
  389. if (observer == null)
  390. {
  391. throw new ArgumentNullException(nameof(observer));
  392. }
  393. observer.OnCompleted();
  394. }
  395. /// <summary>
  396. /// Invokes the observer's method corresponding to the notification and returns the produced result.
  397. /// </summary>
  398. /// <param name="observer">Observer to invoke the notification on.</param>
  399. /// <returns>Result produced by the observation.</returns>
  400. public override TResult Accept<TResult>(IObserver<T, TResult> observer)
  401. {
  402. if (observer == null)
  403. {
  404. throw new ArgumentNullException(nameof(observer));
  405. }
  406. return observer.OnCompleted();
  407. }
  408. /// <summary>
  409. /// Invokes the delegate corresponding to the notification.
  410. /// </summary>
  411. /// <param name="onNext">Delegate to invoke for an OnNext notification.</param>
  412. /// <param name="onError">Delegate to invoke for an OnError notification.</param>
  413. /// <param name="onCompleted">Delegate to invoke for an OnCompleted notification.</param>
  414. public override void Accept(Action<T> onNext, Action<Exception> onError, Action onCompleted)
  415. {
  416. if (onNext == null)
  417. {
  418. throw new ArgumentNullException(nameof(onNext));
  419. }
  420. if (onError == null)
  421. {
  422. throw new ArgumentNullException(nameof(onError));
  423. }
  424. if (onCompleted == null)
  425. {
  426. throw new ArgumentNullException(nameof(onCompleted));
  427. }
  428. onCompleted();
  429. }
  430. /// <summary>
  431. /// Invokes the delegate corresponding to the notification and returns the produced result.
  432. /// </summary>
  433. /// <param name="onNext">Delegate to invoke for an OnNext notification.</param>
  434. /// <param name="onError">Delegate to invoke for an OnError notification.</param>
  435. /// <param name="onCompleted">Delegate to invoke for an OnCompleted notification.</param>
  436. /// <returns>Result produced by the observation.</returns>
  437. public override TResult Accept<TResult>(Func<T, TResult> onNext, Func<Exception, TResult> onError, Func<TResult> onCompleted)
  438. {
  439. if (onNext == null)
  440. {
  441. throw new ArgumentNullException(nameof(onNext));
  442. }
  443. if (onError == null)
  444. {
  445. throw new ArgumentNullException(nameof(onError));
  446. }
  447. if (onCompleted == null)
  448. {
  449. throw new ArgumentNullException(nameof(onCompleted));
  450. }
  451. return onCompleted();
  452. }
  453. }
  454. /// <summary>
  455. /// Determines whether the current <see cref="Notification{T}"/> object has the same observer message payload as a specified <see cref="Notification{T}"/> value.
  456. /// </summary>
  457. /// <param name="other">An object to compare to the current <see cref="Notification{T}"/> object.</param>
  458. /// <returns><c>true</c> if both <see cref="Notification{T}"/> objects have the same observer message payload; otherwise, <c>false</c>.</returns>
  459. /// <remarks>
  460. /// Equality of <see cref="Notification{T}"/> objects is based on the equality of the observer message payload they represent, including the notification Kind and the Value or Exception (if any).
  461. /// This means two <see cref="Notification{T}"/> objects can be equal even though they don't represent the same observer method call, but have the same Kind and have equal parameters passed to the observer method.
  462. /// In case one wants to determine whether two <see cref="Notification{T}"/> objects represent the same observer method call, use Object.ReferenceEquals identity equality instead.
  463. /// </remarks>
  464. public abstract bool Equals(Notification<T> other);
  465. /// <summary>
  466. /// Determines whether the two specified <see cref="Notification{T}"/> objects have the same observer message payload.
  467. /// </summary>
  468. /// <param name="left">The first <see cref="Notification{T}"/> to compare, or <c>null</c>.</param>
  469. /// <param name="right">The second <see cref="Notification{T}"/> to compare, or <c>null</c>.</param>
  470. /// <returns><c>true</c> if the first <see cref="Notification{T}"/> value has the same observer message payload as the second <see cref="Notification{T}"/> value; otherwise, <c>false</c>.</returns>
  471. /// <remarks>
  472. /// Equality of <see cref="Notification{T}"/> objects is based on the equality of the observer message payload they represent, including the notification Kind and the Value or Exception (if any).
  473. /// This means two <see cref="Notification{T}"/> objects can be equal even though they don't represent the same observer method call, but have the same Kind and have equal parameters passed to the observer method.
  474. /// In case one wants to determine whether two <see cref="Notification{T}"/> objects represent the same observer method call, use Object.ReferenceEquals identity equality instead.
  475. /// </remarks>
  476. public static bool operator ==(Notification<T> left, Notification<T> right)
  477. {
  478. if (ReferenceEquals(left, right))
  479. {
  480. return true;
  481. }
  482. if (left is null || right is null)
  483. {
  484. return false;
  485. }
  486. return left.Equals(right);
  487. }
  488. /// <summary>
  489. /// Determines whether the two specified <see cref="Notification{T}"/> objects have a different observer message payload.
  490. /// </summary>
  491. /// <param name="left">The first <see cref="Notification{T}"/> to compare, or <c>null</c>.</param>
  492. /// <param name="right">The second <see cref="Notification{T}"/> to compare, or <c>null</c>.</param>
  493. /// <returns><c>true</c> if the first <see cref="Notification{T}"/> value has a different observer message payload as the second <see cref="Notification{T}"/> value; otherwise, <c>false</c>.</returns>
  494. /// <remarks>
  495. /// Equality of <see cref="Notification{T}"/> objects is based on the equality of the observer message payload they represent, including the notification Kind and the Value or Exception (if any).
  496. /// This means two <see cref="Notification{T}"/> objects can be equal even though they don't represent the same observer method call, but have the same Kind and have equal parameters passed to the observer method.
  497. /// In case one wants to determine whether two <see cref="Notification{T}"/> objects represent a different observer method call, use Object.ReferenceEquals identity equality instead.
  498. /// </remarks>
  499. public static bool operator !=(Notification<T> left, Notification<T> right) => !(left == right);
  500. /// <summary>
  501. /// Determines whether the specified System.Object is equal to the current <see cref="Notification{T}"/>.
  502. /// </summary>
  503. /// <param name="obj">The System.Object to compare with the current <see cref="Notification{T}"/>.</param>
  504. /// <returns><c>true</c> if the specified System.Object is equal to the current <see cref="Notification{T}"/>; otherwise, <c>false</c>.</returns>
  505. /// <remarks>
  506. /// Equality of <see cref="Notification{T}"/> objects is based on the equality of the observer message payload they represent, including the notification Kind and the Value or Exception (if any).
  507. /// This means two <see cref="Notification{T}"/> objects can be equal even though they don't represent the same observer method call, but have the same Kind and have equal parameters passed to the observer method.
  508. /// In case one wants to determine whether two <see cref="Notification{T}"/> objects represent the same observer method call, use Object.ReferenceEquals identity equality instead.
  509. /// </remarks>
  510. public override bool Equals(object obj) => Equals(obj as Notification<T>);
  511. /// <summary>
  512. /// Invokes the observer's method corresponding to the notification.
  513. /// </summary>
  514. /// <param name="observer">Observer to invoke the notification on.</param>
  515. public abstract void Accept(IObserver<T> observer);
  516. /// <summary>
  517. /// Invokes the observer's method corresponding to the notification and returns the produced result.
  518. /// </summary>
  519. /// <typeparam name="TResult">The type of the result returned from the observer's notification handlers.</typeparam>
  520. /// <param name="observer">Observer to invoke the notification on.</param>
  521. /// <returns>Result produced by the observation.</returns>
  522. public abstract TResult Accept<TResult>(IObserver<T, TResult> observer);
  523. /// <summary>
  524. /// Invokes the delegate corresponding to the notification.
  525. /// </summary>
  526. /// <param name="onNext">Delegate to invoke for an OnNext notification.</param>
  527. /// <param name="onError">Delegate to invoke for an OnError notification.</param>
  528. /// <param name="onCompleted">Delegate to invoke for an OnCompleted notification.</param>
  529. public abstract void Accept(Action<T> onNext, Action<Exception> onError, Action onCompleted);
  530. /// <summary>
  531. /// Invokes the delegate corresponding to the notification and returns the produced result.
  532. /// </summary>
  533. /// <typeparam name="TResult">The type of the result returned from the notification handler delegates.</typeparam>
  534. /// <param name="onNext">Delegate to invoke for an OnNext notification.</param>
  535. /// <param name="onError">Delegate to invoke for an OnError notification.</param>
  536. /// <param name="onCompleted">Delegate to invoke for an OnCompleted notification.</param>
  537. /// <returns>Result produced by the observation.</returns>
  538. public abstract TResult Accept<TResult>(Func<T, TResult> onNext, Func<Exception, TResult> onError, Func<TResult> onCompleted);
  539. /// <summary>
  540. /// Returns an observable sequence with a single notification, using the immediate scheduler.
  541. /// </summary>
  542. /// <returns>The observable sequence that surfaces the behavior of the notification upon subscription.</returns>
  543. public IObservable<T> ToObservable() => ToObservable(ImmediateScheduler.Instance);
  544. /// <summary>
  545. /// Returns an observable sequence with a single notification.
  546. /// </summary>
  547. /// <param name="scheduler">Scheduler to send out the notification calls on.</param>
  548. /// <returns>The observable sequence that surfaces the behavior of the notification upon subscription.</returns>
  549. public IObservable<T> ToObservable(IScheduler scheduler)
  550. {
  551. if (scheduler == null)
  552. {
  553. throw new ArgumentNullException(nameof(scheduler));
  554. }
  555. return new NotificationToObservable(scheduler, this);
  556. }
  557. private sealed class NotificationToObservable : ObservableBase<T>
  558. {
  559. private readonly IScheduler _scheduler;
  560. private readonly Notification<T> _parent;
  561. public NotificationToObservable(IScheduler scheduler, Notification<T> parent)
  562. {
  563. _scheduler = scheduler;
  564. _parent = parent;
  565. }
  566. protected override IDisposable SubscribeCore(IObserver<T> observer)
  567. {
  568. return _scheduler.ScheduleAction((_parent, observer), state =>
  569. {
  570. var parent = state._parent;
  571. var o = state.observer;
  572. parent.Accept(o);
  573. if (parent.Kind == NotificationKind.OnNext)
  574. {
  575. o.OnCompleted();
  576. }
  577. });
  578. }
  579. }
  580. }
  581. /// <summary>
  582. /// Provides a set of static methods for constructing notifications.
  583. /// </summary>
  584. public static class Notification
  585. {
  586. /// <summary>
  587. /// Creates an object that represents an OnNext notification to an observer.
  588. /// </summary>
  589. /// <typeparam name="T">The type of the elements received by the observer. Upon dematerialization of the notifications into an observable sequence, this type is used as the element type for the sequence.</typeparam>
  590. /// <param name="value">The value contained in the notification.</param>
  591. /// <returns>The OnNext notification containing the value.</returns>
  592. public static Notification<T> CreateOnNext<T>(T value)
  593. {
  594. return new Notification<T>.OnNextNotification(value);
  595. }
  596. /// <summary>
  597. /// Creates an object that represents an OnError notification to an observer.
  598. /// </summary>
  599. /// <typeparam name="T">The type of the elements received by the observer. Upon dematerialization of the notifications into an observable sequence, this type is used as the element type for the sequence.</typeparam>
  600. /// <param name="error">The exception contained in the notification.</param>
  601. /// <returns>The OnError notification containing the exception.</returns>
  602. /// <exception cref="ArgumentNullException"><paramref name="error"/> is null.</exception>
  603. public static Notification<T> CreateOnError<T>(Exception error)
  604. {
  605. if (error == null)
  606. {
  607. throw new ArgumentNullException(nameof(error));
  608. }
  609. return new Notification<T>.OnErrorNotification(error);
  610. }
  611. /// <summary>
  612. /// Creates an object that represents an OnCompleted notification to an observer.
  613. /// </summary>
  614. /// <typeparam name="T">The type of the elements received by the observer. Upon dematerialization of the notifications into an observable sequence, this type is used as the element type for the sequence.</typeparam>
  615. /// <returns>The OnCompleted notification.</returns>
  616. public static Notification<T> CreateOnCompleted<T>()
  617. {
  618. return Notification<T>.OnCompletedNotification.Instance;
  619. }
  620. }
  621. }
  622. #pragma warning restore 0659
  623. #pragma warning restore 0661