Observable.Blocking.cs 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. using System.Collections.Generic;
  3. namespace System.Reactive.Linq
  4. {
  5. public static partial class Observable
  6. {
  7. #region + Chunkify +
  8. /// <summary>
  9. /// Produces an enumerable sequence of consecutive (possibly empty) chunks of the source sequence.
  10. /// </summary>
  11. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  12. /// <param name="source">Source observable sequence.</param>
  13. /// <returns>The enumerable sequence that returns consecutive (possibly empty) chunks upon each iteration.</returns>
  14. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  15. public static IEnumerable<IList<TSource>> Chunkify<TSource>(this IObservable<TSource> source)
  16. {
  17. if (source == null)
  18. throw new ArgumentNullException("source");
  19. return s_impl.Chunkify<TSource>(source);
  20. }
  21. #endregion
  22. #region + Collect +
  23. /// <summary>
  24. /// Produces an enumerable sequence that returns elements collected/aggregated from the source sequence between consecutive iterations.
  25. /// </summary>
  26. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  27. /// <typeparam name="TResult">The type of the elements produced by the merge operation during collection.</typeparam>
  28. /// <param name="source">Source observable sequence.</param>
  29. /// <param name="newCollector">Factory to create a new collector object.</param>
  30. /// <param name="merge">Merges a sequence element with the current collector.</param>
  31. /// <returns>The enumerable sequence that returns collected/aggregated elements from the source sequence upon each iteration.</returns>
  32. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="newCollector"/> or <paramref name="merge"/> is null.</exception>
  33. public static IEnumerable<TResult> Collect<TSource, TResult>(this IObservable<TSource> source, Func<TResult> newCollector, Func<TResult, TSource, TResult> merge)
  34. {
  35. if (source == null)
  36. throw new ArgumentNullException("source");
  37. if (newCollector == null)
  38. throw new ArgumentNullException("newCollector");
  39. if (merge == null)
  40. throw new ArgumentNullException("merge");
  41. return s_impl.Collect<TSource, TResult>(source, newCollector, merge);
  42. }
  43. /// <summary>
  44. /// Produces an enumerable sequence that returns elements collected/aggregated from the source sequence between consecutive iterations.
  45. /// </summary>
  46. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  47. /// <typeparam name="TResult">The type of the elements produced by the merge operation during collection.</typeparam>
  48. /// <param name="source">Source observable sequence.</param>
  49. /// <param name="getInitialCollector">Factory to create the initial collector object.</param>
  50. /// <param name="merge">Merges a sequence element with the current collector.</param>
  51. /// <param name="getNewCollector">Factory to replace the current collector by a new collector.</param>
  52. /// <returns>The enumerable sequence that returns collected/aggregated elements from the source sequence upon each iteration.</returns>
  53. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="getInitialCollector"/> or <paramref name="merge"/> or <paramref name="getNewCollector"/> is null.</exception>
  54. public static IEnumerable<TResult> Collect<TSource, TResult>(this IObservable<TSource> source, Func<TResult> getInitialCollector, Func<TResult, TSource, TResult> merge, Func<TResult, TResult> getNewCollector)
  55. {
  56. if (source == null)
  57. throw new ArgumentNullException("source");
  58. if (getInitialCollector == null)
  59. throw new ArgumentNullException("getInitialCollector");
  60. if (merge == null)
  61. throw new ArgumentNullException("merge");
  62. if (getNewCollector == null)
  63. throw new ArgumentNullException("getNewCollector");
  64. return s_impl.Collect<TSource, TResult>(source, getInitialCollector, merge, getNewCollector);
  65. }
  66. #endregion
  67. #region + First +
  68. /// <summary>
  69. /// Returns the first element of an observable sequence.
  70. /// </summary>
  71. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  72. /// <param name="source">Source observable sequence.</param>
  73. /// <returns>The first element in the observable sequence.</returns>
  74. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  75. /// <exception cref="InvalidOperationException">The source sequence is empty.</exception>
  76. /// <seealso cref="Observable.FirstAsync{TSource}(IObservable{TSource})"/>
  77. #if PREFER_ASYNC
  78. [Obsolete(Constants_Linq.USE_ASYNC)]
  79. #endif
  80. public static TSource First<TSource>(this IObservable<TSource> source)
  81. {
  82. if (source == null)
  83. throw new ArgumentNullException("source");
  84. return s_impl.First<TSource>(source);
  85. }
  86. /// <summary>
  87. /// Returns the first element of an observable sequence that satisfies the condition in the predicate.
  88. /// </summary>
  89. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  90. /// <param name="source">Source observable sequence.</param>
  91. /// <param name="predicate">A predicate function to evaluate for elements in the source sequence.</param>
  92. /// <returns>The first element in the observable sequence that satisfies the condition in the predicate.</returns>
  93. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="predicate"/> is null.</exception>
  94. /// <exception cref="InvalidOperationException">No element satisfies the condition in the predicate. -or- The source sequence is empty.</exception>
  95. /// <seealso cref="Observable.FirstAsync{TSource}(IObservable{TSource}, Func{TSource, bool})"/>
  96. #if PREFER_ASYNC
  97. [Obsolete(Constants_Linq.USE_ASYNC)]
  98. #endif
  99. public static TSource First<TSource>(this IObservable<TSource> source, Func<TSource, bool> predicate)
  100. {
  101. if (source == null)
  102. throw new ArgumentNullException("source");
  103. if (predicate == null)
  104. throw new ArgumentNullException("predicate");
  105. return s_impl.First<TSource>(source, predicate);
  106. }
  107. #endregion
  108. #region + FirstOrDefault +
  109. /// <summary>
  110. /// Returns the first element of an observable sequence, or a default value if no such element exists.
  111. /// </summary>
  112. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  113. /// <param name="source">Source observable sequence.</param>
  114. /// <returns>The first element in the observable sequence, or a default value if no such element exists.</returns>
  115. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  116. /// <seealso cref="Observable.FirstOrDefaultAsync{TSource}(IObservable{TSource})"/>
  117. #if PREFER_ASYNC
  118. [Obsolete(Constants_Linq.USE_ASYNC)]
  119. #endif
  120. public static TSource FirstOrDefault<TSource>(this IObservable<TSource> source)
  121. {
  122. if (source == null)
  123. throw new ArgumentNullException("source");
  124. return s_impl.FirstOrDefault<TSource>(source);
  125. }
  126. /// <summary>
  127. /// Returns the first element of an observable sequence that satisfies the condition in the predicate, or a default value if no such element exists.
  128. /// </summary>
  129. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  130. /// <param name="source">Source observable sequence.</param>
  131. /// <param name="predicate">A predicate function to evaluate for elements in the source sequence.</param>
  132. /// <returns>The first element in the observable sequence that satisfies the condition in the predicate, or a default value if no such element exists.</returns>
  133. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="predicate"/> is null.</exception>
  134. /// <seealso cref="Observable.FirstOrDefaultAsync{TSource}(IObservable{TSource}, Func{TSource, bool})"/>
  135. #if PREFER_ASYNC
  136. [Obsolete(Constants_Linq.USE_ASYNC)]
  137. #endif
  138. public static TSource FirstOrDefault<TSource>(this IObservable<TSource> source, Func<TSource, bool> predicate)
  139. {
  140. if (source == null)
  141. throw new ArgumentNullException("source");
  142. if (predicate == null)
  143. throw new ArgumentNullException("predicate");
  144. return s_impl.FirstOrDefault<TSource>(source, predicate);
  145. }
  146. #endregion
  147. #region + ForEach +
  148. /// <summary>
  149. /// Invokes an action for each element in the observable sequence, and blocks until the sequence is terminated.
  150. /// </summary>
  151. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  152. /// <param name="source">Source sequence.</param>
  153. /// <param name="onNext">Action to invoke for each element in the observable sequence.</param>
  154. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="onNext"/> is null.</exception>
  155. /// <remarks>Because of its blocking nature, this operator is mainly used for testing.</remarks>
  156. #if PREFER_ASYNC
  157. [Obsolete(Constants_Linq.USE_ASYNC)]
  158. #endif
  159. public static void ForEach<TSource>(this IObservable<TSource> source, Action<TSource> onNext)
  160. {
  161. if (source == null)
  162. throw new ArgumentNullException("source");
  163. if (onNext == null)
  164. throw new ArgumentNullException("onNext");
  165. s_impl.ForEach<TSource>(source, onNext);
  166. }
  167. /// <summary>
  168. /// Invokes an action for each element in the observable sequence, incorporating the element's index, and blocks until the sequence is terminated.
  169. /// </summary>
  170. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  171. /// <param name="source">Source sequence.</param>
  172. /// <param name="onNext">Action to invoke for each element in the observable sequence.</param>
  173. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="onNext"/> is null.</exception>
  174. /// <remarks>Because of its blocking nature, this operator is mainly used for testing.</remarks>
  175. #if PREFER_ASYNC
  176. [Obsolete(Constants_Linq.USE_ASYNC)]
  177. #endif
  178. public static void ForEach<TSource>(this IObservable<TSource> source, Action<TSource, int> onNext)
  179. {
  180. if (source == null)
  181. throw new ArgumentNullException("source");
  182. if (onNext == null)
  183. throw new ArgumentNullException("onNext");
  184. s_impl.ForEach<TSource>(source, onNext);
  185. }
  186. #endregion
  187. #region + GetEnumerator +
  188. /// <summary>
  189. /// Returns an enumerator that enumerates all values of the observable sequence.
  190. /// </summary>
  191. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  192. /// <param name="source">An observable sequence to get an enumerator for.</param>
  193. /// <returns>The enumerator that can be used to enumerate over the elements in the observable sequence.</returns>
  194. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  195. public static IEnumerator<TSource> GetEnumerator<TSource>(this IObservable<TSource> source)
  196. {
  197. if (source == null)
  198. throw new ArgumentNullException("source");
  199. return s_impl.GetEnumerator<TSource>(source);
  200. }
  201. #endregion
  202. #region + Last +
  203. /// <summary>
  204. /// Returns the last element of an observable sequence.
  205. /// </summary>
  206. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  207. /// <param name="source">Source observable sequence.</param>
  208. /// <returns>The last element in the observable sequence.</returns>
  209. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  210. /// <exception cref="InvalidOperationException">The source sequence is empty.</exception>
  211. /// <seealso cref="Observable.LastAsync{TSource}(IObservable{TSource})"/>
  212. #if PREFER_ASYNC
  213. [Obsolete(Constants_Linq.USE_ASYNC)]
  214. #endif
  215. public static TSource Last<TSource>(this IObservable<TSource> source)
  216. {
  217. if (source == null)
  218. throw new ArgumentNullException("source");
  219. return s_impl.Last<TSource>(source);
  220. }
  221. /// <summary>
  222. /// Returns the last element of an observable sequence that satisfies the condition in the predicate.
  223. /// </summary>
  224. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  225. /// <param name="source">Source observable sequence.</param>
  226. /// <param name="predicate">A predicate function to evaluate for elements in the source sequence.</param>
  227. /// <returns>The last element in the observable sequence that satisfies the condition in the predicate.</returns>
  228. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="predicate"/> is null.</exception>
  229. /// <exception cref="InvalidOperationException">No element satisfies the condition in the predicate. -or- The source sequence is empty.</exception>
  230. /// <seealso cref="Observable.LastAsync{TSource}(IObservable{TSource}, Func{TSource, bool})"/>
  231. #if PREFER_ASYNC
  232. [Obsolete(Constants_Linq.USE_ASYNC)]
  233. #endif
  234. public static TSource Last<TSource>(this IObservable<TSource> source, Func<TSource, bool> predicate)
  235. {
  236. if (source == null)
  237. throw new ArgumentNullException("source");
  238. if (predicate == null)
  239. throw new ArgumentNullException("predicate");
  240. return s_impl.Last<TSource>(source, predicate);
  241. }
  242. #endregion
  243. #region + LastOrDefault +
  244. /// <summary>
  245. /// Returns the last element of an observable sequence, or a default value if no such element exists.
  246. /// </summary>
  247. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  248. /// <param name="source">Source observable sequence.</param>
  249. /// <returns>The last element in the observable sequence, or a default value if no such element exists.</returns>
  250. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  251. /// <seealso cref="Observable.LastOrDefaultAsync{TSource}(IObservable{TSource})"/>
  252. #if PREFER_ASYNC
  253. [Obsolete(Constants_Linq.USE_ASYNC)]
  254. #endif
  255. public static TSource LastOrDefault<TSource>(this IObservable<TSource> source)
  256. {
  257. if (source == null)
  258. throw new ArgumentNullException("source");
  259. return s_impl.LastOrDefault<TSource>(source);
  260. }
  261. /// <summary>
  262. /// Returns the last element of an observable sequence that satisfies the condition in the predicate, or a default value if no such element exists.
  263. /// </summary>
  264. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  265. /// <param name="source">Source observable sequence.</param>
  266. /// <param name="predicate">A predicate function to evaluate for elements in the source sequence.</param>
  267. /// <returns>The last element in the observable sequence that satisfies the condition in the predicate, or a default value if no such element exists.</returns>
  268. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="predicate"/> is null.</exception>
  269. /// <seealso cref="Observable.LastOrDefaultAsync{TSource}(IObservable{TSource}, Func{TSource, bool})"/>
  270. #if PREFER_ASYNC
  271. [Obsolete(Constants_Linq.USE_ASYNC)]
  272. #endif
  273. public static TSource LastOrDefault<TSource>(this IObservable<TSource> source, Func<TSource, bool> predicate)
  274. {
  275. if (source == null)
  276. throw new ArgumentNullException("source");
  277. if (predicate == null)
  278. throw new ArgumentNullException("predicate");
  279. return s_impl.LastOrDefault<TSource>(source, predicate);
  280. }
  281. #endregion
  282. #region + Latest +
  283. /// <summary>
  284. /// Returns an enumerable sequence whose enumeration returns the latest observed element in the source observable sequence.
  285. /// Enumerators on the resulting sequence will never produce the same element repeatedly, and will block until the next element becomes available.
  286. /// </summary>
  287. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  288. /// <param name="source">Source observable sequence.</param>
  289. /// <returns>The enumerable sequence that returns the last sampled element upon each iteration and subsequently blocks until the next element in the observable source sequence becomes available.</returns>
  290. public static IEnumerable<TSource> Latest<TSource>(this IObservable<TSource> source)
  291. {
  292. if (source == null)
  293. throw new ArgumentNullException("source");
  294. return s_impl.Latest<TSource>(source);
  295. }
  296. #endregion
  297. #region + MostRecent +
  298. /// <summary>
  299. /// Returns an enumerable sequence whose enumeration returns the most recently observed element in the source observable sequence, using the specified initial value in case no element has been sampled yet.
  300. /// Enumerators on the resulting sequence never block and can produce the same element repeatedly.
  301. /// </summary>
  302. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  303. /// <param name="source">Source observable sequence.</param>
  304. /// <param name="initialValue">Initial value that will be yielded by the enumerable sequence if no element has been sampled yet.</param>
  305. /// <returns>The enumerable sequence that returns the last sampled element upon each iteration.</returns>
  306. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  307. public static IEnumerable<TSource> MostRecent<TSource>(this IObservable<TSource> source, TSource initialValue)
  308. {
  309. if (source == null)
  310. throw new ArgumentNullException("source");
  311. return s_impl.MostRecent<TSource>(source, initialValue);
  312. }
  313. #endregion
  314. #region + Next +
  315. /// <summary>
  316. /// Returns an enumerable sequence whose enumeration blocks until the next element in the source observable sequence becomes available.
  317. /// Enumerators on the resulting sequence will block until the next element becomes available.
  318. /// </summary>
  319. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  320. /// <param name="source">Source observable sequence.</param>
  321. /// <returns>The enumerable sequence that blocks upon each iteration until the next element in the observable source sequence becomes available.</returns>
  322. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  323. public static IEnumerable<TSource> Next<TSource>(this IObservable<TSource> source)
  324. {
  325. if (source == null)
  326. throw new ArgumentNullException("source");
  327. return s_impl.Next<TSource>(source);
  328. }
  329. #endregion
  330. #region + Single +
  331. /// <summary>
  332. /// Returns the only element of an observable sequence, and throws an exception if there is not exactly one element in the observable sequence.
  333. /// </summary>
  334. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  335. /// <param name="source">Source observable sequence.</param>
  336. /// <returns>The single element in the observable sequence.</returns>
  337. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  338. /// <exception cref="InvalidOperationException">The source sequence contains more than one element. -or- The source sequence is empty.</exception>
  339. /// <seealso cref="Observable.SingleAsync{TSource}(IObservable{TSource})"/>
  340. #if PREFER_ASYNC
  341. [Obsolete(Constants_Linq.USE_ASYNC)]
  342. #endif
  343. public static TSource Single<TSource>(this IObservable<TSource> source)
  344. {
  345. if (source == null)
  346. throw new ArgumentNullException("source");
  347. return s_impl.Single<TSource>(source);
  348. }
  349. /// <summary>
  350. /// Returns the only element of an observable sequence that satisfies the condition in the predicate, and throws an exception if there is not exactly one element matching the predicate in the observable sequence.
  351. /// </summary>
  352. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  353. /// <param name="source">Source observable sequence.</param>
  354. /// <param name="predicate">A predicate function to evaluate for elements in the source sequence.</param>
  355. /// <returns>The single element in the observable sequence that satisfies the condition in the predicate.</returns>
  356. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="predicate"/> is null.</exception>
  357. /// <exception cref="InvalidOperationException">No element satisfies the condition in the predicate. -or- More than one element satisfies the condition in the predicate. -or- The source sequence is empty.</exception>
  358. /// <seealso cref="Observable.SingleAsync{TSource}(IObservable{TSource}, Func{TSource, bool})"/>
  359. #if PREFER_ASYNC
  360. [Obsolete(Constants_Linq.USE_ASYNC)]
  361. #endif
  362. public static TSource Single<TSource>(this IObservable<TSource> source, Func<TSource, bool> predicate)
  363. {
  364. if (source == null)
  365. throw new ArgumentNullException("source");
  366. if (predicate == null)
  367. throw new ArgumentNullException("predicate");
  368. return s_impl.Single<TSource>(source, predicate);
  369. }
  370. #endregion
  371. #region + SingleOrDefault +
  372. /// <summary>
  373. /// Returns the only element of an observable sequence, or a default value if the observable sequence is empty; this method throws an exception if there is more than one element in the observable sequence.
  374. /// </summary>
  375. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  376. /// <param name="source">Source observable sequence.</param>
  377. /// <returns>The single element in the observable sequence, or a default value if no such element exists.</returns>
  378. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  379. /// <exception cref="InvalidOperationException">The source sequence contains more than one element.</exception>
  380. /// <seealso cref="Observable.SingleOrDefaultAsync{TSource}(IObservable{TSource})"/>
  381. #if PREFER_ASYNC
  382. [Obsolete(Constants_Linq.USE_ASYNC)]
  383. #endif
  384. public static TSource SingleOrDefault<TSource>(this IObservable<TSource> source)
  385. {
  386. if (source == null)
  387. throw new ArgumentNullException("source");
  388. return s_impl.SingleOrDefault<TSource>(source);
  389. }
  390. /// <summary>
  391. /// Returns the only element of an observable sequence that satisfies the condition in the predicate, or a default value if no such element exists; this method throws an exception if there is more than one element matching the predicate in the observable sequence.
  392. /// </summary>
  393. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  394. /// <param name="source">Source observable sequence.</param>
  395. /// <param name="predicate">A predicate function to evaluate for elements in the source sequence.</param>
  396. /// <returns>The single element in the observable sequence that satisfies the condition in the predicate, or a default value if no such element exists.</returns>
  397. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="predicate"/> is null.</exception>
  398. /// <exception cref="InvalidOperationException">The sequence contains more than one element that satisfies the condition in the predicate.</exception>
  399. /// <seealso cref="Observable.SingleOrDefaultAsync{TSource}(IObservable{TSource}, Func{TSource, bool})"/>
  400. #if PREFER_ASYNC
  401. [Obsolete(Constants_Linq.USE_ASYNC)]
  402. #endif
  403. public static TSource SingleOrDefault<TSource>(this IObservable<TSource> source, Func<TSource, bool> predicate)
  404. {
  405. if (source == null)
  406. throw new ArgumentNullException("source");
  407. if (predicate == null)
  408. throw new ArgumentNullException("predicate");
  409. return s_impl.SingleOrDefault<TSource>(source, predicate);
  410. }
  411. #endregion
  412. #region + Wait +
  413. /// <summary>
  414. /// Waits for the observable sequence to complete and returns the last element of the sequence.
  415. /// If the sequence terminates with an OnError notification, the exception is throw.
  416. /// </summary>
  417. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  418. /// <param name="source">Source observable sequence.</param>
  419. /// <returns>The last element in the observable sequence.</returns>
  420. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  421. /// <exception cref="InvalidOperationException">The source sequence is empty.</exception>
  422. public static TSource Wait<TSource>(this IObservable<TSource> source)
  423. {
  424. if (source == null)
  425. throw new ArgumentNullException("source");
  426. return s_impl.Wait<TSource>(source);
  427. }
  428. #endregion
  429. }
  430. }