1
0

Observable.StandardSequenceOperators.cs 109 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Reactive.Concurrency;
  6. using System.Reactive.Threading.Tasks; // needed for doc comments
  7. using System.Threading;
  8. using System.Threading.Tasks;
  9. namespace System.Reactive.Linq
  10. {
  11. public static partial class Observable
  12. {
  13. #region + Cast +
  14. /// <summary>
  15. /// Converts the elements of an observable sequence to the specified type.
  16. /// </summary>
  17. /// <typeparam name="TResult">The type to convert the elements in the source sequence to.</typeparam>
  18. /// <param name="source">The observable sequence that contains the elements to be converted.</param>
  19. /// <returns>An observable sequence that contains each element of the source sequence converted to the specified type.</returns>
  20. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  21. public static IObservable<TResult> Cast<TResult>(this IObservable<object> source)
  22. {
  23. if (source == null)
  24. {
  25. throw new ArgumentNullException(nameof(source));
  26. }
  27. return s_impl.Cast<TResult>(source);
  28. }
  29. #endregion
  30. #region + DefaultIfEmpty +
  31. /// <summary>
  32. /// Returns the elements of the specified sequence or the type parameter's default value in a singleton sequence if the sequence is empty.
  33. /// </summary>
  34. /// <typeparam name="TSource">The type of the elements in the source sequence (if any), whose default value will be taken if the sequence is empty.</typeparam>
  35. /// <param name="source">The sequence to return a default value for if it is empty.</param>
  36. /// <returns>An observable sequence that contains the default value for the TSource type if the source is empty; otherwise, the elements of the source itself.</returns>
  37. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  38. public static IObservable<TSource> DefaultIfEmpty<TSource>(this IObservable<TSource> source)
  39. {
  40. if (source == null)
  41. {
  42. throw new ArgumentNullException(nameof(source));
  43. }
  44. return s_impl.DefaultIfEmpty(source);
  45. }
  46. /// <summary>
  47. /// Returns the elements of the specified sequence or the specified value in a singleton sequence if the sequence is empty.
  48. /// </summary>
  49. /// <typeparam name="TSource">The type of the elements in the source sequence (if any), and the specified default value which will be taken if the sequence is empty.</typeparam>
  50. /// <param name="source">The sequence to return the specified value for if it is empty.</param>
  51. /// <param name="defaultValue">The value to return if the sequence is empty.</param>
  52. /// <returns>An observable sequence that contains the specified default value if the source is empty; otherwise, the elements of the source itself.</returns>
  53. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  54. public static IObservable<TSource> DefaultIfEmpty<TSource>(this IObservable<TSource> source, TSource defaultValue)
  55. {
  56. if (source == null)
  57. {
  58. throw new ArgumentNullException(nameof(source));
  59. }
  60. return s_impl.DefaultIfEmpty(source, defaultValue);
  61. }
  62. #endregion
  63. #region + Distinct +
  64. /// <summary>
  65. /// Returns an observable sequence that contains only distinct elements.
  66. /// </summary>
  67. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  68. /// <param name="source">An observable sequence to retain distinct elements for.</param>
  69. /// <returns>An observable sequence only containing the distinct elements from the source sequence.</returns>
  70. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  71. /// <remarks>Usage of this operator should be considered carefully due to the maintenance of an internal lookup structure which can grow large.</remarks>
  72. public static IObservable<TSource> Distinct<TSource>(this IObservable<TSource> source)
  73. {
  74. if (source == null)
  75. {
  76. throw new ArgumentNullException(nameof(source));
  77. }
  78. return s_impl.Distinct(source);
  79. }
  80. /// <summary>
  81. /// Returns an observable sequence that contains only distinct elements according to the comparer.
  82. /// </summary>
  83. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  84. /// <param name="source">An observable sequence to retain distinct elements for.</param>
  85. /// <param name="comparer">Equality comparer for source elements.</param>
  86. /// <returns>An observable sequence only containing the distinct elements from the source sequence.</returns>
  87. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="comparer"/> is null.</exception>
  88. /// <remarks>Usage of this operator should be considered carefully due to the maintenance of an internal lookup structure which can grow large.</remarks>
  89. public static IObservable<TSource> Distinct<TSource>(this IObservable<TSource> source, IEqualityComparer<TSource> comparer)
  90. {
  91. if (source == null)
  92. {
  93. throw new ArgumentNullException(nameof(source));
  94. }
  95. if (comparer == null)
  96. {
  97. throw new ArgumentNullException(nameof(comparer));
  98. }
  99. return s_impl.Distinct(source, comparer);
  100. }
  101. /// <summary>
  102. /// Returns an observable sequence that contains only distinct elements according to the keySelector.
  103. /// </summary>
  104. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  105. /// <typeparam name="TKey">The type of the discriminator key computed for each element in the source sequence.</typeparam>
  106. /// <param name="source">An observable sequence to retain distinct elements for.</param>
  107. /// <param name="keySelector">A function to compute the comparison key for each element.</param>
  108. /// <returns>An observable sequence only containing the distinct elements, based on a computed key value, from the source sequence.</returns>
  109. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> is null.</exception>
  110. /// <remarks>Usage of this operator should be considered carefully due to the maintenance of an internal lookup structure which can grow large.</remarks>
  111. public static IObservable<TSource> Distinct<TSource, TKey>(this IObservable<TSource> source, Func<TSource, TKey> keySelector)
  112. {
  113. if (source == null)
  114. {
  115. throw new ArgumentNullException(nameof(source));
  116. }
  117. if (keySelector == null)
  118. {
  119. throw new ArgumentNullException(nameof(keySelector));
  120. }
  121. return s_impl.Distinct(source, keySelector);
  122. }
  123. /// <summary>
  124. /// Returns an observable sequence that contains only distinct elements according to the keySelector and the comparer.
  125. /// </summary>
  126. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  127. /// <typeparam name="TKey">The type of the discriminator key computed for each element in the source sequence.</typeparam>
  128. /// <param name="source">An observable sequence to retain distinct elements for.</param>
  129. /// <param name="keySelector">A function to compute the comparison key for each element.</param>
  130. /// <param name="comparer">Equality comparer for source elements.</param>
  131. /// <returns>An observable sequence only containing the distinct elements, based on a computed key value, from the source sequence.</returns>
  132. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> or <paramref name="comparer"/> is null.</exception>
  133. /// <remarks>Usage of this operator should be considered carefully due to the maintenance of an internal lookup structure which can grow large.</remarks>
  134. public static IObservable<TSource> Distinct<TSource, TKey>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer)
  135. {
  136. if (source == null)
  137. {
  138. throw new ArgumentNullException(nameof(source));
  139. }
  140. if (keySelector == null)
  141. {
  142. throw new ArgumentNullException(nameof(keySelector));
  143. }
  144. if (comparer == null)
  145. {
  146. throw new ArgumentNullException(nameof(comparer));
  147. }
  148. return s_impl.Distinct(source, keySelector, comparer);
  149. }
  150. #endregion
  151. #region + GroupBy +
  152. /// <summary>
  153. /// Groups the elements of an observable sequence according to a specified key selector function.
  154. /// </summary>
  155. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  156. /// <typeparam name="TKey">The type of the grouping key computed for each element in the source sequence.</typeparam>
  157. /// <param name="source">An observable sequence whose elements to group.</param>
  158. /// <param name="keySelector">A function to extract the key for each element.</param>
  159. /// <returns>A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.</returns>
  160. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> is null.</exception>
  161. public static IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(this IObservable<TSource> source, Func<TSource, TKey> keySelector)
  162. {
  163. if (source == null)
  164. {
  165. throw new ArgumentNullException(nameof(source));
  166. }
  167. if (keySelector == null)
  168. {
  169. throw new ArgumentNullException(nameof(keySelector));
  170. }
  171. return s_impl.GroupBy(source, keySelector);
  172. }
  173. /// <summary>
  174. /// Groups the elements of an observable sequence according to a specified key selector function and comparer.
  175. /// </summary>
  176. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  177. /// <typeparam name="TKey">The type of the grouping key computed for each element in the source sequence.</typeparam>
  178. /// <param name="source">An observable sequence whose elements to group.</param>
  179. /// <param name="keySelector">A function to extract the key for each element.</param>
  180. /// <param name="comparer">An equality comparer to compare keys with.</param>
  181. /// <returns>A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.</returns>
  182. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> or <paramref name="comparer"/> is null.</exception>
  183. public static IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer)
  184. {
  185. if (source == null)
  186. {
  187. throw new ArgumentNullException(nameof(source));
  188. }
  189. if (keySelector == null)
  190. {
  191. throw new ArgumentNullException(nameof(keySelector));
  192. }
  193. if (comparer == null)
  194. {
  195. throw new ArgumentNullException(nameof(comparer));
  196. }
  197. return s_impl.GroupBy(source, keySelector, comparer);
  198. }
  199. /// <summary>
  200. /// Groups the elements of an observable sequence and selects the resulting elements by using a specified function.
  201. /// </summary>
  202. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  203. /// <typeparam name="TKey">The type of the grouping key computed for each element in the source sequence.</typeparam>
  204. /// <typeparam name="TElement">The type of the elements within the groups computed for each element in the source sequence.</typeparam>
  205. /// <param name="source">An observable sequence whose elements to group.</param>
  206. /// <param name="keySelector">A function to extract the key for each element.</param>
  207. /// <param name="elementSelector">A function to map each source element to an element in an observable group.</param>
  208. /// <returns>A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.</returns>
  209. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> or <paramref name="elementSelector"/> is null.</exception>
  210. public static IObservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector)
  211. {
  212. if (source == null)
  213. {
  214. throw new ArgumentNullException(nameof(source));
  215. }
  216. if (keySelector == null)
  217. {
  218. throw new ArgumentNullException(nameof(keySelector));
  219. }
  220. if (elementSelector == null)
  221. {
  222. throw new ArgumentNullException(nameof(elementSelector));
  223. }
  224. return s_impl.GroupBy(source, keySelector, elementSelector);
  225. }
  226. /// <summary>
  227. /// Groups the elements of an observable sequence according to a specified key selector function and comparer and selects the resulting elements by using a specified function.
  228. /// </summary>
  229. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  230. /// <typeparam name="TKey">The type of the grouping key computed for each element in the source sequence.</typeparam>
  231. /// <typeparam name="TElement">The type of the elements within the groups computed for each element in the source sequence.</typeparam>
  232. /// <param name="source">An observable sequence whose elements to group.</param>
  233. /// <param name="keySelector">A function to extract the key for each element.</param>
  234. /// <param name="elementSelector">A function to map each source element to an element in an observable group.</param>
  235. /// <param name="comparer">An equality comparer to compare keys with.</param>
  236. /// <returns>A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.</returns>
  237. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> or <paramref name="elementSelector"/> or <paramref name="comparer"/> is null.</exception>
  238. public static IObservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, IEqualityComparer<TKey> comparer)
  239. {
  240. if (source == null)
  241. {
  242. throw new ArgumentNullException(nameof(source));
  243. }
  244. if (keySelector == null)
  245. {
  246. throw new ArgumentNullException(nameof(keySelector));
  247. }
  248. if (elementSelector == null)
  249. {
  250. throw new ArgumentNullException(nameof(elementSelector));
  251. }
  252. if (comparer == null)
  253. {
  254. throw new ArgumentNullException(nameof(comparer));
  255. }
  256. return s_impl.GroupBy(source, keySelector, elementSelector, comparer);
  257. }
  258. /// <summary>
  259. /// Groups the elements of an observable sequence with the specified initial capacity according to a specified key selector function.
  260. /// </summary>
  261. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  262. /// <typeparam name="TKey">The type of the grouping key computed for each element in the source sequence.</typeparam>
  263. /// <param name="source">An observable sequence whose elements to group.</param>
  264. /// <param name="keySelector">A function to extract the key for each element.</param>
  265. /// <param name="capacity">The initial number of elements that the underlying dictionary can contain.</param>
  266. /// <returns>A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.</returns>
  267. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> is null.</exception>
  268. /// <exception cref="ArgumentOutOfRangeException"><paramref name="capacity"/> is less than 0.</exception>
  269. public static IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, int capacity)
  270. {
  271. if (source == null)
  272. {
  273. throw new ArgumentNullException(nameof(source));
  274. }
  275. if (keySelector == null)
  276. {
  277. throw new ArgumentNullException(nameof(keySelector));
  278. }
  279. if (capacity < 0)
  280. {
  281. throw new ArgumentOutOfRangeException(nameof(capacity));
  282. }
  283. return s_impl.GroupBy(source, keySelector, capacity);
  284. }
  285. /// <summary>
  286. /// Groups the elements of an observable sequence with the specified initial capacity according to a specified key selector function and comparer.
  287. /// </summary>
  288. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  289. /// <typeparam name="TKey">The type of the grouping key computed for each element in the source sequence.</typeparam>
  290. /// <param name="source">An observable sequence whose elements to group.</param>
  291. /// <param name="keySelector">A function to extract the key for each element.</param>
  292. /// <param name="capacity">The initial number of elements that the underlying dictionary can contain.</param>
  293. /// <param name="comparer">An equality comparer to compare keys with.</param>
  294. /// <returns>A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.</returns>
  295. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> or <paramref name="comparer"/> is null.</exception>
  296. /// <exception cref="ArgumentOutOfRangeException"><paramref name="capacity"/> is less than 0.</exception>
  297. public static IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, int capacity, IEqualityComparer<TKey> comparer)
  298. {
  299. if (source == null)
  300. {
  301. throw new ArgumentNullException(nameof(source));
  302. }
  303. if (keySelector == null)
  304. {
  305. throw new ArgumentNullException(nameof(keySelector));
  306. }
  307. if (capacity < 0)
  308. {
  309. throw new ArgumentOutOfRangeException(nameof(capacity));
  310. }
  311. if (comparer == null)
  312. {
  313. throw new ArgumentNullException(nameof(comparer));
  314. }
  315. return s_impl.GroupBy(source, keySelector, capacity, comparer);
  316. }
  317. /// <summary>
  318. /// Groups the elements of an observable sequence with the specified initial capacity and selects the resulting elements by using a specified function.
  319. /// </summary>
  320. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  321. /// <typeparam name="TKey">The type of the grouping key computed for each element in the source sequence.</typeparam>
  322. /// <typeparam name="TElement">The type of the elements within the groups computed for each element in the source sequence.</typeparam>
  323. /// <param name="source">An observable sequence whose elements to group.</param>
  324. /// <param name="keySelector">A function to extract the key for each element.</param>
  325. /// <param name="elementSelector">A function to map each source element to an element in an observable group.</param>
  326. /// <param name="capacity">The initial number of elements that the underlying dictionary can contain.</param>
  327. /// <returns>A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.</returns>
  328. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> or <paramref name="elementSelector"/> is null.</exception>
  329. /// <exception cref="ArgumentOutOfRangeException"><paramref name="capacity"/> is less than 0.</exception>
  330. public static IObservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, int capacity)
  331. {
  332. if (source == null)
  333. {
  334. throw new ArgumentNullException(nameof(source));
  335. }
  336. if (keySelector == null)
  337. {
  338. throw new ArgumentNullException(nameof(keySelector));
  339. }
  340. if (elementSelector == null)
  341. {
  342. throw new ArgumentNullException(nameof(elementSelector));
  343. }
  344. if (capacity < 0)
  345. {
  346. throw new ArgumentOutOfRangeException(nameof(capacity));
  347. }
  348. return s_impl.GroupBy(source, keySelector, elementSelector, capacity);
  349. }
  350. /// <summary>
  351. /// Groups the elements of an observable sequence with the specified initial capacity according to a specified key selector function and comparer and selects the resulting elements by using a specified function.
  352. /// </summary>
  353. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  354. /// <typeparam name="TKey">The type of the grouping key computed for each element in the source sequence.</typeparam>
  355. /// <typeparam name="TElement">The type of the elements within the groups computed for each element in the source sequence.</typeparam>
  356. /// <param name="source">An observable sequence whose elements to group.</param>
  357. /// <param name="keySelector">A function to extract the key for each element.</param>
  358. /// <param name="elementSelector">A function to map each source element to an element in an observable group.</param>
  359. /// <param name="capacity">The initial number of elements that the underlying dictionary can contain.</param>
  360. /// <param name="comparer">An equality comparer to compare keys with.</param>
  361. /// <returns>A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.</returns>
  362. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> or <paramref name="elementSelector"/> or <paramref name="comparer"/> is null.</exception>
  363. /// <exception cref="ArgumentOutOfRangeException"><paramref name="capacity"/> is less than 0.</exception>
  364. public static IObservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, int capacity, IEqualityComparer<TKey> comparer)
  365. {
  366. if (source == null)
  367. {
  368. throw new ArgumentNullException(nameof(source));
  369. }
  370. if (keySelector == null)
  371. {
  372. throw new ArgumentNullException(nameof(keySelector));
  373. }
  374. if (elementSelector == null)
  375. {
  376. throw new ArgumentNullException(nameof(elementSelector));
  377. }
  378. if (capacity < 0)
  379. {
  380. throw new ArgumentOutOfRangeException(nameof(capacity));
  381. }
  382. if (comparer == null)
  383. {
  384. throw new ArgumentNullException(nameof(comparer));
  385. }
  386. return s_impl.GroupBy(source, keySelector, elementSelector, capacity, comparer);
  387. }
  388. #endregion
  389. #region + GroupByUntil +
  390. /// <summary>
  391. /// Groups the elements of an observable sequence according to a specified key selector function and comparer and selects the resulting elements by using a specified function.
  392. /// A duration selector function is used to control the lifetime of groups. When a group expires, it receives an OnCompleted notification. When a new element with the same
  393. /// key value as a reclaimed group occurs, the group will be reborn with a new lifetime request.
  394. /// </summary>
  395. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  396. /// <typeparam name="TKey">The type of the grouping key computed for each element in the source sequence.</typeparam>
  397. /// <typeparam name="TElement">The type of the elements within the groups computed for each element in the source sequence.</typeparam>
  398. /// <typeparam name="TDuration">The type of the elements in the duration sequences obtained for each group to denote its lifetime.</typeparam>
  399. /// <param name="source">An observable sequence whose elements to group.</param>
  400. /// <param name="keySelector">A function to extract the key for each element.</param>
  401. /// <param name="elementSelector">A function to map each source element to an element in an observable group.</param>
  402. /// <param name="durationSelector">A function to signal the expiration of a group.</param>
  403. /// <param name="comparer">An equality comparer to compare keys with.</param>
  404. /// <returns>
  405. /// A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.
  406. /// If a group's lifetime expires, a new group with the same key value can be created once an element with such a key value is encountered.
  407. /// </returns>
  408. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> or <paramref name="elementSelector"/> or <paramref name="durationSelector"/> or <paramref name="comparer"/> is null.</exception>
  409. public static IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
  410. {
  411. if (source == null)
  412. {
  413. throw new ArgumentNullException(nameof(source));
  414. }
  415. if (keySelector == null)
  416. {
  417. throw new ArgumentNullException(nameof(keySelector));
  418. }
  419. if (elementSelector == null)
  420. {
  421. throw new ArgumentNullException(nameof(elementSelector));
  422. }
  423. if (durationSelector == null)
  424. {
  425. throw new ArgumentNullException(nameof(durationSelector));
  426. }
  427. if (comparer == null)
  428. {
  429. throw new ArgumentNullException(nameof(comparer));
  430. }
  431. return s_impl.GroupByUntil(source, keySelector, elementSelector, durationSelector, comparer);
  432. }
  433. /// <summary>
  434. /// Groups the elements of an observable sequence according to a specified key selector function and selects the resulting elements by using a specified function.
  435. /// A duration selector function is used to control the lifetime of groups. When a group expires, it receives an OnCompleted notification. When a new element with the same
  436. /// key value as a reclaimed group occurs, the group will be reborn with a new lifetime request.
  437. /// </summary>
  438. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  439. /// <typeparam name="TKey">The type of the grouping key computed for each element in the source sequence.</typeparam>
  440. /// <typeparam name="TElement">The type of the elements within the groups computed for each element in the source sequence.</typeparam>
  441. /// <typeparam name="TDuration">The type of the elements in the duration sequences obtained for each group to denote its lifetime.</typeparam>
  442. /// <param name="source">An observable sequence whose elements to group.</param>
  443. /// <param name="keySelector">A function to extract the key for each element.</param>
  444. /// <param name="elementSelector">A function to map each source element to an element in an observable group.</param>
  445. /// <param name="durationSelector">A function to signal the expiration of a group.</param>
  446. /// <returns>
  447. /// A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.
  448. /// If a group's lifetime expires, a new group with the same key value can be created once an element with such a key value is encountered.
  449. /// </returns>
  450. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> or <paramref name="elementSelector"/> or <paramref name="durationSelector"/> is null.</exception>
  451. public static IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector)
  452. {
  453. if (source == null)
  454. {
  455. throw new ArgumentNullException(nameof(source));
  456. }
  457. if (keySelector == null)
  458. {
  459. throw new ArgumentNullException(nameof(keySelector));
  460. }
  461. if (elementSelector == null)
  462. {
  463. throw new ArgumentNullException(nameof(elementSelector));
  464. }
  465. if (durationSelector == null)
  466. {
  467. throw new ArgumentNullException(nameof(durationSelector));
  468. }
  469. return s_impl.GroupByUntil(source, keySelector, elementSelector, durationSelector);
  470. }
  471. /// <summary>
  472. /// Groups the elements of an observable sequence according to a specified key selector function and comparer.
  473. /// A duration selector function is used to control the lifetime of groups. When a group expires, it receives an OnCompleted notification. When a new element with the same
  474. /// key value as a reclaimed group occurs, the group will be reborn with a new lifetime request.
  475. /// </summary>
  476. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  477. /// <typeparam name="TKey">The type of the grouping key computed for each element in the source sequence.</typeparam>
  478. /// <typeparam name="TDuration">The type of the elements in the duration sequences obtained for each group to denote its lifetime.</typeparam>
  479. /// <param name="source">An observable sequence whose elements to group.</param>
  480. /// <param name="keySelector">A function to extract the key for each element.</param>
  481. /// <param name="durationSelector">A function to signal the expiration of a group.</param>
  482. /// <param name="comparer">An equality comparer to compare keys with.</param>
  483. /// <returns>
  484. /// A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.
  485. /// If a group's lifetime expires, a new group with the same key value can be created once an element with such a key value is encountered.
  486. /// </returns>
  487. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> or <paramref name="durationSelector"/> or <paramref name="comparer"/> is null.</exception>
  488. public static IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
  489. {
  490. if (source == null)
  491. {
  492. throw new ArgumentNullException(nameof(source));
  493. }
  494. if (keySelector == null)
  495. {
  496. throw new ArgumentNullException(nameof(keySelector));
  497. }
  498. if (durationSelector == null)
  499. {
  500. throw new ArgumentNullException(nameof(durationSelector));
  501. }
  502. if (comparer == null)
  503. {
  504. throw new ArgumentNullException(nameof(comparer));
  505. }
  506. return s_impl.GroupByUntil(source, keySelector, durationSelector, comparer);
  507. }
  508. /// <summary>
  509. /// Groups the elements of an observable sequence according to a specified key selector function.
  510. /// A duration selector function is used to control the lifetime of groups. When a group expires, it receives an OnCompleted notification. When a new element with the same
  511. /// key value as a reclaimed group occurs, the group will be reborn with a new lifetime request.
  512. /// </summary>
  513. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  514. /// <typeparam name="TKey">The type of the grouping key computed for each element in the source sequence.</typeparam>
  515. /// <typeparam name="TDuration">The type of the elements in the duration sequences obtained for each group to denote its lifetime.</typeparam>
  516. /// <param name="source">An observable sequence whose elements to group.</param>
  517. /// <param name="keySelector">A function to extract the key for each element.</param>
  518. /// <param name="durationSelector">A function to signal the expiration of a group.</param>
  519. /// <returns>
  520. /// A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.
  521. /// If a group's lifetime expires, a new group with the same key value can be created once an element with such a key value is encountered.
  522. /// </returns>
  523. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> or <paramref name="durationSelector"/> is null.</exception>
  524. public static IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector)
  525. {
  526. if (source == null)
  527. {
  528. throw new ArgumentNullException(nameof(source));
  529. }
  530. if (keySelector == null)
  531. {
  532. throw new ArgumentNullException(nameof(keySelector));
  533. }
  534. if (durationSelector == null)
  535. {
  536. throw new ArgumentNullException(nameof(durationSelector));
  537. }
  538. return s_impl.GroupByUntil(source, keySelector, durationSelector);
  539. }
  540. /// <summary>
  541. /// Groups the elements of an observable sequence with the specified initial capacity according to a specified key selector function and comparer and selects the resulting elements by using a specified function.
  542. /// A duration selector function is used to control the lifetime of groups. When a group expires, it receives an OnCompleted notification. When a new element with the same
  543. /// key value as a reclaimed group occurs, the group will be reborn with a new lifetime request.
  544. /// </summary>
  545. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  546. /// <typeparam name="TKey">The type of the grouping key computed for each element in the source sequence.</typeparam>
  547. /// <typeparam name="TElement">The type of the elements within the groups computed for each element in the source sequence.</typeparam>
  548. /// <typeparam name="TDuration">The type of the elements in the duration sequences obtained for each group to denote its lifetime.</typeparam>
  549. /// <param name="source">An observable sequence whose elements to group.</param>
  550. /// <param name="keySelector">A function to extract the key for each element.</param>
  551. /// <param name="elementSelector">A function to map each source element to an element in an observable group.</param>
  552. /// <param name="durationSelector">A function to signal the expiration of a group.</param>
  553. /// <param name="capacity">The initial number of elements that the underlying dictionary can contain.</param>
  554. /// <param name="comparer">An equality comparer to compare keys with.</param>
  555. /// <returns>
  556. /// A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.
  557. /// If a group's lifetime expires, a new group with the same key value can be created once an element with such a key value is encountered.
  558. /// </returns>
  559. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> or <paramref name="elementSelector"/> or <paramref name="durationSelector"/> or <paramref name="comparer"/> is null.</exception>
  560. /// <exception cref="ArgumentOutOfRangeException"><paramref name="capacity"/> is less than 0.</exception>
  561. public static IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer)
  562. {
  563. if (source == null)
  564. {
  565. throw new ArgumentNullException(nameof(source));
  566. }
  567. if (keySelector == null)
  568. {
  569. throw new ArgumentNullException(nameof(keySelector));
  570. }
  571. if (elementSelector == null)
  572. {
  573. throw new ArgumentNullException(nameof(elementSelector));
  574. }
  575. if (durationSelector == null)
  576. {
  577. throw new ArgumentNullException(nameof(durationSelector));
  578. }
  579. if (capacity < 0)
  580. {
  581. throw new ArgumentOutOfRangeException(nameof(capacity));
  582. }
  583. if (comparer == null)
  584. {
  585. throw new ArgumentNullException(nameof(comparer));
  586. }
  587. return s_impl.GroupByUntil(source, keySelector, elementSelector, durationSelector, capacity, comparer);
  588. }
  589. /// <summary>
  590. /// Groups the elements of an observable sequence with the specified initial capacity according to a specified key selector function and selects the resulting elements by using a specified function.
  591. /// A duration selector function is used to control the lifetime of groups. When a group expires, it receives an OnCompleted notification. When a new element with the same
  592. /// key value as a reclaimed group occurs, the group will be reborn with a new lifetime request.
  593. /// </summary>
  594. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  595. /// <typeparam name="TKey">The type of the grouping key computed for each element in the source sequence.</typeparam>
  596. /// <typeparam name="TElement">The type of the elements within the groups computed for each element in the source sequence.</typeparam>
  597. /// <typeparam name="TDuration">The type of the elements in the duration sequences obtained for each group to denote its lifetime.</typeparam>
  598. /// <param name="source">An observable sequence whose elements to group.</param>
  599. /// <param name="keySelector">A function to extract the key for each element.</param>
  600. /// <param name="elementSelector">A function to map each source element to an element in an observable group.</param>
  601. /// <param name="durationSelector">A function to signal the expiration of a group.</param>
  602. /// <param name="capacity">The initial number of elements that the underlying dictionary can contain.</param>
  603. /// <returns>
  604. /// A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.
  605. /// If a group's lifetime expires, a new group with the same key value can be created once an element with such a key value is encountered.
  606. /// </returns>
  607. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> or <paramref name="elementSelector"/> or <paramref name="durationSelector"/> is null.</exception>
  608. /// <exception cref="ArgumentOutOfRangeException"><paramref name="capacity"/> is less than 0.</exception>
  609. public static IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, int capacity)
  610. {
  611. if (source == null)
  612. {
  613. throw new ArgumentNullException(nameof(source));
  614. }
  615. if (keySelector == null)
  616. {
  617. throw new ArgumentNullException(nameof(keySelector));
  618. }
  619. if (elementSelector == null)
  620. {
  621. throw new ArgumentNullException(nameof(elementSelector));
  622. }
  623. if (durationSelector == null)
  624. {
  625. throw new ArgumentNullException(nameof(durationSelector));
  626. }
  627. if (capacity < 0)
  628. {
  629. throw new ArgumentOutOfRangeException(nameof(capacity));
  630. }
  631. return s_impl.GroupByUntil(source, keySelector, elementSelector, durationSelector, capacity);
  632. }
  633. /// <summary>
  634. /// Groups the elements of an observable sequence with the specified initial capacity according to a specified key selector function and comparer.
  635. /// A duration selector function is used to control the lifetime of groups. When a group expires, it receives an OnCompleted notification. When a new element with the same
  636. /// key value as a reclaimed group occurs, the group will be reborn with a new lifetime request.
  637. /// </summary>
  638. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  639. /// <typeparam name="TKey">The type of the grouping key computed for each element in the source sequence.</typeparam>
  640. /// <typeparam name="TDuration">The type of the elements in the duration sequences obtained for each group to denote its lifetime.</typeparam>
  641. /// <param name="source">An observable sequence whose elements to group.</param>
  642. /// <param name="keySelector">A function to extract the key for each element.</param>
  643. /// <param name="durationSelector">A function to signal the expiration of a group.</param>
  644. /// <param name="capacity">The initial number of elements that the underlying dictionary can contain.</param>
  645. /// <param name="comparer">An equality comparer to compare keys with.</param>
  646. /// <returns>
  647. /// A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.
  648. /// If a group's lifetime expires, a new group with the same key value can be created once an element with such a key value is encountered.
  649. /// </returns>
  650. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> or <paramref name="durationSelector"/> or <paramref name="comparer"/> is null.</exception>
  651. /// <exception cref="ArgumentOutOfRangeException"><paramref name="capacity"/> is less than 0.</exception>
  652. public static IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer)
  653. {
  654. if (source == null)
  655. {
  656. throw new ArgumentNullException(nameof(source));
  657. }
  658. if (keySelector == null)
  659. {
  660. throw new ArgumentNullException(nameof(keySelector));
  661. }
  662. if (durationSelector == null)
  663. {
  664. throw new ArgumentNullException(nameof(durationSelector));
  665. }
  666. if (capacity < 0)
  667. {
  668. throw new ArgumentOutOfRangeException(nameof(capacity));
  669. }
  670. if (comparer == null)
  671. {
  672. throw new ArgumentNullException(nameof(comparer));
  673. }
  674. return s_impl.GroupByUntil(source, keySelector, durationSelector, capacity, comparer);
  675. }
  676. /// <summary>
  677. /// Groups the elements of an observable sequence with the specified initial capacity according to a specified key selector function.
  678. /// A duration selector function is used to control the lifetime of groups. When a group expires, it receives an OnCompleted notification. When a new element with the same
  679. /// key value as a reclaimed group occurs, the group will be reborn with a new lifetime request.
  680. /// </summary>
  681. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  682. /// <typeparam name="TKey">The type of the grouping key computed for each element in the source sequence.</typeparam>
  683. /// <typeparam name="TDuration">The type of the elements in the duration sequences obtained for each group to denote its lifetime.</typeparam>
  684. /// <param name="source">An observable sequence whose elements to group.</param>
  685. /// <param name="keySelector">A function to extract the key for each element.</param>
  686. /// <param name="durationSelector">A function to signal the expiration of a group.</param>
  687. /// <param name="capacity">The initial number of elements that the underlying dictionary can contain.</param>
  688. /// <returns>
  689. /// A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.
  690. /// If a group's lifetime expires, a new group with the same key value can be created once an element with such a key value is encountered.
  691. /// </returns>
  692. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> or <paramref name="durationSelector"/> is null.</exception>
  693. /// <exception cref="ArgumentOutOfRangeException"><paramref name="capacity"/> is less than 0.</exception>
  694. public static IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector, int capacity)
  695. {
  696. if (source == null)
  697. {
  698. throw new ArgumentNullException(nameof(source));
  699. }
  700. if (keySelector == null)
  701. {
  702. throw new ArgumentNullException(nameof(keySelector));
  703. }
  704. if (durationSelector == null)
  705. {
  706. throw new ArgumentNullException(nameof(durationSelector));
  707. }
  708. if (capacity < 0)
  709. {
  710. throw new ArgumentOutOfRangeException(nameof(capacity));
  711. }
  712. return s_impl.GroupByUntil(source, keySelector, durationSelector, capacity);
  713. }
  714. #endregion
  715. #region + GroupJoin +
  716. /// <summary>
  717. /// Correlates the elements of two sequences based on overlapping durations, and groups the results.
  718. /// </summary>
  719. /// <typeparam name="TLeft">The type of the elements in the left source sequence.</typeparam>
  720. /// <typeparam name="TRight">The type of the elements in the right source sequence.</typeparam>
  721. /// <typeparam name="TLeftDuration">The type of the elements in the duration sequence denoting the computed duration of each element in the left source sequence.</typeparam>
  722. /// <typeparam name="TRightDuration">The type of the elements in the duration sequence denoting the computed duration of each element in the right source sequence.</typeparam>
  723. /// <typeparam name="TResult">The type of the elements in the result sequence, obtained by invoking the result selector function for source elements with overlapping duration.</typeparam>
  724. /// <param name="left">The left observable sequence to join elements for.</param>
  725. /// <param name="right">The right observable sequence to join elements for.</param>
  726. /// <param name="leftDurationSelector">A function to select the duration of each element of the left observable sequence, used to determine overlap.</param>
  727. /// <param name="rightDurationSelector">A function to select the duration of each element of the right observable sequence, used to determine overlap.</param>
  728. /// <param name="resultSelector">A function invoked to compute a result element for any element of the left sequence with overlapping elements from the right observable sequence.</param>
  729. /// <returns>An observable sequence that contains result elements computed from source elements that have an overlapping duration.</returns>
  730. /// <exception cref="ArgumentNullException"><paramref name="left"/> or <paramref name="right"/> or <paramref name="leftDurationSelector"/> or <paramref name="rightDurationSelector"/> or <paramref name="resultSelector"/> is null.</exception>
  731. public static IObservable<TResult> GroupJoin<TLeft, TRight, TLeftDuration, TRightDuration, TResult>(this IObservable<TLeft> left, IObservable<TRight> right, Func<TLeft, IObservable<TLeftDuration>> leftDurationSelector, Func<TRight, IObservable<TRightDuration>> rightDurationSelector, Func<TLeft, IObservable<TRight>, TResult> resultSelector)
  732. {
  733. if (left == null)
  734. {
  735. throw new ArgumentNullException(nameof(left));
  736. }
  737. if (right == null)
  738. {
  739. throw new ArgumentNullException(nameof(right));
  740. }
  741. if (leftDurationSelector == null)
  742. {
  743. throw new ArgumentNullException(nameof(leftDurationSelector));
  744. }
  745. if (rightDurationSelector == null)
  746. {
  747. throw new ArgumentNullException(nameof(rightDurationSelector));
  748. }
  749. if (resultSelector == null)
  750. {
  751. throw new ArgumentNullException(nameof(resultSelector));
  752. }
  753. return s_impl.GroupJoin(left, right, leftDurationSelector, rightDurationSelector, resultSelector);
  754. }
  755. #endregion
  756. #region + Join +
  757. /// <summary>
  758. /// Correlates the elements of two sequences based on overlapping durations.
  759. /// </summary>
  760. /// <typeparam name="TLeft">The type of the elements in the left source sequence.</typeparam>
  761. /// <typeparam name="TRight">The type of the elements in the right source sequence.</typeparam>
  762. /// <typeparam name="TLeftDuration">The type of the elements in the duration sequence denoting the computed duration of each element in the left source sequence.</typeparam>
  763. /// <typeparam name="TRightDuration">The type of the elements in the duration sequence denoting the computed duration of each element in the right source sequence.</typeparam>
  764. /// <typeparam name="TResult">The type of the elements in the result sequence, obtained by invoking the result selector function for source elements with overlapping duration.</typeparam>
  765. /// <param name="left">The left observable sequence to join elements for.</param>
  766. /// <param name="right">The right observable sequence to join elements for.</param>
  767. /// <param name="leftDurationSelector">A function to select the duration of each element of the left observable sequence, used to determine overlap.</param>
  768. /// <param name="rightDurationSelector">A function to select the duration of each element of the right observable sequence, used to determine overlap.</param>
  769. /// <param name="resultSelector">A function invoked to compute a result element for any two overlapping elements of the left and right observable sequences.</param>
  770. /// <returns>An observable sequence that contains result elements computed from source elements that have an overlapping duration.</returns>
  771. /// <exception cref="ArgumentNullException"><paramref name="left"/> or <paramref name="right"/> or <paramref name="leftDurationSelector"/> or <paramref name="rightDurationSelector"/> or <paramref name="resultSelector"/> is null.</exception>
  772. public static IObservable<TResult> Join<TLeft, TRight, TLeftDuration, TRightDuration, TResult>(this IObservable<TLeft> left, IObservable<TRight> right, Func<TLeft, IObservable<TLeftDuration>> leftDurationSelector, Func<TRight, IObservable<TRightDuration>> rightDurationSelector, Func<TLeft, TRight, TResult> resultSelector)
  773. {
  774. if (left == null)
  775. {
  776. throw new ArgumentNullException(nameof(left));
  777. }
  778. if (right == null)
  779. {
  780. throw new ArgumentNullException(nameof(right));
  781. }
  782. if (leftDurationSelector == null)
  783. {
  784. throw new ArgumentNullException(nameof(leftDurationSelector));
  785. }
  786. if (rightDurationSelector == null)
  787. {
  788. throw new ArgumentNullException(nameof(rightDurationSelector));
  789. }
  790. if (resultSelector == null)
  791. {
  792. throw new ArgumentNullException(nameof(resultSelector));
  793. }
  794. return s_impl.Join(left, right, leftDurationSelector, rightDurationSelector, resultSelector);
  795. }
  796. #endregion
  797. #region + OfType +
  798. /// <summary>
  799. /// Filters the elements of an observable sequence based on the specified type.
  800. /// </summary>
  801. /// <typeparam name="TResult">The type to filter the elements in the source sequence on.</typeparam>
  802. /// <param name="source">The observable sequence that contains the elements to be filtered.</param>
  803. /// <returns>An observable sequence that contains elements from the input sequence of type TResult.</returns>
  804. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  805. public static IObservable<TResult> OfType<TResult>(this IObservable<object> source)
  806. {
  807. if (source == null)
  808. {
  809. throw new ArgumentNullException(nameof(source));
  810. }
  811. return s_impl.OfType<TResult>(source);
  812. }
  813. #endregion
  814. #region + Select +
  815. /// <summary>
  816. /// Projects each element of an observable sequence into a new form.
  817. /// </summary>
  818. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  819. /// <typeparam name="TResult">The type of the elements in the result sequence, obtained by running the selector function for each element in the source sequence.</typeparam>
  820. /// <param name="source">A sequence of elements to invoke a transform function on.</param>
  821. /// <param name="selector">A transform function to apply to each source element.</param>
  822. /// <returns>An observable sequence whose elements are the result of invoking the transform function on each element of source.</returns>
  823. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="selector"/> is null.</exception>
  824. public static IObservable<TResult> Select<TSource, TResult>(this IObservable<TSource> source, Func<TSource, TResult> selector)
  825. {
  826. if (source == null)
  827. {
  828. throw new ArgumentNullException(nameof(source));
  829. }
  830. if (selector == null)
  831. {
  832. throw new ArgumentNullException(nameof(selector));
  833. }
  834. return s_impl.Select(source, selector);
  835. }
  836. /// <summary>
  837. /// Projects each element of an observable sequence into a new form by incorporating the element's index.
  838. /// </summary>
  839. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  840. /// <typeparam name="TResult">The type of the elements in the result sequence, obtained by running the selector function for each element in the source sequence.</typeparam>
  841. /// <param name="source">A sequence of elements to invoke a transform function on.</param>
  842. /// <param name="selector">A transform function to apply to each source element; the second parameter of the function represents the index of the source element.</param>
  843. /// <returns>An observable sequence whose elements are the result of invoking the transform function on each element of source.</returns>
  844. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="selector"/> is null.</exception>
  845. public static IObservable<TResult> Select<TSource, TResult>(this IObservable<TSource> source, Func<TSource, int, TResult> selector)
  846. {
  847. if (source == null)
  848. {
  849. throw new ArgumentNullException(nameof(source));
  850. }
  851. if (selector == null)
  852. {
  853. throw new ArgumentNullException(nameof(selector));
  854. }
  855. return s_impl.Select(source, selector);
  856. }
  857. #endregion
  858. #region + SelectMany +
  859. /// <summary>
  860. /// Projects each element of the source observable sequence to the other observable sequence and merges the resulting observable sequences into one observable sequence.
  861. /// </summary>
  862. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  863. /// <typeparam name="TOther">The type of the elements in the other sequence and the elements in the result sequence.</typeparam>
  864. /// <param name="source">An observable sequence of elements to project.</param>
  865. /// <param name="other">An observable sequence to project each element from the source sequence onto.</param>
  866. /// <returns>An observable sequence whose elements are the result of projecting each source element onto the other sequence and merging all the resulting sequences together.</returns>
  867. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="other"/> is null.</exception>
  868. public static IObservable<TOther> SelectMany<TSource, TOther>(this IObservable<TSource> source, IObservable<TOther> other)
  869. {
  870. if (source == null)
  871. {
  872. throw new ArgumentNullException(nameof(source));
  873. }
  874. if (other == null)
  875. {
  876. throw new ArgumentNullException(nameof(other));
  877. }
  878. return s_impl.SelectMany(source, other);
  879. }
  880. /// <summary>
  881. /// Projects each element of an observable sequence to an observable sequence and merges the resulting observable sequences into one observable sequence.
  882. /// </summary>
  883. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  884. /// <typeparam name="TResult">The type of the elements in the projected inner sequences and the elements in the merged result sequence.</typeparam>
  885. /// <param name="source">An observable sequence of elements to project.</param>
  886. /// <param name="selector">A transform function to apply to each element.</param>
  887. /// <returns>An observable sequence whose elements are the result of invoking the one-to-many transform function on each element of the input sequence.</returns>
  888. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="selector"/> is null.</exception>
  889. public static IObservable<TResult> SelectMany<TSource, TResult>(this IObservable<TSource> source, Func<TSource, IObservable<TResult>> selector)
  890. {
  891. if (source == null)
  892. {
  893. throw new ArgumentNullException(nameof(source));
  894. }
  895. if (selector == null)
  896. {
  897. throw new ArgumentNullException(nameof(selector));
  898. }
  899. return s_impl.SelectMany(source, selector);
  900. }
  901. /// <summary>
  902. /// Projects each element of an observable sequence to an observable sequence by incorporating the element's index and merges the resulting observable sequences into one observable sequence.
  903. /// </summary>
  904. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  905. /// <typeparam name="TResult">The type of the elements in the projected inner sequences and the elements in the merged result sequence.</typeparam>
  906. /// <param name="source">An observable sequence of elements to project.</param>
  907. /// <param name="selector">A transform function to apply to each element; the second parameter of the function represents the index of the source element.</param>
  908. /// <returns>An observable sequence whose elements are the result of invoking the one-to-many transform function on each element of the input sequence.</returns>
  909. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="selector"/> is null.</exception>
  910. public static IObservable<TResult> SelectMany<TSource, TResult>(this IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector)
  911. {
  912. if (source == null)
  913. {
  914. throw new ArgumentNullException(nameof(source));
  915. }
  916. if (selector == null)
  917. {
  918. throw new ArgumentNullException(nameof(selector));
  919. }
  920. return s_impl.SelectMany(source, selector);
  921. }
  922. /// <summary>
  923. /// Projects each element of an observable sequence to a task and merges all of the task results into one observable sequence.
  924. /// </summary>
  925. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  926. /// <typeparam name="TResult">The type of the result produced by the projected tasks and the elements in the merged result sequence.</typeparam>
  927. /// <param name="source">An observable sequence of elements to project.</param>
  928. /// <param name="selector">A transform function to apply to each element.</param>
  929. /// <returns>An observable sequence whose elements are the result of the tasks executed for each element of the input sequence.</returns>
  930. /// <remarks>This overload supports composition of observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="TaskObservableExtensions.ToObservable{TResult}(Task{TResult})"/>.</remarks>
  931. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="selector"/> is null.</exception>
  932. public static IObservable<TResult> SelectMany<TSource, TResult>(this IObservable<TSource> source, Func<TSource, Task<TResult>> selector)
  933. {
  934. if (source == null)
  935. {
  936. throw new ArgumentNullException(nameof(source));
  937. }
  938. if (selector == null)
  939. {
  940. throw new ArgumentNullException(nameof(selector));
  941. }
  942. return s_impl.SelectMany(source, selector);
  943. }
  944. /// <summary>
  945. /// Projects each element of an observable sequence to a task by incorporating the element's index and merges all of the task results into one observable sequence.
  946. /// </summary>
  947. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  948. /// <typeparam name="TResult">The type of the result produced by the projected tasks and the elements in the merged result sequence.</typeparam>
  949. /// <param name="source">An observable sequence of elements to project.</param>
  950. /// <param name="selector">A transform function to apply to each element; the second parameter of the function represents the index of the source element.</param>
  951. /// <returns>An observable sequence whose elements are the result of the tasks executed for each element of the input sequence.</returns>
  952. /// <remarks>This overload supports composition of observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="TaskObservableExtensions.ToObservable{TResult}(Task{TResult})"/>.</remarks>
  953. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="selector"/> is null.</exception>
  954. public static IObservable<TResult> SelectMany<TSource, TResult>(this IObservable<TSource> source, Func<TSource, int, Task<TResult>> selector)
  955. {
  956. if (source == null)
  957. {
  958. throw new ArgumentNullException(nameof(source));
  959. }
  960. if (selector == null)
  961. {
  962. throw new ArgumentNullException(nameof(selector));
  963. }
  964. return s_impl.SelectMany(source, selector);
  965. }
  966. /// <summary>
  967. /// Projects each element of an observable sequence to a task with cancellation support and merges all of the task results into one observable sequence.
  968. /// </summary>
  969. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  970. /// <typeparam name="TResult">The type of the result produced by the projected tasks and the elements in the merged result sequence.</typeparam>
  971. /// <param name="source">An observable sequence of elements to project.</param>
  972. /// <param name="selector">A transform function to apply to each element.</param>
  973. /// <returns>An observable sequence whose elements are the result of the tasks executed for each element of the input sequence.</returns>
  974. /// <remarks>This overload supports composition of observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="TaskObservableExtensions.ToObservable{TResult}(Task{TResult})"/>.</remarks>
  975. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="selector"/> is null.</exception>
  976. public static IObservable<TResult> SelectMany<TSource, TResult>(this IObservable<TSource> source, Func<TSource, CancellationToken, Task<TResult>> selector)
  977. {
  978. if (source == null)
  979. {
  980. throw new ArgumentNullException(nameof(source));
  981. }
  982. if (selector == null)
  983. {
  984. throw new ArgumentNullException(nameof(selector));
  985. }
  986. return s_impl.SelectMany(source, selector);
  987. }
  988. /// <summary>
  989. /// Projects each element of an observable sequence to a task by incorporating the element's index with cancellation support and merges all of the task results into one observable sequence.
  990. /// </summary>
  991. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  992. /// <typeparam name="TResult">The type of the result produced by the projected tasks and the elements in the merged result sequence.</typeparam>
  993. /// <param name="source">An observable sequence of elements to project.</param>
  994. /// <param name="selector">A transform function to apply to each element; the second parameter of the function represents the index of the source element.</param>
  995. /// <returns>An observable sequence whose elements are the result of the tasks executed for each element of the input sequence.</returns>
  996. /// <remarks>This overload supports composition of observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="TaskObservableExtensions.ToObservable{TResult}(Task{TResult})"/>.</remarks>
  997. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="selector"/> is null.</exception>
  998. public static IObservable<TResult> SelectMany<TSource, TResult>(this IObservable<TSource> source, Func<TSource, int, CancellationToken, Task<TResult>> selector)
  999. {
  1000. if (source == null)
  1001. {
  1002. throw new ArgumentNullException(nameof(source));
  1003. }
  1004. if (selector == null)
  1005. {
  1006. throw new ArgumentNullException(nameof(selector));
  1007. }
  1008. return s_impl.SelectMany(source, selector);
  1009. }
  1010. /// <summary>
  1011. /// Projects each element of an observable sequence to an observable sequence, invokes the result selector for the source element and each of the corresponding inner sequence's elements, and merges the results into one observable sequence.
  1012. /// </summary>
  1013. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  1014. /// <typeparam name="TCollection">The type of the elements in the projected intermediate sequences.</typeparam>
  1015. /// <typeparam name="TResult">The type of the elements in the result sequence, obtained by using the selector to combine source sequence elements with their corresponding intermediate sequence elements.</typeparam>
  1016. /// <param name="source">An observable sequence of elements to project.</param>
  1017. /// <param name="collectionSelector">A transform function to apply to each element.</param>
  1018. /// <param name="resultSelector">A transform function to apply to each element of the intermediate sequence.</param>
  1019. /// <returns>An observable sequence whose elements are the result of invoking the one-to-many transform function collectionSelector on each element of the input sequence and then mapping each of those sequence elements and their corresponding source element to a result element.</returns>
  1020. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="collectionSelector"/> or <paramref name="resultSelector"/> is null.</exception>
  1021. public static IObservable<TResult> SelectMany<TSource, TCollection, TResult>(this IObservable<TSource> source, Func<TSource, IObservable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
  1022. {
  1023. if (source == null)
  1024. {
  1025. throw new ArgumentNullException(nameof(source));
  1026. }
  1027. if (collectionSelector == null)
  1028. {
  1029. throw new ArgumentNullException(nameof(collectionSelector));
  1030. }
  1031. if (resultSelector == null)
  1032. {
  1033. throw new ArgumentNullException(nameof(resultSelector));
  1034. }
  1035. return s_impl.SelectMany(source, collectionSelector, resultSelector);
  1036. }
  1037. /// <summary>
  1038. /// Projects each element of an observable sequence to an observable sequence by incorporating the element's index, invokes the result selector for the source element and each of the corresponding inner sequence's elements, and merges the results into one observable sequence.
  1039. /// </summary>
  1040. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  1041. /// <typeparam name="TCollection">The type of the elements in the projected intermediate sequences.</typeparam>
  1042. /// <typeparam name="TResult">The type of the elements in the result sequence, obtained by using the selector to combine source sequence elements with their corresponding intermediate sequence elements.</typeparam>
  1043. /// <param name="source">An observable sequence of elements to project.</param>
  1044. /// <param name="collectionSelector">A transform function to apply to each element; the second parameter of the function represents the index of the source element.</param>
  1045. /// <param name="resultSelector">A transform function to apply to each element of the intermediate sequence; the second parameter of the function represents the index of the source element and the fourth parameter represents the index of the intermediate element.</param>
  1046. /// <returns>An observable sequence whose elements are the result of invoking the one-to-many transform function collectionSelector on each element of the input sequence and then mapping each of those sequence elements and their corresponding source element to a result element.</returns>
  1047. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="collectionSelector"/> or <paramref name="resultSelector"/> is null.</exception>
  1048. public static IObservable<TResult> SelectMany<TSource, TCollection, TResult>(this IObservable<TSource> source, Func<TSource, int, IObservable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector)
  1049. {
  1050. if (source == null)
  1051. {
  1052. throw new ArgumentNullException(nameof(source));
  1053. }
  1054. if (collectionSelector == null)
  1055. {
  1056. throw new ArgumentNullException(nameof(collectionSelector));
  1057. }
  1058. if (resultSelector == null)
  1059. {
  1060. throw new ArgumentNullException(nameof(resultSelector));
  1061. }
  1062. return s_impl.SelectMany(source, collectionSelector, resultSelector);
  1063. }
  1064. /// <summary>
  1065. /// Projects each element of an observable sequence to a task, invokes the result selector for the source element and the task result, and merges the results into one observable sequence.
  1066. /// </summary>
  1067. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  1068. /// <typeparam name="TTaskResult">The type of the results produced by the projected intermediate tasks.</typeparam>
  1069. /// <typeparam name="TResult">The type of the elements in the result sequence, obtained by using the selector to combine source sequence elements with their corresponding intermediate task results.</typeparam>
  1070. /// <param name="source">An observable sequence of elements to project.</param>
  1071. /// <param name="taskSelector">A transform function to apply to each element.</param>
  1072. /// <param name="resultSelector">A transform function to apply to each element of the intermediate sequence.</param>
  1073. /// <returns>An observable sequence whose elements are the result of obtaining a task for each element of the input sequence and then mapping the task's result and its corresponding source element to a result element.</returns>
  1074. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="taskSelector"/> or <paramref name="resultSelector"/> is null.</exception>
  1075. /// <remarks>This overload supports using LINQ query comprehension syntax in C# and Visual Basic to compose observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="TaskObservableExtensions.ToObservable{TResult}(Task{TResult})"/>.</remarks>
  1076. public static IObservable<TResult> SelectMany<TSource, TTaskResult, TResult>(this IObservable<TSource> source, Func<TSource, Task<TTaskResult>> taskSelector, Func<TSource, TTaskResult, TResult> resultSelector)
  1077. {
  1078. if (source == null)
  1079. {
  1080. throw new ArgumentNullException(nameof(source));
  1081. }
  1082. if (taskSelector == null)
  1083. {
  1084. throw new ArgumentNullException(nameof(taskSelector));
  1085. }
  1086. if (resultSelector == null)
  1087. {
  1088. throw new ArgumentNullException(nameof(resultSelector));
  1089. }
  1090. return s_impl.SelectMany(source, taskSelector, resultSelector);
  1091. }
  1092. /// <summary>
  1093. /// Projects each element of an observable sequence to a task by incorporating the element's index, invokes the result selector for the source element and the task result, and merges the results into one observable sequence.
  1094. /// </summary>
  1095. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  1096. /// <typeparam name="TTaskResult">The type of the results produced by the projected intermediate tasks.</typeparam>
  1097. /// <typeparam name="TResult">The type of the elements in the result sequence, obtained by using the selector to combine source sequence elements with their corresponding intermediate task results.</typeparam>
  1098. /// <param name="source">An observable sequence of elements to project.</param>
  1099. /// <param name="taskSelector">A transform function to apply to each element; the second parameter of the function represents the index of the source element.</param>
  1100. /// <param name="resultSelector">A transform function to apply to each element of the intermediate sequence; the second parameter of the function represents the index of the source element.</param>
  1101. /// <returns>An observable sequence whose elements are the result of obtaining a task for each element of the input sequence and then mapping the task's result and its corresponding source element to a result element.</returns>
  1102. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="taskSelector"/> or <paramref name="resultSelector"/> is null.</exception>
  1103. /// <remarks>This overload supports using LINQ query comprehension syntax in C# and Visual Basic to compose observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="TaskObservableExtensions.ToObservable{TResult}(Task{TResult})"/>.</remarks>
  1104. public static IObservable<TResult> SelectMany<TSource, TTaskResult, TResult>(this IObservable<TSource> source, Func<TSource, int, Task<TTaskResult>> taskSelector, Func<TSource, int, TTaskResult, TResult> resultSelector)
  1105. {
  1106. if (source == null)
  1107. {
  1108. throw new ArgumentNullException(nameof(source));
  1109. }
  1110. if (taskSelector == null)
  1111. {
  1112. throw new ArgumentNullException(nameof(taskSelector));
  1113. }
  1114. if (resultSelector == null)
  1115. {
  1116. throw new ArgumentNullException(nameof(resultSelector));
  1117. }
  1118. return s_impl.SelectMany(source, taskSelector, resultSelector);
  1119. }
  1120. /// <summary>
  1121. /// Projects each element of an observable sequence to a task with cancellation support, invokes the result selector for the source element and the task result, and merges the results into one observable sequence.
  1122. /// </summary>
  1123. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  1124. /// <typeparam name="TTaskResult">The type of the results produced by the projected intermediate tasks.</typeparam>
  1125. /// <typeparam name="TResult">The type of the elements in the result sequence, obtained by using the selector to combine source sequence elements with their corresponding intermediate task results.</typeparam>
  1126. /// <param name="source">An observable sequence of elements to project.</param>
  1127. /// <param name="taskSelector">A transform function to apply to each element.</param>
  1128. /// <param name="resultSelector">A transform function to apply to each element of the intermediate sequence.</param>
  1129. /// <returns>An observable sequence whose elements are the result of obtaining a task for each element of the input sequence and then mapping the task's result and its corresponding source element to a result element.</returns>
  1130. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="taskSelector"/> or <paramref name="resultSelector"/> is null.</exception>
  1131. /// <remarks>This overload supports using LINQ query comprehension syntax in C# and Visual Basic to compose observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="TaskObservableExtensions.ToObservable{TResult}(Task{TResult})"/>.</remarks>
  1132. public static IObservable<TResult> SelectMany<TSource, TTaskResult, TResult>(this IObservable<TSource> source, Func<TSource, CancellationToken, Task<TTaskResult>> taskSelector, Func<TSource, TTaskResult, TResult> resultSelector)
  1133. {
  1134. if (source == null)
  1135. {
  1136. throw new ArgumentNullException(nameof(source));
  1137. }
  1138. if (taskSelector == null)
  1139. {
  1140. throw new ArgumentNullException(nameof(taskSelector));
  1141. }
  1142. if (resultSelector == null)
  1143. {
  1144. throw new ArgumentNullException(nameof(resultSelector));
  1145. }
  1146. return s_impl.SelectMany(source, taskSelector, resultSelector);
  1147. }
  1148. /// <summary>
  1149. /// Projects each element of an observable sequence to a task by incorporating the element's index with cancellation support, invokes the result selector for the source element and the task result, and merges the results into one observable sequence.
  1150. /// </summary>
  1151. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  1152. /// <typeparam name="TTaskResult">The type of the results produced by the projected intermediate tasks.</typeparam>
  1153. /// <typeparam name="TResult">The type of the elements in the result sequence, obtained by using the selector to combine source sequence elements with their corresponding intermediate task results.</typeparam>
  1154. /// <param name="source">An observable sequence of elements to project.</param>
  1155. /// <param name="taskSelector">A transform function to apply to each element; the second parameter of the function represents the index of the source element.</param>
  1156. /// <param name="resultSelector">A transform function to apply to each element of the intermediate sequence; the second parameter of the function represents the index of the source element.</param>
  1157. /// <returns>An observable sequence whose elements are the result of obtaining a task for each element of the input sequence and then mapping the task's result and its corresponding source element to a result element.</returns>
  1158. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="taskSelector"/> or <paramref name="resultSelector"/> is null.</exception>
  1159. /// <remarks>This overload supports using LINQ query comprehension syntax in C# and Visual Basic to compose observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="TaskObservableExtensions.ToObservable{TResult}(Task{TResult})"/>.</remarks>
  1160. public static IObservable<TResult> SelectMany<TSource, TTaskResult, TResult>(this IObservable<TSource> source, Func<TSource, int, CancellationToken, Task<TTaskResult>> taskSelector, Func<TSource, int, TTaskResult, TResult> resultSelector)
  1161. {
  1162. if (source == null)
  1163. {
  1164. throw new ArgumentNullException(nameof(source));
  1165. }
  1166. if (taskSelector == null)
  1167. {
  1168. throw new ArgumentNullException(nameof(taskSelector));
  1169. }
  1170. if (resultSelector == null)
  1171. {
  1172. throw new ArgumentNullException(nameof(resultSelector));
  1173. }
  1174. return s_impl.SelectMany(source, taskSelector, resultSelector);
  1175. }
  1176. /// <summary>
  1177. /// Projects each notification of an observable sequence to an observable sequence and merges the resulting observable sequences into one observable sequence.
  1178. /// </summary>
  1179. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  1180. /// <typeparam name="TResult">The type of the elements in the projected inner sequences and the elements in the merged result sequence.</typeparam>
  1181. /// <param name="source">An observable sequence of notifications to project.</param>
  1182. /// <param name="onNext">A transform function to apply to each element.</param>
  1183. /// <param name="onError">A transform function to apply when an error occurs in the source sequence.</param>
  1184. /// <param name="onCompleted">A transform function to apply when the end of the source sequence is reached.</param>
  1185. /// <returns>An observable sequence whose elements are the result of invoking the one-to-many transform function corresponding to each notification in the input sequence.</returns>
  1186. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="onNext"/> or <paramref name="onError"/> or <paramref name="onCompleted"/> is null.</exception>
  1187. public static IObservable<TResult> SelectMany<TSource, TResult>(this IObservable<TSource> source, Func<TSource, IObservable<TResult>> onNext, Func<Exception, IObservable<TResult>> onError, Func<IObservable<TResult>> onCompleted)
  1188. {
  1189. if (source == null)
  1190. {
  1191. throw new ArgumentNullException(nameof(source));
  1192. }
  1193. if (onNext == null)
  1194. {
  1195. throw new ArgumentNullException(nameof(onNext));
  1196. }
  1197. if (onError == null)
  1198. {
  1199. throw new ArgumentNullException(nameof(onError));
  1200. }
  1201. if (onCompleted == null)
  1202. {
  1203. throw new ArgumentNullException(nameof(onCompleted));
  1204. }
  1205. return s_impl.SelectMany(source, onNext, onError, onCompleted);
  1206. }
  1207. /// <summary>
  1208. /// Projects each notification of an observable sequence to an observable sequence by incorporating the element's index and merges the resulting observable sequences into one observable sequence.
  1209. /// </summary>
  1210. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  1211. /// <typeparam name="TResult">The type of the elements in the projected inner sequences and the elements in the merged result sequence.</typeparam>
  1212. /// <param name="source">An observable sequence of notifications to project.</param>
  1213. /// <param name="onNext">A transform function to apply to each element; the second parameter of the function represents the index of the source element.</param>
  1214. /// <param name="onError">A transform function to apply when an error occurs in the source sequence.</param>
  1215. /// <param name="onCompleted">A transform function to apply when the end of the source sequence is reached.</param>
  1216. /// <returns>An observable sequence whose elements are the result of invoking the one-to-many transform function corresponding to each notification in the input sequence.</returns>
  1217. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="onNext"/> or <paramref name="onError"/> or <paramref name="onCompleted"/> is null.</exception>
  1218. public static IObservable<TResult> SelectMany<TSource, TResult>(this IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> onNext, Func<Exception, IObservable<TResult>> onError, Func<IObservable<TResult>> onCompleted)
  1219. {
  1220. if (source == null)
  1221. {
  1222. throw new ArgumentNullException(nameof(source));
  1223. }
  1224. if (onNext == null)
  1225. {
  1226. throw new ArgumentNullException(nameof(onNext));
  1227. }
  1228. if (onError == null)
  1229. {
  1230. throw new ArgumentNullException(nameof(onError));
  1231. }
  1232. if (onCompleted == null)
  1233. {
  1234. throw new ArgumentNullException(nameof(onCompleted));
  1235. }
  1236. return s_impl.SelectMany(source, onNext, onError, onCompleted);
  1237. }
  1238. /// <summary>
  1239. /// Projects each element of an observable sequence to an enumerable sequence and concatenates the resulting enumerable sequences into one observable sequence.
  1240. /// </summary>
  1241. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  1242. /// <typeparam name="TResult">The type of the elements in the projected inner enumerable sequences and the elements in the merged result sequence.</typeparam>
  1243. /// <param name="source">An observable sequence of elements to project.</param>
  1244. /// <param name="selector">A transform function to apply to each element.</param>
  1245. /// <returns>An observable sequence whose elements are the result of invoking the one-to-many transform function on each element of the input sequence.</returns>
  1246. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="selector"/> is null.</exception>
  1247. /// <remarks>The projected sequences are enumerated synchronously within the OnNext call of the source sequence. In order to do a concurrent, non-blocking merge, change the selector to return an observable sequence obtained using the <see cref="Observable.ToObservable{TSource}(IEnumerable{TSource})"/> conversion.</remarks>
  1248. public static IObservable<TResult> SelectMany<TSource, TResult>(this IObservable<TSource> source, Func<TSource, IEnumerable<TResult>> selector)
  1249. {
  1250. if (source == null)
  1251. {
  1252. throw new ArgumentNullException(nameof(source));
  1253. }
  1254. if (selector == null)
  1255. {
  1256. throw new ArgumentNullException(nameof(selector));
  1257. }
  1258. return s_impl.SelectMany(source, selector);
  1259. }
  1260. /// <summary>
  1261. /// Projects each element of an observable sequence to an enumerable sequence by incorporating the element's index and concatenates the resulting enumerable sequences into one observable sequence.
  1262. /// </summary>
  1263. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  1264. /// <typeparam name="TResult">The type of the elements in the projected inner enumerable sequences and the elements in the merged result sequence.</typeparam>
  1265. /// <param name="source">An observable sequence of elements to project.</param>
  1266. /// <param name="selector">A transform function to apply to each element; the second parameter of the function represents the index of the source element.</param>
  1267. /// <returns>An observable sequence whose elements are the result of invoking the one-to-many transform function on each element of the input sequence.</returns>
  1268. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="selector"/> is null.</exception>
  1269. /// <remarks>The projected sequences are enumerated synchronously within the OnNext call of the source sequence. In order to do a concurrent, non-blocking merge, change the selector to return an observable sequence obtained using the <see cref="Observable.ToObservable{TSource}(IEnumerable{TSource})"/> conversion.</remarks>
  1270. public static IObservable<TResult> SelectMany<TSource, TResult>(this IObservable<TSource> source, Func<TSource, int, IEnumerable<TResult>> selector)
  1271. {
  1272. if (source == null)
  1273. {
  1274. throw new ArgumentNullException(nameof(source));
  1275. }
  1276. if (selector == null)
  1277. {
  1278. throw new ArgumentNullException(nameof(selector));
  1279. }
  1280. return s_impl.SelectMany(source, selector);
  1281. }
  1282. /// <summary>
  1283. /// Projects each element of an observable sequence to an enumerable sequence, invokes the result selector for the source element and each of the corresponding inner sequence's elements, and merges the results into one observable sequence.
  1284. /// </summary>
  1285. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  1286. /// <typeparam name="TCollection">The type of the elements in the projected intermediate enumerable sequences.</typeparam>
  1287. /// <typeparam name="TResult">The type of the elements in the result sequence, obtained by using the selector to combine source sequence elements with their corresponding intermediate sequence elements.</typeparam>
  1288. /// <param name="source">An observable sequence of elements to project.</param>
  1289. /// <param name="collectionSelector">A transform function to apply to each element.</param>
  1290. /// <param name="resultSelector">A transform function to apply to each element of the intermediate sequence.</param>
  1291. /// <returns>An observable sequence whose elements are the result of invoking the one-to-many transform function collectionSelector on each element of the input sequence and then mapping each of those sequence elements and their corresponding source element to a result element.</returns>
  1292. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="collectionSelector"/> or <paramref name="resultSelector"/> is null.</exception>
  1293. /// <remarks>The projected sequences are enumerated synchronously within the OnNext call of the source sequence. In order to do a concurrent, non-blocking merge, change the selector to return an observable sequence obtained using the <see cref="Observable.ToObservable{TSource}(IEnumerable{TSource})"/> conversion.</remarks>
  1294. public static IObservable<TResult> SelectMany<TSource, TCollection, TResult>(this IObservable<TSource> source, Func<TSource, IEnumerable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
  1295. {
  1296. if (source == null)
  1297. {
  1298. throw new ArgumentNullException(nameof(source));
  1299. }
  1300. if (collectionSelector == null)
  1301. {
  1302. throw new ArgumentNullException(nameof(collectionSelector));
  1303. }
  1304. if (resultSelector == null)
  1305. {
  1306. throw new ArgumentNullException(nameof(resultSelector));
  1307. }
  1308. return s_impl.SelectMany(source, collectionSelector, resultSelector);
  1309. }
  1310. /// <summary>
  1311. /// Projects each element of an observable sequence to an enumerable sequence by incorporating the element's index, invokes the result selector for the source element and each of the corresponding inner sequence's elements, and merges the results into one observable sequence.
  1312. /// </summary>
  1313. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  1314. /// <typeparam name="TCollection">The type of the elements in the projected intermediate enumerable sequences.</typeparam>
  1315. /// <typeparam name="TResult">The type of the elements in the result sequence, obtained by using the selector to combine source sequence elements with their corresponding intermediate sequence elements.</typeparam>
  1316. /// <param name="source">An observable sequence of elements to project.</param>
  1317. /// <param name="collectionSelector">A transform function to apply to each element; the second parameter of the function represents the index of the source element.</param>
  1318. /// <param name="resultSelector">A transform function to apply to each element of the intermediate sequence; the second parameter of the function represents the index of the source element and the fourth parameter represents the index of the intermediate element.</param>
  1319. /// <returns>An observable sequence whose elements are the result of invoking the one-to-many transform function collectionSelector on each element of the input sequence and then mapping each of those sequence elements and their corresponding source element to a result element.</returns>
  1320. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="collectionSelector"/> or <paramref name="resultSelector"/> is null.</exception>
  1321. /// <remarks>The projected sequences are enumerated synchronously within the OnNext call of the source sequence. In order to do a concurrent, non-blocking merge, change the selector to return an observable sequence obtained using the <see cref="Observable.ToObservable{TSource}(IEnumerable{TSource})"/> conversion.</remarks>
  1322. public static IObservable<TResult> SelectMany<TSource, TCollection, TResult>(this IObservable<TSource> source, Func<TSource, int, IEnumerable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector)
  1323. {
  1324. if (source == null)
  1325. {
  1326. throw new ArgumentNullException(nameof(source));
  1327. }
  1328. if (collectionSelector == null)
  1329. {
  1330. throw new ArgumentNullException(nameof(collectionSelector));
  1331. }
  1332. if (resultSelector == null)
  1333. {
  1334. throw new ArgumentNullException(nameof(resultSelector));
  1335. }
  1336. return s_impl.SelectMany(source, collectionSelector, resultSelector);
  1337. }
  1338. #endregion
  1339. #region + Skip +
  1340. /// <summary>
  1341. /// Bypasses a specified number of elements in an observable sequence and then returns the remaining elements.
  1342. /// </summary>
  1343. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  1344. /// <param name="source">The sequence to take elements from.</param>
  1345. /// <param name="count">The number of elements to skip before returning the remaining elements.</param>
  1346. /// <returns>An observable sequence that contains the elements that occur after the specified index in the input sequence.</returns>
  1347. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  1348. /// <exception cref="ArgumentOutOfRangeException"><paramref name="count"/> is less than zero.</exception>
  1349. public static IObservable<TSource> Skip<TSource>(this IObservable<TSource> source, int count)
  1350. {
  1351. if (source == null)
  1352. {
  1353. throw new ArgumentNullException(nameof(source));
  1354. }
  1355. if (count < 0)
  1356. {
  1357. throw new ArgumentOutOfRangeException(nameof(count));
  1358. }
  1359. return s_impl.Skip(source, count);
  1360. }
  1361. #endregion
  1362. #region + SkipWhile +
  1363. /// <summary>
  1364. /// Bypasses elements in an observable sequence as long as a specified condition is true and then returns the remaining elements.
  1365. /// </summary>
  1366. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  1367. /// <param name="source">An observable sequence to return elements from.</param>
  1368. /// <param name="predicate">A function to test each element for a condition.</param>
  1369. /// <returns>An observable sequence that contains the elements from the input sequence starting at the first element in the linear series that does not pass the test specified by predicate.</returns>
  1370. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="predicate"/> is null.</exception>
  1371. public static IObservable<TSource> SkipWhile<TSource>(this IObservable<TSource> source, Func<TSource, bool> predicate)
  1372. {
  1373. if (source == null)
  1374. {
  1375. throw new ArgumentNullException(nameof(source));
  1376. }
  1377. if (predicate == null)
  1378. {
  1379. throw new ArgumentNullException(nameof(predicate));
  1380. }
  1381. return s_impl.SkipWhile(source, predicate);
  1382. }
  1383. /// <summary>
  1384. /// Bypasses elements in an observable sequence as long as a specified condition is true and then returns the remaining elements.
  1385. /// The element's index is used in the logic of the predicate function.
  1386. /// </summary>
  1387. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  1388. /// <param name="source">An observable sequence to return elements from.</param>
  1389. /// <param name="predicate">A function to test each element for a condition; the second parameter of the function represents the index of the source element.</param>
  1390. /// <returns>An observable sequence that contains the elements from the input sequence starting at the first element in the linear series that does not pass the test specified by predicate.</returns>
  1391. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="predicate"/> is null.</exception>
  1392. public static IObservable<TSource> SkipWhile<TSource>(this IObservable<TSource> source, Func<TSource, int, bool> predicate)
  1393. {
  1394. if (source == null)
  1395. {
  1396. throw new ArgumentNullException(nameof(source));
  1397. }
  1398. if (predicate == null)
  1399. {
  1400. throw new ArgumentNullException(nameof(predicate));
  1401. }
  1402. return s_impl.SkipWhile(source, predicate);
  1403. }
  1404. #endregion
  1405. #region + Take +
  1406. /// <summary>
  1407. /// Returns a specified number of contiguous elements from the start of an observable sequence.
  1408. /// </summary>
  1409. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  1410. /// <param name="source">The sequence to take elements from.</param>
  1411. /// <param name="count">The number of elements to return.</param>
  1412. /// <returns>An observable sequence that contains the specified number of elements from the start of the input sequence.</returns>
  1413. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  1414. /// <exception cref="ArgumentOutOfRangeException"><paramref name="count"/> is less than zero.</exception>
  1415. public static IObservable<TSource> Take<TSource>(this IObservable<TSource> source, int count)
  1416. {
  1417. if (source == null)
  1418. {
  1419. throw new ArgumentNullException(nameof(source));
  1420. }
  1421. if (count < 0)
  1422. {
  1423. throw new ArgumentOutOfRangeException(nameof(count));
  1424. }
  1425. return s_impl.Take(source, count);
  1426. }
  1427. /// <summary>
  1428. /// Returns a specified number of contiguous elements from the start of an observable sequence, using the specified scheduler for the edge case of Take(0).
  1429. /// </summary>
  1430. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  1431. /// <param name="source">The sequence to take elements from.</param>
  1432. /// <param name="count">The number of elements to return.</param>
  1433. /// <param name="scheduler">Scheduler used to produce an OnCompleted message in case <paramref name="count">count</paramref> is set to 0.</param>
  1434. /// <returns>An observable sequence that contains the specified number of elements from the start of the input sequence.</returns>
  1435. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="scheduler"/> is null.</exception>
  1436. /// <exception cref="ArgumentOutOfRangeException"><paramref name="count"/> is less than zero.</exception>
  1437. public static IObservable<TSource> Take<TSource>(this IObservable<TSource> source, int count, IScheduler scheduler)
  1438. {
  1439. if (source == null)
  1440. {
  1441. throw new ArgumentNullException(nameof(source));
  1442. }
  1443. if (count < 0)
  1444. {
  1445. throw new ArgumentOutOfRangeException(nameof(count));
  1446. }
  1447. if (scheduler == null)
  1448. {
  1449. throw new ArgumentNullException(nameof(scheduler));
  1450. }
  1451. return s_impl.Take(source, count, scheduler);
  1452. }
  1453. #endregion
  1454. #region + TakeWhile +
  1455. /// <summary>
  1456. /// Returns elements from an observable sequence as long as a specified condition is true.
  1457. /// </summary>
  1458. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  1459. /// <param name="source">A sequence to return elements from.</param>
  1460. /// <param name="predicate">A function to test each element for a condition.</param>
  1461. /// <returns>An observable sequence that contains the elements from the input sequence that occur before the element at which the test no longer passes.</returns>
  1462. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="predicate"/> is null.</exception>
  1463. public static IObservable<TSource> TakeWhile<TSource>(this IObservable<TSource> source, Func<TSource, bool> predicate)
  1464. {
  1465. if (source == null)
  1466. {
  1467. throw new ArgumentNullException(nameof(source));
  1468. }
  1469. if (predicate == null)
  1470. {
  1471. throw new ArgumentNullException(nameof(predicate));
  1472. }
  1473. return s_impl.TakeWhile(source, predicate);
  1474. }
  1475. /// <summary>
  1476. /// Returns elements from an observable sequence as long as a specified condition is true.
  1477. /// The element's index is used in the logic of the predicate function.
  1478. /// </summary>
  1479. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  1480. /// <param name="source">A sequence to return elements from.</param>
  1481. /// <param name="predicate">A function to test each element for a condition; the second parameter of the function represents the index of the source element.</param>
  1482. /// <returns>An observable sequence that contains the elements from the input sequence that occur before the element at which the test no longer passes.</returns>
  1483. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="predicate"/> is null.</exception>
  1484. public static IObservable<TSource> TakeWhile<TSource>(this IObservable<TSource> source, Func<TSource, int, bool> predicate)
  1485. {
  1486. if (source == null)
  1487. {
  1488. throw new ArgumentNullException(nameof(source));
  1489. }
  1490. if (predicate == null)
  1491. {
  1492. throw new ArgumentNullException(nameof(predicate));
  1493. }
  1494. return s_impl.TakeWhile(source, predicate);
  1495. }
  1496. #endregion
  1497. #region + Where +
  1498. /// <summary>
  1499. /// Filters the elements of an observable sequence based on a predicate.
  1500. /// </summary>
  1501. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  1502. /// <param name="source">An observable sequence whose elements to filter.</param>
  1503. /// <param name="predicate">A function to test each source element for a condition.</param>
  1504. /// <returns>An observable sequence that contains elements from the input sequence that satisfy the condition.</returns>
  1505. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="predicate"/> is null.</exception>
  1506. public static IObservable<TSource> Where<TSource>(this IObservable<TSource> source, Func<TSource, bool> predicate)
  1507. {
  1508. if (source == null)
  1509. {
  1510. throw new ArgumentNullException(nameof(source));
  1511. }
  1512. if (predicate == null)
  1513. {
  1514. throw new ArgumentNullException(nameof(predicate));
  1515. }
  1516. return s_impl.Where(source, predicate);
  1517. }
  1518. /// <summary>
  1519. /// Filters the elements of an observable sequence based on a predicate by incorporating the element's index.
  1520. /// </summary>
  1521. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  1522. /// <param name="source">An observable sequence whose elements to filter.</param>
  1523. /// <param name="predicate">A function to test each source element for a condition; the second parameter of the function represents the index of the source element.</param>
  1524. /// <returns>An observable sequence that contains elements from the input sequence that satisfy the condition.</returns>
  1525. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="predicate"/> is null.</exception>
  1526. public static IObservable<TSource> Where<TSource>(this IObservable<TSource> source, Func<TSource, int, bool> predicate)
  1527. {
  1528. if (source == null)
  1529. {
  1530. throw new ArgumentNullException(nameof(source));
  1531. }
  1532. if (predicate == null)
  1533. {
  1534. throw new ArgumentNullException(nameof(predicate));
  1535. }
  1536. return s_impl.Where(source, predicate);
  1537. }
  1538. #endregion
  1539. }
  1540. }