Observable.Imperative.cs 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Reactive.Concurrency;
  6. using System.Threading;
  7. #if !NO_TPL
  8. using System.Threading.Tasks;
  9. #endif
  10. namespace System.Reactive.Linq
  11. {
  12. public static partial class Observable
  13. {
  14. #region + ForEachAsync +
  15. #if !NO_TPL
  16. /// <summary>
  17. /// Invokes an action for each element in the observable sequence, and returns a Task object that will get signaled when the sequence terminates.
  18. /// </summary>
  19. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  20. /// <param name="source">Source sequence.</param>
  21. /// <param name="onNext">Action to invoke for each element in the observable sequence.</param>
  22. /// <returns>Task that signals the termination of the sequence.</returns>
  23. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="onNext"/> is null.</exception>
  24. /// <remarks>This operator is especially useful in conjunction with the asynchronous programming features introduced in C# 5.0 and Visual Basic 11.</remarks>
  25. public static Task ForEachAsync<TSource>(this IObservable<TSource> source, Action<TSource> onNext)
  26. {
  27. if (source == null)
  28. throw new ArgumentNullException(nameof(source));
  29. if (onNext == null)
  30. throw new ArgumentNullException(nameof(onNext));
  31. return s_impl.ForEachAsync<TSource>(source, onNext);
  32. }
  33. /// <summary>
  34. /// Invokes an action for each element in the observable sequence, and returns a Task object that will get signaled when the sequence terminates.
  35. /// The loop can be quit prematurely by setting the specified cancellation token.
  36. /// </summary>
  37. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  38. /// <param name="source">Source sequence.</param>
  39. /// <param name="onNext">Action to invoke for each element in the observable sequence.</param>
  40. /// <param name="cancellationToken">Cancellation token used to stop the loop.</param>
  41. /// <returns>Task that signals the termination of the sequence.</returns>
  42. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="onNext"/> is null.</exception>
  43. /// <remarks>This operator is especially useful in conjunction with the asynchronous programming features introduced in C# 5.0 and Visual Basic 11.</remarks>
  44. public static Task ForEachAsync<TSource>(this IObservable<TSource> source, Action<TSource> onNext, CancellationToken cancellationToken)
  45. {
  46. if (source == null)
  47. throw new ArgumentNullException(nameof(source));
  48. if (onNext == null)
  49. throw new ArgumentNullException(nameof(onNext));
  50. return s_impl.ForEachAsync<TSource>(source, onNext, cancellationToken);
  51. }
  52. /// <summary>
  53. /// Invokes an action for each element in the observable sequence, incorporating the element's index, and returns a Task object that will get signaled when the sequence terminates.
  54. /// </summary>
  55. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  56. /// <param name="source">Source sequence.</param>
  57. /// <param name="onNext">Action to invoke for each element in the observable sequence.</param>
  58. /// <returns>Task that signals the termination of the sequence.</returns>
  59. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="onNext"/> is null.</exception>
  60. /// <remarks>This operator is especially useful in conjunction with the asynchronous programming features introduced in C# 5.0 and Visual Basic 11.</remarks>
  61. public static Task ForEachAsync<TSource>(this IObservable<TSource> source, Action<TSource, int> onNext)
  62. {
  63. if (source == null)
  64. throw new ArgumentNullException(nameof(source));
  65. if (onNext == null)
  66. throw new ArgumentNullException(nameof(onNext));
  67. return s_impl.ForEachAsync<TSource>(source, onNext);
  68. }
  69. /// <summary>
  70. /// Invokes an action for each element in the observable sequence, incorporating the element's index, and returns a Task object that will get signaled when the sequence terminates.
  71. /// The loop can be quit prematurely by setting the specified cancellation token.
  72. /// </summary>
  73. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  74. /// <param name="source">Source sequence.</param>
  75. /// <param name="onNext">Action to invoke for each element in the observable sequence.</param>
  76. /// <param name="cancellationToken">Cancellation token used to stop the loop.</param>
  77. /// <returns>Task that signals the termination of the sequence.</returns>
  78. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="onNext"/> is null.</exception>
  79. /// <remarks>This operator is especially useful in conjunction with the asynchronous programming features introduced in C# 5.0 and Visual Basic 11.</remarks>
  80. public static Task ForEachAsync<TSource>(this IObservable<TSource> source, Action<TSource, int> onNext, CancellationToken cancellationToken)
  81. {
  82. if (source == null)
  83. throw new ArgumentNullException(nameof(source));
  84. if (onNext == null)
  85. throw new ArgumentNullException(nameof(onNext));
  86. return s_impl.ForEachAsync<TSource>(source, onNext, cancellationToken);
  87. }
  88. #endif
  89. #endregion
  90. #region + Case +
  91. /// <summary>
  92. /// Uses <paramref name="selector"/> to determine which source in <paramref name="sources"/> to return, choosing <paramref name="defaultSource"/> if no match is found.
  93. /// </summary>
  94. /// <typeparam name="TValue">The type of the value returned by the selector function, used to look up the resulting source.</typeparam>
  95. /// <typeparam name="TResult">The type of the elements in the result sequence.</typeparam>
  96. /// <param name="selector">Selector function invoked to determine the source to lookup in the <paramref name="sources"/> dictionary.</param>
  97. /// <param name="sources">Dictionary of sources to select from based on the <paramref name="selector"/> invocation result.</param>
  98. /// <param name="defaultSource">Default source to select in case no matching source in <paramref name="sources"/> is found.</param>
  99. /// <returns>The observable sequence retrieved from the <paramref name="sources"/> dictionary based on the <paramref name="selector"/> invocation result, or <paramref name="defaultSource"/> if no match is found.</returns>
  100. /// <exception cref="ArgumentNullException"><paramref name="selector"/> or <paramref name="sources"/> or <paramref name="defaultSource"/> is null.</exception>
  101. public static IObservable<TResult> Case<TValue, TResult>(Func<TValue> selector, IDictionary<TValue, IObservable<TResult>> sources, IObservable<TResult> defaultSource)
  102. {
  103. if (selector == null)
  104. throw new ArgumentNullException(nameof(selector));
  105. if (sources == null)
  106. throw new ArgumentNullException(nameof(sources));
  107. if (defaultSource == null)
  108. throw new ArgumentNullException(nameof(defaultSource));
  109. return s_impl.Case<TValue, TResult>(selector, sources, defaultSource);
  110. }
  111. /// <summary>
  112. /// Uses <paramref name="selector"/> to determine which source in <paramref name="sources"/> to return, choosing an empty sequence on the specified scheduler if no match is found.
  113. /// </summary>
  114. /// <typeparam name="TValue">The type of the value returned by the selector function, used to look up the resulting source.</typeparam>
  115. /// <typeparam name="TResult">The type of the elements in the result sequence.</typeparam>
  116. /// <param name="selector">Selector function invoked to determine the source to lookup in the <paramref name="sources"/> dictionary.</param>
  117. /// <param name="sources">Dictionary of sources to select from based on the <paramref name="selector"/> invocation result.</param>
  118. /// <param name="scheduler">Scheduler to generate an empty sequence on in case no matching source in <paramref name="sources"/> is found.</param>
  119. /// <returns>The observable sequence retrieved from the <paramref name="sources"/> dictionary based on the <paramref name="selector"/> invocation result, or an empty sequence if no match is found.</returns>
  120. /// <exception cref="ArgumentNullException"><paramref name="selector"/> or <paramref name="sources"/> or <paramref name="scheduler"/> is null.</exception>
  121. public static IObservable<TResult> Case<TValue, TResult>(Func<TValue> selector, IDictionary<TValue, IObservable<TResult>> sources, IScheduler scheduler)
  122. {
  123. if (selector == null)
  124. throw new ArgumentNullException(nameof(selector));
  125. if (sources == null)
  126. throw new ArgumentNullException(nameof(sources));
  127. if (scheduler == null)
  128. throw new ArgumentNullException(nameof(scheduler));
  129. return s_impl.Case<TValue, TResult>(selector, sources, scheduler);
  130. }
  131. /// <summary>
  132. /// Uses <paramref name="selector"/> to determine which source in <paramref name="sources"/> to return, choosing an empty sequence if no match is found.
  133. /// </summary>
  134. /// <typeparam name="TValue">The type of the value returned by the selector function, used to look up the resulting source.</typeparam>
  135. /// <typeparam name="TResult">The type of the elements in the result sequence.</typeparam>
  136. /// <param name="selector">Selector function invoked to determine the source to lookup in the <paramref name="sources"/> dictionary.</param>
  137. /// <param name="sources">Dictionary of sources to select from based on the <paramref name="selector"/> invocation result.</param>
  138. /// <returns>The observable sequence retrieved from the <paramref name="sources"/> dictionary based on the <paramref name="selector"/> invocation result, or an empty sequence if no match is found.</returns>
  139. /// <exception cref="ArgumentNullException"><paramref name="selector"/> or <paramref name="sources"/> is null.</exception>
  140. public static IObservable<TResult> Case<TValue, TResult>(Func<TValue> selector, IDictionary<TValue, IObservable<TResult>> sources)
  141. {
  142. if (selector == null)
  143. throw new ArgumentNullException(nameof(selector));
  144. if (sources == null)
  145. throw new ArgumentNullException(nameof(sources));
  146. return s_impl.Case<TValue, TResult>(selector, sources);
  147. }
  148. #endregion
  149. #region + DoWhile +
  150. /// <summary>
  151. /// Repeats the given <paramref name="source"/> as long as the specified <paramref name="condition"/> holds, where the <paramref name="condition"/> is evaluated after each repeated <paramref name="source"/> completed.
  152. /// </summary>
  153. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  154. /// <param name="source">Source to repeat as long as the <paramref name="condition"/> function evaluates to true.</param>
  155. /// <param name="condition">Condition that will be evaluated upon the completion of an iteration through the <paramref name="source"/>, to determine whether repetition of the source is required.</param>
  156. /// <returns>The observable sequence obtained by concatenating the <paramref name="source"/> sequence as long as the <paramref name="condition"/> holds.</returns>
  157. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="condition"/> is null.</exception>
  158. public static IObservable<TSource> DoWhile<TSource>(this IObservable<TSource> source, Func<bool> condition)
  159. {
  160. if (source == null)
  161. throw new ArgumentNullException(nameof(source));
  162. if (condition == null)
  163. throw new ArgumentNullException(nameof(condition));
  164. return s_impl.DoWhile<TSource>(source, condition);
  165. }
  166. #endregion
  167. #region + For +
  168. /// <summary>
  169. /// Concatenates the observable sequences obtained by running the <paramref name="resultSelector"/> for each element in the given enumerable <paramref name="source"/>.
  170. /// </summary>
  171. /// <typeparam name="TSource">The type of the elements in the enumerable source sequence.</typeparam>
  172. /// <typeparam name="TResult">The type of the elements in the observable result sequence.</typeparam>
  173. /// <param name="source">Enumerable source for which each element will be mapped onto an observable source that will be concatenated in the result sequence.</param>
  174. /// <param name="resultSelector">Function to select an observable source for each element in the <paramref name="source"/>.</param>
  175. /// <returns>The observable sequence obtained by concatenating the sources returned by <paramref name="resultSelector"/> for each element in the <paramref name="source"/>.</returns>
  176. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="resultSelector"/> is null.</exception>
  177. public static IObservable<TResult> For<TSource, TResult>(IEnumerable<TSource> source, Func<TSource, IObservable<TResult>> resultSelector)
  178. {
  179. if (source == null)
  180. throw new ArgumentNullException(nameof(source));
  181. if (resultSelector == null)
  182. throw new ArgumentNullException(nameof(resultSelector));
  183. return s_impl.For<TSource, TResult>(source, resultSelector);
  184. }
  185. #endregion
  186. #region + If +
  187. /// <summary>
  188. /// If the specified <paramref name="condition"/> evaluates true, select the <paramref name="thenSource"/> sequence. Otherwise, select the <paramref name="elseSource"/> sequence.
  189. /// </summary>
  190. /// <typeparam name="TResult">The type of the elements in the result sequence.</typeparam>
  191. /// <param name="condition">Condition evaluated to decide which sequence to return.</param>
  192. /// <param name="thenSource">Sequence returned in case <paramref name="condition"/> evaluates true.</param>
  193. /// <param name="elseSource">Sequence returned in case <paramref name="condition"/> evaluates false.</param>
  194. /// <returns><paramref name="thenSource"/> if <paramref name="condition"/> evaluates true; <paramref name="elseSource"/> otherwise.</returns>
  195. /// <exception cref="ArgumentNullException"><paramref name="condition"/> or <paramref name="thenSource"/> or <paramref name="elseSource"/> is null.</exception>
  196. public static IObservable<TResult> If<TResult>(Func<bool> condition, IObservable<TResult> thenSource, IObservable<TResult> elseSource)
  197. {
  198. if (condition == null)
  199. throw new ArgumentNullException(nameof(condition));
  200. if (thenSource == null)
  201. throw new ArgumentNullException(nameof(thenSource));
  202. if (elseSource == null)
  203. throw new ArgumentNullException(nameof(elseSource));
  204. return s_impl.If<TResult>(condition, thenSource, elseSource);
  205. }
  206. /// <summary>
  207. /// If the specified <paramref name="condition"/> evaluates true, select the <paramref name="thenSource"/> sequence. Otherwise, return an empty sequence.
  208. /// </summary>
  209. /// <typeparam name="TResult">The type of the elements in the result sequence.</typeparam>
  210. /// <param name="condition">Condition evaluated to decide which sequence to return.</param>
  211. /// <param name="thenSource">Sequence returned in case <paramref name="condition"/> evaluates true.</param>
  212. /// <returns><paramref name="thenSource"/> if <paramref name="condition"/> evaluates true; an empty sequence otherwise.</returns>
  213. /// <exception cref="ArgumentNullException"><paramref name="condition"/> or <paramref name="thenSource"/> is null.</exception>
  214. public static IObservable<TResult> If<TResult>(Func<bool> condition, IObservable<TResult> thenSource)
  215. {
  216. if (condition == null)
  217. throw new ArgumentNullException(nameof(condition));
  218. if (thenSource == null)
  219. throw new ArgumentNullException(nameof(thenSource));
  220. return s_impl.If<TResult>(condition, thenSource);
  221. }
  222. /// <summary>
  223. /// If the specified <paramref name="condition"/> evaluates true, select the <paramref name="thenSource"/> sequence. Otherwise, return an empty sequence generated on the specified scheduler.
  224. /// </summary>
  225. /// <typeparam name="TResult">The type of the elements in the result sequence.</typeparam>
  226. /// <param name="condition">Condition evaluated to decide which sequence to return.</param>
  227. /// <param name="thenSource">Sequence returned in case <paramref name="condition"/> evaluates true.</param>
  228. /// <param name="scheduler">Scheduler to generate an empty sequence on in case <paramref name="condition"/> evaluates false.</param>
  229. /// <returns><paramref name="thenSource"/> if <paramref name="condition"/> evaluates true; an empty sequence otherwise.</returns>
  230. /// <exception cref="ArgumentNullException"><paramref name="condition"/> or <paramref name="thenSource"/> or <paramref name="scheduler"/> is null.</exception>
  231. public static IObservable<TResult> If<TResult>(Func<bool> condition, IObservable<TResult> thenSource, IScheduler scheduler)
  232. {
  233. if (condition == null)
  234. throw new ArgumentNullException(nameof(condition));
  235. if (thenSource == null)
  236. throw new ArgumentNullException(nameof(thenSource));
  237. if (scheduler == null)
  238. throw new ArgumentNullException(nameof(scheduler));
  239. return s_impl.If<TResult>(condition, thenSource, scheduler);
  240. }
  241. #endregion
  242. #region + While +
  243. /// <summary>
  244. /// Repeats the given <paramref name="source"/> as long as the specified <paramref name="condition"/> holds, where the <paramref name="condition"/> is evaluated before each repeated <paramref name="source"/> is subscribed to.
  245. /// </summary>
  246. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  247. /// <param name="source">Source to repeat as long as the <paramref name="condition"/> function evaluates to true.</param>
  248. /// <param name="condition">Condition that will be evaluated before subscription to the <paramref name="source"/>, to determine whether repetition of the source is required.</param>
  249. /// <returns>The observable sequence obtained by concatenating the <paramref name="source"/> sequence as long as the <paramref name="condition"/> holds.</returns>
  250. /// <exception cref="ArgumentNullException"><paramref name="condition"/> or <paramref name="source"/> is null.</exception>
  251. public static IObservable<TSource> While<TSource>(Func<bool> condition, IObservable<TSource> source)
  252. {
  253. if (condition == null)
  254. throw new ArgumentNullException(nameof(condition));
  255. if (source == null)
  256. throw new ArgumentNullException(nameof(source));
  257. return s_impl.While<TSource>(condition, source);
  258. }
  259. #endregion
  260. }
  261. }