QueryLanguage.Blocking.cs 14 KB


  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. using System.Collections;
  3. using System.Collections.Generic;
  4. using System.Threading;
  5. using System.Reactive.Disposables;
  6. #if NO_SEMAPHORE
  7. using System.Reactive.Threading;
  8. #endif
  9. namespace System.Reactive.Linq
  10. {
  11. #if !NO_PERF
  12. using Observαble;
  13. #endif
  14. internal partial class QueryLanguage
  15. {
  16. #region - Chunkify -
  17. public virtual IEnumerable<IList<TSource>> Chunkify<TSource>(IObservable<TSource> source)
  18. {
  19. return source.Collect<TSource, IList<TSource>>(() => new List<TSource>(), (lst, x) => { lst.Add(x); return lst; }, _ => new List<TSource>());
  20. }
  21. #endregion
  22. #region + Collect +
  23. public virtual IEnumerable<TResult> Collect<TSource, TResult>(IObservable<TSource> source, Func<TResult> newCollector, Func<TResult, TSource, TResult> merge)
  24. {
  25. return Collect_<TSource, TResult>(source, newCollector, merge, _ => newCollector());
  26. }
  27. public virtual IEnumerable<TResult> Collect<TSource, TResult>(IObservable<TSource> source, Func<TResult> getInitialCollector, Func<TResult, TSource, TResult> merge, Func<TResult, TResult> getNewCollector)
  28. {
  29. return Collect_<TSource, TResult>(source, getInitialCollector, merge, getNewCollector);
  30. }
  31. private static IEnumerable<TResult> Collect_<TSource, TResult>(IObservable<TSource> source, Func<TResult> getInitialCollector, Func<TResult, TSource, TResult> merge, Func<TResult, TResult> getNewCollector)
  32. {
  33. #if !NO_PERF
  34. return new Collect<TSource, TResult>(source, getInitialCollector, merge, getNewCollector);
  35. #else
  36. return new AnonymousEnumerable<TResult>(() =>
  37. {
  38. var c = getInitialCollector();
  39. var f = default(Notification<TSource>);
  40. var o = new object();
  41. var done = false;
  42. return PushToPull<TSource, TResult>(
  43. source,
  44. x =>
  45. {
  46. lock (o)
  47. {
  48. if (x.HasValue)
  49. {
  50. try
  51. {
  52. c = merge(c, x.Value);
  53. }
  54. catch (Exception ex)
  55. {
  56. f = Notification.CreateOnError<TSource>(ex);
  57. }
  58. }
  59. else
  60. f = x;
  61. }
  62. },
  63. () =>
  64. {
  65. if (f != null)
  66. {
  67. if (f.Kind == NotificationKind.OnError)
  68. {
  69. return Notification.CreateOnError<TResult>(f.Exception);
  70. }
  71. else
  72. {
  73. if (done)
  74. return Notification.CreateOnCompleted<TResult>();
  75. else
  76. done = true;
  77. }
  78. }
  79. var l = default(TResult);
  80. lock (o)
  81. {
  82. l = c;
  83. c = getNewCollector(c);
  84. }
  85. return Notification.CreateOnNext(l);
  86. }
  87. );
  88. });
  89. #endif
  90. }
  91. #endregion
  92. #region First
  93. public virtual TSource First<TSource>(IObservable<TSource> source)
  94. {
  95. return FirstOrDefaultInternal(source, true);
  96. }
  97. public virtual TSource First<TSource>(IObservable<TSource> source, Func<TSource, bool> predicate)
  98. {
  99. return First(Where(source, predicate));
  100. }
  101. #endregion
  102. #region FirstOrDefault
  103. public virtual TSource FirstOrDefault<TSource>(IObservable<TSource> source)
  104. {
  105. return FirstOrDefaultInternal(source, false);
  106. }
  107. public virtual TSource FirstOrDefault<TSource>(IObservable<TSource> source, Func<TSource, bool> predicate)
  108. {
  109. return FirstOrDefault(Where(source, predicate));
  110. }
  111. private static TSource FirstOrDefaultInternal<TSource>(IObservable<TSource> source, bool throwOnEmpty)
  112. {
  113. var value = default(TSource);
  114. var seenValue = false;
  115. var ex = default(Exception);
  116. var evt = new ManualResetEvent(false);
  117. //
  118. // [OK] Use of unsafe Subscribe: fine to throw to our caller, behavior indistinguishable from going through the sink.
  119. //
  120. using (source.Subscribe/*Unsafe*/(new AnonymousObserver<TSource>(
  121. v =>
  122. {
  123. if (!seenValue)
  124. {
  125. value = v;
  126. }
  127. seenValue = true;
  128. evt.Set();
  129. },
  130. e =>
  131. {
  132. ex = e;
  133. evt.Set();
  134. },
  135. () =>
  136. {
  137. evt.Set();
  138. })))
  139. {
  140. evt.WaitOne();
  141. }
  142. ex.ThrowIfNotNull();
  143. if (throwOnEmpty && !seenValue)
  144. throw new InvalidOperationException(Strings_Linq.NO_ELEMENTS);
  145. return value;
  146. }
  147. #endregion
  148. #region + ForEach +
  149. public virtual void ForEach<TSource>(IObservable<TSource> source, Action<TSource> onNext)
  150. {
  151. #if !NO_PERF
  152. var evt = new ManualResetEvent(false);
  153. var sink = new ForEach<TSource>._(onNext, () => evt.Set());
  154. using (source.SubscribeSafe(sink))
  155. {
  156. evt.WaitOne();
  157. }
  158. sink.Error.ThrowIfNotNull();
  159. #else
  160. ForEach_(source, onNext);
  161. #endif
  162. }
  163. public virtual void ForEach<TSource>(IObservable<TSource> source, Action<TSource, int> onNext)
  164. {
  165. #if !NO_PERF
  166. var evt = new ManualResetEvent(false);
  167. var sink = new ForEach<TSource>.τ(onNext, () => evt.Set());
  168. using (source.SubscribeSafe(sink))
  169. {
  170. evt.WaitOne();
  171. }
  172. sink.Error.ThrowIfNotNull();
  173. #else
  174. var i = 0;
  175. ForEach_(source, x => onNext(x, checked(i++)));
  176. #endif
  177. }
  178. #if NO_PERF
  179. private static void ForEach_<TSource>(IObservable<TSource> source, Action<TSource> onNext)
  180. {
  181. var exception = default(Exception);
  182. var evt = new ManualResetEvent(false);
  183. using (source.Subscribe(
  184. x =>
  185. {
  186. try
  187. {
  188. onNext(x);
  189. }
  190. catch (Exception ex)
  191. {
  192. exception = ex;
  193. evt.Set();
  194. }
  195. },
  196. ex =>
  197. {
  198. exception = ex;
  199. evt.Set();
  200. },
  201. () => evt.Set()
  202. ))
  203. {
  204. evt.WaitOne();
  205. }
  206. if (exception != null)
  207. exception.Throw();
  208. }
  209. #endif
  210. #endregion
  211. #region + GetEnumerator +
  212. public virtual IEnumerator<TSource> GetEnumerator<TSource>(IObservable<TSource> source)
  213. {
  214. #if !NO_PERF && !NO_CDS
  215. var e = new GetEnumerator<TSource>();
  216. return e.Run(source);
  217. #else
  218. var q = new Queue<Notification<TSource>>();
  219. var s = new Semaphore(0, int.MaxValue);
  220. return PushToPull(
  221. source,
  222. x =>
  223. {
  224. lock (q)
  225. q.Enqueue(x);
  226. s.Release();
  227. },
  228. () =>
  229. {
  230. s.WaitOne();
  231. lock (q)
  232. return q.Dequeue();
  233. });
  234. #endif
  235. }
  236. #endregion
  237. #region Last
  238. public virtual TSource Last<TSource>(IObservable<TSource> source)
  239. {
  240. return LastOrDefaultInternal(source, true);
  241. }
  242. public virtual TSource Last<TSource>(IObservable<TSource> source, Func<TSource, bool> predicate)
  243. {
  244. return Last(Where(source, predicate));
  245. }
  246. #endregion
  247. #region LastOrDefault
  248. public virtual TSource LastOrDefault<TSource>(IObservable<TSource> source)
  249. {
  250. return LastOrDefaultInternal(source, false);
  251. }
  252. public virtual TSource LastOrDefault<TSource>(IObservable<TSource> source, Func<TSource, bool> predicate)
  253. {
  254. return LastOrDefault(Where(source, predicate));
  255. }
  256. private static TSource LastOrDefaultInternal<TSource>(IObservable<TSource> source, bool throwOnEmpty)
  257. {
  258. var value = default(TSource);
  259. var seenValue = false;
  260. var ex = default(Exception);
  261. var evt = new ManualResetEvent(false);
  262. //
  263. // [OK] Use of unsafe Subscribe: fine to throw to our caller, behavior indistinguishable from going through the sink.
  264. //
  265. using (source.Subscribe/*Unsafe*/(new AnonymousObserver<TSource>(
  266. v =>
  267. {
  268. seenValue = true;
  269. value = v;
  270. },
  271. e =>
  272. {
  273. ex = e;
  274. evt.Set();
  275. },
  276. () =>
  277. {
  278. evt.Set();
  279. })))
  280. {
  281. evt.WaitOne();
  282. }
  283. ex.ThrowIfNotNull();
  284. if (throwOnEmpty && !seenValue)
  285. throw new InvalidOperationException(Strings_Linq.NO_ELEMENTS);
  286. return value;
  287. }
  288. #endregion
  289. #region + Latest +
  290. public virtual IEnumerable<TSource> Latest<TSource>(IObservable<TSource> source)
  291. {
  292. return new Latest<TSource>(source);
  293. }
  294. #endregion
  295. #region + MostRecent +
  296. public virtual IEnumerable<TSource> MostRecent<TSource>(IObservable<TSource> source, TSource initialValue)
  297. {
  298. return new MostRecent<TSource>(source, initialValue);
  299. }
  300. #endregion
  301. #region + Next +
  302. public virtual IEnumerable<TSource> Next<TSource>(IObservable<TSource> source)
  303. {
  304. return new Next<TSource>(source);
  305. }
  306. #endregion
  307. #region Single
  308. public virtual TSource Single<TSource>(IObservable<TSource> source)
  309. {
  310. return SingleOrDefaultInternal(source, true);
  311. }
  312. public virtual TSource Single<TSource>(IObservable<TSource> source, Func<TSource, bool> predicate)
  313. {
  314. return Single(Where(source, predicate));
  315. }
  316. #endregion
  317. #region SingleOrDefault
  318. public virtual TSource SingleOrDefault<TSource>(IObservable<TSource> source)
  319. {
  320. return SingleOrDefaultInternal(source, false);
  321. }
  322. public virtual TSource SingleOrDefault<TSource>(IObservable<TSource> source, Func<TSource, bool> predicate)
  323. {
  324. return SingleOrDefault(Where(source, predicate));
  325. }
  326. private static TSource SingleOrDefaultInternal<TSource>(IObservable<TSource> source, bool throwOnEmpty)
  327. {
  328. var value = default(TSource);
  329. var seenValue = false;
  330. var ex = default(Exception);
  331. var evt = new ManualResetEvent(false);
  332. //
  333. // [OK] Use of unsafe Subscribe: fine to throw to our caller, behavior indistinguishable from going through the sink.
  334. //
  335. using (source.Subscribe/*Unsafe*/(new AnonymousObserver<TSource>(
  336. v =>
  337. {
  338. if (seenValue)
  339. {
  340. ex = new InvalidOperationException(Strings_Linq.MORE_THAN_ONE_ELEMENT);
  341. evt.Set();
  342. }
  343. value = v;
  344. seenValue = true;
  345. },
  346. e =>
  347. {
  348. ex = e;
  349. evt.Set();
  350. },
  351. () =>
  352. {
  353. evt.Set();
  354. })))
  355. {
  356. evt.WaitOne();
  357. }
  358. ex.ThrowIfNotNull();
  359. if (throwOnEmpty && !seenValue)
  360. throw new InvalidOperationException(Strings_Linq.NO_ELEMENTS);
  361. return value;
  362. }
  363. #endregion
  364. #region Wait
  365. public virtual TSource Wait<TSource>(IObservable<TSource> source)
  366. {
  367. return LastOrDefaultInternal(source, true);
  368. }
  369. #endregion
  370. #region |> Helpers <|
  371. #if NO_CDS || NO_PERF
  372. private static IEnumerator<TResult> PushToPull<TSource, TResult>(IObservable<TSource> source, Action<Notification<TSource>> push, Func<Notification<TResult>> pull)
  373. {
  374. var subscription = new SingleAssignmentDisposable();
  375. var adapter = new PushPullAdapter<TSource, TResult>(push, pull, subscription.Dispose);
  376. subscription.Disposable = source.SubscribeSafe(adapter);
  377. return adapter;
  378. }
  379. #endif
  380. #endregion
  381. }
  382. }