Observable.Blocking.cs 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. namespace System.Reactive.Linq
  6. {
  7. public static partial class Observable
  8. {
  9. #region + Chunkify +
  10. /// <summary>
  11. /// Produces an enumerable sequence of consecutive (possibly empty) chunks of the source sequence.
  12. /// </summary>
  13. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  14. /// <param name="source">Source observable sequence.</param>
  15. /// <returns>The enumerable sequence that returns consecutive (possibly empty) chunks upon each iteration.</returns>
  16. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  17. public static IEnumerable<IList<TSource>> Chunkify<TSource>(this IObservable<TSource> source)
  18. {
  19. if (source == null)
  20. throw new ArgumentNullException(nameof(source));
  21. return s_impl.Chunkify<TSource>(source);
  22. }
  23. #endregion
  24. #region + Collect +
  25. /// <summary>
  26. /// Produces an enumerable sequence that returns elements collected/aggregated from the source sequence between consecutive iterations.
  27. /// </summary>
  28. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  29. /// <typeparam name="TResult">The type of the elements produced by the merge operation during collection.</typeparam>
  30. /// <param name="source">Source observable sequence.</param>
  31. /// <param name="newCollector">Factory to create a new collector object.</param>
  32. /// <param name="merge">Merges a sequence element with the current collector.</param>
  33. /// <returns>The enumerable sequence that returns collected/aggregated elements from the source sequence upon each iteration.</returns>
  34. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="newCollector"/> or <paramref name="merge"/> is null.</exception>
  35. public static IEnumerable<TResult> Collect<TSource, TResult>(this IObservable<TSource> source, Func<TResult> newCollector, Func<TResult, TSource, TResult> merge)
  36. {
  37. if (source == null)
  38. throw new ArgumentNullException(nameof(source));
  39. if (newCollector == null)
  40. throw new ArgumentNullException(nameof(newCollector));
  41. if (merge == null)
  42. throw new ArgumentNullException(nameof(merge));
  43. return s_impl.Collect<TSource, TResult>(source, newCollector, merge);
  44. }
  45. /// <summary>
  46. /// Produces an enumerable sequence that returns elements collected/aggregated from the source sequence between consecutive iterations.
  47. /// </summary>
  48. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  49. /// <typeparam name="TResult">The type of the elements produced by the merge operation during collection.</typeparam>
  50. /// <param name="source">Source observable sequence.</param>
  51. /// <param name="getInitialCollector">Factory to create the initial collector object.</param>
  52. /// <param name="merge">Merges a sequence element with the current collector.</param>
  53. /// <param name="getNewCollector">Factory to replace the current collector by a new collector.</param>
  54. /// <returns>The enumerable sequence that returns collected/aggregated elements from the source sequence upon each iteration.</returns>
  55. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="getInitialCollector"/> or <paramref name="merge"/> or <paramref name="getNewCollector"/> is null.</exception>
  56. public static IEnumerable<TResult> Collect<TSource, TResult>(this IObservable<TSource> source, Func<TResult> getInitialCollector, Func<TResult, TSource, TResult> merge, Func<TResult, TResult> getNewCollector)
  57. {
  58. if (source == null)
  59. throw new ArgumentNullException(nameof(source));
  60. if (getInitialCollector == null)
  61. throw new ArgumentNullException(nameof(getInitialCollector));
  62. if (merge == null)
  63. throw new ArgumentNullException(nameof(merge));
  64. if (getNewCollector == null)
  65. throw new ArgumentNullException(nameof(getNewCollector));
  66. return s_impl.Collect<TSource, TResult>(source, getInitialCollector, merge, getNewCollector);
  67. }
  68. #endregion
  69. #region + First +
  70. /// <summary>
  71. /// Returns the first element of an observable sequence.
  72. /// </summary>
  73. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  74. /// <param name="source">Source observable sequence.</param>
  75. /// <returns>The first element in the observable sequence.</returns>
  76. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  77. /// <exception cref="InvalidOperationException">The source sequence is empty.</exception>
  78. /// <seealso cref="Observable.FirstAsync{TSource}(IObservable{TSource})"/>
  79. #if PREFER_ASYNC
  80. [Obsolete(Constants_Linq.USE_ASYNC)]
  81. #endif
  82. public static TSource First<TSource>(this IObservable<TSource> source)
  83. {
  84. if (source == null)
  85. throw new ArgumentNullException(nameof(source));
  86. return s_impl.First<TSource>(source);
  87. }
  88. /// <summary>
  89. /// Returns the first element of an observable sequence that satisfies the condition in the predicate.
  90. /// </summary>
  91. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  92. /// <param name="source">Source observable sequence.</param>
  93. /// <param name="predicate">A predicate function to evaluate for elements in the source sequence.</param>
  94. /// <returns>The first element in the observable sequence that satisfies the condition in the predicate.</returns>
  95. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="predicate"/> is null.</exception>
  96. /// <exception cref="InvalidOperationException">No element satisfies the condition in the predicate. -or- The source sequence is empty.</exception>
  97. /// <seealso cref="Observable.FirstAsync{TSource}(IObservable{TSource}, Func{TSource, bool})"/>
  98. #if PREFER_ASYNC
  99. [Obsolete(Constants_Linq.USE_ASYNC)]
  100. #endif
  101. public static TSource First<TSource>(this IObservable<TSource> source, Func<TSource, bool> predicate)
  102. {
  103. if (source == null)
  104. throw new ArgumentNullException(nameof(source));
  105. if (predicate == null)
  106. throw new ArgumentNullException(nameof(predicate));
  107. return s_impl.First<TSource>(source, predicate);
  108. }
  109. #endregion
  110. #region + FirstOrDefault +
  111. /// <summary>
  112. /// Returns the first element of an observable sequence, or a default value if no such element exists.
  113. /// </summary>
  114. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  115. /// <param name="source">Source observable sequence.</param>
  116. /// <returns>The first element in the observable sequence, or a default value if no such element exists.</returns>
  117. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  118. /// <seealso cref="Observable.FirstOrDefaultAsync{TSource}(IObservable{TSource})"/>
  119. #if PREFER_ASYNC
  120. [Obsolete(Constants_Linq.USE_ASYNC)]
  121. #endif
  122. public static TSource FirstOrDefault<TSource>(this IObservable<TSource> source)
  123. {
  124. if (source == null)
  125. throw new ArgumentNullException(nameof(source));
  126. return s_impl.FirstOrDefault<TSource>(source);
  127. }
  128. /// <summary>
  129. /// Returns the first element of an observable sequence that satisfies the condition in the predicate, or a default value if no such element exists.
  130. /// </summary>
  131. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  132. /// <param name="source">Source observable sequence.</param>
  133. /// <param name="predicate">A predicate function to evaluate for elements in the source sequence.</param>
  134. /// <returns>The first element in the observable sequence that satisfies the condition in the predicate, or a default value if no such element exists.</returns>
  135. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="predicate"/> is null.</exception>
  136. /// <seealso cref="Observable.FirstOrDefaultAsync{TSource}(IObservable{TSource}, Func{TSource, bool})"/>
  137. #if PREFER_ASYNC
  138. [Obsolete(Constants_Linq.USE_ASYNC)]
  139. #endif
  140. public static TSource FirstOrDefault<TSource>(this IObservable<TSource> source, Func<TSource, bool> predicate)
  141. {
  142. if (source == null)
  143. throw new ArgumentNullException(nameof(source));
  144. if (predicate == null)
  145. throw new ArgumentNullException(nameof(predicate));
  146. return s_impl.FirstOrDefault<TSource>(source, predicate);
  147. }
  148. #endregion
  149. #region + ForEach +
  150. /// <summary>
  151. /// Invokes an action for each element in the observable sequence, and blocks until the sequence is terminated.
  152. /// </summary>
  153. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  154. /// <param name="source">Source sequence.</param>
  155. /// <param name="onNext">Action to invoke for each element in the observable sequence.</param>
  156. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="onNext"/> is null.</exception>
  157. /// <remarks>Because of its blocking nature, this operator is mainly used for testing.</remarks>
  158. #if PREFER_ASYNC
  159. [Obsolete(Constants_Linq.USE_ASYNC)]
  160. #endif
  161. public static void ForEach<TSource>(this IObservable<TSource> source, Action<TSource> onNext)
  162. {
  163. if (source == null)
  164. throw new ArgumentNullException(nameof(source));
  165. if (onNext == null)
  166. throw new ArgumentNullException(nameof(onNext));
  167. s_impl.ForEach<TSource>(source, onNext);
  168. }
  169. /// <summary>
  170. /// Invokes an action for each element in the observable sequence, incorporating the element's index, and blocks until the sequence is terminated.
  171. /// </summary>
  172. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  173. /// <param name="source">Source sequence.</param>
  174. /// <param name="onNext">Action to invoke for each element in the observable sequence.</param>
  175. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="onNext"/> is null.</exception>
  176. /// <remarks>Because of its blocking nature, this operator is mainly used for testing.</remarks>
  177. #if PREFER_ASYNC
  178. [Obsolete(Constants_Linq.USE_ASYNC)]
  179. #endif
  180. public static void ForEach<TSource>(this IObservable<TSource> source, Action<TSource, int> onNext)
  181. {
  182. if (source == null)
  183. throw new ArgumentNullException(nameof(source));
  184. if (onNext == null)
  185. throw new ArgumentNullException(nameof(onNext));
  186. s_impl.ForEach<TSource>(source, onNext);
  187. }
  188. #endregion
  189. #region + GetEnumerator +
  190. /// <summary>
  191. /// Returns an enumerator that enumerates all values of the observable sequence.
  192. /// </summary>
  193. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  194. /// <param name="source">An observable sequence to get an enumerator for.</param>
  195. /// <returns>The enumerator that can be used to enumerate over the elements in the observable sequence.</returns>
  196. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  197. public static IEnumerator<TSource> GetEnumerator<TSource>(this IObservable<TSource> source)
  198. {
  199. if (source == null)
  200. throw new ArgumentNullException(nameof(source));
  201. return s_impl.GetEnumerator<TSource>(source);
  202. }
  203. #endregion
  204. #region + Last +
  205. /// <summary>
  206. /// Returns the last element of an observable sequence.
  207. /// </summary>
  208. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  209. /// <param name="source">Source observable sequence.</param>
  210. /// <returns>The last element in the observable sequence.</returns>
  211. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  212. /// <exception cref="InvalidOperationException">The source sequence is empty.</exception>
  213. /// <seealso cref="Observable.LastAsync{TSource}(IObservable{TSource})"/>
  214. #if PREFER_ASYNC
  215. [Obsolete(Constants_Linq.USE_ASYNC)]
  216. #endif
  217. public static TSource Last<TSource>(this IObservable<TSource> source)
  218. {
  219. if (source == null)
  220. throw new ArgumentNullException(nameof(source));
  221. return s_impl.Last<TSource>(source);
  222. }
  223. /// <summary>
  224. /// Returns the last element of an observable sequence that satisfies the condition in the predicate.
  225. /// </summary>
  226. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  227. /// <param name="source">Source observable sequence.</param>
  228. /// <param name="predicate">A predicate function to evaluate for elements in the source sequence.</param>
  229. /// <returns>The last element in the observable sequence that satisfies the condition in the predicate.</returns>
  230. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="predicate"/> is null.</exception>
  231. /// <exception cref="InvalidOperationException">No element satisfies the condition in the predicate. -or- The source sequence is empty.</exception>
  232. /// <seealso cref="Observable.LastAsync{TSource}(IObservable{TSource}, Func{TSource, bool})"/>
  233. #if PREFER_ASYNC
  234. [Obsolete(Constants_Linq.USE_ASYNC)]
  235. #endif
  236. public static TSource Last<TSource>(this IObservable<TSource> source, Func<TSource, bool> predicate)
  237. {
  238. if (source == null)
  239. throw new ArgumentNullException(nameof(source));
  240. if (predicate == null)
  241. throw new ArgumentNullException(nameof(predicate));
  242. return s_impl.Last<TSource>(source, predicate);
  243. }
  244. #endregion
  245. #region + LastOrDefault +
  246. /// <summary>
  247. /// Returns the last element of an observable sequence, or a default value if no such element exists.
  248. /// </summary>
  249. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  250. /// <param name="source">Source observable sequence.</param>
  251. /// <returns>The last element in the observable sequence, or a default value if no such element exists.</returns>
  252. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  253. /// <seealso cref="Observable.LastOrDefaultAsync{TSource}(IObservable{TSource})"/>
  254. #if PREFER_ASYNC
  255. [Obsolete(Constants_Linq.USE_ASYNC)]
  256. #endif
  257. public static TSource LastOrDefault<TSource>(this IObservable<TSource> source)
  258. {
  259. if (source == null)
  260. throw new ArgumentNullException(nameof(source));
  261. return s_impl.LastOrDefault<TSource>(source);
  262. }
  263. /// <summary>
  264. /// Returns the last element of an observable sequence that satisfies the condition in the predicate, or a default value if no such element exists.
  265. /// </summary>
  266. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  267. /// <param name="source">Source observable sequence.</param>
  268. /// <param name="predicate">A predicate function to evaluate for elements in the source sequence.</param>
  269. /// <returns>The last element in the observable sequence that satisfies the condition in the predicate, or a default value if no such element exists.</returns>
  270. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="predicate"/> is null.</exception>
  271. /// <seealso cref="Observable.LastOrDefaultAsync{TSource}(IObservable{TSource}, Func{TSource, bool})"/>
  272. #if PREFER_ASYNC
  273. [Obsolete(Constants_Linq.USE_ASYNC)]
  274. #endif
  275. public static TSource LastOrDefault<TSource>(this IObservable<TSource> source, Func<TSource, bool> predicate)
  276. {
  277. if (source == null)
  278. throw new ArgumentNullException(nameof(source));
  279. if (predicate == null)
  280. throw new ArgumentNullException(nameof(predicate));
  281. return s_impl.LastOrDefault<TSource>(source, predicate);
  282. }
  283. #endregion
  284. #region + Latest +
  285. /// <summary>
  286. /// Returns an enumerable sequence whose enumeration returns the latest observed element in the source observable sequence.
  287. /// Enumerators on the resulting sequence will never produce the same element repeatedly, and will block until the next element becomes available.
  288. /// </summary>
  289. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  290. /// <param name="source">Source observable sequence.</param>
  291. /// <returns>The enumerable sequence that returns the last sampled element upon each iteration and subsequently blocks until the next element in the observable source sequence becomes available.</returns>
  292. public static IEnumerable<TSource> Latest<TSource>(this IObservable<TSource> source)
  293. {
  294. if (source == null)
  295. throw new ArgumentNullException(nameof(source));
  296. return s_impl.Latest<TSource>(source);
  297. }
  298. #endregion
  299. #region + MostRecent +
  300. /// <summary>
  301. /// Returns an enumerable sequence whose enumeration returns the most recently observed element in the source observable sequence, using the specified initial value in case no element has been sampled yet.
  302. /// Enumerators on the resulting sequence never block and can produce the same element repeatedly.
  303. /// </summary>
  304. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  305. /// <param name="source">Source observable sequence.</param>
  306. /// <param name="initialValue">Initial value that will be yielded by the enumerable sequence if no element has been sampled yet.</param>
  307. /// <returns>The enumerable sequence that returns the last sampled element upon each iteration.</returns>
  308. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  309. public static IEnumerable<TSource> MostRecent<TSource>(this IObservable<TSource> source, TSource initialValue)
  310. {
  311. if (source == null)
  312. throw new ArgumentNullException(nameof(source));
  313. return s_impl.MostRecent<TSource>(source, initialValue);
  314. }
  315. #endregion
  316. #region + Next +
  317. /// <summary>
  318. /// Returns an enumerable sequence whose enumeration blocks until the next element in the source observable sequence becomes available.
  319. /// Enumerators on the resulting sequence will block until the next element becomes available.
  320. /// </summary>
  321. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  322. /// <param name="source">Source observable sequence.</param>
  323. /// <returns>The enumerable sequence that blocks upon each iteration until the next element in the observable source sequence becomes available.</returns>
  324. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  325. public static IEnumerable<TSource> Next<TSource>(this IObservable<TSource> source)
  326. {
  327. if (source == null)
  328. throw new ArgumentNullException(nameof(source));
  329. return s_impl.Next<TSource>(source);
  330. }
  331. #endregion
  332. #region + Single +
  333. /// <summary>
  334. /// Returns the only element of an observable sequence, and throws an exception if there is not exactly one element in the observable sequence.
  335. /// </summary>
  336. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  337. /// <param name="source">Source observable sequence.</param>
  338. /// <returns>The single element in the observable sequence.</returns>
  339. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  340. /// <exception cref="InvalidOperationException">The source sequence contains more than one element. -or- The source sequence is empty.</exception>
  341. /// <seealso cref="Observable.SingleAsync{TSource}(IObservable{TSource})"/>
  342. #if PREFER_ASYNC
  343. [Obsolete(Constants_Linq.USE_ASYNC)]
  344. #endif
  345. public static TSource Single<TSource>(this IObservable<TSource> source)
  346. {
  347. if (source == null)
  348. throw new ArgumentNullException(nameof(source));
  349. return s_impl.Single<TSource>(source);
  350. }
  351. /// <summary>
  352. /// Returns the only element of an observable sequence that satisfies the condition in the predicate, and throws an exception if there is not exactly one element matching the predicate in the observable sequence.
  353. /// </summary>
  354. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  355. /// <param name="source">Source observable sequence.</param>
  356. /// <param name="predicate">A predicate function to evaluate for elements in the source sequence.</param>
  357. /// <returns>The single element in the observable sequence that satisfies the condition in the predicate.</returns>
  358. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="predicate"/> is null.</exception>
  359. /// <exception cref="InvalidOperationException">No element satisfies the condition in the predicate. -or- More than one element satisfies the condition in the predicate. -or- The source sequence is empty.</exception>
  360. /// <seealso cref="Observable.SingleAsync{TSource}(IObservable{TSource}, Func{TSource, bool})"/>
  361. #if PREFER_ASYNC
  362. [Obsolete(Constants_Linq.USE_ASYNC)]
  363. #endif
  364. public static TSource Single<TSource>(this IObservable<TSource> source, Func<TSource, bool> predicate)
  365. {
  366. if (source == null)
  367. throw new ArgumentNullException(nameof(source));
  368. if (predicate == null)
  369. throw new ArgumentNullException(nameof(predicate));
  370. return s_impl.Single<TSource>(source, predicate);
  371. }
  372. #endregion
  373. #region + SingleOrDefault +
  374. /// <summary>
  375. /// Returns the only element of an observable sequence, or a default value if the observable sequence is empty; this method throws an exception if there is more than one element in the observable sequence.
  376. /// </summary>
  377. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  378. /// <param name="source">Source observable sequence.</param>
  379. /// <returns>The single element in the observable sequence, or a default value if no such element exists.</returns>
  380. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  381. /// <exception cref="InvalidOperationException">The source sequence contains more than one element.</exception>
  382. /// <seealso cref="Observable.SingleOrDefaultAsync{TSource}(IObservable{TSource})"/>
  383. #if PREFER_ASYNC
  384. [Obsolete(Constants_Linq.USE_ASYNC)]
  385. #endif
  386. public static TSource SingleOrDefault<TSource>(this IObservable<TSource> source)
  387. {
  388. if (source == null)
  389. throw new ArgumentNullException(nameof(source));
  390. return s_impl.SingleOrDefault<TSource>(source);
  391. }
  392. /// <summary>
  393. /// Returns the only element of an observable sequence that satisfies the condition in the predicate, or a default value if no such element exists; this method throws an exception if there is more than one element matching the predicate in the observable sequence.
  394. /// </summary>
  395. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  396. /// <param name="source">Source observable sequence.</param>
  397. /// <param name="predicate">A predicate function to evaluate for elements in the source sequence.</param>
  398. /// <returns>The single element in the observable sequence that satisfies the condition in the predicate, or a default value if no such element exists.</returns>
  399. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="predicate"/> is null.</exception>
  400. /// <exception cref="InvalidOperationException">The sequence contains more than one element that satisfies the condition in the predicate.</exception>
  401. /// <seealso cref="Observable.SingleOrDefaultAsync{TSource}(IObservable{TSource}, Func{TSource, bool})"/>
  402. #if PREFER_ASYNC
  403. [Obsolete(Constants_Linq.USE_ASYNC)]
  404. #endif
  405. public static TSource SingleOrDefault<TSource>(this IObservable<TSource> source, Func<TSource, bool> predicate)
  406. {
  407. if (source == null)
  408. throw new ArgumentNullException(nameof(source));
  409. if (predicate == null)
  410. throw new ArgumentNullException(nameof(predicate));
  411. return s_impl.SingleOrDefault<TSource>(source, predicate);
  412. }
  413. #endregion
  414. #region + Wait +
  415. /// <summary>
  416. /// Waits for the observable sequence to complete and returns the last element of the sequence.
  417. /// If the sequence terminates with an OnError notification, the exception is thrown.
  418. /// </summary>
  419. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  420. /// <param name="source">Source observable sequence.</param>
  421. /// <returns>The last element in the observable sequence.</returns>
  422. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  423. /// <exception cref="InvalidOperationException">The source sequence is empty.</exception>
  424. public static TSource Wait<TSource>(this IObservable<TSource> source)
  425. {
  426. if (source == null)
  427. throw new ArgumentNullException(nameof(source));
  428. return s_impl.Wait<TSource>(source);
  429. }
  430. #endregion
  431. }
  432. }