QueryLanguage.Single.cs 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Reactive.Concurrency;
  5. using System.Reactive.Disposables;
  6. using System.Reactive.Subjects;
  7. namespace System.Reactive.Linq
  8. {
  9. #if !NO_PERF
  10. using Observαble;
  11. #endif
  12. internal partial class QueryLanguage
  13. {
  14. #region + AsObservable +
  15. public virtual IObservable<TSource> AsObservable<TSource>(IObservable<TSource> source)
  16. {
  17. #if !NO_PERF
  18. var asObservable = source as AsObservable<TSource>;
  19. if (asObservable != null)
  20. return asObservable.Ω();
  21. return new AsObservable<TSource>(source);
  22. #else
  23. return new AnonymousObservable<TSource>(observer => source.Subscribe(observer));
  24. #endif
  25. }
  26. #endregion
  27. #region + Buffer +
  28. public virtual IObservable<IList<TSource>> Buffer<TSource>(IObservable<TSource> source, int count)
  29. {
  30. return Buffer_<TSource>(source, count, count);
  31. }
  32. public virtual IObservable<IList<TSource>> Buffer<TSource>(IObservable<TSource> source, int count, int skip)
  33. {
  34. return Buffer_<TSource>(source, count, skip);
  35. }
  36. private static IObservable<IList<TSource>> Buffer_<TSource>(IObservable<TSource> source, int count, int skip)
  37. {
  38. #if !NO_PERF
  39. return new Buffer<TSource>(source, count, skip);
  40. #else
  41. return Window_<TSource>(source, count, skip).SelectMany(Observable.ToList).Where(list => list.Count > 0);
  42. #endif
  43. }
  44. #endregion
  45. #region + Dematerialize +
  46. public virtual IObservable<TSource> Dematerialize<TSource>(IObservable<Notification<TSource>> source)
  47. {
  48. #if !NO_PERF
  49. var materialize = source as Materialize<TSource>;
  50. if (materialize != null)
  51. return materialize.Dematerialize();
  52. return new Dematerialize<TSource>(source);
  53. #else
  54. return new AnonymousObservable<TSource>(observer =>
  55. source.Subscribe(x => x.Accept(observer), observer.OnError, observer.OnCompleted));
  56. #endif
  57. }
  58. #endregion
  59. #region + DistinctUntilChanged +
  60. public virtual IObservable<TSource> DistinctUntilChanged<TSource>(IObservable<TSource> source)
  61. {
  62. return DistinctUntilChanged_(source, x => x, EqualityComparer<TSource>.Default);
  63. }
  64. public virtual IObservable<TSource> DistinctUntilChanged<TSource>(IObservable<TSource> source, IEqualityComparer<TSource> comparer)
  65. {
  66. return DistinctUntilChanged_(source, x => x, comparer);
  67. }
  68. public virtual IObservable<TSource> DistinctUntilChanged<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector)
  69. {
  70. return DistinctUntilChanged_(source, keySelector, EqualityComparer<TKey>.Default);
  71. }
  72. public virtual IObservable<TSource> DistinctUntilChanged<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer)
  73. {
  74. return DistinctUntilChanged_(source, keySelector, comparer);
  75. }
  76. private static IObservable<TSource> DistinctUntilChanged_<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer)
  77. {
  78. #if !NO_PERF
  79. return new DistinctUntilChanged<TSource, TKey>(source, keySelector, comparer);
  80. #else
  81. return new AnonymousObservable<TSource>(observer =>
  82. {
  83. var currentKey = default(TKey);
  84. var hasCurrentKey = false;
  85. return source.Subscribe(
  86. value =>
  87. {
  88. var key = default(TKey);
  89. try
  90. {
  91. key = keySelector(value);
  92. }
  93. catch (Exception exception)
  94. {
  95. observer.OnError(exception);
  96. return;
  97. }
  98. var comparerEquals = false;
  99. if (hasCurrentKey)
  100. {
  101. try
  102. {
  103. comparerEquals = comparer.Equals(currentKey, key);
  104. }
  105. catch (Exception exception)
  106. {
  107. observer.OnError(exception);
  108. return;
  109. }
  110. }
  111. if (!hasCurrentKey || !comparerEquals)
  112. {
  113. hasCurrentKey = true;
  114. currentKey = key;
  115. observer.OnNext(value);
  116. }
  117. },
  118. observer.OnError,
  119. observer.OnCompleted);
  120. });
  121. #endif
  122. }
  123. #endregion
  124. #region + Do +
  125. public virtual IObservable<TSource> Do<TSource>(IObservable<TSource> source, Action<TSource> onNext)
  126. {
  127. #if !NO_PERF
  128. return Do_<TSource>(source, onNext, Stubs<Exception>.Ignore, Stubs.Nop);
  129. #else
  130. // PERFORMANCE - Use of Select allows for operator coalescing
  131. return source.Select(
  132. x =>
  133. {
  134. onNext(x);
  135. return x;
  136. }
  137. );
  138. #endif
  139. }
  140. public virtual IObservable<TSource> Do<TSource>(IObservable<TSource> source, Action<TSource> onNext, Action onCompleted)
  141. {
  142. #if !NO_PERF
  143. return Do_<TSource>(source, onNext, Stubs<Exception>.Ignore, onCompleted);
  144. #else
  145. return new AnonymousObservable<TSource>(obs =>
  146. {
  147. return source.Subscribe(
  148. x =>
  149. {
  150. try
  151. {
  152. onNext(x);
  153. }
  154. catch (Exception ex)
  155. {
  156. obs.OnError(ex);
  157. }
  158. obs.OnNext(x);
  159. },
  160. obs.OnError,
  161. () =>
  162. {
  163. try
  164. {
  165. onCompleted();
  166. }
  167. catch (Exception ex)
  168. {
  169. obs.OnError(ex);
  170. }
  171. obs.OnCompleted();
  172. });
  173. });
  174. #endif
  175. }
  176. public virtual IObservable<TSource> Do<TSource>(IObservable<TSource> source, Action<TSource> onNext, Action<Exception> onError)
  177. {
  178. #if !NO_PERF
  179. return Do_<TSource>(source, onNext, onError, Stubs.Nop);
  180. #else
  181. return new AnonymousObservable<TSource>(obs =>
  182. {
  183. return source.Subscribe(
  184. x =>
  185. {
  186. try
  187. {
  188. onNext(x);
  189. }
  190. catch (Exception ex)
  191. {
  192. obs.OnError(ex);
  193. }
  194. obs.OnNext(x);
  195. },
  196. ex =>
  197. {
  198. try
  199. {
  200. onError(ex);
  201. }
  202. catch (Exception ex2)
  203. {
  204. obs.OnError(ex2);
  205. }
  206. obs.OnError(ex);
  207. },
  208. obs.OnCompleted);
  209. });
  210. #endif
  211. }
  212. public virtual IObservable<TSource> Do<TSource>(IObservable<TSource> source, Action<TSource> onNext, Action<Exception> onError, Action onCompleted)
  213. {
  214. return Do_(source, onNext, onError, onCompleted);
  215. }
  216. public virtual IObservable<TSource> Do<TSource>(IObservable<TSource> source, IObserver<TSource> observer)
  217. {
  218. return Do_(source, observer.OnNext, observer.OnError, observer.OnCompleted);
  219. }
  220. private static IObservable<TSource> Do_<TSource>(IObservable<TSource> source, Action<TSource> onNext, Action<Exception> onError, Action onCompleted)
  221. {
  222. #if !NO_PERF
  223. return new Do<TSource>(source, onNext, onError, onCompleted);
  224. #else
  225. return new AnonymousObservable<TSource>(obs =>
  226. {
  227. return source.Subscribe(
  228. x =>
  229. {
  230. try
  231. {
  232. onNext(x);
  233. }
  234. catch (Exception ex)
  235. {
  236. obs.OnError(ex);
  237. }
  238. obs.OnNext(x);
  239. },
  240. ex =>
  241. {
  242. try
  243. {
  244. onError(ex);
  245. }
  246. catch (Exception ex2)
  247. {
  248. obs.OnError(ex2);
  249. }
  250. obs.OnError(ex);
  251. },
  252. () =>
  253. {
  254. try
  255. {
  256. onCompleted();
  257. }
  258. catch (Exception ex)
  259. {
  260. obs.OnError(ex);
  261. }
  262. obs.OnCompleted();
  263. });
  264. });
  265. #endif
  266. }
  267. #endregion
  268. #region + Finally +
  269. public virtual IObservable<TSource> Finally<TSource>(IObservable<TSource> source, Action finallyAction)
  270. {
  271. #if !NO_PERF
  272. return new Finally<TSource>(source, finallyAction);
  273. #else
  274. return new AnonymousObservable<TSource>(observer =>
  275. {
  276. var subscription = source.Subscribe(observer);
  277. return Disposable.Create(() =>
  278. {
  279. try
  280. {
  281. subscription.Dispose();
  282. }
  283. finally
  284. {
  285. finallyAction();
  286. }
  287. });
  288. });
  289. #endif
  290. }
  291. #endregion
  292. #region + IgnoreElements +
  293. public virtual IObservable<TSource> IgnoreElements<TSource>(IObservable<TSource> source)
  294. {
  295. #if !NO_PERF
  296. var ignoreElements = source as IgnoreElements<TSource>;
  297. if (ignoreElements != null)
  298. return ignoreElements.Ω();
  299. return new IgnoreElements<TSource>(source);
  300. #else
  301. return new AnonymousObservable<TSource>(observer => source.Subscribe(_ => { }, observer.OnError, observer.OnCompleted));
  302. #endif
  303. }
  304. #endregion
  305. #region + Materialize +
  306. public virtual IObservable<Notification<TSource>> Materialize<TSource>(IObservable<TSource> source)
  307. {
  308. #if !NO_PERF
  309. //
  310. // NOTE: Peephole optimization of xs.Dematerialize().Materialize() should not be performed. It's possible for xs to
  311. // contain multiple terminal notifications, which won't survive a Dematerialize().Materialize() chain. In case
  312. // a reduction to xs.AsObservable() would be performed, those notification elements would survive.
  313. //
  314. return new Materialize<TSource>(source);
  315. #else
  316. return new AnonymousObservable<Notification<TSource>>(observer =>
  317. source.Subscribe(
  318. value => observer.OnNext(Notification.CreateOnNext<TSource>(value)),
  319. exception =>
  320. {
  321. observer.OnNext(Notification.CreateOnError<TSource>(exception));
  322. observer.OnCompleted();
  323. },
  324. () =>
  325. {
  326. observer.OnNext(Notification.CreateOnCompleted<TSource>());
  327. observer.OnCompleted();
  328. }));
  329. #endif
  330. }
  331. #endregion
  332. #region - Repeat -
  333. public virtual IObservable<TSource> Repeat<TSource>(IObservable<TSource> source)
  334. {
  335. return RepeatInfinite(source).Concat();
  336. }
  337. private static IEnumerable<T> RepeatInfinite<T>(T value)
  338. {
  339. while (true)
  340. yield return value;
  341. }
  342. public virtual IObservable<TSource> Repeat<TSource>(IObservable<TSource> source, int repeatCount)
  343. {
  344. return Enumerable.Repeat(source, repeatCount).Concat();
  345. }
  346. #endregion
  347. #region - Retry -
  348. public virtual IObservable<TSource> Retry<TSource>(IObservable<TSource> source)
  349. {
  350. return RepeatInfinite(source).Catch();
  351. }
  352. public virtual IObservable<TSource> Retry<TSource>(IObservable<TSource> source, int retryCount)
  353. {
  354. return Enumerable.Repeat(source, retryCount).Catch();
  355. }
  356. #endregion
  357. #region + Scan +
  358. public virtual IObservable<TAccumulate> Scan<TSource, TAccumulate>(IObservable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator)
  359. {
  360. #if !NO_PERF
  361. return new Scan<TSource, TAccumulate>(source, seed, accumulator);
  362. #else
  363. return Defer(() =>
  364. {
  365. var accumulation = default(TAccumulate);
  366. var hasAccumulation = false;
  367. return source.Select(x =>
  368. {
  369. if (hasAccumulation)
  370. accumulation = accumulator(accumulation, x);
  371. else
  372. {
  373. accumulation = accumulator(seed, x);
  374. hasAccumulation = true;
  375. }
  376. return accumulation;
  377. });
  378. });
  379. #endif
  380. }
  381. public virtual IObservable<TSource> Scan<TSource>(IObservable<TSource> source, Func<TSource, TSource, TSource> accumulator)
  382. {
  383. #if !NO_PERF
  384. return new Scan<TSource>(source, accumulator);
  385. #else
  386. return Defer(() =>
  387. {
  388. var accumulation = default(TSource);
  389. var hasAccumulation = false;
  390. return source.Select(x =>
  391. {
  392. if (hasAccumulation)
  393. accumulation = accumulator(accumulation, x);
  394. else
  395. {
  396. accumulation = x;
  397. hasAccumulation = true;
  398. }
  399. return accumulation;
  400. });
  401. });
  402. #endif
  403. }
  404. #endregion
  405. #region + SkipLast +
  406. public virtual IObservable<TSource> SkipLast<TSource>(IObservable<TSource> source, int count)
  407. {
  408. #if !NO_PERF
  409. return new SkipLast<TSource>(source, count);
  410. #else
  411. return new AnonymousObservable<TSource>(observer =>
  412. {
  413. var q = new Queue<TSource>();
  414. return source.Subscribe(
  415. x =>
  416. {
  417. q.Enqueue(x);
  418. if (q.Count > count)
  419. observer.OnNext(q.Dequeue());
  420. },
  421. observer.OnError,
  422. observer.OnCompleted);
  423. });
  424. #endif
  425. }
  426. #endregion
  427. #region - StartWith -
  428. public virtual IObservable<TSource> StartWith<TSource>(IObservable<TSource> source, params TSource[] values)
  429. {
  430. return StartWith_<TSource>(source, SchedulerDefaults.ConstantTimeOperations, values);
  431. }
  432. public virtual IObservable<TSource> StartWith<TSource>(IObservable<TSource> source, IScheduler scheduler, params TSource[] values)
  433. {
  434. return StartWith_<TSource>(source, scheduler, values);
  435. }
  436. private static IObservable<TSource> StartWith_<TSource>(IObservable<TSource> source, IScheduler scheduler, params TSource[] values)
  437. {
  438. return values.ToObservable(scheduler).Concat(source);
  439. }
  440. #endregion
  441. #region + TakeLast +
  442. public virtual IObservable<TSource> TakeLast<TSource>(IObservable<TSource> source, int count)
  443. {
  444. return TakeLast_(source, count, SchedulerDefaults.Iteration);
  445. }
  446. public virtual IObservable<TSource> TakeLast<TSource>(IObservable<TSource> source, int count, IScheduler scheduler)
  447. {
  448. return TakeLast_(source, count, scheduler);
  449. }
  450. private static IObservable<TSource> TakeLast_<TSource>(IObservable<TSource> source, int count, IScheduler scheduler)
  451. {
  452. #if !NO_PERF
  453. return new TakeLast<TSource>(source, count, scheduler);
  454. #else
  455. return new AnonymousObservable<TSource>(observer =>
  456. {
  457. var q = new Queue<TSource>();
  458. var g = new CompositeDisposable();
  459. g.Add(source.Subscribe(
  460. x =>
  461. {
  462. q.Enqueue(x);
  463. if (q.Count > count)
  464. q.Dequeue();
  465. },
  466. observer.OnError,
  467. () =>
  468. {
  469. g.Add(scheduler.Schedule(rec =>
  470. {
  471. if (q.Count > 0)
  472. {
  473. observer.OnNext(q.Dequeue());
  474. rec();
  475. }
  476. else
  477. {
  478. observer.OnCompleted();
  479. }
  480. }));
  481. }
  482. ));
  483. return g;
  484. });
  485. #endif
  486. }
  487. public virtual IObservable<IList<TSource>> TakeLastBuffer<TSource>(IObservable<TSource> source, int count)
  488. {
  489. #if !NO_PERF
  490. return new TakeLastBuffer<TSource>(source, count);
  491. #else
  492. return new AnonymousObservable<IList<TSource>>(observer =>
  493. {
  494. var q = new Queue<TSource>();
  495. return source.Subscribe(
  496. x =>
  497. {
  498. q.Enqueue(x);
  499. if (q.Count > count)
  500. q.Dequeue();
  501. },
  502. observer.OnError,
  503. () =>
  504. {
  505. observer.OnNext(q.ToList());
  506. observer.OnCompleted();
  507. });
  508. });
  509. #endif
  510. }
  511. #endregion
  512. #region + Window +
  513. public virtual IObservable<IObservable<TSource>> Window<TSource>(IObservable<TSource> source, int count, int skip)
  514. {
  515. return Window_<TSource>(source, count, skip);
  516. }
  517. public virtual IObservable<IObservable<TSource>> Window<TSource>(IObservable<TSource> source, int count)
  518. {
  519. return Window_<TSource>(source, count, count);
  520. }
  521. private static IObservable<IObservable<TSource>> Window_<TSource>(IObservable<TSource> source, int count, int skip)
  522. {
  523. #if !NO_PERF
  524. return new Window<TSource>(source, count, skip);
  525. #else
  526. return new AnonymousObservable<IObservable<TSource>>(observer =>
  527. {
  528. var q = new Queue<ISubject<TSource>>();
  529. var n = 0;
  530. var m = new SingleAssignmentDisposable();
  531. var refCountDisposable = new RefCountDisposable(m);
  532. Action createWindow = () =>
  533. {
  534. var s = new Subject<TSource>();
  535. q.Enqueue(s);
  536. observer.OnNext(s.AddRef(refCountDisposable));
  537. };
  538. createWindow();
  539. m.Disposable = source.Subscribe(
  540. x =>
  541. {
  542. foreach (var s in q)
  543. s.OnNext(x);
  544. var c = n - count + 1;
  545. if (c >= 0 && c % skip == 0)
  546. {
  547. var s = q.Dequeue();
  548. s.OnCompleted();
  549. }
  550. n++;
  551. if (n % skip == 0)
  552. createWindow();
  553. },
  554. exception =>
  555. {
  556. while (q.Count > 0)
  557. q.Dequeue().OnError(exception);
  558. observer.OnError(exception);
  559. },
  560. () =>
  561. {
  562. while (q.Count > 0)
  563. q.Dequeue().OnCompleted();
  564. observer.OnCompleted();
  565. }
  566. );
  567. return refCountDisposable;
  568. });
  569. #endif
  570. }
  571. #endregion
  572. }
  573. }