QueryLanguage.StandardSequenceOperators.cs 22 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Reactive.Concurrency;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. namespace System.Reactive.Linq
  9. {
  10. using ObservableImpl;
  11. internal partial class QueryLanguage
  12. {
  13. #region + Cast +
  14. public virtual IObservable<TResult> Cast<TResult>(IObservable<object> source)
  15. {
  16. return new Cast<object, TResult>(source);
  17. }
  18. #endregion
  19. #region + DefaultIfEmpty +
  20. public virtual IObservable<TSource> DefaultIfEmpty<TSource>(IObservable<TSource> source)
  21. {
  22. return new DefaultIfEmpty<TSource>(source, default);
  23. }
  24. public virtual IObservable<TSource> DefaultIfEmpty<TSource>(IObservable<TSource> source, TSource defaultValue)
  25. {
  26. return new DefaultIfEmpty<TSource>(source, defaultValue);
  27. }
  28. #endregion
  29. #region + Distinct +
  30. public virtual IObservable<TSource> Distinct<TSource>(IObservable<TSource> source)
  31. {
  32. return new Distinct<TSource, TSource>(source, x => x, EqualityComparer<TSource>.Default);
  33. }
  34. public virtual IObservable<TSource> Distinct<TSource>(IObservable<TSource> source, IEqualityComparer<TSource> comparer)
  35. {
  36. return new Distinct<TSource, TSource>(source, x => x, comparer);
  37. }
  38. public virtual IObservable<TSource> Distinct<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector)
  39. {
  40. return new Distinct<TSource, TKey>(source, keySelector, EqualityComparer<TKey>.Default);
  41. }
  42. public virtual IObservable<TSource> Distinct<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer)
  43. {
  44. return new Distinct<TSource, TKey>(source, keySelector, comparer);
  45. }
  46. #endregion
  47. #region + GroupBy +
  48. public virtual IObservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector)
  49. {
  50. return GroupBy_(source, keySelector, elementSelector, null, EqualityComparer<TKey>.Default);
  51. }
  52. public virtual IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer)
  53. {
  54. return GroupBy_(source, keySelector, x => x, null, comparer);
  55. }
  56. public virtual IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector)
  57. {
  58. return GroupBy_(source, keySelector, x => x, null, EqualityComparer<TKey>.Default);
  59. }
  60. public virtual IObservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, IEqualityComparer<TKey> comparer)
  61. {
  62. return GroupBy_(source, keySelector, elementSelector, null, comparer);
  63. }
  64. public virtual IObservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, int capacity)
  65. {
  66. return GroupBy_(source, keySelector, elementSelector, capacity, EqualityComparer<TKey>.Default);
  67. }
  68. public virtual IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector, int capacity, IEqualityComparer<TKey> comparer)
  69. {
  70. return GroupBy_(source, keySelector, x => x, capacity, comparer);
  71. }
  72. public virtual IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector, int capacity)
  73. {
  74. return GroupBy_(source, keySelector, x => x, capacity, EqualityComparer<TKey>.Default);
  75. }
  76. public virtual IObservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, int capacity, IEqualityComparer<TKey> comparer)
  77. {
  78. return GroupBy_(source, keySelector, elementSelector, capacity, comparer);
  79. }
  80. private static IObservable<IGroupedObservable<TKey, TElement>> GroupBy_<TSource, TKey, TElement>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, int? capacity, IEqualityComparer<TKey> comparer)
  81. {
  82. return new GroupBy<TSource, TKey, TElement>(source, keySelector, elementSelector, capacity, comparer);
  83. }
  84. #endregion
  85. #region + GroupByUntil +
  86. public virtual IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
  87. {
  88. return GroupByUntil_(source, keySelector, elementSelector, durationSelector, null, comparer);
  89. }
  90. public virtual IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector)
  91. {
  92. return GroupByUntil_(source, keySelector, elementSelector, durationSelector, null, EqualityComparer<TKey>.Default);
  93. }
  94. public virtual IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
  95. {
  96. return GroupByUntil_(source, keySelector, x => x, durationSelector, null, comparer);
  97. }
  98. public virtual IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector)
  99. {
  100. return GroupByUntil_(source, keySelector, x => x, durationSelector, null, EqualityComparer<TKey>.Default);
  101. }
  102. public virtual IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer)
  103. {
  104. return GroupByUntil_(source, keySelector, elementSelector, durationSelector, capacity, comparer);
  105. }
  106. public virtual IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, int capacity)
  107. {
  108. return GroupByUntil_(source, keySelector, elementSelector, durationSelector, capacity, EqualityComparer<TKey>.Default);
  109. }
  110. public virtual IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer)
  111. {
  112. return GroupByUntil_(source, keySelector, x => x, durationSelector, capacity, comparer);
  113. }
  114. public virtual IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector, int capacity)
  115. {
  116. return GroupByUntil_(source, keySelector, x => x, durationSelector, capacity, EqualityComparer<TKey>.Default);
  117. }
  118. private static IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil_<TSource, TKey, TElement, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, int? capacity, IEqualityComparer<TKey> comparer)
  119. {
  120. return new GroupByUntil<TSource, TKey, TElement, TDuration>(source, keySelector, elementSelector, durationSelector, capacity, comparer);
  121. }
  122. #endregion
  123. #region + GroupJoin +
  124. public virtual IObservable<TResult> GroupJoin<TLeft, TRight, TLeftDuration, TRightDuration, TResult>(IObservable<TLeft> left, IObservable<TRight> right, Func<TLeft, IObservable<TLeftDuration>> leftDurationSelector, Func<TRight, IObservable<TRightDuration>> rightDurationSelector, Func<TLeft, IObservable<TRight>, TResult> resultSelector)
  125. {
  126. return GroupJoin_(left, right, leftDurationSelector, rightDurationSelector, resultSelector);
  127. }
  128. private static IObservable<TResult> GroupJoin_<TLeft, TRight, TLeftDuration, TRightDuration, TResult>(IObservable<TLeft> left, IObservable<TRight> right, Func<TLeft, IObservable<TLeftDuration>> leftDurationSelector, Func<TRight, IObservable<TRightDuration>> rightDurationSelector, Func<TLeft, IObservable<TRight>, TResult> resultSelector)
  129. {
  130. return new GroupJoin<TLeft, TRight, TLeftDuration, TRightDuration, TResult>(left, right, leftDurationSelector, rightDurationSelector, resultSelector);
  131. }
  132. #endregion
  133. #region + Join +
  134. public virtual IObservable<TResult> Join<TLeft, TRight, TLeftDuration, TRightDuration, TResult>(IObservable<TLeft> left, IObservable<TRight> right, Func<TLeft, IObservable<TLeftDuration>> leftDurationSelector, Func<TRight, IObservable<TRightDuration>> rightDurationSelector, Func<TLeft, TRight, TResult> resultSelector)
  135. {
  136. return Join_(left, right, leftDurationSelector, rightDurationSelector, resultSelector);
  137. }
  138. private static IObservable<TResult> Join_<TLeft, TRight, TLeftDuration, TRightDuration, TResult>(IObservable<TLeft> left, IObservable<TRight> right, Func<TLeft, IObservable<TLeftDuration>> leftDurationSelector, Func<TRight, IObservable<TRightDuration>> rightDurationSelector, Func<TLeft, TRight, TResult> resultSelector)
  139. {
  140. return new Join<TLeft, TRight, TLeftDuration, TRightDuration, TResult>(left, right, leftDurationSelector, rightDurationSelector, resultSelector);
  141. }
  142. #endregion
  143. #region + OfType +
  144. public virtual IObservable<TResult> OfType<TResult>(IObservable<object> source)
  145. {
  146. return new OfType<object, TResult>(source);
  147. }
  148. #endregion
  149. #region + Select +
  150. public virtual IObservable<TResult> Select<TSource, TResult>(IObservable<TSource> source, Func<TSource, TResult> selector)
  151. {
  152. // CONSIDER: Add fusion for Select/Select pairs.
  153. return new Select<TSource, TResult>.Selector(source, selector);
  154. }
  155. public virtual IObservable<TResult> Select<TSource, TResult>(IObservable<TSource> source, Func<TSource, int, TResult> selector)
  156. {
  157. return new Select<TSource, TResult>.SelectorIndexed(source, selector);
  158. }
  159. #endregion
  160. #region + SelectMany +
  161. public virtual IObservable<TOther> SelectMany<TSource, TOther>(IObservable<TSource> source, IObservable<TOther> other)
  162. {
  163. return SelectMany_(source, _ => other);
  164. }
  165. public virtual IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, IObservable<TResult>> selector)
  166. {
  167. return SelectMany_(source, selector);
  168. }
  169. public virtual IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector)
  170. {
  171. return SelectMany_(source, selector);
  172. }
  173. public virtual IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, Task<TResult>> selector)
  174. {
  175. return new SelectMany<TSource, TResult>.TaskSelector(source, (x, token) => selector(x));
  176. }
  177. public virtual IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, int, Task<TResult>> selector)
  178. {
  179. return new SelectMany<TSource, TResult>.TaskSelectorIndexed(source, (x, i, token) => selector(x, i));
  180. }
  181. public virtual IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, CancellationToken, Task<TResult>> selector)
  182. {
  183. return new SelectMany<TSource, TResult>.TaskSelector(source, selector);
  184. }
  185. public virtual IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, int, CancellationToken, Task<TResult>> selector)
  186. {
  187. return new SelectMany<TSource, TResult>.TaskSelectorIndexed(source, selector);
  188. }
  189. public virtual IObservable<TResult> SelectMany<TSource, TCollection, TResult>(IObservable<TSource> source, Func<TSource, IObservable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
  190. {
  191. return SelectMany_(source, collectionSelector, resultSelector);
  192. }
  193. public virtual IObservable<TResult> SelectMany<TSource, TCollection, TResult>(IObservable<TSource> source, Func<TSource, int, IObservable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector)
  194. {
  195. return SelectMany_(source, collectionSelector, resultSelector);
  196. }
  197. public virtual IObservable<TResult> SelectMany<TSource, TTaskResult, TResult>(IObservable<TSource> source, Func<TSource, Task<TTaskResult>> taskSelector, Func<TSource, TTaskResult, TResult> resultSelector)
  198. {
  199. return new SelectMany<TSource, TTaskResult, TResult>.TaskSelector(source, (x, token) => taskSelector(x), resultSelector);
  200. }
  201. public virtual IObservable<TResult> SelectMany<TSource, TTaskResult, TResult>(IObservable<TSource> source, Func<TSource, int, Task<TTaskResult>> taskSelector, Func<TSource, int, TTaskResult, TResult> resultSelector)
  202. {
  203. return new SelectMany<TSource, TTaskResult, TResult>.TaskSelectorIndexed(source, (x, i, token) => taskSelector(x, i), resultSelector);
  204. }
  205. public virtual IObservable<TResult> SelectMany<TSource, TTaskResult, TResult>(IObservable<TSource> source, Func<TSource, CancellationToken, Task<TTaskResult>> taskSelector, Func<TSource, TTaskResult, TResult> resultSelector)
  206. {
  207. return new SelectMany<TSource, TTaskResult, TResult>.TaskSelector(source, taskSelector, resultSelector);
  208. }
  209. public virtual IObservable<TResult> SelectMany<TSource, TTaskResult, TResult>(IObservable<TSource> source, Func<TSource, int, CancellationToken, Task<TTaskResult>> taskSelector, Func<TSource, int, TTaskResult, TResult> resultSelector)
  210. {
  211. return new SelectMany<TSource, TTaskResult, TResult>.TaskSelectorIndexed(source, taskSelector, resultSelector);
  212. }
  213. private static IObservable<TResult> SelectMany_<TSource, TResult>(IObservable<TSource> source, Func<TSource, IObservable<TResult>> selector)
  214. {
  215. return new SelectMany<TSource, TResult>.ObservableSelector(source, selector);
  216. }
  217. private static IObservable<TResult> SelectMany_<TSource, TResult>(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector)
  218. {
  219. return new SelectMany<TSource, TResult>.ObservableSelectorIndexed(source, selector);
  220. }
  221. private static IObservable<TResult> SelectMany_<TSource, TCollection, TResult>(IObservable<TSource> source, Func<TSource, IObservable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
  222. {
  223. return new SelectMany<TSource, TCollection, TResult>.ObservableSelector(source, collectionSelector, resultSelector);
  224. }
  225. private static IObservable<TResult> SelectMany_<TSource, TCollection, TResult>(IObservable<TSource> source, Func<TSource, int, IObservable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector)
  226. {
  227. return new SelectMany<TSource, TCollection, TResult>.ObservableSelectorIndexed(source, collectionSelector, resultSelector);
  228. }
  229. public virtual IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, IObservable<TResult>> onNext, Func<Exception, IObservable<TResult>> onError, Func<IObservable<TResult>> onCompleted)
  230. {
  231. return new SelectMany<TSource, TResult>.ObservableSelectors(source, onNext, onError, onCompleted);
  232. }
  233. public virtual IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> onNext, Func<Exception, IObservable<TResult>> onError, Func<IObservable<TResult>> onCompleted)
  234. {
  235. return new SelectMany<TSource, TResult>.ObservableSelectorsIndexed(source, onNext, onError, onCompleted);
  236. }
  237. public virtual IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, IEnumerable<TResult>> selector)
  238. {
  239. return new SelectMany<TSource, TResult>.EnumerableSelector(source, selector);
  240. }
  241. public virtual IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, int, IEnumerable<TResult>> selector)
  242. {
  243. return new SelectMany<TSource, TResult>.EnumerableSelectorIndexed(source, selector);
  244. }
  245. public virtual IObservable<TResult> SelectMany<TSource, TCollection, TResult>(IObservable<TSource> source, Func<TSource, IEnumerable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
  246. {
  247. return SelectMany_(source, collectionSelector, resultSelector);
  248. }
  249. public virtual IObservable<TResult> SelectMany<TSource, TCollection, TResult>(IObservable<TSource> source, Func<TSource, int, IEnumerable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector)
  250. {
  251. return SelectMany_(source, collectionSelector, resultSelector);
  252. }
  253. private static IObservable<TResult> SelectMany_<TSource, TCollection, TResult>(IObservable<TSource> source, Func<TSource, IEnumerable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
  254. {
  255. return new SelectMany<TSource, TCollection, TResult>.EnumerableSelector(source, collectionSelector, resultSelector);
  256. }
  257. private static IObservable<TResult> SelectMany_<TSource, TCollection, TResult>(IObservable<TSource> source, Func<TSource, int, IEnumerable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector)
  258. {
  259. return new SelectMany<TSource, TCollection, TResult>.EnumerableSelectorIndexed(source, collectionSelector, resultSelector);
  260. }
  261. #endregion
  262. #region + Skip +
  263. public virtual IObservable<TSource> Skip<TSource>(IObservable<TSource> source, int count)
  264. {
  265. if (source is Skip<TSource>.Count skip)
  266. {
  267. return skip.Combine(count);
  268. }
  269. return new Skip<TSource>.Count(source, count);
  270. }
  271. #endregion
  272. #region + SkipWhile +
  273. public virtual IObservable<TSource> SkipWhile<TSource>(IObservable<TSource> source, Func<TSource, bool> predicate)
  274. {
  275. return new SkipWhile<TSource>.Predicate(source, predicate);
  276. }
  277. public virtual IObservable<TSource> SkipWhile<TSource>(IObservable<TSource> source, Func<TSource, int, bool> predicate)
  278. {
  279. return new SkipWhile<TSource>.PredicateIndexed(source, predicate);
  280. }
  281. #endregion
  282. #region + Take +
  283. public virtual IObservable<TSource> Take<TSource>(IObservable<TSource> source, int count)
  284. {
  285. if (count == 0)
  286. {
  287. return Empty<TSource>();
  288. }
  289. return Take_(source, count);
  290. }
  291. public virtual IObservable<TSource> Take<TSource>(IObservable<TSource> source, int count, IScheduler scheduler)
  292. {
  293. if (count == 0)
  294. {
  295. return Empty<TSource>(scheduler);
  296. }
  297. return Take_(source, count);
  298. }
  299. private static IObservable<TSource> Take_<TSource>(IObservable<TSource> source, int count)
  300. {
  301. if (source is Take<TSource>.Count take)
  302. {
  303. return take.Combine(count);
  304. }
  305. return new Take<TSource>.Count(source, count);
  306. }
  307. #endregion
  308. #region + TakeWhile +
  309. public virtual IObservable<TSource> TakeWhile<TSource>(IObservable<TSource> source, Func<TSource, bool> predicate)
  310. {
  311. return new TakeWhile<TSource>.Predicate(source, predicate);
  312. }
  313. public virtual IObservable<TSource> TakeWhile<TSource>(IObservable<TSource> source, Func<TSource, int, bool> predicate)
  314. {
  315. return new TakeWhile<TSource>.PredicateIndexed(source, predicate);
  316. }
  317. #endregion
  318. #region + Where +
  319. public virtual IObservable<TSource> Where<TSource>(IObservable<TSource> source, Func<TSource, bool> predicate)
  320. {
  321. if (source is Where<TSource>.Predicate where)
  322. {
  323. return where.Combine(predicate);
  324. }
  325. return new Where<TSource>.Predicate(source, predicate);
  326. }
  327. public virtual IObservable<TSource> Where<TSource>(IObservable<TSource> source, Func<TSource, int, bool> predicate)
  328. {
  329. return new Where<TSource>.PredicateIndexed(source, predicate);
  330. }
  331. #endregion
  332. }
  333. }