Observable.Creation.cs 38 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Reactive.Concurrency;
  6. using System.Threading;
  7. #if !NO_TPL
  8. using System.Threading.Tasks;
  9. #endif
  10. namespace System.Reactive.Linq
  11. {
  12. public static partial class Observable
  13. {
  14. #region + Create +
  15. /// <summary>
  16. /// Creates an observable sequence from a specified Subscribe method implementation.
  17. /// </summary>
  18. /// <typeparam name="TResult">The type of the elements in the produced sequence.</typeparam>
  19. /// <param name="subscribe">Implementation of the resulting observable sequence's Subscribe method.</param>
  20. /// <returns>The observable sequence with the specified implementation for the Subscribe method.</returns>
  21. /// <exception cref="ArgumentNullException"><paramref name="subscribe"/> is null.</exception>
  22. /// <remarks>
  23. /// Use of this operator is preferred over manual implementation of the IObservable&lt;T&gt; interface. In case
  24. /// you need a type implementing IObservable&lt;T&gt; rather than an anonymous implementation, consider using
  25. /// the <see cref="System.Reactive.ObservableBase&lt;T&gt;"/> abstract base class.
  26. /// </remarks>
  27. public static IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, IDisposable> subscribe)
  28. {
  29. if (subscribe == null)
  30. throw new ArgumentNullException(nameof(subscribe));
  31. return s_impl.Create<TResult>(subscribe);
  32. }
  33. /// <summary>
  34. /// Creates an observable sequence from a specified Subscribe method implementation.
  35. /// </summary>
  36. /// <typeparam name="TResult">The type of the elements in the produced sequence.</typeparam>
  37. /// <param name="subscribe">Implementation of the resulting observable sequence's Subscribe method, returning an Action delegate that will be wrapped in an IDisposable.</param>
  38. /// <returns>The observable sequence with the specified implementation for the Subscribe method.</returns>
  39. /// <exception cref="ArgumentNullException"><paramref name="subscribe"/> is null.</exception>
  40. /// <remarks>
  41. /// Use of this operator is preferred over manual implementation of the IObservable&lt;T&gt; interface. In case
  42. /// you need a type implementing IObservable&lt;T&gt; rather than an anonymous implementation, consider using
  43. /// the <see cref="System.Reactive.ObservableBase&lt;T&gt;"/> abstract base class.
  44. /// </remarks>
  45. public static IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, Action> subscribe)
  46. {
  47. if (subscribe == null)
  48. throw new ArgumentNullException(nameof(subscribe));
  49. return s_impl.Create<TResult>(subscribe);
  50. }
  51. #endregion
  52. #region + CreateAsync +
  53. #if !NO_TPL
  54. /// <summary>
  55. /// Creates an observable sequence from a specified cancellable asynchronous Subscribe method.
  56. /// The CancellationToken passed to the asynchronous Subscribe method is tied to the returned disposable subscription, allowing best-effort cancellation.
  57. /// </summary>
  58. /// <typeparam name="TResult">The type of the elements in the produced sequence.</typeparam>
  59. /// <param name="subscribeAsync">Asynchronous method used to produce elements.</param>
  60. /// <returns>The observable sequence surfacing the elements produced by the asynchronous method.</returns>
  61. /// <exception cref="ArgumentNullException"><paramref name="subscribeAsync"/> is null.</exception>
  62. /// <remarks>This operator is especially useful in conjunction with the asynchronous programming features introduced in C# 5.0 and Visual Basic 11.</remarks>
  63. /// <remarks>When a subscription to the resulting sequence is disposed, the CancellationToken that was fed to the asynchronous subscribe function will be signaled.</remarks>
  64. public static IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, CancellationToken, Task> subscribeAsync)
  65. {
  66. if (subscribeAsync == null)
  67. throw new ArgumentNullException(nameof(subscribeAsync));
  68. return s_impl.Create<TResult>(subscribeAsync);
  69. }
  70. /// <summary>
  71. /// Creates an observable sequence from a specified asynchronous Subscribe method.
  72. /// </summary>
  73. /// <typeparam name="TResult">The type of the elements in the produced sequence.</typeparam>
  74. /// <param name="subscribeAsync">Asynchronous method used to produce elements.</param>
  75. /// <returns>The observable sequence surfacing the elements produced by the asynchronous method.</returns>
  76. /// <exception cref="ArgumentNullException"><paramref name="subscribeAsync"/> is null.</exception>
  77. /// <remarks>This operator is especially useful in conjunction with the asynchronous programming features introduced in C# 5.0 and Visual Basic 11.</remarks>
  78. public static IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, Task> subscribeAsync)
  79. {
  80. if (subscribeAsync == null)
  81. throw new ArgumentNullException(nameof(subscribeAsync));
  82. return s_impl.Create<TResult>(subscribeAsync);
  83. }
  84. /// <summary>
  85. /// Creates an observable sequence from a specified cancellable asynchronous Subscribe method.
  86. /// The CancellationToken passed to the asynchronous Subscribe method is tied to the returned disposable subscription, allowing best-effort cancellation.
  87. /// </summary>
  88. /// <typeparam name="TResult">The type of the elements in the produced sequence.</typeparam>
  89. /// <param name="subscribeAsync">Asynchronous method used to implement the resulting sequence's Subscribe method.</param>
  90. /// <returns>The observable sequence with the specified implementation for the Subscribe method.</returns>
  91. /// <exception cref="ArgumentNullException"><paramref name="subscribeAsync"/> is null.</exception>
  92. /// <remarks>This operator is especially useful in conjunction with the asynchronous programming features introduced in C# 5.0 and Visual Basic 11.</remarks>
  93. /// <remarks>When a subscription to the resulting sequence is disposed, the CancellationToken that was fed to the asynchronous subscribe function will be signaled.</remarks>
  94. public static IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, CancellationToken, Task<IDisposable>> subscribeAsync)
  95. {
  96. if (subscribeAsync == null)
  97. throw new ArgumentNullException(nameof(subscribeAsync));
  98. return s_impl.Create<TResult>(subscribeAsync);
  99. }
  100. /// <summary>
  101. /// Creates an observable sequence from a specified asynchronous Subscribe method.
  102. /// </summary>
  103. /// <typeparam name="TResult">The type of the elements in the produced sequence.</typeparam>
  104. /// <param name="subscribeAsync">Asynchronous method used to implement the resulting sequence's Subscribe method.</param>
  105. /// <returns>The observable sequence with the specified implementation for the Subscribe method.</returns>
  106. /// <exception cref="ArgumentNullException"><paramref name="subscribeAsync"/> is null.</exception>
  107. /// <remarks>This operator is especially useful in conjunction with the asynchronous programming features introduced in C# 5.0 and Visual Basic 11.</remarks>
  108. public static IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, Task<IDisposable>> subscribeAsync)
  109. {
  110. if (subscribeAsync == null)
  111. throw new ArgumentNullException(nameof(subscribeAsync));
  112. return s_impl.Create<TResult>(subscribeAsync);
  113. }
  114. /// <summary>
  115. /// Creates an observable sequence from a specified cancellable asynchronous Subscribe method.
  116. /// The CancellationToken passed to the asynchronous Subscribe method is tied to the returned disposable subscription, allowing best-effort cancellation.
  117. /// </summary>
  118. /// <typeparam name="TResult">The type of the elements in the produced sequence.</typeparam>
  119. /// <param name="subscribeAsync">Asynchronous method used to implement the resulting sequence's Subscribe method, returning an Action delegate that will be wrapped in an IDisposable.</param>
  120. /// <returns>The observable sequence with the specified implementation for the Subscribe method.</returns>
  121. /// <exception cref="ArgumentNullException"><paramref name="subscribeAsync"/> is null.</exception>
  122. /// <remarks>This operator is especially useful in conjunction with the asynchronous programming features introduced in C# 5.0 and Visual Basic 11.</remarks>
  123. /// <remarks>When a subscription to the resulting sequence is disposed, the CancellationToken that was fed to the asynchronous subscribe function will be signaled.</remarks>
  124. public static IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, CancellationToken, Task<Action>> subscribeAsync)
  125. {
  126. if (subscribeAsync == null)
  127. throw new ArgumentNullException(nameof(subscribeAsync));
  128. return s_impl.Create<TResult>(subscribeAsync);
  129. }
  130. /// <summary>
  131. /// Creates an observable sequence from a specified asynchronous Subscribe method.
  132. /// </summary>
  133. /// <typeparam name="TResult">The type of the elements in the produced sequence.</typeparam>
  134. /// <param name="subscribeAsync">Asynchronous method used to implement the resulting sequence's Subscribe method, returning an Action delegate that will be wrapped in an IDisposable.</param>
  135. /// <returns>The observable sequence with the specified implementation for the Subscribe method.</returns>
  136. /// <exception cref="ArgumentNullException"><paramref name="subscribeAsync"/> is null.</exception>
  137. /// <remarks>This operator is especially useful in conjunction with the asynchronous programming features introduced in C# 5.0 and Visual Basic 11.</remarks>
  138. public static IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, Task<Action>> subscribeAsync)
  139. {
  140. if (subscribeAsync == null)
  141. throw new ArgumentNullException(nameof(subscribeAsync));
  142. return s_impl.Create<TResult>(subscribeAsync);
  143. }
  144. #endif
  145. #endregion
  146. #region + Defer +
  147. /// <summary>
  148. /// Returns an observable sequence that invokes the specified factory function whenever a new observer subscribes.
  149. /// </summary>
  150. /// <typeparam name="TResult">The type of the elements in the sequence returned by the factory function, and in the resulting sequence.</typeparam>
  151. /// <param name="observableFactory">Observable factory function to invoke for each observer that subscribes to the resulting sequence.</param>
  152. /// <returns>An observable sequence whose observers trigger an invocation of the given observable factory function.</returns>
  153. /// <exception cref="ArgumentNullException"><paramref name="observableFactory"/> is null.</exception>
  154. public static IObservable<TResult> Defer<TResult>(Func<IObservable<TResult>> observableFactory)
  155. {
  156. if (observableFactory == null)
  157. throw new ArgumentNullException(nameof(observableFactory));
  158. return s_impl.Defer<TResult>(observableFactory);
  159. }
  160. #endregion
  161. #region + DeferAsync +
  162. #if !NO_TPL
  163. /// <summary>
  164. /// Returns an observable sequence that starts the specified asynchronous factory function whenever a new observer subscribes.
  165. /// </summary>
  166. /// <typeparam name="TResult">The type of the elements in the sequence returned by the factory function, and in the resulting sequence.</typeparam>
  167. /// <param name="observableFactoryAsync">Asynchronous factory function to start for each observer that subscribes to the resulting sequence.</param>
  168. /// <returns>An observable sequence whose observers trigger the given asynchronous observable factory function to be started.</returns>
  169. /// <exception cref="ArgumentNullException"><paramref name="observableFactoryAsync"/> is null.</exception>
  170. /// <remarks>This operator is especially useful in conjunction with the asynchronous programming features introduced in C# 5.0 and Visual Basic 11.</remarks>
  171. public static IObservable<TResult> Defer<TResult>(Func<Task<IObservable<TResult>>> observableFactoryAsync)
  172. {
  173. if (observableFactoryAsync == null)
  174. throw new ArgumentNullException(nameof(observableFactoryAsync));
  175. return s_impl.Defer<TResult>(observableFactoryAsync);
  176. }
  177. /// <summary>
  178. /// Returns an observable sequence that starts the specified cancellable asynchronous factory function whenever a new observer subscribes.
  179. /// The CancellationToken passed to the asynchronous factory function is tied to the returned disposable subscription, allowing best-effort cancellation.
  180. /// </summary>
  181. /// <typeparam name="TResult">The type of the elements in the sequence returned by the factory function, and in the resulting sequence.</typeparam>
  182. /// <param name="observableFactoryAsync">Asynchronous factory function to start for each observer that subscribes to the resulting sequence.</param>
  183. /// <returns>An observable sequence whose observers trigger the given asynchronous observable factory function to be started.</returns>
  184. /// <exception cref="ArgumentNullException"><paramref name="observableFactoryAsync"/> is null.</exception>
  185. /// <remarks>This operator is especially useful in conjunction with the asynchronous programming features introduced in C# 5.0 and Visual Basic 11.</remarks>
  186. /// <remarks>When a subscription to the resulting sequence is disposed, the CancellationToken that was fed to the asynchronous observable factory function will be signaled.</remarks>
  187. public static IObservable<TResult> DeferAsync<TResult>(Func<CancellationToken, Task<IObservable<TResult>>> observableFactoryAsync)
  188. {
  189. if (observableFactoryAsync == null)
  190. throw new ArgumentNullException(nameof(observableFactoryAsync));
  191. return s_impl.Defer<TResult>(observableFactoryAsync);
  192. }
  193. #endif
  194. #endregion
  195. #region + Empty +
  196. /// <summary>
  197. /// Returns an empty observable sequence.
  198. /// </summary>
  199. /// <typeparam name="TResult">The type used for the IObservable&lt;T&gt; type parameter of the resulting sequence.</typeparam>
  200. /// <returns>An observable sequence with no elements.</returns>
  201. public static IObservable<TResult> Empty<TResult>()
  202. {
  203. return s_impl.Empty<TResult>();
  204. }
  205. /// <summary>
  206. /// Returns an empty observable sequence.
  207. /// </summary>
  208. /// <typeparam name="TResult">The type used for the IObservable&lt;T&gt; type parameter of the resulting sequence.</typeparam>
  209. /// <param name="witness">Object solely used to infer the type of the <typeparamref name="TResult"/> type parameter. This parameter is typically used when creating a sequence of anonymously typed elements.</param>
  210. /// <returns>An observable sequence with no elements.</returns>
  211. public static IObservable<TResult> Empty<TResult>(TResult witness)
  212. {
  213. return s_impl.Empty<TResult>(); // Pure inference - no specialized target method.
  214. }
  215. /// <summary>
  216. /// Returns an empty observable sequence, using the specified scheduler to send out the single OnCompleted message.
  217. /// </summary>
  218. /// <typeparam name="TResult">The type used for the IObservable&lt;T&gt; type parameter of the resulting sequence.</typeparam>
  219. /// <param name="scheduler">Scheduler to send the termination call on.</param>
  220. /// <returns>An observable sequence with no elements.</returns>
  221. /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception>
  222. public static IObservable<TResult> Empty<TResult>(IScheduler scheduler)
  223. {
  224. if (scheduler == null)
  225. throw new ArgumentNullException(nameof(scheduler));
  226. return s_impl.Empty<TResult>(scheduler);
  227. }
  228. /// <summary>
  229. /// Returns an empty observable sequence, using the specified scheduler to send out the single OnCompleted message.
  230. /// </summary>
  231. /// <typeparam name="TResult">The type used for the IObservable&lt;T&gt; type parameter of the resulting sequence.</typeparam>
  232. /// <param name="scheduler">Scheduler to send the termination call on.</param>
  233. /// <param name="witness">Object solely used to infer the type of the <typeparamref name="TResult"/> type parameter. This parameter is typically used when creating a sequence of anonymously typed elements.</param>
  234. /// <returns>An observable sequence with no elements.</returns>
  235. /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception>
  236. public static IObservable<TResult> Empty<TResult>(IScheduler scheduler, TResult witness)
  237. {
  238. if (scheduler == null)
  239. throw new ArgumentNullException(nameof(scheduler));
  240. return s_impl.Empty<TResult>(scheduler); // Pure inference - no specialized target method.
  241. }
  242. #endregion
  243. #region + Generate +
  244. /// <summary>
  245. /// Generates an observable sequence by running a state-driven loop producing the sequence's elements.
  246. /// </summary>
  247. /// <typeparam name="TState">The type of the state used in the generator loop.</typeparam>
  248. /// <typeparam name="TResult">The type of the elements in the produced sequence.</typeparam>
  249. /// <param name="initialState">Initial state.</param>
  250. /// <param name="condition">Condition to terminate generation (upon returning false).</param>
  251. /// <param name="iterate">Iteration step function.</param>
  252. /// <param name="resultSelector">Selector function for results produced in the sequence.</param>
  253. /// <returns>The generated sequence.</returns>
  254. /// <exception cref="ArgumentNullException"><paramref name="condition"/> or <paramref name="iterate"/> or <paramref name="resultSelector"/> is null.</exception>
  255. public static IObservable<TResult> Generate<TState, TResult>(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector)
  256. {
  257. if (condition == null)
  258. throw new ArgumentNullException(nameof(condition));
  259. if (iterate == null)
  260. throw new ArgumentNullException(nameof(iterate));
  261. if (resultSelector == null)
  262. throw new ArgumentNullException(nameof(resultSelector));
  263. return s_impl.Generate<TState, TResult>(initialState, condition, iterate, resultSelector);
  264. }
  265. /// <summary>
  266. /// Generates an observable sequence by running a state-driven loop producing the sequence's elements, using the specified scheduler to send out observer messages.
  267. /// </summary>
  268. /// <typeparam name="TState">The type of the state used in the generator loop.</typeparam>
  269. /// <typeparam name="TResult">The type of the elements in the produced sequence.</typeparam>
  270. /// <param name="initialState">Initial state.</param>
  271. /// <param name="condition">Condition to terminate generation (upon returning false).</param>
  272. /// <param name="iterate">Iteration step function.</param>
  273. /// <param name="resultSelector">Selector function for results produced in the sequence.</param>
  274. /// <param name="scheduler">Scheduler on which to run the generator loop.</param>
  275. /// <returns>The generated sequence.</returns>
  276. /// <exception cref="ArgumentNullException"><paramref name="condition"/> or <paramref name="iterate"/> or <paramref name="resultSelector"/> or <paramref name="scheduler"/> is null.</exception>
  277. public static IObservable<TResult> Generate<TState, TResult>(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, IScheduler scheduler)
  278. {
  279. if (condition == null)
  280. throw new ArgumentNullException(nameof(condition));
  281. if (iterate == null)
  282. throw new ArgumentNullException(nameof(iterate));
  283. if (resultSelector == null)
  284. throw new ArgumentNullException(nameof(resultSelector));
  285. if (scheduler == null)
  286. throw new ArgumentNullException(nameof(scheduler));
  287. return s_impl.Generate<TState, TResult>(initialState, condition, iterate, resultSelector, scheduler);
  288. }
  289. #endregion
  290. #region + Never +
  291. /// <summary>
  292. /// Returns a non-terminating observable sequence, which can be used to denote an infinite duration (e.g. when using reactive joins).
  293. /// </summary>
  294. /// <typeparam name="TResult">The type used for the IObservable&lt;T&gt; type parameter of the resulting sequence.</typeparam>
  295. /// <returns>An observable sequence whose observers will never get called.</returns>
  296. public static IObservable<TResult> Never<TResult>()
  297. {
  298. return s_impl.Never<TResult>();
  299. }
  300. /// <summary>
  301. /// Returns a non-terminating observable sequence, which can be used to denote an infinite duration (e.g. when using reactive joins).
  302. /// </summary>
  303. /// <typeparam name="TResult">The type used for the IObservable&lt;T&gt; type parameter of the resulting sequence.</typeparam>
  304. /// <param name="witness">Object solely used to infer the type of the <typeparamref name="TResult"/> type parameter. This parameter is typically used when creating a sequence of anonymously typed elements.</param>
  305. /// <returns>An observable sequence whose observers will never get called.</returns>
  306. public static IObservable<TResult> Never<TResult>(TResult witness)
  307. {
  308. return s_impl.Never<TResult>(); // Pure inference - no specialized target method.
  309. }
  310. #endregion
  311. #region + Range +
  312. /// <summary>
  313. /// Generates an observable sequence of integral numbers within a specified range.
  314. /// </summary>
  315. /// <param name="start">The value of the first integer in the sequence.</param>
  316. /// <param name="count">The number of sequential integers to generate.</param>
  317. /// <returns>An observable sequence that contains a range of sequential integral numbers.</returns>
  318. /// <exception cref="ArgumentOutOfRangeException"><paramref name="count"/> is less than zero. -or- <paramref name="start"/> + <paramref name="count"/> - 1 is larger than <see cref="M:System.Int32.MaxValue"/>.</exception>
  319. public static IObservable<int> Range(int start, int count)
  320. {
  321. var max = ((long)start) + count - 1;
  322. if (count < 0 || max > int.MaxValue)
  323. throw new ArgumentOutOfRangeException(nameof(count));
  324. return s_impl.Range(start, count);
  325. }
  326. /// <summary>
  327. /// Generates an observable sequence of integral numbers within a specified range, using the specified scheduler to send out observer messages.
  328. /// </summary>
  329. /// <param name="start">The value of the first integer in the sequence.</param>
  330. /// <param name="count">The number of sequential integers to generate.</param>
  331. /// <param name="scheduler">Scheduler to run the generator loop on.</param>
  332. /// <returns>An observable sequence that contains a range of sequential integral numbers.</returns>
  333. /// <exception cref="ArgumentOutOfRangeException"><paramref name="count"/> is less than zero. -or- <paramref name="start"/> + <paramref name="count"/> - 1 is larger than <see cref="M:System.Int32.MaxValue"/>.</exception>
  334. /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception>
  335. public static IObservable<int> Range(int start, int count, IScheduler scheduler)
  336. {
  337. var max = ((long)start) + count - 1;
  338. if (count < 0 || max > int.MaxValue)
  339. throw new ArgumentOutOfRangeException(nameof(count));
  340. if (scheduler == null)
  341. throw new ArgumentNullException(nameof(scheduler));
  342. return s_impl.Range(start, count, scheduler);
  343. }
  344. #endregion
  345. #region + Repeat +
  346. /// <summary>
  347. /// Generates an observable sequence that repeats the given element infinitely.
  348. /// </summary>
  349. /// <typeparam name="TResult">The type of the element that will be repeated in the produced sequence.</typeparam>
  350. /// <param name="value">Element to repeat.</param>
  351. /// <returns>An observable sequence that repeats the given element infinitely.</returns>
  352. public static IObservable<TResult> Repeat<TResult>(TResult value)
  353. {
  354. return s_impl.Repeat<TResult>(value);
  355. }
  356. /// <summary>
  357. /// Generates an observable sequence that repeats the given element infinitely, using the specified scheduler to send out observer messages.
  358. /// </summary>
  359. /// <typeparam name="TResult">The type of the element that will be repeated in the produced sequence.</typeparam>
  360. /// <param name="value">Element to repeat.</param>
  361. /// <param name="scheduler">Scheduler to run the producer loop on.</param>
  362. /// <returns>An observable sequence that repeats the given element infinitely.</returns>
  363. /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception>
  364. public static IObservable<TResult> Repeat<TResult>(TResult value, IScheduler scheduler)
  365. {
  366. if (scheduler == null)
  367. throw new ArgumentNullException(nameof(scheduler));
  368. return s_impl.Repeat<TResult>(value, scheduler);
  369. }
  370. /// <summary>
  371. /// Generates an observable sequence that repeats the given element the specified number of times.
  372. /// </summary>
  373. /// <typeparam name="TResult">The type of the element that will be repeated in the produced sequence.</typeparam>
  374. /// <param name="value">Element to repeat.</param>
  375. /// <param name="repeatCount">Number of times to repeat the element.</param>
  376. /// <returns>An observable sequence that repeats the given element the specified number of times.</returns>
  377. /// <exception cref="ArgumentOutOfRangeException"><paramref name="repeatCount"/> is less than zero.</exception>
  378. public static IObservable<TResult> Repeat<TResult>(TResult value, int repeatCount)
  379. {
  380. if (repeatCount < 0)
  381. throw new ArgumentOutOfRangeException(nameof(repeatCount));
  382. return s_impl.Repeat<TResult>(value, repeatCount);
  383. }
  384. /// <summary>
  385. /// Generates an observable sequence that repeats the given element the specified number of times, using the specified scheduler to send out observer messages.
  386. /// </summary>
  387. /// <typeparam name="TResult">The type of the element that will be repeated in the produced sequence.</typeparam>
  388. /// <param name="value">Element to repeat.</param>
  389. /// <param name="repeatCount">Number of times to repeat the element.</param>
  390. /// <param name="scheduler">Scheduler to run the producer loop on.</param>
  391. /// <returns>An observable sequence that repeats the given element the specified number of times.</returns>
  392. /// <exception cref="ArgumentOutOfRangeException"><paramref name="repeatCount"/> is less than zero.</exception>
  393. /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception>
  394. public static IObservable<TResult> Repeat<TResult>(TResult value, int repeatCount, IScheduler scheduler)
  395. {
  396. if (repeatCount < 0)
  397. throw new ArgumentOutOfRangeException(nameof(repeatCount));
  398. if (scheduler == null)
  399. throw new ArgumentNullException(nameof(scheduler));
  400. return s_impl.Repeat<TResult>(value, repeatCount, scheduler);
  401. }
  402. #endregion
  403. #region + Return +
  404. /// <summary>
  405. /// Returns an observable sequence that contains a single element.
  406. /// </summary>
  407. /// <typeparam name="TResult">The type of the element that will be returned in the produced sequence.</typeparam>
  408. /// <param name="value">Single element in the resulting observable sequence.</param>
  409. /// <returns>An observable sequence containing the single specified element.</returns>
  410. public static IObservable<TResult> Return<TResult>(TResult value)
  411. {
  412. return s_impl.Return<TResult>(value);
  413. }
  414. /// <summary>
  415. /// Returns an observable sequence that contains a single element, using the specified scheduler to send out observer messages.
  416. /// </summary>
  417. /// <typeparam name="TResult">The type of the element that will be returned in the produced sequence.</typeparam>
  418. /// <param name="value">Single element in the resulting observable sequence.</param>
  419. /// <param name="scheduler">Scheduler to send the single element on.</param>
  420. /// <returns>An observable sequence containing the single specified element.</returns>
  421. /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> is null.</exception>
  422. public static IObservable<TResult> Return<TResult>(TResult value, IScheduler scheduler)
  423. {
  424. if (scheduler == null)
  425. throw new ArgumentNullException(nameof(scheduler));
  426. return s_impl.Return<TResult>(value, scheduler);
  427. }
  428. #endregion
  429. #region + Throw +
  430. /// <summary>
  431. /// Returns an observable sequence that terminates with an exception.
  432. /// </summary>
  433. /// <typeparam name="TResult">The type used for the IObservable&lt;T&gt; type parameter of the resulting sequence.</typeparam>
  434. /// <param name="exception">Exception object used for the sequence's termination.</param>
  435. /// <returns>The observable sequence that terminates exceptionally with the specified exception object.</returns>
  436. /// <exception cref="ArgumentNullException"><paramref name="exception"/> is null.</exception>
  437. public static IObservable<TResult> Throw<TResult>(Exception exception)
  438. {
  439. if (exception == null)
  440. throw new ArgumentNullException(nameof(exception));
  441. return s_impl.Throw<TResult>(exception);
  442. }
  443. /// <summary>
  444. /// Returns an observable sequence that terminates with an exception.
  445. /// </summary>
  446. /// <typeparam name="TResult">The type used for the IObservable&lt;T&gt; type parameter of the resulting sequence.</typeparam>
  447. /// <param name="exception">Exception object used for the sequence's termination.</param>
  448. /// <param name="witness">Object solely used to infer the type of the <typeparamref name="TResult"/> type parameter. This parameter is typically used when creating a sequence of anonymously typed elements.</param>
  449. /// <returns>The observable sequence that terminates exceptionally with the specified exception object.</returns>
  450. /// <exception cref="ArgumentNullException"><paramref name="exception"/> is null.</exception>
  451. public static IObservable<TResult> Throw<TResult>(Exception exception, TResult witness)
  452. {
  453. if (exception == null)
  454. throw new ArgumentNullException(nameof(exception));
  455. return s_impl.Throw<TResult>(exception); // Pure inference - no specialized target method.
  456. }
  457. /// <summary>
  458. /// Returns an observable sequence that terminates with an exception, using the specified scheduler to send out the single OnError message.
  459. /// </summary>
  460. /// <typeparam name="TResult">The type used for the IObservable&lt;T&gt; type parameter of the resulting sequence.</typeparam>
  461. /// <param name="exception">Exception object used for the sequence's termination.</param>
  462. /// <param name="scheduler">Scheduler to send the exceptional termination call on.</param>
  463. /// <returns>The observable sequence that terminates exceptionally with the specified exception object.</returns>
  464. /// <exception cref="ArgumentNullException"><paramref name="exception"/> or <paramref name="scheduler"/> is null.</exception>
  465. public static IObservable<TResult> Throw<TResult>(Exception exception, IScheduler scheduler)
  466. {
  467. if (exception == null)
  468. throw new ArgumentNullException(nameof(exception));
  469. if (scheduler == null)
  470. throw new ArgumentNullException(nameof(scheduler));
  471. return s_impl.Throw<TResult>(exception, scheduler);
  472. }
  473. /// <summary>
  474. /// Returns an observable sequence that terminates with an exception, using the specified scheduler to send out the single OnError message.
  475. /// </summary>
  476. /// <typeparam name="TResult">The type used for the IObservable&lt;T&gt; type parameter of the resulting sequence.</typeparam>
  477. /// <param name="exception">Exception object used for the sequence's termination.</param>
  478. /// <param name="scheduler">Scheduler to send the exceptional termination call on.</param>
  479. /// <param name="witness">Object solely used to infer the type of the <typeparamref name="TResult"/> type parameter. This parameter is typically used when creating a sequence of anonymously typed elements.</param>
  480. /// <returns>The observable sequence that terminates exceptionally with the specified exception object.</returns>
  481. /// <exception cref="ArgumentNullException"><paramref name="exception"/> or <paramref name="scheduler"/> is null.</exception>
  482. public static IObservable<TResult> Throw<TResult>(Exception exception, IScheduler scheduler, TResult witness)
  483. {
  484. if (exception == null)
  485. throw new ArgumentNullException(nameof(exception));
  486. if (scheduler == null)
  487. throw new ArgumentNullException(nameof(scheduler));
  488. return s_impl.Throw<TResult>(exception, scheduler); // Pure inference - no specialized target method.
  489. }
  490. #endregion
  491. #region + Using +
  492. /// <summary>
  493. /// Constructs an observable sequence that depends on a resource object, whose lifetime is tied to the resulting observable sequence's lifetime.
  494. /// </summary>
  495. /// <typeparam name="TResult">The type of the elements in the produced sequence.</typeparam>
  496. /// <typeparam name="TResource">The type of the resource used during the generation of the resulting sequence. Needs to implement <see cref="System.IDisposable"/>.</typeparam>
  497. /// <param name="resourceFactory">Factory function to obtain a resource object.</param>
  498. /// <param name="observableFactory">Factory function to obtain an observable sequence that depends on the obtained resource.</param>
  499. /// <returns>An observable sequence whose lifetime controls the lifetime of the dependent resource object.</returns>
  500. /// <exception cref="ArgumentNullException"><paramref name="resourceFactory"/> or <paramref name="observableFactory"/> is null.</exception>
  501. public static IObservable<TResult> Using<TResult, TResource>(Func<TResource> resourceFactory, Func<TResource, IObservable<TResult>> observableFactory) where TResource : IDisposable
  502. {
  503. if (resourceFactory == null)
  504. throw new ArgumentNullException(nameof(resourceFactory));
  505. if (observableFactory == null)
  506. throw new ArgumentNullException(nameof(observableFactory));
  507. return s_impl.Using<TResult, TResource>(resourceFactory, observableFactory);
  508. }
  509. #endregion
  510. #region + UsingAsync +
  511. #if !NO_TPL
  512. /// <summary>
  513. /// Constructs an observable sequence that depends on a resource object, whose lifetime is tied to the resulting observable sequence's lifetime. The resource is obtained and used through asynchronous methods.
  514. /// The CancellationToken passed to the asynchronous methods is tied to the returned disposable subscription, allowing best-effort cancellation at any stage of the resource acquisition or usage.
  515. /// </summary>
  516. /// <typeparam name="TResult">The type of the elements in the produced sequence.</typeparam>
  517. /// <typeparam name="TResource">The type of the resource used during the generation of the resulting sequence. Needs to implement <see cref="System.IDisposable"/>.</typeparam>
  518. /// <param name="resourceFactoryAsync">Asynchronous factory function to obtain a resource object.</param>
  519. /// <param name="observableFactoryAsync">Asynchronous factory function to obtain an observable sequence that depends on the obtained resource.</param>
  520. /// <returns>An observable sequence whose lifetime controls the lifetime of the dependent resource object.</returns>
  521. /// <exception cref="ArgumentNullException"><paramref name="resourceFactoryAsync"/> or <paramref name="observableFactoryAsync"/> is null.</exception>
  522. /// <remarks>This operator is especially useful in conjunction with the asynchronous programming features introduced in C# 5.0 and Visual Basic 11.</remarks>
  523. /// <remarks>When a subscription to the resulting sequence is disposed, the CancellationToken that was fed to the asynchronous resource factory and observable factory functions will be signaled.</remarks>
  524. public static IObservable<TResult> Using<TResult, TResource>(Func<CancellationToken, Task<TResource>> resourceFactoryAsync, Func<TResource, CancellationToken, Task<IObservable<TResult>>> observableFactoryAsync) where TResource : IDisposable
  525. {
  526. if (resourceFactoryAsync == null)
  527. throw new ArgumentNullException(nameof(resourceFactoryAsync));
  528. if (observableFactoryAsync == null)
  529. throw new ArgumentNullException(nameof(observableFactoryAsync));
  530. return s_impl.Using<TResult, TResource>(resourceFactoryAsync, observableFactoryAsync);
  531. }
  532. #endif
  533. #endregion
  534. }
  535. }