Observable.Single.cs 40 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Reactive.Concurrency;
  6. namespace System.Reactive.Linq
  7. {
  8. public static partial class Observable
  9. {
  10. #region + AsObservable +
  11. /// <summary>
  12. /// Hides the identity of an observable sequence.
  13. /// </summary>
  14. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  15. /// <param name="source">An observable sequence whose identity to hide.</param>
  16. /// <returns>An observable sequence that hides the identity of the source sequence.</returns>
  17. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  18. public static IObservable<TSource> AsObservable<TSource>(this IObservable<TSource> source)
  19. {
  20. if (source == null)
  21. throw new ArgumentNullException(nameof(source));
  22. return s_impl.AsObservable<TSource>(source);
  23. }
  24. #endregion
  25. #region + Buffer +
  26. /// <summary>
  27. /// Projects each element of an observable sequence into consecutive non-overlapping buffers which are produced based on element count information.
  28. /// </summary>
  29. /// <typeparam name="TSource">The type of the elements in the source sequence, and in the lists in the result sequence.</typeparam>
  30. /// <param name="source">Source sequence to produce buffers over.</param>
  31. /// <param name="count">Length of each buffer.</param>
  32. /// <returns>An observable sequence of buffers.</returns>
  33. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  34. /// <exception cref="ArgumentOutOfRangeException"><paramref name="count"/> is less than or equal to zero.</exception>
  35. public static IObservable<IList<TSource>> Buffer<TSource>(this IObservable<TSource> source, int count)
  36. {
  37. if (source == null)
  38. throw new ArgumentNullException(nameof(source));
  39. if (count <= 0)
  40. throw new ArgumentOutOfRangeException(nameof(count));
  41. return s_impl.Buffer<TSource>(source, count);
  42. }
  43. /// <summary>
  44. /// Projects each element of an observable sequence into zero or more buffers which are produced based on element count information.
  45. /// </summary>
  46. /// <typeparam name="TSource">The type of the elements in the source sequence, and in the lists in the result sequence.</typeparam>
  47. /// <param name="source">Source sequence to produce buffers over.</param>
  48. /// <param name="count">Length of each buffer.</param>
  49. /// <param name="skip">Number of elements to skip between creation of consecutive buffers.</param>
  50. /// <returns>An observable sequence of buffers.</returns>
  51. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  52. /// <exception cref="ArgumentOutOfRangeException"><paramref name="count"/> or <paramref name="skip"/> is less than or equal to zero.</exception>
  53. public static IObservable<IList<TSource>> Buffer<TSource>(this IObservable<TSource> source, int count, int skip)
  54. {
  55. if (source == null)
  56. throw new ArgumentNullException(nameof(source));
  57. if (count <= 0)
  58. throw new ArgumentOutOfRangeException(nameof(count));
  59. if (skip <= 0)
  60. throw new ArgumentOutOfRangeException(nameof(skip));
  61. return s_impl.Buffer<TSource>(source, count, skip);
  62. }
  63. #endregion
  64. #region + Dematerialize +
  65. /// <summary>
  66. /// Dematerializes the explicit notification values of an observable sequence as implicit notifications.
  67. /// </summary>
  68. /// <typeparam name="TSource">The type of the elements materialized in the source sequence notification objects.</typeparam>
  69. /// <param name="source">An observable sequence containing explicit notification values which have to be turned into implicit notifications.</param>
  70. /// <returns>An observable sequence exhibiting the behavior corresponding to the source sequence's notification values.</returns>
  71. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  72. public static IObservable<TSource> Dematerialize<TSource>(this IObservable<Notification<TSource>> source)
  73. {
  74. if (source == null)
  75. throw new ArgumentNullException(nameof(source));
  76. return s_impl.Dematerialize<TSource>(source);
  77. }
  78. #endregion
  79. #region + DistinctUntilChanged +
  80. /// <summary>
  81. /// Returns an observable sequence that contains only distinct contiguous elements.
  82. /// </summary>
  83. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  84. /// <param name="source">An observable sequence to retain distinct contiguous elements for.</param>
  85. /// <returns>An observable sequence only containing the distinct contiguous elements from the source sequence.</returns>
  86. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  87. public static IObservable<TSource> DistinctUntilChanged<TSource>(this IObservable<TSource> source)
  88. {
  89. if (source == null)
  90. throw new ArgumentNullException(nameof(source));
  91. return s_impl.DistinctUntilChanged<TSource>(source);
  92. }
  93. /// <summary>
  94. /// Returns an observable sequence that contains only distinct contiguous elements according to the comparer.
  95. /// </summary>
  96. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  97. /// <param name="source">An observable sequence to retain distinct contiguous elements for.</param>
  98. /// <param name="comparer">Equality comparer for source elements.</param>
  99. /// <returns>An observable sequence only containing the distinct contiguous elements from the source sequence.</returns>
  100. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="comparer"/> is null.</exception>
  101. public static IObservable<TSource> DistinctUntilChanged<TSource>(this IObservable<TSource> source, IEqualityComparer<TSource> comparer)
  102. {
  103. if (source == null)
  104. throw new ArgumentNullException(nameof(source));
  105. if (comparer == null)
  106. throw new ArgumentNullException(nameof(comparer));
  107. return s_impl.DistinctUntilChanged<TSource>(source, comparer);
  108. }
  109. /// <summary>
  110. /// Returns an observable sequence that contains only distinct contiguous elements according to the keySelector.
  111. /// </summary>
  112. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  113. /// <typeparam name="TKey">The type of the discriminator key computed for each element in the source sequence.</typeparam>
  114. /// <param name="source">An observable sequence to retain distinct contiguous elements for, based on a computed key value.</param>
  115. /// <param name="keySelector">A function to compute the comparison key for each element.</param>
  116. /// <returns>An observable sequence only containing the distinct contiguous elements, based on a computed key value, from the source sequence.</returns>
  117. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> is null.</exception>
  118. public static IObservable<TSource> DistinctUntilChanged<TSource, TKey>(this IObservable<TSource> source, Func<TSource, TKey> keySelector)
  119. {
  120. if (source == null)
  121. throw new ArgumentNullException(nameof(source));
  122. if (keySelector == null)
  123. throw new ArgumentNullException(nameof(keySelector));
  124. return s_impl.DistinctUntilChanged<TSource, TKey>(source, keySelector);
  125. }
  126. /// <summary>
  127. /// Returns an observable sequence that contains only distinct contiguous elements according to the keySelector and the comparer.
  128. /// </summary>
  129. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  130. /// <typeparam name="TKey">The type of the discriminator key computed for each element in the source sequence.</typeparam>
  131. /// <param name="source">An observable sequence to retain distinct contiguous elements for, based on a computed key value.</param>
  132. /// <param name="keySelector">A function to compute the comparison key for each element.</param>
  133. /// <param name="comparer">Equality comparer for computed key values.</param>
  134. /// <returns>An observable sequence only containing the distinct contiguous elements, based on a computed key value, from the source sequence.</returns>
  135. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> or <paramref name="comparer"/> is null.</exception>
  136. public static IObservable<TSource> DistinctUntilChanged<TSource, TKey>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer)
  137. {
  138. if (source == null)
  139. throw new ArgumentNullException(nameof(source));
  140. if (keySelector == null)
  141. throw new ArgumentNullException(nameof(keySelector));
  142. if (comparer == null)
  143. throw new ArgumentNullException(nameof(comparer));
  144. return s_impl.DistinctUntilChanged<TSource, TKey>(source, keySelector, comparer);
  145. }
  146. #endregion
  147. #region + Do +
  148. /// <summary>
  149. /// Invokes an action for each element in the observable sequence, and propagates all observer messages through the result sequence.
  150. /// This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline.
  151. /// </summary>
  152. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  153. /// <param name="source">Source sequence.</param>
  154. /// <param name="onNext">Action to invoke for each element in the observable sequence.</param>
  155. /// <returns>The source sequence with the side-effecting behavior applied.</returns>
  156. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="onNext"/> is null.</exception>
  157. public static IObservable<TSource> Do<TSource>(this IObservable<TSource> source, Action<TSource> onNext)
  158. {
  159. if (source == null)
  160. throw new ArgumentNullException(nameof(source));
  161. if (onNext == null)
  162. throw new ArgumentNullException(nameof(onNext));
  163. return s_impl.Do<TSource>(source, onNext);
  164. }
  165. /// <summary>
  166. /// Invokes an action for each element in the observable sequence and invokes an action upon graceful termination of the observable sequence.
  167. /// This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline.
  168. /// </summary>
  169. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  170. /// <param name="source">Source sequence.</param>
  171. /// <param name="onNext">Action to invoke for each element in the observable sequence.</param>
  172. /// <param name="onCompleted">Action to invoke upon graceful termination of the observable sequence.</param>
  173. /// <returns>The source sequence with the side-effecting behavior applied.</returns>
  174. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="onNext"/> or <paramref name="onCompleted"/> is null.</exception>
  175. public static IObservable<TSource> Do<TSource>(this IObservable<TSource> source, Action<TSource> onNext, Action onCompleted)
  176. {
  177. if (source == null)
  178. throw new ArgumentNullException(nameof(source));
  179. if (onNext == null)
  180. throw new ArgumentNullException(nameof(onNext));
  181. if (onCompleted == null)
  182. throw new ArgumentNullException(nameof(onCompleted));
  183. return s_impl.Do<TSource>(source, onNext, onCompleted);
  184. }
  185. /// <summary>
  186. /// Invokes an action for each element in the observable sequence and invokes an action upon exceptional termination of the observable sequence.
  187. /// This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline.
  188. /// </summary>
  189. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  190. /// <param name="source">Source sequence.</param>
  191. /// <param name="onNext">Action to invoke for each element in the observable sequence.</param>
  192. /// <param name="onError">Action to invoke upon exceptional termination of the observable sequence.</param>
  193. /// <returns>The source sequence with the side-effecting behavior applied.</returns>
  194. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="onNext"/> or <paramref name="onError"/> is null.</exception>
  195. public static IObservable<TSource> Do<TSource>(this IObservable<TSource> source, Action<TSource> onNext, Action<Exception> onError)
  196. {
  197. if (source == null)
  198. throw new ArgumentNullException(nameof(source));
  199. if (onNext == null)
  200. throw new ArgumentNullException(nameof(onNext));
  201. if (onError == null)
  202. throw new ArgumentNullException(nameof(onError));
  203. return s_impl.Do<TSource>(source, onNext, onError);
  204. }
  205. /// <summary>
  206. /// Invokes an action for each element in the observable sequence and invokes an action upon graceful or exceptional termination of the observable sequence.
  207. /// This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline.
  208. /// </summary>
  209. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  210. /// <param name="source">Source sequence.</param>
  211. /// <param name="onNext">Action to invoke for each element in the observable sequence.</param>
  212. /// <param name="onError">Action to invoke upon exceptional termination of the observable sequence.</param>
  213. /// <param name="onCompleted">Action to invoke upon graceful termination of the observable sequence.</param>
  214. /// <returns>The source sequence with the side-effecting behavior applied.</returns>
  215. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="onNext"/> or <paramref name="onError"/> or <paramref name="onCompleted"/> is null.</exception>
  216. public static IObservable<TSource> Do<TSource>(this IObservable<TSource> source, Action<TSource> onNext, Action<Exception> onError, Action onCompleted)
  217. {
  218. if (source == null)
  219. throw new ArgumentNullException(nameof(source));
  220. if (onNext == null)
  221. throw new ArgumentNullException(nameof(onNext));
  222. if (onError == null)
  223. throw new ArgumentNullException(nameof(onError));
  224. if (onCompleted == null)
  225. throw new ArgumentNullException(nameof(onCompleted));
  226. return s_impl.Do<TSource>(source, onNext, onError, onCompleted);
  227. }
  228. /// <summary>
  229. /// Invokes the observer's methods for each message in the source sequence.
  230. /// This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline.
  231. /// </summary>
  232. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  233. /// <param name="source">Source sequence.</param>
  234. /// <param name="observer">Observer whose methods to invoke as part of the source sequence's observation.</param>
  235. /// <returns>The source sequence with the side-effecting behavior applied.</returns>
  236. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="observer"/> is null.</exception>
  237. public static IObservable<TSource> Do<TSource>(this IObservable<TSource> source, IObserver<TSource> observer)
  238. {
  239. if (source == null)
  240. throw new ArgumentNullException(nameof(source));
  241. if (observer == null)
  242. throw new ArgumentNullException(nameof(observer));
  243. return s_impl.Do<TSource>(source, observer);
  244. }
  245. #endregion
  246. #region + Finally +
  247. /// <summary>
  248. /// Invokes a specified action after the source observable sequence terminates gracefully or exceptionally.
  249. /// </summary>
  250. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  251. /// <param name="source">Source sequence.</param>
  252. /// <param name="finallyAction">Action to invoke after the source observable sequence terminates.</param>
  253. /// <returns>Source sequence with the action-invoking termination behavior applied.</returns>
  254. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="finallyAction"/> is null.</exception>
  255. public static IObservable<TSource> Finally<TSource>(this IObservable<TSource> source, Action finallyAction)
  256. {
  257. if (source == null)
  258. throw new ArgumentNullException(nameof(source));
  259. if (finallyAction == null)
  260. throw new ArgumentNullException(nameof(finallyAction));
  261. return s_impl.Finally<TSource>(source, finallyAction);
  262. }
  263. #endregion
  264. #region + IgnoreElements +
  265. /// <summary>
  266. /// Ignores all elements in an observable sequence leaving only the termination messages.
  267. /// </summary>
  268. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  269. /// <param name="source">Source sequence.</param>
  270. /// <returns>An empty observable sequence that signals termination, successful or exceptional, of the source sequence.</returns>
  271. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  272. public static IObservable<TSource> IgnoreElements<TSource>(this IObservable<TSource> source)
  273. {
  274. if (source == null)
  275. throw new ArgumentNullException(nameof(source));
  276. return s_impl.IgnoreElements<TSource>(source);
  277. }
  278. #endregion
  279. #region + Materialize +
  280. /// <summary>
  281. /// Materializes the implicit notifications of an observable sequence as explicit notification values.
  282. /// </summary>
  283. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  284. /// <param name="source">An observable sequence to get notification values for.</param>
  285. /// <returns>An observable sequence containing the materialized notification values from the source sequence.</returns>
  286. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  287. public static IObservable<Notification<TSource>> Materialize<TSource>(this IObservable<TSource> source)
  288. {
  289. if (source == null)
  290. throw new ArgumentNullException(nameof(source));
  291. return s_impl.Materialize<TSource>(source);
  292. }
  293. #endregion
  294. #region + Repeat +
  295. /// <summary>
  296. /// Repeats the observable sequence indefinitely.
  297. /// </summary>
  298. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  299. /// <param name="source">Observable sequence to repeat.</param>
  300. /// <returns>The observable sequence producing the elements of the given sequence repeatedly and sequentially.</returns>
  301. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  302. public static IObservable<TSource> Repeat<TSource>(this IObservable<TSource> source)
  303. {
  304. if (source == null)
  305. throw new ArgumentNullException(nameof(source));
  306. return s_impl.Repeat<TSource>(source);
  307. }
  308. /// <summary>
  309. /// Repeats the observable sequence a specified number of times.
  310. /// </summary>
  311. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  312. /// <param name="source">Observable sequence to repeat.</param>
  313. /// <param name="repeatCount">Number of times to repeat the sequence.</param>
  314. /// <returns>The observable sequence producing the elements of the given sequence repeatedly.</returns>
  315. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  316. /// <exception cref="ArgumentOutOfRangeException"><paramref name="repeatCount"/> is less than zero.</exception>
  317. public static IObservable<TSource> Repeat<TSource>(this IObservable<TSource> source, int repeatCount)
  318. {
  319. if (source == null)
  320. throw new ArgumentNullException(nameof(source));
  321. if (repeatCount < 0)
  322. throw new ArgumentOutOfRangeException(nameof(repeatCount));
  323. return s_impl.Repeat<TSource>(source, repeatCount);
  324. }
  325. #endregion
  326. #region + Retry +
  327. /// <summary>
  328. /// Repeats the source observable sequence until it successfully terminates.
  329. /// </summary>
  330. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  331. /// <param name="source">Observable sequence to repeat until it successfully terminates.</param>
  332. /// <returns>An observable sequence producing the elements of the given sequence repeatedly until it terminates successfully.</returns>
  333. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  334. public static IObservable<TSource> Retry<TSource>(this IObservable<TSource> source)
  335. {
  336. if (source == null)
  337. throw new ArgumentNullException(nameof(source));
  338. return s_impl.Retry<TSource>(source);
  339. }
  340. /// <summary>
  341. /// Repeats the source observable sequence the specified number of times or until it successfully terminates.
  342. /// </summary>
  343. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  344. /// <param name="source">Observable sequence to repeat until it successfully terminates.</param>
  345. /// <param name="retryCount">Number of times to repeat the sequence.</param>
  346. /// <returns>An observable sequence producing the elements of the given sequence repeatedly until it terminates successfully.</returns>
  347. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  348. /// <exception cref="ArgumentOutOfRangeException"><paramref name="retryCount"/> is less than zero.</exception>
  349. public static IObservable<TSource> Retry<TSource>(this IObservable<TSource> source, int retryCount)
  350. {
  351. if (source == null)
  352. throw new ArgumentNullException(nameof(source));
  353. if (retryCount < 0)
  354. throw new ArgumentOutOfRangeException(nameof(retryCount));
  355. return s_impl.Retry<TSource>(source, retryCount);
  356. }
  357. #endregion
  358. #region + Scan +
  359. /// <summary>
  360. /// Applies an accumulator function over an observable sequence and returns each intermediate result. The specified seed value is used as the initial accumulator value.
  361. /// For aggregation behavior with no intermediate results, see <see cref="Observable.Aggregate&lt;TSource, Accumulate&gt;"/>.
  362. /// </summary>
  363. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  364. /// <typeparam name="TAccumulate">The type of the result of the aggregation.</typeparam>
  365. /// <param name="source">An observable sequence to accumulate over.</param>
  366. /// <param name="seed">The initial accumulator value.</param>
  367. /// <param name="accumulator">An accumulator function to be invoked on each element.</param>
  368. /// <returns>An observable sequence containing the accumulated values.</returns>
  369. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="accumulator"/> is null.</exception>
  370. public static IObservable<TAccumulate> Scan<TSource, TAccumulate>(this IObservable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator)
  371. {
  372. if (source == null)
  373. throw new ArgumentNullException(nameof(source));
  374. if (accumulator == null)
  375. throw new ArgumentNullException(nameof(accumulator));
  376. return s_impl.Scan<TSource, TAccumulate>(source, seed, accumulator);
  377. }
  378. /// <summary>
  379. /// Applies an accumulator function over an observable sequence and returns each intermediate result.
  380. /// For aggregation behavior with no intermediate results, see <see cref="Observable.Aggregate&lt;TSource&gt;"/>.
  381. /// </summary>
  382. /// <typeparam name="TSource">The type of the elements in the source sequence and the result of the aggregation.</typeparam>
  383. /// <param name="source">An observable sequence to accumulate over.</param>
  384. /// <param name="accumulator">An accumulator function to be invoked on each element.</param>
  385. /// <returns>An observable sequence containing the accumulated values.</returns>
  386. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="accumulator"/> is null.</exception>
  387. public static IObservable<TSource> Scan<TSource>(this IObservable<TSource> source, Func<TSource, TSource, TSource> accumulator)
  388. {
  389. if (source == null)
  390. throw new ArgumentNullException(nameof(source));
  391. if (accumulator == null)
  392. throw new ArgumentNullException(nameof(accumulator));
  393. return s_impl.Scan<TSource>(source, accumulator);
  394. }
  395. #endregion
  396. #region + SkipLast +
  397. /// <summary>
  398. /// Bypasses a specified number of elements at the end of an observable sequence.
  399. /// </summary>
  400. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  401. /// <param name="source">Source sequence.</param>
  402. /// <param name="count">Number of elements to bypass at the end of the source sequence.</param>
  403. /// <returns>An observable sequence containing the source sequence elements except for the bypassed ones at the end.</returns>
  404. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  405. /// <exception cref="ArgumentOutOfRangeException"><paramref name="count"/> is less than zero.</exception>
  406. /// <remarks>
  407. /// This operator accumulates a queue with a length enough to store the first <paramref name="count"/> elements. As more elements are
  408. /// received, elements are taken from the front of the queue and produced on the result sequence. This causes elements to be delayed.
  409. /// </remarks>
  410. public static IObservable<TSource> SkipLast<TSource>(this IObservable<TSource> source, int count)
  411. {
  412. if (source == null)
  413. throw new ArgumentNullException(nameof(source));
  414. if (count < 0)
  415. throw new ArgumentOutOfRangeException(nameof(count));
  416. return s_impl.SkipLast<TSource>(source, count);
  417. }
  418. #endregion
  419. #region + StartWith +
  420. /// <summary>
  421. /// Prepends a sequence of values to an observable sequence.
  422. /// </summary>
  423. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  424. /// <param name="source">Source sequence to prepend values to.</param>
  425. /// <param name="values">Values to prepend to the specified sequence.</param>
  426. /// <returns>The source sequence prepended with the specified values.</returns>
  427. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="values"/> is null.</exception>
  428. public static IObservable<TSource> StartWith<TSource>(this IObservable<TSource> source, params TSource[] values)
  429. {
  430. if (source == null)
  431. throw new ArgumentNullException(nameof(source));
  432. if (values == null)
  433. throw new ArgumentNullException(nameof(values));
  434. return s_impl.StartWith<TSource>(source, values);
  435. }
  436. /// <summary>
  437. /// Prepends a sequence of values to an observable sequence.
  438. /// </summary>
  439. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  440. /// <param name="source">Source sequence to prepend values to.</param>
  441. /// <param name="values">Values to prepend to the specified sequence.</param>
  442. /// <returns>The source sequence prepended with the specified values.</returns>
  443. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="values"/> is null.</exception>
  444. public static IObservable<TSource> StartWith<TSource>(this IObservable<TSource> source, IEnumerable<TSource> values)
  445. {
  446. if (source == null)
  447. throw new ArgumentNullException(nameof(source));
  448. if (values == null)
  449. throw new ArgumentNullException(nameof(values));
  450. return s_impl.StartWith<TSource>(source, values);
  451. }
  452. /// <summary>
  453. /// Prepends a sequence of values to an observable sequence.
  454. /// </summary>
  455. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  456. /// <param name="source">Source sequence to prepend values to.</param>
  457. /// <param name="scheduler">Scheduler to emit the prepended values on.</param>
  458. /// <param name="values">Values to prepend to the specified sequence.</param>
  459. /// <returns>The source sequence prepended with the specified values.</returns>
  460. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="scheduler"/> or <paramref name="values"/> is null.</exception>
  461. public static IObservable<TSource> StartWith<TSource>(this IObservable<TSource> source, IScheduler scheduler, params TSource[] values)
  462. {
  463. if (source == null)
  464. throw new ArgumentNullException(nameof(source));
  465. if (scheduler == null)
  466. throw new ArgumentNullException(nameof(scheduler));
  467. if (values == null)
  468. throw new ArgumentNullException(nameof(values));
  469. return s_impl.StartWith<TSource>(source, scheduler, values);
  470. }
  471. /// <summary>
  472. /// Prepends a sequence of values to an observable sequence.
  473. /// </summary>
  474. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  475. /// <param name="source">Source sequence to prepend values to.</param>
  476. /// <param name="scheduler">Scheduler to emit the prepended values on.</param>
  477. /// <param name="values">Values to prepend to the specified sequence.</param>
  478. /// <returns>The source sequence prepended with the specified values.</returns>
  479. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="scheduler"/> or <paramref name="values"/> is null.</exception>
  480. public static IObservable<TSource> StartWith<TSource>(this IObservable<TSource> source, IScheduler scheduler, IEnumerable<TSource> values)
  481. {
  482. //
  483. // NOTE: For some reason, someone introduced this signature which is inconsistent with the Rx pattern of putting the IScheduler last.
  484. // We can't change it at this point because of compatibility.
  485. //
  486. if (source == null)
  487. throw new ArgumentNullException(nameof(source));
  488. if (scheduler == null)
  489. throw new ArgumentNullException(nameof(scheduler));
  490. if (values == null)
  491. throw new ArgumentNullException(nameof(values));
  492. return s_impl.StartWith<TSource>(source, scheduler, values);
  493. }
  494. #endregion
  495. #region + TakeLast +
  496. /// <summary>
  497. /// Returns a specified number of contiguous elements from the end of an observable sequence.
  498. /// </summary>
  499. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  500. /// <param name="source">Source sequence.</param>
  501. /// <param name="count">Number of elements to take from the end of the source sequence.</param>
  502. /// <returns>An observable sequence containing the specified number of elements from the end of the source sequence.</returns>
  503. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  504. /// <exception cref="ArgumentOutOfRangeException"><paramref name="count"/> is less than zero.</exception>
  505. /// <remarks>
  506. /// This operator accumulates a buffer with a length enough to store elements <paramref name="count"/> elements. Upon completion of
  507. /// the source sequence, this buffer is drained on the result sequence. This causes the elements to be delayed.
  508. /// </remarks>
  509. public static IObservable<TSource> TakeLast<TSource>(this IObservable<TSource> source, int count)
  510. {
  511. if (source == null)
  512. throw new ArgumentNullException(nameof(source));
  513. if (count < 0)
  514. throw new ArgumentOutOfRangeException(nameof(count));
  515. return s_impl.TakeLast<TSource>(source, count);
  516. }
  517. /// <summary>
  518. /// Returns a specified number of contiguous elements from the end of an observable sequence, using the specified scheduler to drain the queue.
  519. /// </summary>
  520. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  521. /// <param name="source">Source sequence.</param>
  522. /// <param name="count">Number of elements to take from the end of the source sequence.</param>
  523. /// <param name="scheduler">Scheduler used to drain the queue upon completion of the source sequence.</param>
  524. /// <returns>An observable sequence containing the specified number of elements from the end of the source sequence.</returns>
  525. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="scheduler"/> is null.</exception>
  526. /// <exception cref="ArgumentOutOfRangeException"><paramref name="count"/> is less than zero.</exception>
  527. /// <remarks>
  528. /// This operator accumulates a buffer with a length enough to store elements <paramref name="count"/> elements. Upon completion of
  529. /// the source sequence, this buffer is drained on the result sequence. This causes the elements to be delayed.
  530. /// </remarks>
  531. public static IObservable<TSource> TakeLast<TSource>(this IObservable<TSource> source, int count, IScheduler scheduler)
  532. {
  533. if (source == null)
  534. throw new ArgumentNullException(nameof(source));
  535. if (count < 0)
  536. throw new ArgumentOutOfRangeException(nameof(count));
  537. if (scheduler == null)
  538. throw new ArgumentNullException(nameof(scheduler));
  539. return s_impl.TakeLast<TSource>(source, count, scheduler);
  540. }
  541. #endregion
  542. #region + TakeLastBuffer +
  543. /// <summary>
  544. /// Returns a list with the specified number of contiguous elements from the end of an observable sequence.
  545. /// </summary>
  546. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  547. /// <param name="source">Source sequence.</param>
  548. /// <param name="count">Number of elements to take from the end of the source sequence.</param>
  549. /// <returns>An observable sequence containing a single list with the specified number of elements from the end of the source sequence.</returns>
  550. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  551. /// <exception cref="ArgumentOutOfRangeException"><paramref name="count"/> is less than zero.</exception>
  552. /// <remarks>
  553. /// This operator accumulates a buffer with a length enough to store <paramref name="count"/> elements. Upon completion of the
  554. /// source sequence, this buffer is produced on the result sequence.
  555. /// </remarks>
  556. public static IObservable<IList<TSource>> TakeLastBuffer<TSource>(this IObservable<TSource> source, int count)
  557. {
  558. if (source == null)
  559. throw new ArgumentNullException(nameof(source));
  560. if (count < 0)
  561. throw new ArgumentOutOfRangeException(nameof(count));
  562. return s_impl.TakeLastBuffer<TSource>(source, count);
  563. }
  564. #endregion
  565. #region + Window +
  566. /// <summary>
  567. /// Projects each element of an observable sequence into consecutive non-overlapping windows which are produced based on element count information.
  568. /// </summary>
  569. /// <typeparam name="TSource">The type of the elements in the source sequence, and in the windows in the result sequence.</typeparam>
  570. /// <param name="source">Source sequence to produce windows over.</param>
  571. /// <param name="count">Length of each window.</param>
  572. /// <returns>An observable sequence of windows.</returns>
  573. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  574. /// <exception cref="ArgumentOutOfRangeException"><paramref name="count"/> is less than or equal to zero.</exception>
  575. public static IObservable<IObservable<TSource>> Window<TSource>(this IObservable<TSource> source, int count)
  576. {
  577. if (source == null)
  578. throw new ArgumentNullException(nameof(source));
  579. if (count <= 0)
  580. throw new ArgumentOutOfRangeException(nameof(count));
  581. return s_impl.Window<TSource>(source, count);
  582. }
  583. /// <summary>
  584. /// Projects each element of an observable sequence into zero or more windows which are produced based on element count information.
  585. /// </summary>
  586. /// <typeparam name="TSource">The type of the elements in the source sequence, and in the windows in the result sequence.</typeparam>
  587. /// <param name="source">Source sequence to produce windows over.</param>
  588. /// <param name="count">Length of each window.</param>
  589. /// <param name="skip">Number of elements to skip between creation of consecutive windows.</param>
  590. /// <returns>An observable sequence of windows.</returns>
  591. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  592. /// <exception cref="ArgumentOutOfRangeException"><paramref name="count"/> or <paramref name="skip"/> is less than or equal to zero.</exception>
  593. public static IObservable<IObservable<TSource>> Window<TSource>(this IObservable<TSource> source, int count, int skip)
  594. {
  595. if (source == null)
  596. throw new ArgumentNullException(nameof(source));
  597. if (count <= 0)
  598. throw new ArgumentOutOfRangeException(nameof(count));
  599. if (skip <= 0)
  600. throw new ArgumentOutOfRangeException(nameof(skip));
  601. return s_impl.Window<TSource>(source, count, skip);
  602. }
  603. #endregion
  604. }
  605. }