Observable.StandardSequenceOperators.cs 132 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. using System.Collections.Generic;
  3. using System.Reactive.Concurrency;
  4. #if !NO_TPL
  5. using System.Reactive.Threading.Tasks; // needed for doc comments
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. #endif
  9. namespace System.Reactive.Linq
  10. {
  11. public static partial class Observable
  12. {
  13. #region + Cast +
  14. /// <summary>
  15. /// Converts the elements of an observable sequence to the specified type.
  16. /// </summary>
  17. /// <typeparam name="TResult">The type to convert the elements in the source sequence to.</typeparam>
  18. /// <param name="source">The observable sequence that contains the elements to be converted.</param>
  19. /// <returns>An observable sequence that contains each element of the source sequence converted to the specified type.</returns>
  20. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  21. public static IObservable<TResult> Cast<TResult>(this IObservable<object> source)
  22. {
  23. if (source == null)
  24. throw new ArgumentNullException("source");
  25. return s_impl.Cast<TResult>(source);
  26. }
  27. #endregion
  28. #region + DefaultIfEmpty +
  29. /// <summary>
  30. /// Returns the elements of the specified sequence or the type parameter's default value in a singleton sequence if the sequence is empty.
  31. /// </summary>
  32. /// <typeparam name="TSource">The type of the elements in the source sequence (if any), whose default value will be taken if the sequence is empty.</typeparam>
  33. /// <param name="source">The sequence to return a default value for if it is empty.</param>
  34. /// <returns>An observable sequence that contains the default value for the TSource type if the source is empty; otherwise, the elements of the source itself.</returns>
  35. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  36. public static IObservable<TSource> DefaultIfEmpty<TSource>(this IObservable<TSource> source)
  37. {
  38. if (source == null)
  39. throw new ArgumentNullException("source");
  40. return s_impl.DefaultIfEmpty<TSource>(source);
  41. }
  42. /// <summary>
  43. /// Returns the elements of the specified sequence or the specified value in a singleton sequence if the sequence is empty.
  44. /// </summary>
  45. /// <typeparam name="TSource">The type of the elements in the source sequence (if any), and the specified default value which will be taken if the sequence is empty.</typeparam>
  46. /// <param name="source">The sequence to return the specified value for if it is empty.</param>
  47. /// <param name="defaultValue">The value to return if the sequence is empty.</param>
  48. /// <returns>An observable sequence that contains the specified default value if the source is empty; otherwise, the elements of the source itself.</returns>
  49. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  50. public static IObservable<TSource> DefaultIfEmpty<TSource>(this IObservable<TSource> source, TSource defaultValue)
  51. {
  52. if (source == null)
  53. throw new ArgumentNullException("source");
  54. return s_impl.DefaultIfEmpty<TSource>(source, defaultValue);
  55. }
  56. #endregion
  57. #region + Distinct +
  58. /// <summary>
  59. /// Returns an observable sequence that contains only distinct elements.
  60. /// </summary>
  61. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  62. /// <param name="source">An observable sequence to retain distinct elements for.</param>
  63. /// <returns>An observable sequence only containing the distinct elements from the source sequence.</returns>
  64. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  65. /// <remarks>Usage of this operator should be considered carefully due to the maintenance of an internal lookup structure which can grow large.</remarks>
  66. public static IObservable<TSource> Distinct<TSource>(this IObservable<TSource> source)
  67. {
  68. if (source == null)
  69. throw new ArgumentNullException("source");
  70. return s_impl.Distinct<TSource>(source);
  71. }
  72. /// <summary>
  73. /// Returns an observable sequence that contains only distinct elements according to the comparer.
  74. /// </summary>
  75. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  76. /// <param name="source">An observable sequence to retain distinct elements for.</param>
  77. /// <param name="comparer">Equality comparer for source elements.</param>
  78. /// <returns>An observable sequence only containing the distinct elements from the source sequence.</returns>
  79. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="comparer"/> is null.</exception>
  80. /// <remarks>Usage of this operator should be considered carefully due to the maintenance of an internal lookup structure which can grow large.</remarks>
  81. public static IObservable<TSource> Distinct<TSource>(this IObservable<TSource> source, IEqualityComparer<TSource> comparer)
  82. {
  83. if (source == null)
  84. throw new ArgumentNullException("source");
  85. if (comparer == null)
  86. throw new ArgumentNullException("comparer");
  87. return s_impl.Distinct<TSource>(source, comparer);
  88. }
  89. /// <summary>
  90. /// Returns an observable sequence that contains only distinct elements according to the keySelector.
  91. /// </summary>
  92. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  93. /// <typeparam name="TKey">The type of the discriminator key computed for each element in the source sequence.</typeparam>
  94. /// <param name="source">An observable sequence to retain distinct elements for.</param>
  95. /// <param name="keySelector">A function to compute the comparison key for each element.</param>
  96. /// <returns>An observable sequence only containing the distinct elements, based on a computed key value, from the source sequence.</returns>
  97. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> is null.</exception>
  98. /// <remarks>Usage of this operator should be considered carefully due to the maintenance of an internal lookup structure which can grow large.</remarks>
  99. public static IObservable<TSource> Distinct<TSource, TKey>(this IObservable<TSource> source, Func<TSource, TKey> keySelector)
  100. {
  101. if (source == null)
  102. throw new ArgumentNullException("source");
  103. if (keySelector == null)
  104. throw new ArgumentNullException("keySelector");
  105. return s_impl.Distinct<TSource, TKey>(source, keySelector);
  106. }
  107. /// <summary>
  108. /// Returns an observable sequence that contains only distinct elements according to the keySelector and the comparer.
  109. /// </summary>
  110. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  111. /// <typeparam name="TKey">The type of the discriminator key computed for each element in the source sequence.</typeparam>
  112. /// <param name="source">An observable sequence to retain distinct elements for.</param>
  113. /// <param name="keySelector">A function to compute the comparison key for each element.</param>
  114. /// <param name="comparer">Equality comparer for source elements.</param>
  115. /// <returns>An observable sequence only containing the distinct elements, based on a computed key value, from the source sequence.</returns>
  116. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> or <paramref name="comparer"/> is null.</exception>
  117. /// <remarks>Usage of this operator should be considered carefully due to the maintenance of an internal lookup structure which can grow large.</remarks>
  118. public static IObservable<TSource> Distinct<TSource, TKey>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer)
  119. {
  120. if (source == null)
  121. throw new ArgumentNullException("source");
  122. if (keySelector == null)
  123. throw new ArgumentNullException("keySelector");
  124. if (comparer == null)
  125. throw new ArgumentNullException("comparer");
  126. return s_impl.Distinct<TSource, TKey>(source, keySelector, comparer);
  127. }
  128. #endregion
  129. #region + GroupBy +
  130. /// <summary>
  131. /// Groups the elements of an observable sequence according to a specified key selector function.
  132. /// </summary>
  133. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  134. /// <typeparam name="TKey">The type of the grouping key computed for each element in the source sequence.</typeparam>
  135. /// <param name="source">An observable sequence whose elements to group.</param>
  136. /// <param name="keySelector">A function to extract the key for each element.</param>
  137. /// <returns>A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.</returns>
  138. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> is null.</exception>
  139. public static IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(this IObservable<TSource> source, Func<TSource, TKey> keySelector)
  140. {
  141. if (source == null)
  142. throw new ArgumentNullException("source");
  143. if (keySelector == null)
  144. throw new ArgumentNullException("keySelector");
  145. return s_impl.GroupBy<TSource, TKey>(source, keySelector);
  146. }
  147. /// <summary>
  148. /// Groups the elements of an observable sequence according to a specified key selector function and comparer.
  149. /// </summary>
  150. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  151. /// <typeparam name="TKey">The type of the grouping key computed for each element in the source sequence.</typeparam>
  152. /// <param name="source">An observable sequence whose elements to group.</param>
  153. /// <param name="keySelector">A function to extract the key for each element.</param>
  154. /// <param name="comparer">An equality comparer to compare keys with.</param>
  155. /// <returns>A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.</returns>
  156. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> or <paramref name="comparer"/> is null.</exception>
  157. public static IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer)
  158. {
  159. if (source == null)
  160. throw new ArgumentNullException("source");
  161. if (keySelector == null)
  162. throw new ArgumentNullException("keySelector");
  163. if (comparer == null)
  164. throw new ArgumentNullException("comparer");
  165. return s_impl.GroupBy<TSource, TKey>(source, keySelector, comparer);
  166. }
  167. /// <summary>
  168. /// Groups the elements of an observable sequence and selects the resulting elements by using a specified function.
  169. /// </summary>
  170. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  171. /// <typeparam name="TKey">The type of the grouping key computed for each element in the source sequence.</typeparam>
  172. /// <typeparam name="TElement">The type of the elements within the groups computed for each element in the source sequence.</typeparam>
  173. /// <param name="source">An observable sequence whose elements to group.</param>
  174. /// <param name="keySelector">A function to extract the key for each element.</param>
  175. /// <param name="elementSelector">A function to map each source element to an element in an observable group.</param>
  176. /// <returns>A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.</returns>
  177. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> or <paramref name="elementSelector"/> is null.</exception>
  178. public static IObservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector)
  179. {
  180. if (source == null)
  181. throw new ArgumentNullException("source");
  182. if (keySelector == null)
  183. throw new ArgumentNullException("keySelector");
  184. if (elementSelector == null)
  185. throw new ArgumentNullException("elementSelector");
  186. return s_impl.GroupBy<TSource, TKey, TElement>(source, keySelector, elementSelector);
  187. }
  188. /// <summary>
  189. /// Groups the elements of an observable sequence according to a specified key selector function and comparer and selects the resulting elements by using a specified function.
  190. /// </summary>
  191. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  192. /// <typeparam name="TKey">The type of the grouping key computed for each element in the source sequence.</typeparam>
  193. /// <typeparam name="TElement">The type of the elements within the groups computed for each element in the source sequence.</typeparam>
  194. /// <param name="source">An observable sequence whose elements to group.</param>
  195. /// <param name="keySelector">A function to extract the key for each element.</param>
  196. /// <param name="elementSelector">A function to map each source element to an element in an observable group.</param>
  197. /// <param name="comparer">An equality comparer to compare keys with.</param>
  198. /// <returns>A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.</returns>
  199. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> or <paramref name="elementSelector"/> or <paramref name="comparer"/> is null.</exception>
  200. public static IObservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, IEqualityComparer<TKey> comparer)
  201. {
  202. if (source == null)
  203. throw new ArgumentNullException("source");
  204. if (keySelector == null)
  205. throw new ArgumentNullException("keySelector");
  206. if (elementSelector == null)
  207. throw new ArgumentNullException("elementSelector");
  208. if (comparer == null)
  209. throw new ArgumentNullException("comparer");
  210. return s_impl.GroupBy<TSource, TKey, TElement>(source, keySelector, elementSelector, comparer);
  211. }
  212. /// <summary>
  213. /// Groups the elements of an observable sequence with the specified initial capacity according to a specified key selector function.
  214. /// </summary>
  215. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  216. /// <typeparam name="TKey">The type of the grouping key computed for each element in the source sequence.</typeparam>
  217. /// <param name="source">An observable sequence whose elements to group.</param>
  218. /// <param name="keySelector">A function to extract the key for each element.</param>
  219. /// <param name="capacity">The initial number of elements that the underlying dictionary can contain.</param>
  220. /// <returns>A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.</returns>
  221. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> is null.</exception>
  222. /// <exception cref="ArgumentOutOfRangeException"><paramref name="capacity"/> is less than 0.</exception>
  223. public static IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, int capacity)
  224. {
  225. if (source == null)
  226. throw new ArgumentNullException("source");
  227. if (keySelector == null)
  228. throw new ArgumentNullException("keySelector");
  229. if (capacity < 0)
  230. throw new ArgumentOutOfRangeException("capacity");
  231. return s_impl.GroupBy<TSource, TKey>(source, keySelector, capacity);
  232. }
  233. /// <summary>
  234. /// Groups the elements of an observable sequence with the specified initial capacity according to a specified key selector function and comparer.
  235. /// </summary>
  236. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  237. /// <typeparam name="TKey">The type of the grouping key computed for each element in the source sequence.</typeparam>
  238. /// <param name="source">An observable sequence whose elements to group.</param>
  239. /// <param name="keySelector">A function to extract the key for each element.</param>
  240. /// <param name="capacity">The initial number of elements that the underlying dictionary can contain.</param>
  241. /// <param name="comparer">An equality comparer to compare keys with.</param>
  242. /// <returns>A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.</returns>
  243. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> or <paramref name="comparer"/> is null.</exception>
  244. /// <exception cref="ArgumentOutOfRangeException"><paramref name="capacity"/> is less than 0.</exception>
  245. public static IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, int capacity, IEqualityComparer<TKey> comparer)
  246. {
  247. if (source == null)
  248. throw new ArgumentNullException("source");
  249. if (keySelector == null)
  250. throw new ArgumentNullException("keySelector");
  251. if (capacity < 0)
  252. throw new ArgumentOutOfRangeException("capacity");
  253. if (comparer == null)
  254. throw new ArgumentNullException("comparer");
  255. return s_impl.GroupBy<TSource, TKey>(source, keySelector, capacity, comparer);
  256. }
  257. /// <summary>
  258. /// Groups the elements of an observable sequence with the specified initial capacity and selects the resulting elements by using a specified function.
  259. /// </summary>
  260. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  261. /// <typeparam name="TKey">The type of the grouping key computed for each element in the source sequence.</typeparam>
  262. /// <typeparam name="TElement">The type of the elements within the groups computed for each element in the source sequence.</typeparam>
  263. /// <param name="source">An observable sequence whose elements to group.</param>
  264. /// <param name="keySelector">A function to extract the key for each element.</param>
  265. /// <param name="elementSelector">A function to map each source element to an element in an observable group.</param>
  266. /// <param name="capacity">The initial number of elements that the underlying dictionary can contain.</param>
  267. /// <returns>A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.</returns>
  268. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> or <paramref name="elementSelector"/> is null.</exception>
  269. /// <exception cref="ArgumentOutOfRangeException"><paramref name="capacity"/> is less than 0.</exception>
  270. public static IObservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, int capacity)
  271. {
  272. if (source == null)
  273. throw new ArgumentNullException("source");
  274. if (keySelector == null)
  275. throw new ArgumentNullException("keySelector");
  276. if (elementSelector == null)
  277. throw new ArgumentNullException("elementSelector");
  278. if (capacity < 0)
  279. throw new ArgumentOutOfRangeException("capacity");
  280. return s_impl.GroupBy<TSource, TKey, TElement>(source, keySelector, elementSelector, capacity);
  281. }
  282. /// <summary>
  283. /// Groups the elements of an observable sequence with the specified initial capacity according to a specified key selector function and comparer and selects the resulting elements by using a specified function.
  284. /// </summary>
  285. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  286. /// <typeparam name="TKey">The type of the grouping key computed for each element in the source sequence.</typeparam>
  287. /// <typeparam name="TElement">The type of the elements within the groups computed for each element in the source sequence.</typeparam>
  288. /// <param name="source">An observable sequence whose elements to group.</param>
  289. /// <param name="keySelector">A function to extract the key for each element.</param>
  290. /// <param name="elementSelector">A function to map each source element to an element in an observable group.</param>
  291. /// <param name="capacity">The initial number of elements that the underlying dictionary can contain.</param>
  292. /// <param name="comparer">An equality comparer to compare keys with.</param>
  293. /// <returns>A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.</returns>
  294. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> or <paramref name="elementSelector"/> or <paramref name="comparer"/> is null.</exception>
  295. /// <exception cref="ArgumentOutOfRangeException"><paramref name="capacity"/> is less than 0.</exception>
  296. public static IObservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, int capacity, IEqualityComparer<TKey> comparer)
  297. {
  298. if (source == null)
  299. throw new ArgumentNullException("source");
  300. if (keySelector == null)
  301. throw new ArgumentNullException("keySelector");
  302. if (elementSelector == null)
  303. throw new ArgumentNullException("elementSelector");
  304. if (capacity < 0)
  305. throw new ArgumentOutOfRangeException("capacity");
  306. if (comparer == null)
  307. throw new ArgumentNullException("comparer");
  308. return s_impl.GroupBy<TSource, TKey, TElement>(source, keySelector, elementSelector, capacity, comparer);
  309. }
  310. #endregion
  311. #region + GroupByUntil +
  312. /// <summary>
  313. /// Groups the elements of an observable sequence according to a specified key selector function and comparer and selects the resulting elements by using a specified function.
  314. /// A duration selector function is used to control the lifetime of groups. When a group expires, it receives an OnCompleted notification. When a new element with the same
  315. /// key value as a reclaimed group occurs, the group will be reborn with a new lifetime request.
  316. /// </summary>
  317. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  318. /// <typeparam name="TKey">The type of the grouping key computed for each element in the source sequence.</typeparam>
  319. /// <typeparam name="TElement">The type of the elements within the groups computed for each element in the source sequence.</typeparam>
  320. /// <typeparam name="TDuration">The type of the elements in the duration sequences obtained for each group to denote its lifetime.</typeparam>
  321. /// <param name="source">An observable sequence whose elements to group.</param>
  322. /// <param name="keySelector">A function to extract the key for each element.</param>
  323. /// <param name="elementSelector">A function to map each source element to an element in an observable group.</param>
  324. /// <param name="durationSelector">A function to signal the expiration of a group.</param>
  325. /// <param name="comparer">An equality comparer to compare keys with.</param>
  326. /// <returns>
  327. /// A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.
  328. /// If a group's lifetime expires, a new group with the same key value can be created once an element with such a key value is encountered.
  329. /// </returns>
  330. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> or <paramref name="elementSelector"/> or <paramref name="durationSelector"/> or <paramref name="comparer"/> is null.</exception>
  331. public static IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
  332. {
  333. if (source == null)
  334. throw new ArgumentNullException("source");
  335. if (keySelector == null)
  336. throw new ArgumentNullException("keySelector");
  337. if (elementSelector == null)
  338. throw new ArgumentNullException("elementSelector");
  339. if (durationSelector == null)
  340. throw new ArgumentNullException("durationSelector");
  341. if (comparer == null)
  342. throw new ArgumentNullException("comparer");
  343. return s_impl.GroupByUntil<TSource, TKey, TElement, TDuration>(source, keySelector, elementSelector, durationSelector, comparer);
  344. }
  345. /// <summary>
  346. /// Groups the elements of an observable sequence according to a specified key selector function and selects the resulting elements by using a specified function.
  347. /// A duration selector function is used to control the lifetime of groups. When a group expires, it receives an OnCompleted notification. When a new element with the same
  348. /// key value as a reclaimed group occurs, the group will be reborn with a new lifetime request.
  349. /// </summary>
  350. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  351. /// <typeparam name="TKey">The type of the grouping key computed for each element in the source sequence.</typeparam>
  352. /// <typeparam name="TElement">The type of the elements within the groups computed for each element in the source sequence.</typeparam>
  353. /// <typeparam name="TDuration">The type of the elements in the duration sequences obtained for each group to denote its lifetime.</typeparam>
  354. /// <param name="source">An observable sequence whose elements to group.</param>
  355. /// <param name="keySelector">A function to extract the key for each element.</param>
  356. /// <param name="elementSelector">A function to map each source element to an element in an observable group.</param>
  357. /// <param name="durationSelector">A function to signal the expiration of a group.</param>
  358. /// <returns>
  359. /// A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.
  360. /// If a group's lifetime expires, a new group with the same key value can be created once an element with such a key value is encoutered.
  361. /// </returns>
  362. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> or <paramref name="elementSelector"/> or <paramref name="durationSelector"/> is null.</exception>
  363. public static IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector)
  364. {
  365. if (source == null)
  366. throw new ArgumentNullException("source");
  367. if (keySelector == null)
  368. throw new ArgumentNullException("keySelector");
  369. if (elementSelector == null)
  370. throw new ArgumentNullException("elementSelector");
  371. if (durationSelector == null)
  372. throw new ArgumentNullException("durationSelector");
  373. return s_impl.GroupByUntil<TSource, TKey, TElement, TDuration>(source, keySelector, elementSelector, durationSelector);
  374. }
  375. /// <summary>
  376. /// Groups the elements of an observable sequence according to a specified key selector function and comparer.
  377. /// A duration selector function is used to control the lifetime of groups. When a group expires, it receives an OnCompleted notification. When a new element with the same
  378. /// key value as a reclaimed group occurs, the group will be reborn with a new lifetime request.
  379. /// </summary>
  380. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  381. /// <typeparam name="TKey">The type of the grouping key computed for each element in the source sequence.</typeparam>
  382. /// <typeparam name="TDuration">The type of the elements in the duration sequences obtained for each group to denote its lifetime.</typeparam>
  383. /// <param name="source">An observable sequence whose elements to group.</param>
  384. /// <param name="keySelector">A function to extract the key for each element.</param>
  385. /// <param name="durationSelector">A function to signal the expiration of a group.</param>
  386. /// <param name="comparer">An equality comparer to compare keys with.</param>
  387. /// <returns>
  388. /// A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.
  389. /// If a group's lifetime expires, a new group with the same key value can be created once an element with such a key value is encoutered.
  390. /// </returns>
  391. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> or <paramref name="durationSelector"/> or <paramref name="comparer"/> is null.</exception>
  392. public static IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
  393. {
  394. if (source == null)
  395. throw new ArgumentNullException("source");
  396. if (keySelector == null)
  397. throw new ArgumentNullException("keySelector");
  398. if (durationSelector == null)
  399. throw new ArgumentNullException("durationSelector");
  400. if (comparer == null)
  401. throw new ArgumentNullException("comparer");
  402. return s_impl.GroupByUntil<TSource, TKey, TDuration>(source, keySelector, durationSelector, comparer);
  403. }
  404. /// <summary>
  405. /// Groups the elements of an observable sequence according to a specified key selector function.
  406. /// A duration selector function is used to control the lifetime of groups. When a group expires, it receives an OnCompleted notification. When a new element with the same
  407. /// key value as a reclaimed group occurs, the group will be reborn with a new lifetime request.
  408. /// </summary>
  409. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  410. /// <typeparam name="TKey">The type of the grouping key computed for each element in the source sequence.</typeparam>
  411. /// <typeparam name="TDuration">The type of the elements in the duration sequences obtained for each group to denote its lifetime.</typeparam>
  412. /// <param name="source">An observable sequence whose elements to group.</param>
  413. /// <param name="keySelector">A function to extract the key for each element.</param>
  414. /// <param name="durationSelector">A function to signal the expiration of a group.</param>
  415. /// <returns>
  416. /// A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.
  417. /// If a group's lifetime expires, a new group with the same key value can be created once an element with such a key value is encoutered.
  418. /// </returns>
  419. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> or <paramref name="durationSelector"/> is null.</exception>
  420. public static IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector)
  421. {
  422. if (source == null)
  423. throw new ArgumentNullException("source");
  424. if (keySelector == null)
  425. throw new ArgumentNullException("keySelector");
  426. if (durationSelector == null)
  427. throw new ArgumentNullException("durationSelector");
  428. return s_impl.GroupByUntil<TSource, TKey, TDuration>(source, keySelector, durationSelector);
  429. }
  430. /// <summary>
  431. /// Groups the elements of an observable sequence with the specified initial capacity according to a specified key selector function and comparer and selects the resulting elements by using a specified function.
  432. /// A duration selector function is used to control the lifetime of groups. When a group expires, it receives an OnCompleted notification. When a new element with the same
  433. /// key value as a reclaimed group occurs, the group will be reborn with a new lifetime request.
  434. /// </summary>
  435. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  436. /// <typeparam name="TKey">The type of the grouping key computed for each element in the source sequence.</typeparam>
  437. /// <typeparam name="TElement">The type of the elements within the groups computed for each element in the source sequence.</typeparam>
  438. /// <typeparam name="TDuration">The type of the elements in the duration sequences obtained for each group to denote its lifetime.</typeparam>
  439. /// <param name="source">An observable sequence whose elements to group.</param>
  440. /// <param name="keySelector">A function to extract the key for each element.</param>
  441. /// <param name="elementSelector">A function to map each source element to an element in an observable group.</param>
  442. /// <param name="durationSelector">A function to signal the expiration of a group.</param>
  443. /// <param name="capacity">The initial number of elements that the underlying dictionary can contain.</param>
  444. /// <param name="comparer">An equality comparer to compare keys with.</param>
  445. /// <returns>
  446. /// A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.
  447. /// If a group's lifetime expires, a new group with the same key value can be created once an element with such a key value is encountered.
  448. /// </returns>
  449. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> or <paramref name="elementSelector"/> or <paramref name="durationSelector"/> or <paramref name="comparer"/> is null.</exception>
  450. /// <exception cref="ArgumentOutOfRangeException"><paramref name="capacity"/> is less than 0.</exception>
  451. public static IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer)
  452. {
  453. if (source == null)
  454. throw new ArgumentNullException("source");
  455. if (keySelector == null)
  456. throw new ArgumentNullException("keySelector");
  457. if (elementSelector == null)
  458. throw new ArgumentNullException("elementSelector");
  459. if (durationSelector == null)
  460. throw new ArgumentNullException("durationSelector");
  461. if (capacity < 0)
  462. throw new ArgumentOutOfRangeException("capacity");
  463. if (comparer == null)
  464. throw new ArgumentNullException("comparer");
  465. return s_impl.GroupByUntil<TSource, TKey, TElement, TDuration>(source, keySelector, elementSelector, durationSelector, capacity, comparer);
  466. }
  467. /// <summary>
  468. /// Groups the elements of an observable sequence with the specified initial capacity according to a specified key selector function and selects the resulting elements by using a specified function.
  469. /// A duration selector function is used to control the lifetime of groups. When a group expires, it receives an OnCompleted notification. When a new element with the same
  470. /// key value as a reclaimed group occurs, the group will be reborn with a new lifetime request.
  471. /// </summary>
  472. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  473. /// <typeparam name="TKey">The type of the grouping key computed for each element in the source sequence.</typeparam>
  474. /// <typeparam name="TElement">The type of the elements within the groups computed for each element in the source sequence.</typeparam>
  475. /// <typeparam name="TDuration">The type of the elements in the duration sequences obtained for each group to denote its lifetime.</typeparam>
  476. /// <param name="source">An observable sequence whose elements to group.</param>
  477. /// <param name="keySelector">A function to extract the key for each element.</param>
  478. /// <param name="elementSelector">A function to map each source element to an element in an observable group.</param>
  479. /// <param name="durationSelector">A function to signal the expiration of a group.</param>
  480. /// <param name="capacity">The initial number of elements that the underlying dictionary can contain.</param>
  481. /// <returns>
  482. /// A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.
  483. /// If a group's lifetime expires, a new group with the same key value can be created once an element with such a key value is encoutered.
  484. /// </returns>
  485. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> or <paramref name="elementSelector"/> or <paramref name="durationSelector"/> is null.</exception>
  486. /// <exception cref="ArgumentOutOfRangeException"><paramref name="capacity"/> is less than 0.</exception>
  487. public static IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, int capacity)
  488. {
  489. if (source == null)
  490. throw new ArgumentNullException("source");
  491. if (keySelector == null)
  492. throw new ArgumentNullException("keySelector");
  493. if (elementSelector == null)
  494. throw new ArgumentNullException("elementSelector");
  495. if (durationSelector == null)
  496. throw new ArgumentNullException("durationSelector");
  497. if (capacity < 0)
  498. throw new ArgumentOutOfRangeException("capacity");
  499. return s_impl.GroupByUntil<TSource, TKey, TElement, TDuration>(source, keySelector, elementSelector, durationSelector, capacity);
  500. }
  501. /// <summary>
  502. /// Groups the elements of an observable sequence with the specified initial capacity according to a specified key selector function and comparer.
  503. /// A duration selector function is used to control the lifetime of groups. When a group expires, it receives an OnCompleted notification. When a new element with the same
  504. /// key value as a reclaimed group occurs, the group will be reborn with a new lifetime request.
  505. /// </summary>
  506. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  507. /// <typeparam name="TKey">The type of the grouping key computed for each element in the source sequence.</typeparam>
  508. /// <typeparam name="TDuration">The type of the elements in the duration sequences obtained for each group to denote its lifetime.</typeparam>
  509. /// <param name="source">An observable sequence whose elements to group.</param>
  510. /// <param name="keySelector">A function to extract the key for each element.</param>
  511. /// <param name="durationSelector">A function to signal the expiration of a group.</param>
  512. /// <param name="capacity">The initial number of elements that the underlying dictionary can contain.</param>
  513. /// <param name="comparer">An equality comparer to compare keys with.</param>
  514. /// <returns>
  515. /// A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.
  516. /// If a group's lifetime expires, a new group with the same key value can be created once an element with such a key value is encoutered.
  517. /// </returns>
  518. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> or <paramref name="durationSelector"/> or <paramref name="comparer"/> is null.</exception>
  519. /// <exception cref="ArgumentOutOfRangeException"><paramref name="capacity"/> is less than 0.</exception>
  520. public static IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer)
  521. {
  522. if (source == null)
  523. throw new ArgumentNullException("source");
  524. if (keySelector == null)
  525. throw new ArgumentNullException("keySelector");
  526. if (durationSelector == null)
  527. throw new ArgumentNullException("durationSelector");
  528. if (capacity < 0)
  529. throw new ArgumentOutOfRangeException("capacity");
  530. if (comparer == null)
  531. throw new ArgumentNullException("comparer");
  532. return s_impl.GroupByUntil<TSource, TKey, TDuration>(source, keySelector, durationSelector, capacity, comparer);
  533. }
  534. /// <summary>
  535. /// Groups the elements of an observable sequence with the specified initial capacity according to a specified key selector function.
  536. /// A duration selector function is used to control the lifetime of groups. When a group expires, it receives an OnCompleted notification. When a new element with the same
  537. /// key value as a reclaimed group occurs, the group will be reborn with a new lifetime request.
  538. /// </summary>
  539. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  540. /// <typeparam name="TKey">The type of the grouping key computed for each element in the source sequence.</typeparam>
  541. /// <typeparam name="TDuration">The type of the elements in the duration sequences obtained for each group to denote its lifetime.</typeparam>
  542. /// <param name="source">An observable sequence whose elements to group.</param>
  543. /// <param name="keySelector">A function to extract the key for each element.</param>
  544. /// <param name="durationSelector">A function to signal the expiration of a group.</param>
  545. /// <param name="capacity">The initial number of elements that the underlying dictionary can contain.</param>
  546. /// <returns>
  547. /// A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.
  548. /// If a group's lifetime expires, a new group with the same key value can be created once an element with such a key value is encoutered.
  549. /// </returns>
  550. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> or <paramref name="durationSelector"/> is null.</exception>
  551. /// <exception cref="ArgumentOutOfRangeException"><paramref name="capacity"/> is less than 0.</exception>
  552. public static IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector, int capacity)
  553. {
  554. if (source == null)
  555. throw new ArgumentNullException("source");
  556. if (keySelector == null)
  557. throw new ArgumentNullException("keySelector");
  558. if (durationSelector == null)
  559. throw new ArgumentNullException("durationSelector");
  560. if (capacity < 0)
  561. throw new ArgumentOutOfRangeException("capacity");
  562. return s_impl.GroupByUntil<TSource, TKey, TDuration>(source, keySelector, durationSelector, capacity);
  563. }
  564. #endregion
  565. #region + GroupJoin +
  566. /// <summary>
  567. /// Correlates the elements of two sequences based on overlapping durations, and groups the results.
  568. /// </summary>
  569. /// <typeparam name="TLeft">The type of the elements in the left source sequence.</typeparam>
  570. /// <typeparam name="TRight">The type of the elements in the right source sequence.</typeparam>
  571. /// <typeparam name="TLeftDuration">The type of the elements in the duration sequence denoting the computed duration of each element in the left source sequence.</typeparam>
  572. /// <typeparam name="TRightDuration">The type of the elements in the duration sequence denoting the computed duration of each element in the right source sequence.</typeparam>
  573. /// <typeparam name="TResult">The type of the elements in the result sequence, obtained by invoking the result selector function for source elements with overlapping duration.</typeparam>
  574. /// <param name="left">The left observable sequence to join elements for.</param>
  575. /// <param name="right">The right observable sequence to join elements for.</param>
  576. /// <param name="leftDurationSelector">A function to select the duration of each element of the left observable sequence, used to determine overlap.</param>
  577. /// <param name="rightDurationSelector">A function to select the duration of each element of the right observable sequence, used to determine overlap.</param>
  578. /// <param name="resultSelector">A function invoked to compute a result element for any element of the left sequence with overlapping elements from the right observable sequence.</param>
  579. /// <returns>An observable sequence that contains result elements computed from source elements that have an overlapping duration.</returns>
  580. /// <exception cref="ArgumentNullException"><paramref name="left"/> or <paramref name="right"/> or <paramref name="leftDurationSelector"/> or <paramref name="rightDurationSelector"/> or <paramref name="resultSelector"/> is null.</exception>
  581. public static IObservable<TResult> GroupJoin<TLeft, TRight, TLeftDuration, TRightDuration, TResult>(this IObservable<TLeft> left, IObservable<TRight> right, Func<TLeft, IObservable<TLeftDuration>> leftDurationSelector, Func<TRight, IObservable<TRightDuration>> rightDurationSelector, Func<TLeft, IObservable<TRight>, TResult> resultSelector)
  582. {
  583. if (left == null)
  584. throw new ArgumentNullException("left");
  585. if (right == null)
  586. throw new ArgumentNullException("right");
  587. if (leftDurationSelector == null)
  588. throw new ArgumentNullException("leftDurationSelector");
  589. if (rightDurationSelector == null)
  590. throw new ArgumentNullException("rightDurationSelector");
  591. if (resultSelector == null)
  592. throw new ArgumentNullException("resultSelector");
  593. return s_impl.GroupJoin<TLeft, TRight, TLeftDuration, TRightDuration, TResult>(left, right, leftDurationSelector, rightDurationSelector, resultSelector);
  594. }
  595. #endregion
  596. #region + Join +
  597. /// <summary>
  598. /// Correlates the elements of two sequences based on overlapping durations.
  599. /// </summary>
  600. /// <typeparam name="TLeft">The type of the elements in the left source sequence.</typeparam>
  601. /// <typeparam name="TRight">The type of the elements in the right source sequence.</typeparam>
  602. /// <typeparam name="TLeftDuration">The type of the elements in the duration sequence denoting the computed duration of each element in the left source sequence.</typeparam>
  603. /// <typeparam name="TRightDuration">The type of the elements in the duration sequence denoting the computed duration of each element in the right source sequence.</typeparam>
  604. /// <typeparam name="TResult">The type of the elements in the result sequence, obtained by invoking the result selector function for source elements with overlapping duration.</typeparam>
  605. /// <param name="left">The left observable sequence to join elements for.</param>
  606. /// <param name="right">The right observable sequence to join elements for.</param>
  607. /// <param name="leftDurationSelector">A function to select the duration of each element of the left observable sequence, used to determine overlap.</param>
  608. /// <param name="rightDurationSelector">A function to select the duration of each element of the right observable sequence, used to determine overlap.</param>
  609. /// <param name="resultSelector">A function invoked to compute a result element for any two overlapping elements of the left and right observable sequences.</param>
  610. /// <returns>An observable sequence that contains result elements computed from source elements that have an overlapping duration.</returns>
  611. /// <exception cref="ArgumentNullException"><paramref name="left"/> or <paramref name="right"/> or <paramref name="leftDurationSelector"/> or <paramref name="rightDurationSelector"/> or <paramref name="resultSelector"/> is null.</exception>
  612. public static IObservable<TResult> Join<TLeft, TRight, TLeftDuration, TRightDuration, TResult>(this IObservable<TLeft> left, IObservable<TRight> right, Func<TLeft, IObservable<TLeftDuration>> leftDurationSelector, Func<TRight, IObservable<TRightDuration>> rightDurationSelector, Func<TLeft, TRight, TResult> resultSelector)
  613. {
  614. if (left == null)
  615. throw new ArgumentNullException("left");
  616. if (right == null)
  617. throw new ArgumentNullException("right");
  618. if (leftDurationSelector == null)
  619. throw new ArgumentNullException("leftDurationSelector");
  620. if (rightDurationSelector == null)
  621. throw new ArgumentNullException("rightDurationSelector");
  622. if (resultSelector == null)
  623. throw new ArgumentNullException("resultSelector");
  624. return s_impl.Join<TLeft, TRight, TLeftDuration, TRightDuration, TResult>(left, right, leftDurationSelector, rightDurationSelector, resultSelector);
  625. }
  626. #endregion
  627. #region + OfType +
  628. /// <summary>
  629. /// Filters the elements of an observable sequence based on the specified type.
  630. /// </summary>
  631. /// <typeparam name="TResult">The type to filter the elements in the source sequence on.</typeparam>
  632. /// <param name="source">The observable sequence that contains the elements to be filtered.</param>
  633. /// <returns>An observable sequence that contains elements from the input sequence of type TResult.</returns>
  634. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  635. public static IObservable<TResult> OfType<TResult>(this IObservable<object> source)
  636. {
  637. if (source == null)
  638. throw new ArgumentNullException("source");
  639. return s_impl.OfType<TResult>(source);
  640. }
  641. #endregion
  642. #region + OrderBy +
  643. /// <summary>
  644. /// Sorts the elements of a sequence in ascending order according to a key.
  645. /// </summary>
  646. /// <typeparam name="TSource">The type of the elements in the <paramref name="source"/> sequence.</typeparam>
  647. /// <typeparam name="TKey">The type of the sorting key computed for each element in the source sequence.</typeparam>
  648. /// <param name="source">An observable sequence of values to order.</param>
  649. /// <param name="keySelector">A function to extract the key for each element.</param>
  650. /// <returns>An <see cref="IOrderedObservable{TSource}"/> whose elements are sorted according to a key.</returns>
  651. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> is <see langword="null"/>.</exception>
  652. /// <remarks>
  653. /// <para>
  654. /// Ordering requires all of the elements in the <paramref name="source"/> sequence to be observed before it can complete,
  655. /// which means that all of the elements must be buffered. Ordering an observable sequence is similar to performing an aggregation
  656. /// such as <see cref="ToList"/> because time information is lost and notifications are only generated when the <paramref name="source"/>
  657. /// sequence calls OnCompleted.
  658. /// </para>
  659. /// <alert type="warn">
  660. /// Do not attempt to order an observable sequence that never calls OnCompleted. There will be no notifications generated, and it
  661. /// may result in an <see cref="OutOfMemoryException"/> being thrown.
  662. /// </alert>
  663. /// </remarks>
  664. public static IOrderedObservable<TSource> OrderBy<TSource, TKey>(this IObservable<TSource> source, Func<TSource, TKey> keySelector)
  665. {
  666. if (source == null)
  667. throw new ArgumentNullException("source");
  668. if (keySelector == null)
  669. throw new ArgumentNullException("keySelector");
  670. return s_impl.OrderBy<TSource, TKey>(source, keySelector);
  671. }
  672. /// <summary>
  673. /// Sorts the elements of a sequence in ascending order by using a specified comparer.
  674. /// </summary>
  675. /// <typeparam name="TSource">The type of the elements in the <paramref name="source"/> sequence.</typeparam>
  676. /// <typeparam name="TKey">The type of the sorting key computed for each element in the source sequence.</typeparam>
  677. /// <param name="source">An observable sequence of values to order.</param>
  678. /// <param name="keySelector">A function to extract the key for each element.</param>
  679. /// <param name="comparer">An <see cref="IComparer{TKey}"/> to compare keys.</param>
  680. /// <returns>An <see cref="IOrderedObservable{TSource}"/> whose elements are sorted according to a key.</returns>
  681. /// <exception cref="ArgumentNullException"><paramref name="source"/>, <paramref name="keySelector"/> or <paramref name="comparer"/> is <see langword="null"/>.</exception>
  682. /// <remarks>
  683. /// <para>
  684. /// Ordering requires all of the elements in the <paramref name="source"/> sequence to be observed before it can complete,
  685. /// which means that all of the elements must be buffered. Ordering an observable sequence is similar to performing an aggregation
  686. /// such as <see cref="ToList"/> because timing is lost and notifications are only generated when the <paramref name="source"/>
  687. /// sequence calls OnCompleted.
  688. /// </para>
  689. /// <alert type="warn">
  690. /// Do not attempt to order an observable sequence that never calls OnCompleted. There will be no notifications generated, and it
  691. /// may result in an <see cref="OutOfMemoryException"/> being thrown.
  692. /// </alert>
  693. /// </remarks>
  694. public static IOrderedObservable<TSource> OrderBy<TSource, TKey>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, IComparer<TKey> comparer)
  695. {
  696. if (source == null)
  697. throw new ArgumentNullException("source");
  698. if (keySelector == null)
  699. throw new ArgumentNullException("keySelector");
  700. if (comparer == null)
  701. throw new ArgumentNullException("comparer");
  702. return s_impl.OrderBy<TSource, TKey>(source, keySelector, comparer);
  703. }
  704. /// <summary>
  705. /// Sorts the elements of a sequence in ascending order according to the times at which corresponding observable sequences produce their first notification or complete.
  706. /// </summary>
  707. /// <typeparam name="TSource">The type of the elements in the <paramref name="source"/> sequence.</typeparam>
  708. /// <typeparam name="TOther">The type of the elements in the observable returned by <paramref name="timeSelector"/>.</typeparam>
  709. /// <param name="source">An observable sequence of values to order.</param>
  710. /// <param name="timeSelector">A function that returns an observable for an element in the <paramref name="source"/> sequence indicating the time at which that element should appear in the ordering.</param>
  711. /// <returns>An <see cref="IOrderedObservable{TSource}"/> whose elements are sorted according to the times at which corresponding observable sequences produce their first notification or complete.</returns>
  712. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="timeSelector"/> is <see langword="null"/>.</exception>
  713. /// <remarks>
  714. /// <para>
  715. /// This overload of OrderBy is similar to a particular usage of SelectMany where the elements in the <paramref name="source"/> sequence are returned based on the times
  716. /// at which their corresponding inner sequences produce their first notification or complete. Any elements in the inner sequences are discarded.
  717. /// </para>
  718. /// <para>
  719. /// The primary benefits of this overload of OrderBy is that it relates to ordering specifically, thus it's more semantic than SelectMany and it allows an author to avoid
  720. /// defining an unused query variable when applying SelectMany merely as an ordering operator. It also returns <see cref="IOrderedObservable{TSource}"/> so that it may
  721. /// be used in combination with any overloads of ThenBy and ThenByDescending to define queries that order by time and key.
  722. /// </para>
  723. /// <alert type="info">
  724. /// Unlike the other overload of OrderBy, this overload does not buffer any elements and it does not wait for the <paramref name="source"/> sequence to complete before it
  725. /// pushes notifications. This overload is entirely reactive.
  726. /// </alert>
  727. /// <para>
  728. /// This overload supports using the orderby LINQ query comprehension syntax in C# and Visual Basic by passing an observable sequence as the key.
  729. /// </para>
  730. /// <example>
  731. /// <para>
  732. /// The following example shows how to use this overload of OrderBy with the orderby LINQ query comprehension syntax in C#.
  733. /// </para>
  734. /// <para>
  735. /// The result of this query is a sequence of integers from 5 to 1, at 1 second intervals.
  736. /// </para>
  737. /// <code>
  738. /// <![CDATA[IObservable<int> xs =
  739. /// from x in Observable.Range(1, 5)
  740. /// orderby Observable.Timer(TimeSpan.FromSeconds(5 - x))
  741. /// select x;]]>
  742. /// </code>
  743. /// </example>
  744. /// </remarks>
  745. public static IOrderedObservable<TSource> OrderBy<TSource, TOther>(this IObservable<TSource> source, Func<TSource, IObservable<TOther>> timeSelector)
  746. {
  747. if (source == null)
  748. throw new ArgumentNullException("source");
  749. if (timeSelector == null)
  750. throw new ArgumentNullException("timeSelector");
  751. return s_impl.OrderBy<TSource, TOther>(source, timeSelector);
  752. }
  753. #endregion
  754. #region + OrderByDescending +
  755. /// <summary>
  756. /// Sorts the elements of a sequence in descending order according to a key.
  757. /// </summary>
  758. /// <typeparam name="TSource">The type of the elements in the <paramref name="source"/> sequence.</typeparam>
  759. /// <typeparam name="TKey">The type of the sorting key computed for each element in the source sequence.</typeparam>
  760. /// <param name="source">An observable sequence of values to order.</param>
  761. /// <param name="keySelector">A function to extract the key for each element.</param>
  762. /// <returns>An <see cref="IOrderedObservable{TSource}"/> whose elements are sorted in descending order according to a key.</returns>
  763. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> is <see langword="null"/>.</exception>
  764. /// <remarks>
  765. /// <para>
  766. /// Ordering requires all of the elements in the <paramref name="source"/> sequence to be observed before it can complete,
  767. /// which means that all of the elements must be buffered. Ordering an observable sequence is similar to performing an aggregation
  768. /// such as <see cref="ToList"/> because timing is lost and notifications are only generated when the <paramref name="source"/>
  769. /// sequence calls OnCompleted.
  770. /// </para>
  771. /// <alert type="warn">
  772. /// Do not attempt to order an observable sequence that never calls OnCompleted. There will be no notifications generated, and it
  773. /// may result in an <see cref="OutOfMemoryException"/> being thrown.
  774. /// </alert>
  775. /// </remarks>
  776. public static IOrderedObservable<TSource> OrderByDescending<TSource, TKey>(this IObservable<TSource> source, Func<TSource, TKey> keySelector)
  777. {
  778. if (source == null)
  779. throw new ArgumentNullException("source");
  780. if (keySelector == null)
  781. throw new ArgumentNullException("keySelector");
  782. return s_impl.OrderByDescending<TSource, TKey>(source, keySelector);
  783. }
  784. /// <summary>
  785. /// Sorts the elements of a sequence in descending order by using a specified comparer.
  786. /// </summary>
  787. /// <typeparam name="TSource">The type of the elements in the <paramref name="source"/> sequence.</typeparam>
  788. /// <typeparam name="TKey">The type of the sorting key computed for each element in the source sequence.</typeparam>
  789. /// <param name="source">An observable sequence of values to order.</param>
  790. /// <param name="keySelector">A function to extract the key for each element.</param>
  791. /// <param name="comparer">An <see cref="IComparer{TKey}"/> to compare keys.</param>
  792. /// <returns>An <see cref="IOrderedObservable{TSource}"/> whose elements are sorted in descending order according to a key.</returns>
  793. /// <exception cref="ArgumentNullException"><paramref name="source"/>, <paramref name="keySelector"/> or <paramref name="comparer"/> is <see langword="null"/>.</exception>
  794. /// <remarks>
  795. /// <para>
  796. /// Ordering requires all of the elements in the <paramref name="source"/> sequence to be observed before it can complete,
  797. /// which means that all of the elements must be buffered. Ordering an observable sequence is similar to performing an aggregation
  798. /// such as <see cref="ToList"/> because timing is lost and notifications are only generated when the <paramref name="source"/>
  799. /// sequence calls OnCompleted.
  800. /// </para>
  801. /// <alert type="warn">
  802. /// Do not attempt to order an observable sequence that never calls OnCompleted. There will be no notifications generated, and it
  803. /// may result in an <see cref="OutOfMemoryException"/> being thrown.
  804. /// </alert>
  805. /// </remarks>
  806. public static IOrderedObservable<TSource> OrderByDescending<TSource, TKey>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, IComparer<TKey> comparer)
  807. {
  808. if (source == null)
  809. throw new ArgumentNullException("source");
  810. if (keySelector == null)
  811. throw new ArgumentNullException("keySelector");
  812. if (comparer == null)
  813. throw new ArgumentNullException("comparer");
  814. return s_impl.OrderByDescending<TSource, TKey>(source, keySelector, comparer);
  815. }
  816. /// <summary>
  817. /// Sorts the elements of a sequence in descending order according to the times at which corresponding observable sequences produce their first notification or complete.
  818. /// </summary>
  819. /// <typeparam name="TSource">The type of the elements in the <paramref name="source"/> sequence.</typeparam>
  820. /// <typeparam name="TOther">The type of the elements in the observable returned by <paramref name="timeSelector"/>.</typeparam>
  821. /// <param name="source">An observable sequence of values to order.</param>
  822. /// <param name="timeSelector">A function that returns an observable for an element in the <paramref name="source"/> sequence indicating the time at which that element should appear in the ordering.</param>
  823. /// <returns>An <see cref="IOrderedObservable{TSource}"/> whose elements are sorted in descending order according to the times at which corresponding observable sequences produce their first notification or complete.</returns>
  824. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="timeSelector"/> is <see langword="null"/>.</exception>
  825. /// <remarks>
  826. /// <para>
  827. /// Ordering descending by time requires all of the elements in the <paramref name="source"/> sequence to be observed before it can complete,
  828. /// which means that all of the elements must be buffered. Ordering an observable sequence descending by time is similar to performing an aggregation
  829. /// such as <see cref="ToList"/> because timing from the <paramref name="source"/> sequence is lost and notifications are only generated when
  830. /// the <paramref name="source"/> sequence calls OnCompleted.
  831. /// </para>
  832. /// <alert type="warn">
  833. /// Do not attempt to order an observable sequence that never calls OnCompleted. There will be no notifications generated, and it
  834. /// may result in an <see cref="OutOfMemoryException"/> being thrown.
  835. /// </alert>
  836. /// <para>
  837. /// This overload of OrderByDescending is similar to a particular usage of SelectMany where the elements in the <paramref name="source"/> sequence are returned based on the times
  838. /// at which their corresponding inner sequences produce their first notification or complete. Any elements in the inner sequences are discarded.
  839. /// </para>
  840. /// <para>
  841. /// The primary benefits of this overload of OrderByDescending is that it relates to ordering specifically, thus it's more semantic than SelectMany and it allows an author to avoid
  842. /// defining an unused query variable when applying SelectMany merely as an ordering operator. It also returns <see cref="IOrderedObservable{TSource}"/> so that it may
  843. /// be used in combination with any overloads of ThenBy and ThenByDescending to define queries that order by time and key.
  844. /// </para>
  845. /// <para>
  846. /// This overload supports using the orderby LINQ query comprehension syntax in C# and Visual Basic by passing an observable sequence as the key.
  847. /// </para>
  848. /// <example>
  849. /// <para>
  850. /// The following example shows how to use this overload of OrderByDescending with the orderby LINQ query comprehension syntax in C#.
  851. /// </para>
  852. /// <para>
  853. /// The result of this query is a sequence of integers from 1 to 5, all of them arriving at 5 seconds from the time of subscription.
  854. /// </para>
  855. /// <code>
  856. /// <![CDATA[IObservable<int> xs =
  857. /// from x in Observable.Range(1, 5)
  858. /// orderby Observable.Timer(TimeSpan.FromSeconds(5 - x)) descending
  859. /// select x;]]>
  860. /// </code>
  861. /// </example>
  862. /// </remarks>
  863. public static IOrderedObservable<TSource> OrderByDescending<TSource, TOther>(this IObservable<TSource> source, Func<TSource, IObservable<TOther>> timeSelector)
  864. {
  865. if (source == null)
  866. throw new ArgumentNullException("source");
  867. if (timeSelector == null)
  868. throw new ArgumentNullException("timeSelector");
  869. return s_impl.OrderByDescending<TSource, TOther>(source, timeSelector);
  870. }
  871. #endregion
  872. #region + Select +
  873. /// <summary>
  874. /// Projects each element of an observable sequence into a new form.
  875. /// </summary>
  876. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  877. /// <typeparam name="TResult">The type of the elements in the result sequence, obtained by running the selector function for each element in the source sequence.</typeparam>
  878. /// <param name="source">A sequence of elements to invoke a transform function on.</param>
  879. /// <param name="selector">A transform function to apply to each source element.</param>
  880. /// <returns>An observable sequence whose elements are the result of invoking the transform function on each element of source.</returns>
  881. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="selector"/> is null.</exception>
  882. public static IObservable<TResult> Select<TSource, TResult>(this IObservable<TSource> source, Func<TSource, TResult> selector)
  883. {
  884. if (source == null)
  885. throw new ArgumentNullException("source");
  886. if (selector == null)
  887. throw new ArgumentNullException("selector");
  888. return s_impl.Select<TSource, TResult>(source, selector);
  889. }
  890. /// <summary>
  891. /// Projects each element of an observable sequence into a new form by incorporating the element's index.
  892. /// </summary>
  893. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  894. /// <typeparam name="TResult">The type of the elements in the result sequence, obtained by running the selector function for each element in the source sequence.</typeparam>
  895. /// <param name="source">A sequence of elements to invoke a transform function on.</param>
  896. /// <param name="selector">A transform function to apply to each source element; the second parameter of the function represents the index of the source element.</param>
  897. /// <returns>An observable sequence whose elements are the result of invoking the transform function on each element of source.</returns>
  898. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="selector"/> is null.</exception>
  899. public static IObservable<TResult> Select<TSource, TResult>(this IObservable<TSource> source, Func<TSource, int, TResult> selector)
  900. {
  901. if (source == null)
  902. throw new ArgumentNullException("source");
  903. if (selector == null)
  904. throw new ArgumentNullException("selector");
  905. return s_impl.Select<TSource, TResult>(source, selector);
  906. }
  907. #endregion
  908. #region + SelectMany +
  909. /// <summary>
  910. /// Projects each element of the source observable sequence to the other observable sequence and merges the resulting observable sequences into one observable sequence.
  911. /// </summary>
  912. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  913. /// <typeparam name="TOther">The type of the elements in the other sequence and the elements in the result sequence.</typeparam>
  914. /// <param name="source">An observable sequence of elements to project.</param>
  915. /// <param name="other">An observable sequence to project each element from the source sequence onto.</param>
  916. /// <returns>An observable sequence whose elements are the result of projecting each source element onto the other sequence and merging all the resulting sequences together.</returns>
  917. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="other"/> is null.</exception>
  918. public static IObservable<TOther> SelectMany<TSource, TOther>(this IObservable<TSource> source, IObservable<TOther> other)
  919. {
  920. if (source == null)
  921. throw new ArgumentNullException("source");
  922. if (other == null)
  923. throw new ArgumentNullException("other");
  924. return s_impl.SelectMany<TSource, TOther>(source, other);
  925. }
  926. /// <summary>
  927. /// Projects each element of an observable sequence to an observable sequence and merges the resulting observable sequences into one observable sequence.
  928. /// </summary>
  929. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  930. /// <typeparam name="TResult">The type of the elements in the projected inner sequences and the elements in the merged result sequence.</typeparam>
  931. /// <param name="source">An observable sequence of elements to project.</param>
  932. /// <param name="selector">A transform function to apply to each element.</param>
  933. /// <returns>An observable sequence whose elements are the result of invoking the one-to-many transform function on each element of the input sequence.</returns>
  934. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="selector"/> is null.</exception>
  935. public static IObservable<TResult> SelectMany<TSource, TResult>(this IObservable<TSource> source, Func<TSource, IObservable<TResult>> selector)
  936. {
  937. if (source == null)
  938. throw new ArgumentNullException("source");
  939. if (selector == null)
  940. throw new ArgumentNullException("selector");
  941. return s_impl.SelectMany<TSource, TResult>(source, selector);
  942. }
  943. /// <summary>
  944. /// Projects each element of an observable sequence to an observable sequence by incorporating the element's index and merges the resulting observable sequences into one observable sequence.
  945. /// </summary>
  946. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  947. /// <typeparam name="TResult">The type of the elements in the projected inner sequences and the elements in the merged result sequence.</typeparam>
  948. /// <param name="source">An observable sequence of elements to project.</param>
  949. /// <param name="selector">A transform function to apply to each element; the second parameter of the function represents the index of the source element.</param>
  950. /// <returns>An observable sequence whose elements are the result of invoking the one-to-many transform function on each element of the input sequence.</returns>
  951. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="selector"/> is null.</exception>
  952. public static IObservable<TResult> SelectMany<TSource, TResult>(this IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector)
  953. {
  954. if (source == null)
  955. throw new ArgumentNullException("source");
  956. if (selector == null)
  957. throw new ArgumentNullException("selector");
  958. return s_impl.SelectMany<TSource, TResult>(source, selector);
  959. }
  960. #if !NO_TPL
  961. /// <summary>
  962. /// Projects each element of an observable sequence to a task and merges all of the task results into one observable sequence.
  963. /// </summary>
  964. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  965. /// <typeparam name="TResult">The type of the result produced by the projected tasks and the elements in the merged result sequence.</typeparam>
  966. /// <param name="source">An observable sequence of elements to project.</param>
  967. /// <param name="selector">A transform function to apply to each element.</param>
  968. /// <returns>An observable sequence whose elements are the result of the tasks executed for each element of the input sequence.</returns>
  969. /// <remarks>This overload supports composition of observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="TaskObservableExtensions.ToObservable&lt;TResult&gt;"/>.</remarks>
  970. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="selector"/> is null.</exception>
  971. public static IObservable<TResult> SelectMany<TSource, TResult>(this IObservable<TSource> source, Func<TSource, Task<TResult>> selector)
  972. {
  973. if (source == null)
  974. throw new ArgumentNullException("source");
  975. if (selector == null)
  976. throw new ArgumentNullException("selector");
  977. return s_impl.SelectMany<TSource, TResult>(source, selector);
  978. }
  979. /// <summary>
  980. /// Projects each element of an observable sequence to a task by incorporating the element's index and merges all of the task results into one observable sequence.
  981. /// </summary>
  982. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  983. /// <typeparam name="TResult">The type of the result produced by the projected tasks and the elements in the merged result sequence.</typeparam>
  984. /// <param name="source">An observable sequence of elements to project.</param>
  985. /// <param name="selector">A transform function to apply to each element; the second parameter of the function represents the index of the source element.</param>
  986. /// <returns>An observable sequence whose elements are the result of the tasks executed for each element of the input sequence.</returns>
  987. /// <remarks>This overload supports composition of observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="TaskObservableExtensions.ToObservable&lt;TResult&gt;"/>.</remarks>
  988. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="selector"/> is null.</exception>
  989. public static IObservable<TResult> SelectMany<TSource, TResult>(this IObservable<TSource> source, Func<TSource, int, Task<TResult>> selector)
  990. {
  991. if (source == null)
  992. throw new ArgumentNullException("source");
  993. if (selector == null)
  994. throw new ArgumentNullException("selector");
  995. return s_impl.SelectMany<TSource, TResult>(source, selector);
  996. }
  997. /// <summary>
  998. /// Projects each element of an observable sequence to a task with cancellation support and merges all of the task results into one observable sequence.
  999. /// </summary>
  1000. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  1001. /// <typeparam name="TResult">The type of the result produced by the projected tasks and the elements in the merged result sequence.</typeparam>
  1002. /// <param name="source">An observable sequence of elements to project.</param>
  1003. /// <param name="selector">A transform function to apply to each element.</param>
  1004. /// <returns>An observable sequence whose elements are the result of the tasks executed for each element of the input sequence.</returns>
  1005. /// <remarks>This overload supports composition of observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="TaskObservableExtensions.ToObservable&lt;TResult&gt;"/>.</remarks>
  1006. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="selector"/> is null.</exception>
  1007. public static IObservable<TResult> SelectMany<TSource, TResult>(this IObservable<TSource> source, Func<TSource, CancellationToken, Task<TResult>> selector)
  1008. {
  1009. if (source == null)
  1010. throw new ArgumentNullException("source");
  1011. if (selector == null)
  1012. throw new ArgumentNullException("selector");
  1013. return s_impl.SelectMany<TSource, TResult>(source, selector);
  1014. }
  1015. /// <summary>
  1016. /// Projects each element of an observable sequence to a task by incorporating the element's index with cancellation support and merges all of the task results into one observable sequence.
  1017. /// </summary>
  1018. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  1019. /// <typeparam name="TResult">The type of the result produced by the projected tasks and the elements in the merged result sequence.</typeparam>
  1020. /// <param name="source">An observable sequence of elements to project.</param>
  1021. /// <param name="selector">A transform function to apply to each element; the second parameter of the function represents the index of the source element.</param>
  1022. /// <returns>An observable sequence whose elements are the result of the tasks executed for each element of the input sequence.</returns>
  1023. /// <remarks>This overload supports composition of observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="TaskObservableExtensions.ToObservable&lt;TResult&gt;"/>.</remarks>
  1024. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="selector"/> is null.</exception>
  1025. public static IObservable<TResult> SelectMany<TSource, TResult>(this IObservable<TSource> source, Func<TSource, int, CancellationToken, Task<TResult>> selector)
  1026. {
  1027. if (source == null)
  1028. throw new ArgumentNullException("source");
  1029. if (selector == null)
  1030. throw new ArgumentNullException("selector");
  1031. return s_impl.SelectMany<TSource, TResult>(source, selector);
  1032. }
  1033. #endif
  1034. /// <summary>
  1035. /// Projects each element of an observable sequence to an observable sequence, invokes the result selector for the source element and each of the corresponding inner sequence's elements, and merges the results into one observable sequence.
  1036. /// </summary>
  1037. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  1038. /// <typeparam name="TCollection">The type of the elements in the projected intermediate sequences.</typeparam>
  1039. /// <typeparam name="TResult">The type of the elements in the result sequence, obtained by using the selector to combine source sequence elements with their corresponding intermediate sequence elements.</typeparam>
  1040. /// <param name="source">An observable sequence of elements to project.</param>
  1041. /// <param name="collectionSelector">A transform function to apply to each element.</param>
  1042. /// <param name="resultSelector">A transform function to apply to each element of the intermediate sequence.</param>
  1043. /// <returns>An observable sequence whose elements are the result of invoking the one-to-many transform function collectionSelector on each element of the input sequence and then mapping each of those sequence elements and their corresponding source element to a result element.</returns>
  1044. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="collectionSelector"/> or <paramref name="resultSelector"/> is null.</exception>
  1045. public static IObservable<TResult> SelectMany<TSource, TCollection, TResult>(this IObservable<TSource> source, Func<TSource, IObservable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
  1046. {
  1047. if (source == null)
  1048. throw new ArgumentNullException("source");
  1049. if (collectionSelector == null)
  1050. throw new ArgumentNullException("collectionSelector");
  1051. if (resultSelector == null)
  1052. throw new ArgumentNullException("resultSelector");
  1053. return s_impl.SelectMany<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
  1054. }
  1055. /// <summary>
  1056. /// Projects each element of an observable sequence to an observable sequence by incorporating the element's index, invokes the result selector for the source element and each of the corresponding inner sequence's elements, and merges the results into one observable sequence.
  1057. /// </summary>
  1058. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  1059. /// <typeparam name="TCollection">The type of the elements in the projected intermediate sequences.</typeparam>
  1060. /// <typeparam name="TResult">The type of the elements in the result sequence, obtained by using the selector to combine source sequence elements with their corresponding intermediate sequence elements.</typeparam>
  1061. /// <param name="source">An observable sequence of elements to project.</param>
  1062. /// <param name="collectionSelector">A transform function to apply to each element; the second parameter of the function represents the index of the source element.</param>
  1063. /// <param name="resultSelector">A transform function to apply to each element of the intermediate sequence; the second parameter of the function represents the index of the source element and the fourth parameter represents the index of the intermediate element.</param>
  1064. /// <returns>An observable sequence whose elements are the result of invoking the one-to-many transform function collectionSelector on each element of the input sequence and then mapping each of those sequence elements and their corresponding source element to a result element.</returns>
  1065. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="collectionSelector"/> or <paramref name="resultSelector"/> is null.</exception>
  1066. public static IObservable<TResult> SelectMany<TSource, TCollection, TResult>(this IObservable<TSource> source, Func<TSource, int, IObservable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector)
  1067. {
  1068. if (source == null)
  1069. throw new ArgumentNullException("source");
  1070. if (collectionSelector == null)
  1071. throw new ArgumentNullException("collectionSelector");
  1072. if (resultSelector == null)
  1073. throw new ArgumentNullException("resultSelector");
  1074. return s_impl.SelectMany<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
  1075. }
  1076. #if !NO_TPL
  1077. /// <summary>
  1078. /// Projects each element of an observable sequence to a task, invokes the result selector for the source element and the task result, and merges the results into one observable sequence.
  1079. /// </summary>
  1080. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  1081. /// <typeparam name="TTaskResult">The type of the results produced by the projected intermediate tasks.</typeparam>
  1082. /// <typeparam name="TResult">The type of the elements in the result sequence, obtained by using the selector to combine source sequence elements with their corresponding intermediate task results.</typeparam>
  1083. /// <param name="source">An observable sequence of elements to project.</param>
  1084. /// <param name="taskSelector">A transform function to apply to each element.</param>
  1085. /// <param name="resultSelector">A transform function to apply to each element of the intermediate sequence.</param>
  1086. /// <returns>An observable sequence whose elements are the result of obtaining a task for each element of the input sequence and then mapping the task's result and its corresponding source element to a result element.</returns>
  1087. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="taskSelector"/> or <paramref name="resultSelector"/> is null.</exception>
  1088. /// <remarks>This overload supports using LINQ query comprehension syntax in C# and Visual Basic to compose observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="TaskObservableExtensions.ToObservable&lt;TResult&gt;"/>.</remarks>
  1089. public static IObservable<TResult> SelectMany<TSource, TTaskResult, TResult>(this IObservable<TSource> source, Func<TSource, Task<TTaskResult>> taskSelector, Func<TSource, TTaskResult, TResult> resultSelector)
  1090. {
  1091. if (source == null)
  1092. throw new ArgumentNullException("source");
  1093. if (taskSelector == null)
  1094. throw new ArgumentNullException("taskSelector");
  1095. if (resultSelector == null)
  1096. throw new ArgumentNullException("resultSelector");
  1097. return s_impl.SelectMany<TSource, TTaskResult, TResult>(source, taskSelector, resultSelector);
  1098. }
  1099. /// <summary>
  1100. /// Projects each element of an observable sequence to a task by incorporating the element's index, invokes the result selector for the source element and the task result, and merges the results into one observable sequence.
  1101. /// </summary>
  1102. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  1103. /// <typeparam name="TTaskResult">The type of the results produced by the projected intermediate tasks.</typeparam>
  1104. /// <typeparam name="TResult">The type of the elements in the result sequence, obtained by using the selector to combine source sequence elements with their corresponding intermediate task results.</typeparam>
  1105. /// <param name="source">An observable sequence of elements to project.</param>
  1106. /// <param name="taskSelector">A transform function to apply to each element; the second parameter of the function represents the index of the source element.</param>
  1107. /// <param name="resultSelector">A transform function to apply to each element of the intermediate sequence; the second parameter of the function represents the index of the source element.</param>
  1108. /// <returns>An observable sequence whose elements are the result of obtaining a task for each element of the input sequence and then mapping the task's result and its corresponding source element to a result element.</returns>
  1109. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="taskSelector"/> or <paramref name="resultSelector"/> is null.</exception>
  1110. /// <remarks>This overload supports using LINQ query comprehension syntax in C# and Visual Basic to compose observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="TaskObservableExtensions.ToObservable&lt;TResult&gt;"/>.</remarks>
  1111. public static IObservable<TResult> SelectMany<TSource, TTaskResult, TResult>(this IObservable<TSource> source, Func<TSource, int, Task<TTaskResult>> taskSelector, Func<TSource, int, TTaskResult, TResult> resultSelector)
  1112. {
  1113. if (source == null)
  1114. throw new ArgumentNullException("source");
  1115. if (taskSelector == null)
  1116. throw new ArgumentNullException("taskSelector");
  1117. if (resultSelector == null)
  1118. throw new ArgumentNullException("resultSelector");
  1119. return s_impl.SelectMany<TSource, TTaskResult, TResult>(source, taskSelector, resultSelector);
  1120. }
  1121. /// <summary>
  1122. /// Projects each element of an observable sequence to a task with cancellation support, invokes the result selector for the source element and the task result, and merges the results into one observable sequence.
  1123. /// </summary>
  1124. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  1125. /// <typeparam name="TTaskResult">The type of the results produced by the projected intermediate tasks.</typeparam>
  1126. /// <typeparam name="TResult">The type of the elements in the result sequence, obtained by using the selector to combine source sequence elements with their corresponding intermediate task results.</typeparam>
  1127. /// <param name="source">An observable sequence of elements to project.</param>
  1128. /// <param name="taskSelector">A transform function to apply to each element.</param>
  1129. /// <param name="resultSelector">A transform function to apply to each element of the intermediate sequence.</param>
  1130. /// <returns>An observable sequence whose elements are the result of obtaining a task for each element of the input sequence and then mapping the task's result and its corresponding source element to a result element.</returns>
  1131. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="taskSelector"/> or <paramref name="resultSelector"/> is null.</exception>
  1132. /// <remarks>This overload supports using LINQ query comprehension syntax in C# and Visual Basic to compose observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="TaskObservableExtensions.ToObservable&lt;TResult&gt;"/>.</remarks>
  1133. public static IObservable<TResult> SelectMany<TSource, TTaskResult, TResult>(this IObservable<TSource> source, Func<TSource, CancellationToken, Task<TTaskResult>> taskSelector, Func<TSource, TTaskResult, TResult> resultSelector)
  1134. {
  1135. if (source == null)
  1136. throw new ArgumentNullException("source");
  1137. if (taskSelector == null)
  1138. throw new ArgumentNullException("taskSelector");
  1139. if (resultSelector == null)
  1140. throw new ArgumentNullException("resultSelector");
  1141. return s_impl.SelectMany<TSource, TTaskResult, TResult>(source, taskSelector, resultSelector);
  1142. }
  1143. /// <summary>
  1144. /// Projects each element of an observable sequence to a task by incorporating the element's index with cancellation support, invokes the result selector for the source element and the task result, and merges the results into one observable sequence.
  1145. /// </summary>
  1146. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  1147. /// <typeparam name="TTaskResult">The type of the results produced by the projected intermediate tasks.</typeparam>
  1148. /// <typeparam name="TResult">The type of the elements in the result sequence, obtained by using the selector to combine source sequence elements with their corresponding intermediate task results.</typeparam>
  1149. /// <param name="source">An observable sequence of elements to project.</param>
  1150. /// <param name="taskSelector">A transform function to apply to each element; the second parameter of the function represents the index of the source element.</param>
  1151. /// <param name="resultSelector">A transform function to apply to each element of the intermediate sequence; the second parameter of the function represents the index of the source element.</param>
  1152. /// <returns>An observable sequence whose elements are the result of obtaining a task for each element of the input sequence and then mapping the task's result and its corresponding source element to a result element.</returns>
  1153. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="taskSelector"/> or <paramref name="resultSelector"/> is null.</exception>
  1154. /// <remarks>This overload supports using LINQ query comprehension syntax in C# and Visual Basic to compose observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="TaskObservableExtensions.ToObservable&lt;TResult&gt;"/>.</remarks>
  1155. public static IObservable<TResult> SelectMany<TSource, TTaskResult, TResult>(this IObservable<TSource> source, Func<TSource, int, CancellationToken, Task<TTaskResult>> taskSelector, Func<TSource, int, TTaskResult, TResult> resultSelector)
  1156. {
  1157. if (source == null)
  1158. throw new ArgumentNullException("source");
  1159. if (taskSelector == null)
  1160. throw new ArgumentNullException("taskSelector");
  1161. if (resultSelector == null)
  1162. throw new ArgumentNullException("resultSelector");
  1163. return s_impl.SelectMany<TSource, TTaskResult, TResult>(source, taskSelector, resultSelector);
  1164. }
  1165. #endif
  1166. /// <summary>
  1167. /// Projects each notification of an observable sequence to an observable sequence and merges the resulting observable sequences into one observable sequence.
  1168. /// </summary>
  1169. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  1170. /// <typeparam name="TResult">The type of the elements in the projected inner sequences and the elements in the merged result sequence.</typeparam>
  1171. /// <param name="source">An observable sequence of notifications to project.</param>
  1172. /// <param name="onNext">A transform function to apply to each element.</param>
  1173. /// <param name="onError">A transform function to apply when an error occurs in the source sequence.</param>
  1174. /// <param name="onCompleted">A transform function to apply when the end of the source sequence is reached.</param>
  1175. /// <returns>An observable sequence whose elements are the result of invoking the one-to-many transform function corresponding to each notification in the input sequence.</returns>
  1176. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="onNext"/> or <paramref name="onError"/> or <paramref name="onCompleted"/> is null.</exception>
  1177. public static IObservable<TResult> SelectMany<TSource, TResult>(this IObservable<TSource> source, Func<TSource, IObservable<TResult>> onNext, Func<Exception, IObservable<TResult>> onError, Func<IObservable<TResult>> onCompleted)
  1178. {
  1179. if (source == null)
  1180. throw new ArgumentNullException("source");
  1181. if (onNext == null)
  1182. throw new ArgumentNullException("onNext");
  1183. if (onError == null)
  1184. throw new ArgumentNullException("onError");
  1185. if (onCompleted == null)
  1186. throw new ArgumentNullException("onCompleted");
  1187. return s_impl.SelectMany<TSource, TResult>(source, onNext, onError, onCompleted);
  1188. }
  1189. /// <summary>
  1190. /// Projects each notification of an observable sequence to an observable sequence by incorporating the element's index and merges the resulting observable sequences into one observable sequence.
  1191. /// </summary>
  1192. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  1193. /// <typeparam name="TResult">The type of the elements in the projected inner sequences and the elements in the merged result sequence.</typeparam>
  1194. /// <param name="source">An observable sequence of notifications to project.</param>
  1195. /// <param name="onNext">A transform function to apply to each element; the second parameter of the function represents the index of the source element.</param>
  1196. /// <param name="onError">A transform function to apply when an error occurs in the source sequence.</param>
  1197. /// <param name="onCompleted">A transform function to apply when the end of the source sequence is reached.</param>
  1198. /// <returns>An observable sequence whose elements are the result of invoking the one-to-many transform function corresponding to each notification in the input sequence.</returns>
  1199. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="onNext"/> or <paramref name="onError"/> or <paramref name="onCompleted"/> is null.</exception>
  1200. public static IObservable<TResult> SelectMany<TSource, TResult>(this IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> onNext, Func<Exception, IObservable<TResult>> onError, Func<IObservable<TResult>> onCompleted)
  1201. {
  1202. if (source == null)
  1203. throw new ArgumentNullException("source");
  1204. if (onNext == null)
  1205. throw new ArgumentNullException("onNext");
  1206. if (onError == null)
  1207. throw new ArgumentNullException("onError");
  1208. if (onCompleted == null)
  1209. throw new ArgumentNullException("onCompleted");
  1210. return s_impl.SelectMany<TSource, TResult>(source, onNext, onError, onCompleted);
  1211. }
  1212. /// <summary>
  1213. /// Projects each element of an observable sequence to an enumerable sequence and concatenates the resulting enumerable sequences into one observable sequence.
  1214. /// </summary>
  1215. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  1216. /// <typeparam name="TResult">The type of the elements in the projected inner enumerable sequences and the elements in the merged result sequence.</typeparam>
  1217. /// <param name="source">An observable sequence of elements to project.</param>
  1218. /// <param name="selector">A transform function to apply to each element.</param>
  1219. /// <returns>An observable sequence whose elements are the result of invoking the one-to-many transform function on each element of the input sequence.</returns>
  1220. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="selector"/> is null.</exception>
  1221. /// <remarks>The projected sequences are enumerated synchonously within the OnNext call of the source sequence. In order to do a concurrent, non-blocking merge, change the selector to return an observable sequence obtained using the <see cref="Observable.ToObservable&lt;TSource&gt;(IEnumerable&lt;TSource&gt;)"/> conversion.</remarks>
  1222. public static IObservable<TResult> SelectMany<TSource, TResult>(this IObservable<TSource> source, Func<TSource, IEnumerable<TResult>> selector)
  1223. {
  1224. if (source == null)
  1225. throw new ArgumentNullException("source");
  1226. if (selector == null)
  1227. throw new ArgumentNullException("selector");
  1228. return s_impl.SelectMany<TSource, TResult>(source, selector);
  1229. }
  1230. /// <summary>
  1231. /// Projects each element of an observable sequence to an enumerable sequence by incorporating the element's index and concatenates the resulting enumerable sequences into one observable sequence.
  1232. /// </summary>
  1233. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  1234. /// <typeparam name="TResult">The type of the elements in the projected inner enumerable sequences and the elements in the merged result sequence.</typeparam>
  1235. /// <param name="source">An observable sequence of elements to project.</param>
  1236. /// <param name="selector">A transform function to apply to each element; the second parameter of the function represents the index of the source element.</param>
  1237. /// <returns>An observable sequence whose elements are the result of invoking the one-to-many transform function on each element of the input sequence.</returns>
  1238. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="selector"/> is null.</exception>
  1239. /// <remarks>The projected sequences are enumerated synchonously within the OnNext call of the source sequence. In order to do a concurrent, non-blocking merge, change the selector to return an observable sequence obtained using the <see cref="Observable.ToObservable&lt;TSource&gt;(IEnumerable&lt;TSource&gt;)"/> conversion.</remarks>
  1240. public static IObservable<TResult> SelectMany<TSource, TResult>(this IObservable<TSource> source, Func<TSource, int, IEnumerable<TResult>> selector)
  1241. {
  1242. if (source == null)
  1243. throw new ArgumentNullException("source");
  1244. if (selector == null)
  1245. throw new ArgumentNullException("selector");
  1246. return s_impl.SelectMany<TSource, TResult>(source, selector);
  1247. }
  1248. /// <summary>
  1249. /// Projects each element of an observable sequence to an enumerable sequence, invokes the result selector for the source element and each of the corresponding inner sequence's elements, and merges the results into one observable sequence.
  1250. /// </summary>
  1251. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  1252. /// <typeparam name="TCollection">The type of the elements in the projected intermediate enumerable sequences.</typeparam>
  1253. /// <typeparam name="TResult">The type of the elements in the result sequence, obtained by using the selector to combine source sequence elements with their corresponding intermediate sequence elements.</typeparam>
  1254. /// <param name="source">An observable sequence of elements to project.</param>
  1255. /// <param name="collectionSelector">A transform function to apply to each element.</param>
  1256. /// <param name="resultSelector">A transform function to apply to each element of the intermediate sequence.</param>
  1257. /// <returns>An observable sequence whose elements are the result of invoking the one-to-many transform function collectionSelector on each element of the input sequence and then mapping each of those sequence elements and their corresponding source element to a result element.</returns>
  1258. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="collectionSelector"/> or <paramref name="resultSelector"/> is null.</exception>
  1259. /// <remarks>The projected sequences are enumerated synchonously within the OnNext call of the source sequence. In order to do a concurrent, non-blocking merge, change the selector to return an observable sequence obtained using the <see cref="Observable.ToObservable&lt;TSource&gt;(IEnumerable&lt;TSource&gt;)"/> conversion.</remarks>
  1260. public static IObservable<TResult> SelectMany<TSource, TCollection, TResult>(this IObservable<TSource> source, Func<TSource, IEnumerable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
  1261. {
  1262. if (source == null)
  1263. throw new ArgumentNullException("source");
  1264. if (collectionSelector == null)
  1265. throw new ArgumentNullException("collectionSelector");
  1266. if (resultSelector == null)
  1267. throw new ArgumentNullException("resultSelector");
  1268. return s_impl.SelectMany<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
  1269. }
  1270. /// <summary>
  1271. /// Projects each element of an observable sequence to an enumerable sequence by incorporating the element's index, invokes the result selector for the source element and each of the corresponding inner sequence's elements, and merges the results into one observable sequence.
  1272. /// </summary>
  1273. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  1274. /// <typeparam name="TCollection">The type of the elements in the projected intermediate enumerable sequences.</typeparam>
  1275. /// <typeparam name="TResult">The type of the elements in the result sequence, obtained by using the selector to combine source sequence elements with their corresponding intermediate sequence elements.</typeparam>
  1276. /// <param name="source">An observable sequence of elements to project.</param>
  1277. /// <param name="collectionSelector">A transform function to apply to each element; the second parameter of the function represents the index of the source element.</param>
  1278. /// <param name="resultSelector">A transform function to apply to each element of the intermediate sequence; the second parameter of the function represents the index of the source element and the fourth parameter represents the index of the intermediate element.</param>
  1279. /// <returns>An observable sequence whose elements are the result of invoking the one-to-many transform function collectionSelector on each element of the input sequence and then mapping each of those sequence elements and their corresponding source element to a result element.</returns>
  1280. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="collectionSelector"/> or <paramref name="resultSelector"/> is null.</exception>
  1281. /// <remarks>The projected sequences are enumerated synchonously within the OnNext call of the source sequence. In order to do a concurrent, non-blocking merge, change the selector to return an observable sequence obtained using the <see cref="Observable.ToObservable&lt;TSource&gt;(IEnumerable&lt;TSource&gt;)"/> conversion.</remarks>
  1282. public static IObservable<TResult> SelectMany<TSource, TCollection, TResult>(this IObservable<TSource> source, Func<TSource, int, IEnumerable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector)
  1283. {
  1284. if (source == null)
  1285. throw new ArgumentNullException("source");
  1286. if (collectionSelector == null)
  1287. throw new ArgumentNullException("collectionSelector");
  1288. if (resultSelector == null)
  1289. throw new ArgumentNullException("resultSelector");
  1290. return s_impl.SelectMany<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
  1291. }
  1292. #endregion
  1293. #region + Skip +
  1294. /// <summary>
  1295. /// Bypasses a specified number of elements in an observable sequence and then returns the remaining elements.
  1296. /// </summary>
  1297. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  1298. /// <param name="source">The sequence to take elements from.</param>
  1299. /// <param name="count">The number of elements to skip before returning the remaining elements.</param>
  1300. /// <returns>An observable sequence that contains the elements that occur after the specified index in the input sequence.</returns>
  1301. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  1302. /// <exception cref="ArgumentOutOfRangeException"><paramref name="count"/> is less than zero.</exception>
  1303. public static IObservable<TSource> Skip<TSource>(this IObservable<TSource> source, int count)
  1304. {
  1305. if (source == null)
  1306. throw new ArgumentNullException("source");
  1307. if (count < 0)
  1308. throw new ArgumentOutOfRangeException("count");
  1309. return s_impl.Skip<TSource>(source, count);
  1310. }
  1311. #endregion
  1312. #region + SkipWhile +
  1313. /// <summary>
  1314. /// Bypasses elements in an observable sequence as long as a specified condition is true and then returns the remaining elements.
  1315. /// </summary>
  1316. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  1317. /// <param name="source">An observable sequence to return elements from.</param>
  1318. /// <param name="predicate">A function to test each element for a condition.</param>
  1319. /// <returns>An observable sequence that contains the elements from the input sequence starting at the first element in the linear series that does not pass the test specified by predicate.</returns>
  1320. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="predicate"/> is null.</exception>
  1321. public static IObservable<TSource> SkipWhile<TSource>(this IObservable<TSource> source, Func<TSource, bool> predicate)
  1322. {
  1323. if (source == null)
  1324. throw new ArgumentNullException("source");
  1325. if (predicate == null)
  1326. throw new ArgumentNullException("predicate");
  1327. return s_impl.SkipWhile<TSource>(source, predicate);
  1328. }
  1329. /// <summary>
  1330. /// Bypasses elements in an observable sequence as long as a specified condition is true and then returns the remaining elements.
  1331. /// The element's index is used in the logic of the predicate function.
  1332. /// </summary>
  1333. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  1334. /// <param name="source">An observable sequence to return elements from.</param>
  1335. /// <param name="predicate">A function to test each element for a condition; the second parameter of the function represents the index of the source element.</param>
  1336. /// <returns>An observable sequence that contains the elements from the input sequence starting at the first element in the linear series that does not pass the test specified by predicate.</returns>
  1337. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="predicate"/> is null.</exception>
  1338. public static IObservable<TSource> SkipWhile<TSource>(this IObservable<TSource> source, Func<TSource, int, bool> predicate)
  1339. {
  1340. if (source == null)
  1341. throw new ArgumentNullException("source");
  1342. if (predicate == null)
  1343. throw new ArgumentNullException("predicate");
  1344. return s_impl.SkipWhile<TSource>(source, predicate);
  1345. }
  1346. #endregion
  1347. #region + Take +
  1348. /// <summary>
  1349. /// Returns a specified number of contiguous elements from the start of an observable sequence.
  1350. /// </summary>
  1351. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  1352. /// <param name="source">The sequence to take elements from.</param>
  1353. /// <param name="count">The number of elements to return.</param>
  1354. /// <returns>An observable sequence that contains the specified number of elements from the start of the input sequence.</returns>
  1355. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  1356. /// <exception cref="ArgumentOutOfRangeException"><paramref name="count"/> is less than zero.</exception>
  1357. public static IObservable<TSource> Take<TSource>(this IObservable<TSource> source, int count)
  1358. {
  1359. if (source == null)
  1360. throw new ArgumentNullException("source");
  1361. if (count < 0)
  1362. throw new ArgumentOutOfRangeException("count");
  1363. return s_impl.Take<TSource>(source, count);
  1364. }
  1365. /// <summary>
  1366. /// Returns a specified number of contiguous elements from the start of an observable sequence, using the specified scheduler for the edge case of Take(0).
  1367. /// </summary>
  1368. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  1369. /// <param name="source">The sequence to take elements from.</param>
  1370. /// <param name="count">The number of elements to return.</param>
  1371. /// <param name="scheduler">Scheduler used to produce an OnCompleted message in case <paramref name="count">count</paramref> is set to 0.</param>
  1372. /// <returns>An observable sequence that contains the specified number of elements from the start of the input sequence.</returns>
  1373. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="scheduler"/> is null.</exception>
  1374. /// <exception cref="ArgumentOutOfRangeException"><paramref name="count"/> is less than zero.</exception>
  1375. public static IObservable<TSource> Take<TSource>(this IObservable<TSource> source, int count, IScheduler scheduler)
  1376. {
  1377. if (source == null)
  1378. throw new ArgumentNullException("source");
  1379. if (count < 0)
  1380. throw new ArgumentOutOfRangeException("count");
  1381. if (scheduler == null)
  1382. throw new ArgumentNullException("scheduler");
  1383. return s_impl.Take<TSource>(source, count, scheduler);
  1384. }
  1385. #endregion
  1386. #region + TakeWhile +
  1387. /// <summary>
  1388. /// Returns elements from an observable sequence as long as a specified condition is true.
  1389. /// </summary>
  1390. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  1391. /// <param name="source">A sequence to return elements from.</param>
  1392. /// <param name="predicate">A function to test each element for a condition.</param>
  1393. /// <returns>An observable sequence that contains the elements from the input sequence that occur before the element at which the test no longer passes.</returns>
  1394. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="predicate"/> is null.</exception>
  1395. public static IObservable<TSource> TakeWhile<TSource>(this IObservable<TSource> source, Func<TSource, bool> predicate)
  1396. {
  1397. if (source == null)
  1398. throw new ArgumentNullException("source");
  1399. if (predicate == null)
  1400. throw new ArgumentNullException("predicate");
  1401. return s_impl.TakeWhile<TSource>(source, predicate);
  1402. }
  1403. /// <summary>
  1404. /// Returns elements from an observable sequence as long as a specified condition is true.
  1405. /// The element's index is used in the logic of the predicate function.
  1406. /// </summary>
  1407. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  1408. /// <param name="source">A sequence to return elements from.</param>
  1409. /// <param name="predicate">A function to test each element for a condition; the second parameter of the function represents the index of the source element.</param>
  1410. /// <returns>An observable sequence that contains the elements from the input sequence that occur before the element at which the test no longer passes.</returns>
  1411. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="predicate"/> is null.</exception>
  1412. public static IObservable<TSource> TakeWhile<TSource>(this IObservable<TSource> source, Func<TSource, int, bool> predicate)
  1413. {
  1414. if (source == null)
  1415. throw new ArgumentNullException("source");
  1416. if (predicate == null)
  1417. throw new ArgumentNullException("predicate");
  1418. return s_impl.TakeWhile<TSource>(source, predicate);
  1419. }
  1420. #endregion
  1421. #region + ThenBy +
  1422. /// <summary>
  1423. /// Performs a subsequent ordering of the elements in a sequence in ascending order according to a key.
  1424. /// </summary>
  1425. /// <typeparam name="TSource">The type of the elements in the <paramref name="source"/> sequence.</typeparam>
  1426. /// <typeparam name="TKey">The type of the sorting key computed for each element in the source sequence.</typeparam>
  1427. /// <param name="source">An observable sequence of values to order.</param>
  1428. /// <param name="keySelector">A function to extract the key for each element.</param>
  1429. /// <returns>An <see cref="IOrderedObservable{TSource}"/> whose elements are sorted according to a key.</returns>
  1430. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> is <see langword="null"/>.</exception>
  1431. public static IOrderedObservable<TSource> ThenBy<TSource, TKey>(this IOrderedObservable<TSource> source, Func<TSource, TKey> keySelector)
  1432. {
  1433. if (source == null)
  1434. throw new ArgumentNullException("source");
  1435. if (keySelector == null)
  1436. throw new ArgumentNullException("keySelector");
  1437. return s_impl.ThenBy<TSource, TKey>(source, keySelector);
  1438. }
  1439. /// <summary>
  1440. /// Performs a subsequent ordering of the elements in a sequence in ascending order by using a specified comparer.
  1441. /// </summary>
  1442. /// <typeparam name="TSource">The type of the elements in the <paramref name="source"/> sequence.</typeparam>
  1443. /// <typeparam name="TKey">The type of the sorting key computed for each element in the source sequence.</typeparam>
  1444. /// <param name="source">An observable sequence of values to order.</param>
  1445. /// <param name="keySelector">A function to extract the key for each element.</param>
  1446. /// <param name="comparer">An <see cref="IComparer{TKey}"/> to compare keys.</param>
  1447. /// <returns>An <see cref="IOrderedObservable{TSource}"/> whose elements are sorted according to a key.</returns>
  1448. /// <exception cref="ArgumentNullException"><paramref name="source"/>, <paramref name="keySelector"/> or <paramref name="comparer"/> is <see langword="null"/>.</exception>
  1449. public static IOrderedObservable<TSource> ThenBy<TSource, TKey>(this IOrderedObservable<TSource> source, Func<TSource, TKey> keySelector, IComparer<TKey> comparer)
  1450. {
  1451. if (source == null)
  1452. throw new ArgumentNullException("source");
  1453. if (keySelector == null)
  1454. throw new ArgumentNullException("keySelector");
  1455. if (comparer == null)
  1456. throw new ArgumentNullException("comparer");
  1457. return s_impl.ThenBy<TSource, TKey>(source, keySelector, comparer);
  1458. }
  1459. /// <summary>
  1460. /// Performs a subsequent ordering of the elements in a sequence in ascending order according to the times at which corresponding observable sequences produce their first notification or complete.
  1461. /// </summary>
  1462. /// <typeparam name="TSource">The type of the elements in the <paramref name="source"/> sequence.</typeparam>
  1463. /// <typeparam name="TOther">The type of the elements in the observable returned by <paramref name="timeSelector"/>.</typeparam>
  1464. /// <param name="source">An observable sequence of values to order.</param>
  1465. /// <param name="timeSelector">A function that returns an observable for an element in the <paramref name="source"/> sequence indicating the time at which that element should appear in the ordering.</param>
  1466. /// <returns>An <see cref="IOrderedObservable{TSource}"/> whose elements are sorted according to the times at which corresponding observable sequences produce their first notification or complete.</returns>
  1467. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="timeSelector"/> is <see langword="null"/>.</exception>
  1468. /// <remarks>
  1469. /// <para>
  1470. /// This overload of ThenBy is similar to a particular usage of SelectMany where the elements in the <paramref name="source"/> sequence are returned based on the times
  1471. /// at which their corresponding inner sequences produce their first notification or complete. Any elements in the inner sequences are discarded.
  1472. /// </para>
  1473. /// <alert type="info">
  1474. /// Unlike the other overload of ThenBy, this overload does not buffer any elements and it does not wait for the <paramref name="source"/> sequence to complete before it
  1475. /// pushes notifications. This overload is entirely reactive.
  1476. /// </alert>
  1477. /// <para>
  1478. /// This overload supports using the orderby LINQ query comprehension syntax in C# and Visual Basic by passing an observable sequence as a subsequent key.
  1479. /// </para>
  1480. /// </remarks>
  1481. public static IOrderedObservable<TSource> ThenBy<TSource, TOther>(this IOrderedObservable<TSource> source, Func<TSource, IObservable<TOther>> timeSelector)
  1482. {
  1483. if (source == null)
  1484. throw new ArgumentNullException("source");
  1485. if (timeSelector == null)
  1486. throw new ArgumentNullException("timeSelector");
  1487. return s_impl.ThenBy<TSource, TOther>(source, timeSelector);
  1488. }
  1489. #endregion
  1490. #region + ThenByDescending +
  1491. /// <summary>
  1492. /// Performs a subsequent ordering of the elements in a sequence in descending order according to a key.
  1493. /// </summary>
  1494. /// <typeparam name="TSource">The type of the elements in the <paramref name="source"/> sequence.</typeparam>
  1495. /// <typeparam name="TKey">The type of the sorting key computed for each element in the source sequence.</typeparam>
  1496. /// <param name="source">An observable sequence of values to order.</param>
  1497. /// <param name="keySelector">A function to extract the key for each element.</param>
  1498. /// <returns>An <see cref="IOrderedObservable{TSource}"/> whose elements are sorted in descending order according to a key.</returns>
  1499. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> is <see langword="null"/>.</exception>
  1500. public static IOrderedObservable<TSource> ThenByDescending<TSource, TKey>(this IOrderedObservable<TSource> source, Func<TSource, TKey> keySelector)
  1501. {
  1502. if (source == null)
  1503. throw new ArgumentNullException("source");
  1504. if (keySelector == null)
  1505. throw new ArgumentNullException("keySelector");
  1506. return s_impl.ThenByDescending<TSource, TKey>(source, keySelector);
  1507. }
  1508. /// <summary>
  1509. /// Performs a subsequent ordering of the elements in a sequence in descending order by using a specified comparer.
  1510. /// </summary>
  1511. /// <typeparam name="TSource">The type of the elements in the <paramref name="source"/> sequence.</typeparam>
  1512. /// <typeparam name="TKey">The type of the sorting key computed for each element in the source sequence.</typeparam>
  1513. /// <param name="source">An observable sequence of values to order.</param>
  1514. /// <param name="keySelector">A function to extract the key for each element.</param>
  1515. /// <param name="comparer">An <see cref="IComparer{TKey}"/> to compare keys.</param>
  1516. /// <returns>An <see cref="IOrderedObservable{TSource}"/> whose elements are sorted in descending order according to a key.</returns>
  1517. /// <exception cref="ArgumentNullException"><paramref name="source"/>, <paramref name="keySelector"/> or <paramref name="comparer"/> is <see langword="null"/>.</exception>
  1518. public static IOrderedObservable<TSource> ThenByDescending<TSource, TKey>(this IOrderedObservable<TSource> source, Func<TSource, TKey> keySelector, IComparer<TKey> comparer)
  1519. {
  1520. if (source == null)
  1521. throw new ArgumentNullException("source");
  1522. if (keySelector == null)
  1523. throw new ArgumentNullException("keySelector");
  1524. if (comparer == null)
  1525. throw new ArgumentNullException("comparer");
  1526. return s_impl.ThenByDescending<TSource, TKey>(source, keySelector, comparer);
  1527. }
  1528. /// <summary>
  1529. /// Performs a subsequent ordering of the elements in a sequence in descending order according to the times at which corresponding observable sequences produce their first notification or complete.
  1530. /// </summary>
  1531. /// <typeparam name="TSource">The type of the elements in the <paramref name="source"/> sequence.</typeparam>
  1532. /// <typeparam name="TOther">The type of the elements in the observable returned by <paramref name="timeSelector"/>.</typeparam>
  1533. /// <param name="source">An observable sequence of values to order.</param>
  1534. /// <param name="timeSelector">A function that returns an observable for an element in the <paramref name="source"/> sequence indicating the time at which that element should appear in the ordering.</param>
  1535. /// <returns>An <see cref="IOrderedObservable{TSource}"/> whose elements are sorted in descending order according to the times at which corresponding observable sequences produce their first notification or complete.</returns>
  1536. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="timeSelector"/> is <see langword="null"/>.</exception>
  1537. /// <remarks>
  1538. /// <para>
  1539. /// Ordering descending by time requires all of the elements in the <paramref name="source"/> sequence to be observed before it can complete,
  1540. /// which means that all of the elements must be buffered. Ordering an observable sequence descending by time is similar to performing an aggregation
  1541. /// such as <see cref="ToList"/> because timing from the <paramref name="source"/> sequence is lost and notifications are only generated when
  1542. /// the <paramref name="source"/> sequence calls OnCompleted.
  1543. /// </para>
  1544. /// <alert type="warn">
  1545. /// Do not attempt to order an observable sequence that never calls OnCompleted. There will be no notifications generated, and it
  1546. /// may result in an <see cref="OutOfMemoryException"/> being thrown.
  1547. /// </alert>
  1548. /// <para>
  1549. /// This overload of ThenByDescending is similar to a particular usage of SelectMany where the elements in the <paramref name="source"/> sequence are returned based on the times
  1550. /// at which their corresponding inner sequences produce their first notification or complete. Any elements in the inner sequences are discarded.
  1551. /// </para>
  1552. /// <para>
  1553. /// This overload supports using the orderby LINQ query comprehension syntax in C# and Visual Basic by passing an observable sequence as a subsequent key.
  1554. /// </para>
  1555. /// </remarks>
  1556. public static IOrderedObservable<TSource> ThenByDescending<TSource, TOther>(this IOrderedObservable<TSource> source, Func<TSource, IObservable<TOther>> timeSelector)
  1557. {
  1558. if (source == null)
  1559. throw new ArgumentNullException("source");
  1560. if (timeSelector == null)
  1561. throw new ArgumentNullException("timeSelector");
  1562. return s_impl.ThenByDescending<TSource, TOther>(source, timeSelector);
  1563. }
  1564. #endregion
  1565. #region + Where +
  1566. /// <summary>
  1567. /// Filters the elements of an observable sequence based on a predicate.
  1568. /// </summary>
  1569. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  1570. /// <param name="source">An observable sequence whose elements to filter.</param>
  1571. /// <param name="predicate">A function to test each source element for a condition.</param>
  1572. /// <returns>An observable sequence that contains elements from the input sequence that satisfy the condition.</returns>
  1573. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="predicate"/> is null.</exception>
  1574. public static IObservable<TSource> Where<TSource>(this IObservable<TSource> source, Func<TSource, bool> predicate)
  1575. {
  1576. if (source == null)
  1577. throw new ArgumentNullException("source");
  1578. if (predicate == null)
  1579. throw new ArgumentNullException("predicate");
  1580. return s_impl.Where<TSource>(source, predicate);
  1581. }
  1582. /// <summary>
  1583. /// Filters the elements of an observable sequence based on a predicate by incorporating the element's index.
  1584. /// </summary>
  1585. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  1586. /// <param name="source">An observable sequence whose elements to filter.</param>
  1587. /// <param name="predicate">A function to test each source element for a conditio; the second parameter of the function represents the index of the source element.</param>
  1588. /// <returns>An observable sequence that contains elements from the input sequence that satisfy the condition.</returns>
  1589. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="predicate"/> is null.</exception>
  1590. public static IObservable<TSource> Where<TSource>(this IObservable<TSource> source, Func<TSource, int, bool> predicate)
  1591. {
  1592. if (source == null)
  1593. throw new ArgumentNullException("source");
  1594. if (predicate == null)
  1595. throw new ArgumentNullException("predicate");
  1596. return s_impl.Where<TSource>(source, predicate);
  1597. }
  1598. #endregion
  1599. }
  1600. }