QueryLanguage.Blocking.cs 15 KB


  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. using System.Collections;
  3. using System.Collections.Generic;
  4. using System.Threading;
  5. using System.Reactive.Disposables;
  6. #if NO_SEMAPHORE
  7. using System.Reactive.Threading;
  8. #endif
  9. namespace System.Reactive.Linq
  10. {
  11. #if !NO_PERF
  12. using ObservableImpl;
  13. #endif
  14. internal partial class QueryLanguage
  15. {
  16. #region - Chunkify -
  17. public virtual IEnumerable<IList<TSource>> Chunkify<TSource>(IObservable<TSource> source)
  18. {
  19. return source.Collect<TSource, IList<TSource>>(() => new List<TSource>(), (lst, x) => { lst.Add(x); return lst; }, _ => new List<TSource>());
  20. }
  21. #endregion
  22. #region + Collect +
  23. public virtual IEnumerable<TResult> Collect<TSource, TResult>(IObservable<TSource> source, Func<TResult> newCollector, Func<TResult, TSource, TResult> merge)
  24. {
  25. return Collect_<TSource, TResult>(source, newCollector, merge, _ => newCollector());
  26. }
  27. public virtual IEnumerable<TResult> Collect<TSource, TResult>(IObservable<TSource> source, Func<TResult> getInitialCollector, Func<TResult, TSource, TResult> merge, Func<TResult, TResult> getNewCollector)
  28. {
  29. return Collect_<TSource, TResult>(source, getInitialCollector, merge, getNewCollector);
  30. }
  31. private static IEnumerable<TResult> Collect_<TSource, TResult>(IObservable<TSource> source, Func<TResult> getInitialCollector, Func<TResult, TSource, TResult> merge, Func<TResult, TResult> getNewCollector)
  32. {
  33. #if !NO_PERF
  34. return new Collect<TSource, TResult>(source, getInitialCollector, merge, getNewCollector);
  35. #else
  36. return new AnonymousEnumerable<TResult>(() =>
  37. {
  38. var c = getInitialCollector();
  39. var f = default(Notification<TSource>);
  40. var o = new object();
  41. var done = false;
  42. return PushToPull<TSource, TResult>(
  43. source,
  44. x =>
  45. {
  46. lock (o)
  47. {
  48. if (x.HasValue)
  49. {
  50. try
  51. {
  52. c = merge(c, x.Value);
  53. }
  54. catch (Exception ex)
  55. {
  56. f = Notification.CreateOnError<TSource>(ex);
  57. }
  58. }
  59. else
  60. f = x;
  61. }
  62. },
  63. () =>
  64. {
  65. if (f != null)
  66. {
  67. if (f.Kind == NotificationKind.OnError)
  68. {
  69. return Notification.CreateOnError<TResult>(f.Exception);
  70. }
  71. else
  72. {
  73. if (done)
  74. return Notification.CreateOnCompleted<TResult>();
  75. else
  76. done = true;
  77. }
  78. }
  79. var l = default(TResult);
  80. lock (o)
  81. {
  82. l = c;
  83. c = getNewCollector(c);
  84. }
  85. return Notification.CreateOnNext(l);
  86. }
  87. );
  88. });
  89. #endif
  90. }
  91. #endregion
  92. #region First
  93. public virtual TSource First<TSource>(IObservable<TSource> source)
  94. {
  95. return FirstOrDefaultInternal(source, true);
  96. }
  97. public virtual TSource First<TSource>(IObservable<TSource> source, Func<TSource, bool> predicate)
  98. {
  99. return First(Where(source, predicate));
  100. }
  101. #endregion
  102. #region FirstOrDefault
  103. public virtual TSource FirstOrDefault<TSource>(IObservable<TSource> source)
  104. {
  105. return FirstOrDefaultInternal(source, false);
  106. }
  107. public virtual TSource FirstOrDefault<TSource>(IObservable<TSource> source, Func<TSource, bool> predicate)
  108. {
  109. return FirstOrDefault(Where(source, predicate));
  110. }
  111. private static TSource FirstOrDefaultInternal<TSource>(IObservable<TSource> source, bool throwOnEmpty)
  112. {
  113. var value = default(TSource);
  114. var seenValue = false;
  115. var ex = default(Exception);
  116. using (var evt = new WaitAndSetOnce())
  117. {
  118. //
  119. // [OK] Use of unsafe Subscribe: fine to throw to our caller, behavior indistinguishable from going through the sink.
  120. //
  121. using (source.Subscribe/*Unsafe*/(new AnonymousObserver<TSource>(
  122. v =>
  123. {
  124. if (!seenValue)
  125. {
  126. value = v;
  127. }
  128. seenValue = true;
  129. evt.Set();
  130. },
  131. e =>
  132. {
  133. ex = e;
  134. evt.Set();
  135. },
  136. () =>
  137. {
  138. evt.Set();
  139. })))
  140. {
  141. evt.WaitOne();
  142. }
  143. }
  144. ex.ThrowIfNotNull();
  145. if (throwOnEmpty && !seenValue)
  146. throw new InvalidOperationException(Strings_Linq.NO_ELEMENTS);
  147. return value;
  148. }
  149. #endregion
  150. #region + ForEach +
  151. public virtual void ForEach<TSource>(IObservable<TSource> source, Action<TSource> onNext)
  152. {
  153. #if !NO_PERF
  154. using (var evt = new WaitAndSetOnce())
  155. {
  156. var sink = new ForEach<TSource>._(onNext, () => evt.Set());
  157. using (source.SubscribeSafe(sink))
  158. {
  159. evt.WaitOne();
  160. }
  161. sink.Error.ThrowIfNotNull();
  162. }
  163. #else
  164. ForEach_(source, onNext);
  165. #endif
  166. }
  167. public virtual void ForEach<TSource>(IObservable<TSource> source, Action<TSource, int> onNext)
  168. {
  169. #if !NO_PERF
  170. using (var evt = new WaitAndSetOnce())
  171. {
  172. var sink = new ForEach<TSource>.ForEachImpl(onNext, () => evt.Set());
  173. using (source.SubscribeSafe(sink))
  174. {
  175. evt.WaitOne();
  176. }
  177. sink.Error.ThrowIfNotNull();
  178. }
  179. #else
  180. var i = 0;
  181. ForEach_(source, x => onNext(x, checked(i++)));
  182. #endif
  183. }
  184. #if NO_PERF
  185. private static void ForEach_<TSource>(IObservable<TSource> source, Action<TSource> onNext)
  186. {
  187. var exception = default(Exception);
  188. using (var evt = new ManualResetEvent(false))
  189. {
  190. using (source.Subscribe(
  191. x =>
  192. {
  193. try
  194. {
  195. onNext(x);
  196. }
  197. catch (Exception ex)
  198. {
  199. exception = ex;
  200. evt.Set();
  201. }
  202. },
  203. ex =>
  204. {
  205. exception = ex;
  206. evt.Set();
  207. },
  208. () => evt.Set()
  209. ))
  210. {
  211. evt.WaitOne();
  212. }
  213. }
  214. if (exception != null)
  215. exception.Throw();
  216. }
  217. #endif
  218. #endregion
  219. #region + GetEnumerator +
  220. public virtual IEnumerator<TSource> GetEnumerator<TSource>(IObservable<TSource> source)
  221. {
  222. #if !NO_PERF && !NO_CDS
  223. var e = new GetEnumerator<TSource>();
  224. return e.Run(source);
  225. #else
  226. var q = new Queue<Notification<TSource>>();
  227. var s = new Semaphore(0, int.MaxValue);
  228. return PushToPull(
  229. source,
  230. x =>
  231. {
  232. lock (q)
  233. q.Enqueue(x);
  234. s.Release();
  235. },
  236. () =>
  237. {
  238. s.WaitOne();
  239. lock (q)
  240. return q.Dequeue();
  241. });
  242. #endif
  243. }
  244. #endregion
  245. #region Last
  246. public virtual TSource Last<TSource>(IObservable<TSource> source)
  247. {
  248. return LastOrDefaultInternal(source, true);
  249. }
  250. public virtual TSource Last<TSource>(IObservable<TSource> source, Func<TSource, bool> predicate)
  251. {
  252. return Last(Where(source, predicate));
  253. }
  254. #endregion
  255. #region LastOrDefault
  256. public virtual TSource LastOrDefault<TSource>(IObservable<TSource> source)
  257. {
  258. return LastOrDefaultInternal(source, false);
  259. }
  260. public virtual TSource LastOrDefault<TSource>(IObservable<TSource> source, Func<TSource, bool> predicate)
  261. {
  262. return LastOrDefault(Where(source, predicate));
  263. }
  264. private static TSource LastOrDefaultInternal<TSource>(IObservable<TSource> source, bool throwOnEmpty)
  265. {
  266. var value = default(TSource);
  267. var seenValue = false;
  268. var ex = default(Exception);
  269. using (var evt = new WaitAndSetOnce())
  270. {
  271. //
  272. // [OK] Use of unsafe Subscribe: fine to throw to our caller, behavior indistinguishable from going through the sink.
  273. //
  274. using (source.Subscribe/*Unsafe*/(new AnonymousObserver<TSource>(
  275. v =>
  276. {
  277. seenValue = true;
  278. value = v;
  279. },
  280. e =>
  281. {
  282. ex = e;
  283. evt.Set();
  284. },
  285. () =>
  286. {
  287. evt.Set();
  288. })))
  289. {
  290. evt.WaitOne();
  291. }
  292. }
  293. ex.ThrowIfNotNull();
  294. if (throwOnEmpty && !seenValue)
  295. throw new InvalidOperationException(Strings_Linq.NO_ELEMENTS);
  296. return value;
  297. }
  298. #endregion
  299. #region + Latest +
  300. public virtual IEnumerable<TSource> Latest<TSource>(IObservable<TSource> source)
  301. {
  302. return new Latest<TSource>(source);
  303. }
  304. #endregion
  305. #region + MostRecent +
  306. public virtual IEnumerable<TSource> MostRecent<TSource>(IObservable<TSource> source, TSource initialValue)
  307. {
  308. return new MostRecent<TSource>(source, initialValue);
  309. }
  310. #endregion
  311. #region + Next +
  312. public virtual IEnumerable<TSource> Next<TSource>(IObservable<TSource> source)
  313. {
  314. return new Next<TSource>(source);
  315. }
  316. #endregion
  317. #region Single
  318. public virtual TSource Single<TSource>(IObservable<TSource> source)
  319. {
  320. return SingleOrDefaultInternal(source, true);
  321. }
  322. public virtual TSource Single<TSource>(IObservable<TSource> source, Func<TSource, bool> predicate)
  323. {
  324. return Single(Where(source, predicate));
  325. }
  326. #endregion
  327. #region SingleOrDefault
  328. public virtual TSource SingleOrDefault<TSource>(IObservable<TSource> source)
  329. {
  330. return SingleOrDefaultInternal(source, false);
  331. }
  332. public virtual TSource SingleOrDefault<TSource>(IObservable<TSource> source, Func<TSource, bool> predicate)
  333. {
  334. return SingleOrDefault(Where(source, predicate));
  335. }
  336. private static TSource SingleOrDefaultInternal<TSource>(IObservable<TSource> source, bool throwOnEmpty)
  337. {
  338. var value = default(TSource);
  339. var seenValue = false;
  340. var ex = default(Exception);
  341. using (var evt = new WaitAndSetOnce())
  342. {
  343. //
  344. // [OK] Use of unsafe Subscribe: fine to throw to our caller, behavior indistinguishable from going through the sink.
  345. //
  346. using (source.Subscribe/*Unsafe*/(new AnonymousObserver<TSource>(
  347. v =>
  348. {
  349. if (seenValue)
  350. {
  351. ex = new InvalidOperationException(Strings_Linq.MORE_THAN_ONE_ELEMENT);
  352. evt.Set();
  353. }
  354. value = v;
  355. seenValue = true;
  356. },
  357. e =>
  358. {
  359. ex = e;
  360. evt.Set();
  361. },
  362. () =>
  363. {
  364. evt.Set();
  365. })))
  366. {
  367. evt.WaitOne();
  368. }
  369. }
  370. ex.ThrowIfNotNull();
  371. if (throwOnEmpty && !seenValue)
  372. throw new InvalidOperationException(Strings_Linq.NO_ELEMENTS);
  373. return value;
  374. }
  375. #endregion
  376. #region Wait
  377. public virtual TSource Wait<TSource>(IObservable<TSource> source)
  378. {
  379. return LastOrDefaultInternal(source, true);
  380. }
  381. #endregion
  382. #region |> Helpers <|
  383. #if NO_CDS || NO_PERF
  384. private static IEnumerator<TResult> PushToPull<TSource, TResult>(IObservable<TSource> source, Action<Notification<TSource>> push, Func<Notification<TResult>> pull)
  385. {
  386. var subscription = new SingleAssignmentDisposable();
  387. var adapter = new PushPullAdapter<TSource, TResult>(push, pull, subscription.Dispose);
  388. subscription.Disposable = source.SubscribeSafe(adapter);
  389. return adapter;
  390. }
  391. #endif
  392. class WaitAndSetOnce : IDisposable
  393. {
  394. private readonly ManualResetEvent _evt;
  395. private int _hasSet;
  396. public WaitAndSetOnce()
  397. {
  398. _evt = new ManualResetEvent(false);
  399. }
  400. public void Set()
  401. {
  402. if (Interlocked.Exchange(ref _hasSet, 1) == 0)
  403. {
  404. _evt.Set();
  405. }
  406. }
  407. public void WaitOne()
  408. {
  409. _evt.WaitOne();
  410. }
  411. public void Dispose()
  412. {
  413. _evt.Dispose();
  414. }
  415. }
  416. #endregion
  417. }
  418. }