QueryLanguageEx.cs 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Reactive.Concurrency;
  5. using System.Reactive.Disposables;
  6. using System.Reactive.Subjects;
  7. namespace System.Reactive.Linq
  8. {
  9. internal class QueryLanguageEx : IQueryLanguageEx
  10. {
  11. #region Create
  12. public virtual IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, IEnumerable<IObservable<object>>> iteratorMethod)
  13. {
  14. return new AnonymousObservable<TResult>(observer =>
  15. iteratorMethod(observer).Concat().Subscribe(_ => { }, observer.OnError, observer.OnCompleted));
  16. }
  17. public virtual IObservable<Unit> Create(Func<IEnumerable<IObservable<object>>> iteratorMethod)
  18. {
  19. return new AnonymousObservable<Unit>(observer =>
  20. iteratorMethod().Concat().Subscribe(_ => { }, observer.OnError, observer.OnCompleted));
  21. }
  22. #endregion
  23. #region Expand
  24. public virtual IObservable<TSource> Expand<TSource>(IObservable<TSource> source, Func<TSource, IObservable<TSource>> selector, IScheduler scheduler)
  25. {
  26. return new AnonymousObservable<TSource>(observer =>
  27. {
  28. var outGate = new object();
  29. var q = new Queue<IObservable<TSource>>();
  30. var m = new SerialDisposable();
  31. var d = new CompositeDisposable { m };
  32. var activeCount = 0;
  33. var isAcquired = false;
  34. var ensureActive = default(Action);
  35. ensureActive = () =>
  36. {
  37. var isOwner = false;
  38. lock (q)
  39. {
  40. if (q.Count > 0)
  41. {
  42. isOwner = !isAcquired;
  43. isAcquired = true;
  44. }
  45. }
  46. if (isOwner)
  47. {
  48. m.Disposable = scheduler.Schedule(self =>
  49. {
  50. var work = default(IObservable<TSource>);
  51. lock (q)
  52. {
  53. if (q.Count > 0)
  54. work = q.Dequeue();
  55. else
  56. {
  57. isAcquired = false;
  58. return;
  59. }
  60. }
  61. var m1 = new SingleAssignmentDisposable();
  62. d.Add(m1);
  63. m1.Disposable = work.Subscribe(
  64. x =>
  65. {
  66. lock (outGate)
  67. observer.OnNext(x);
  68. var result = default(IObservable<TSource>);
  69. try
  70. {
  71. result = selector(x);
  72. }
  73. catch (Exception exception)
  74. {
  75. lock (outGate)
  76. observer.OnError(exception);
  77. }
  78. lock (q)
  79. {
  80. q.Enqueue(result);
  81. activeCount++;
  82. }
  83. ensureActive();
  84. },
  85. exception =>
  86. {
  87. lock (outGate)
  88. observer.OnError(exception);
  89. },
  90. () =>
  91. {
  92. d.Remove(m1);
  93. var done = false;
  94. lock (q)
  95. {
  96. activeCount--;
  97. if (activeCount == 0)
  98. done = true;
  99. }
  100. if (done)
  101. lock (outGate)
  102. observer.OnCompleted();
  103. });
  104. self();
  105. });
  106. }
  107. };
  108. lock (q)
  109. {
  110. q.Enqueue(source);
  111. activeCount++;
  112. }
  113. ensureActive();
  114. return d;
  115. });
  116. }
  117. public virtual IObservable<TSource> Expand<TSource>(IObservable<TSource> source, Func<TSource, IObservable<TSource>> selector)
  118. {
  119. return source.Expand(selector, SchedulerDefaults.Iteration);
  120. }
  121. #endregion
  122. #region ForkJoin
  123. public virtual IObservable<TResult> ForkJoin<TFirst, TSecond, TResult>(IObservable<TFirst> first, IObservable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)
  124. {
  125. return Combine<TFirst, TSecond, TResult>(first, second, (observer, leftSubscription, rightSubscription) =>
  126. {
  127. var leftStopped = false;
  128. var rightStopped = false;
  129. var hasLeft = false;
  130. var hasRight = false;
  131. var lastLeft = default(TFirst);
  132. var lastRight = default(TSecond);
  133. return new BinaryObserver<TFirst, TSecond>(
  134. left =>
  135. {
  136. switch (left.Kind)
  137. {
  138. case NotificationKind.OnNext:
  139. hasLeft = true;
  140. lastLeft = left.Value;
  141. break;
  142. case NotificationKind.OnError:
  143. rightSubscription.Dispose();
  144. observer.OnError(left.Exception);
  145. break;
  146. case NotificationKind.OnCompleted:
  147. leftStopped = true;
  148. if (rightStopped)
  149. {
  150. if (!hasLeft)
  151. observer.OnCompleted();
  152. else if (!hasRight)
  153. observer.OnCompleted();
  154. else
  155. {
  156. TResult result;
  157. try
  158. {
  159. result = resultSelector(lastLeft, lastRight);
  160. }
  161. catch (Exception exception)
  162. {
  163. observer.OnError(exception);
  164. return;
  165. }
  166. observer.OnNext(result);
  167. observer.OnCompleted();
  168. }
  169. }
  170. break;
  171. }
  172. },
  173. right =>
  174. {
  175. switch (right.Kind)
  176. {
  177. case NotificationKind.OnNext:
  178. hasRight = true;
  179. lastRight = right.Value;
  180. break;
  181. case NotificationKind.OnError:
  182. leftSubscription.Dispose();
  183. observer.OnError(right.Exception);
  184. break;
  185. case NotificationKind.OnCompleted:
  186. rightStopped = true;
  187. if (leftStopped)
  188. {
  189. if (!hasLeft)
  190. observer.OnCompleted();
  191. else if (!hasRight)
  192. observer.OnCompleted();
  193. else
  194. {
  195. TResult result;
  196. try
  197. {
  198. result = resultSelector(lastLeft, lastRight);
  199. }
  200. catch (Exception exception)
  201. {
  202. observer.OnError(exception);
  203. return;
  204. }
  205. observer.OnNext(result);
  206. observer.OnCompleted();
  207. }
  208. }
  209. break;
  210. }
  211. });
  212. });
  213. }
  214. public virtual IObservable<TSource[]> ForkJoin<TSource>(params IObservable<TSource>[] sources)
  215. {
  216. return sources.ForkJoin();
  217. }
  218. public virtual IObservable<TSource[]> ForkJoin<TSource>(IEnumerable<IObservable<TSource>> sources)
  219. {
  220. return new AnonymousObservable<TSource[]>(subscriber =>
  221. {
  222. var allSources = sources.ToArray();
  223. var count = allSources.Length;
  224. if (count == 0)
  225. {
  226. subscriber.OnCompleted();
  227. return Disposable.Empty;
  228. }
  229. var group = new CompositeDisposable(allSources.Length);
  230. var gate = new object();
  231. var finished = false;
  232. var hasResults = new bool[count];
  233. var hasCompleted = new bool[count];
  234. var results = new List<TSource>(count);
  235. lock (gate)
  236. {
  237. for (var index = 0; index < count; index++)
  238. {
  239. var currentIndex = index;
  240. var source = allSources[index];
  241. results.Add(default(TSource));
  242. group.Add(source.Subscribe(
  243. value =>
  244. {
  245. lock (gate)
  246. {
  247. if (!finished)
  248. {
  249. hasResults[currentIndex] = true;
  250. results[currentIndex] = value;
  251. }
  252. }
  253. },
  254. error =>
  255. {
  256. lock (gate)
  257. {
  258. finished = true;
  259. subscriber.OnError(error);
  260. group.Dispose();
  261. }
  262. },
  263. () =>
  264. {
  265. lock (gate)
  266. {
  267. if (!finished)
  268. {
  269. if (!hasResults[currentIndex])
  270. {
  271. subscriber.OnCompleted();
  272. return;
  273. }
  274. hasCompleted[currentIndex] = true;
  275. foreach (var completed in hasCompleted)
  276. {
  277. if (!completed)
  278. return;
  279. }
  280. finished = true;
  281. subscriber.OnNext(results.ToArray());
  282. subscriber.OnCompleted();
  283. }
  284. }
  285. }));
  286. }
  287. }
  288. return group;
  289. });
  290. }
  291. #endregion
  292. #region Let
  293. public virtual IObservable<TResult> Let<TSource, TResult>(IObservable<TSource> source, Func<IObservable<TSource>, IObservable<TResult>> function)
  294. {
  295. return function(source);
  296. }
  297. #endregion
  298. #region ManySelect
  299. public virtual IObservable<TResult> ManySelect<TSource, TResult>(IObservable<TSource> source, Func<IObservable<TSource>, TResult> selector)
  300. {
  301. return ManySelect(source, selector, DefaultScheduler.Instance);
  302. }
  303. public virtual IObservable<TResult> ManySelect<TSource, TResult>(IObservable<TSource> source, Func<IObservable<TSource>, TResult> selector, IScheduler scheduler)
  304. {
  305. return Observable.Defer<TResult>(() =>
  306. {
  307. var chain = default(ChainObservable<TSource>);
  308. return source
  309. .Select(
  310. x =>
  311. {
  312. var curr = new ChainObservable<TSource>(x);
  313. if (chain != null)
  314. chain.OnNext(curr);
  315. chain = curr;
  316. return (IObservable<TSource>)curr;
  317. })
  318. .Do(
  319. _ => { },
  320. exception =>
  321. {
  322. if (chain != null)
  323. chain.OnError(exception);
  324. },
  325. () =>
  326. {
  327. if (chain != null)
  328. chain.OnCompleted();
  329. })
  330. .ObserveOn(scheduler)
  331. .Select(selector);
  332. });
  333. }
  334. class ChainObservable<T> : ISubject<IObservable<T>, T>
  335. {
  336. T head;
  337. AsyncSubject<IObservable<T>> tail = new AsyncSubject<IObservable<T>>();
  338. public ChainObservable(T head)
  339. {
  340. this.head = head;
  341. }
  342. public IDisposable Subscribe(IObserver<T> observer)
  343. {
  344. var g = new CompositeDisposable();
  345. g.Add(CurrentThreadScheduler.Instance.Schedule(() =>
  346. {
  347. observer.OnNext(head);
  348. g.Add(tail.Merge().Subscribe(observer));
  349. }));
  350. return g;
  351. }
  352. public void OnCompleted()
  353. {
  354. OnNext(Observable.Empty<T>());
  355. }
  356. public void OnError(Exception error)
  357. {
  358. OnNext(Observable.Throw<T>(error));
  359. }
  360. public void OnNext(IObservable<T> value)
  361. {
  362. tail.OnNext(value);
  363. tail.OnCompleted();
  364. }
  365. }
  366. #endregion
  367. #region ToListObservable
  368. public virtual ListObservable<TSource> ToListObservable<TSource>(IObservable<TSource> source)
  369. {
  370. return new ListObservable<TSource>(source);
  371. }
  372. #endregion
  373. #region |> Helpers <|
  374. private static IObservable<TResult> Combine<TLeft, TRight, TResult>(IObservable<TLeft> leftSource, IObservable<TRight> rightSource, Func<IObserver<TResult>, IDisposable, IDisposable, IObserver<Either<Notification<TLeft>, Notification<TRight>>>> combinerSelector)
  375. {
  376. return new AnonymousObservable<TResult>(observer =>
  377. {
  378. var leftSubscription = new SingleAssignmentDisposable();
  379. var rightSubscription = new SingleAssignmentDisposable();
  380. var combiner = combinerSelector(observer, leftSubscription, rightSubscription);
  381. var gate = new object();
  382. leftSubscription.Disposable = leftSource.Materialize().Select(x => Either<Notification<TLeft>, Notification<TRight>>.CreateLeft(x)).Synchronize(gate).Subscribe(combiner);
  383. rightSubscription.Disposable = rightSource.Materialize().Select(x => Either<Notification<TLeft>, Notification<TRight>>.CreateRight(x)).Synchronize(gate).Subscribe(combiner);
  384. return new CompositeDisposable(leftSubscription, rightSubscription);
  385. });
  386. }
  387. #endregion
  388. }
  389. }