QueryLanguage.Blocking.cs 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Threading;
  6. namespace System.Reactive.Linq
  7. {
  8. using ObservableImpl;
  9. internal partial class QueryLanguage
  10. {
  11. #region - Chunkify -
  12. public virtual IEnumerable<IList<TSource>> Chunkify<TSource>(IObservable<TSource> source)
  13. {
  14. return source.Collect<TSource, IList<TSource>>(() => new List<TSource>(), (lst, x) => { lst.Add(x); return lst; }, _ => new List<TSource>());
  15. }
  16. #endregion
  17. #region + Collect +
  18. public virtual IEnumerable<TResult> Collect<TSource, TResult>(IObservable<TSource> source, Func<TResult> newCollector, Func<TResult, TSource, TResult> merge)
  19. {
  20. return Collect_(source, newCollector, merge, _ => newCollector());
  21. }
  22. public virtual IEnumerable<TResult> Collect<TSource, TResult>(IObservable<TSource> source, Func<TResult> getInitialCollector, Func<TResult, TSource, TResult> merge, Func<TResult, TResult> getNewCollector)
  23. {
  24. return Collect_(source, getInitialCollector, merge, getNewCollector);
  25. }
  26. private static IEnumerable<TResult> Collect_<TSource, TResult>(IObservable<TSource> source, Func<TResult> getInitialCollector, Func<TResult, TSource, TResult> merge, Func<TResult, TResult> getNewCollector)
  27. {
  28. return new Collect<TSource, TResult>(source, getInitialCollector, merge, getNewCollector);
  29. }
  30. #endregion
  31. #region First
  32. public virtual TSource First<TSource>(IObservable<TSource> source)
  33. {
  34. return FirstOrDefaultInternal(source, true);
  35. }
  36. public virtual TSource First<TSource>(IObservable<TSource> source, Func<TSource, bool> predicate)
  37. {
  38. return First(Where(source, predicate));
  39. }
  40. #endregion
  41. #region FirstOrDefault
  42. public virtual TSource FirstOrDefault<TSource>(IObservable<TSource> source)
  43. {
  44. return FirstOrDefaultInternal(source, false);
  45. }
  46. public virtual TSource FirstOrDefault<TSource>(IObservable<TSource> source, Func<TSource, bool> predicate)
  47. {
  48. return FirstOrDefault(Where(source, predicate));
  49. }
  50. private static TSource FirstOrDefaultInternal<TSource>(IObservable<TSource> source, bool throwOnEmpty)
  51. {
  52. using (var consumer = new FirstBlocking<TSource>())
  53. {
  54. using (var d = source.Subscribe(consumer))
  55. {
  56. consumer.SetUpstream(d);
  57. if (consumer.CurrentCount != 0)
  58. {
  59. consumer.Wait();
  60. }
  61. }
  62. consumer._error.ThrowIfNotNull();
  63. if (throwOnEmpty && !consumer._hasValue)
  64. {
  65. throw new InvalidOperationException(Strings_Linq.NO_ELEMENTS);
  66. }
  67. return consumer._value;
  68. }
  69. }
  70. #endregion
  71. #region + ForEach +
  72. public virtual void ForEach<TSource>(IObservable<TSource> source, Action<TSource> onNext)
  73. {
  74. using (var evt = new WaitAndSetOnce())
  75. {
  76. var sink = new ForEach<TSource>.Observer(onNext, () => evt.Set());
  77. using (source.SubscribeSafe(sink))
  78. {
  79. evt.WaitOne();
  80. }
  81. sink.Error.ThrowIfNotNull();
  82. }
  83. }
  84. public virtual void ForEach<TSource>(IObservable<TSource> source, Action<TSource, int> onNext)
  85. {
  86. using (var evt = new WaitAndSetOnce())
  87. {
  88. var sink = new ForEach<TSource>.ObserverIndexed(onNext, () => evt.Set());
  89. using (source.SubscribeSafe(sink))
  90. {
  91. evt.WaitOne();
  92. }
  93. sink.Error.ThrowIfNotNull();
  94. }
  95. }
  96. #endregion
  97. #region + GetEnumerator +
  98. public virtual IEnumerator<TSource> GetEnumerator<TSource>(IObservable<TSource> source)
  99. {
  100. var e = new GetEnumerator<TSource>();
  101. return e.Run(source);
  102. }
  103. #endregion
  104. #region Last
  105. public virtual TSource Last<TSource>(IObservable<TSource> source)
  106. {
  107. return LastOrDefaultInternal(source, true);
  108. }
  109. public virtual TSource Last<TSource>(IObservable<TSource> source, Func<TSource, bool> predicate)
  110. {
  111. return Last(Where(source, predicate));
  112. }
  113. #endregion
  114. #region LastOrDefault
  115. public virtual TSource LastOrDefault<TSource>(IObservable<TSource> source)
  116. {
  117. return LastOrDefaultInternal(source, false);
  118. }
  119. public virtual TSource LastOrDefault<TSource>(IObservable<TSource> source, Func<TSource, bool> predicate)
  120. {
  121. return LastOrDefault(Where(source, predicate));
  122. }
  123. private static TSource LastOrDefaultInternal<TSource>(IObservable<TSource> source, bool throwOnEmpty)
  124. {
  125. using (var consumer = new LastBlocking<TSource>())
  126. {
  127. using (var d = source.Subscribe(consumer))
  128. {
  129. consumer.SetUpstream(d);
  130. if (consumer.CurrentCount != 0)
  131. {
  132. consumer.Wait();
  133. }
  134. }
  135. consumer._error.ThrowIfNotNull();
  136. if (throwOnEmpty && !consumer._hasValue)
  137. {
  138. throw new InvalidOperationException(Strings_Linq.NO_ELEMENTS);
  139. }
  140. return consumer._value;
  141. }
  142. }
  143. #endregion
  144. #region + Latest +
  145. public virtual IEnumerable<TSource> Latest<TSource>(IObservable<TSource> source)
  146. {
  147. return new Latest<TSource>(source);
  148. }
  149. #endregion
  150. #region + MostRecent +
  151. public virtual IEnumerable<TSource> MostRecent<TSource>(IObservable<TSource> source, TSource initialValue)
  152. {
  153. return new MostRecent<TSource>(source, initialValue);
  154. }
  155. #endregion
  156. #region + Next +
  157. public virtual IEnumerable<TSource> Next<TSource>(IObservable<TSource> source)
  158. {
  159. return new Next<TSource>(source);
  160. }
  161. #endregion
  162. #region Single
  163. public virtual TSource Single<TSource>(IObservable<TSource> source)
  164. {
  165. return SingleOrDefaultInternal(source, true);
  166. }
  167. public virtual TSource Single<TSource>(IObservable<TSource> source, Func<TSource, bool> predicate)
  168. {
  169. return Single(Where(source, predicate));
  170. }
  171. #endregion
  172. #region SingleOrDefault
  173. public virtual TSource SingleOrDefault<TSource>(IObservable<TSource> source)
  174. {
  175. return SingleOrDefaultInternal(source, false);
  176. }
  177. public virtual TSource SingleOrDefault<TSource>(IObservable<TSource> source, Func<TSource, bool> predicate)
  178. {
  179. return SingleOrDefault(Where(source, predicate));
  180. }
  181. private static TSource SingleOrDefaultInternal<TSource>(IObservable<TSource> source, bool throwOnEmpty)
  182. {
  183. var value = default(TSource);
  184. var seenValue = false;
  185. var ex = default(Exception);
  186. using (var evt = new WaitAndSetOnce())
  187. {
  188. //
  189. // [OK] Use of unsafe Subscribe: fine to throw to our caller, behavior indistinguishable from going through the sink.
  190. //
  191. using (source.Subscribe/*Unsafe*/(new AnonymousObserver<TSource>(
  192. v =>
  193. {
  194. if (seenValue)
  195. {
  196. ex = new InvalidOperationException(Strings_Linq.MORE_THAN_ONE_ELEMENT);
  197. evt.Set();
  198. }
  199. value = v;
  200. seenValue = true;
  201. },
  202. e =>
  203. {
  204. ex = e;
  205. evt.Set();
  206. },
  207. () =>
  208. {
  209. evt.Set();
  210. })))
  211. {
  212. evt.WaitOne();
  213. }
  214. }
  215. ex.ThrowIfNotNull();
  216. if (throwOnEmpty && !seenValue)
  217. throw new InvalidOperationException(Strings_Linq.NO_ELEMENTS);
  218. return value;
  219. }
  220. #endregion
  221. #region Wait
  222. public virtual TSource Wait<TSource>(IObservable<TSource> source)
  223. {
  224. return LastOrDefaultInternal(source, true);
  225. }
  226. #endregion
  227. #region |> Helpers <|
  228. class WaitAndSetOnce : IDisposable
  229. {
  230. private readonly ManualResetEvent _evt;
  231. private int _hasSet;
  232. public WaitAndSetOnce()
  233. {
  234. _evt = new ManualResetEvent(false);
  235. }
  236. public void Set()
  237. {
  238. if (Interlocked.Exchange(ref _hasSet, 1) == 0)
  239. {
  240. _evt.Set();
  241. }
  242. }
  243. public void WaitOne()
  244. {
  245. _evt.WaitOne();
  246. }
  247. public void Dispose()
  248. {
  249. #if HAS_MREEXPLICITDISPOSABLE
  250. ((IDisposable)_evt).Dispose();
  251. #else
  252. _evt.Dispose();
  253. #endif
  254. }
  255. }
  256. #endregion
  257. }
  258. }