QueryLanguage.Single.cs 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Reactive.Concurrency;
  5. using System.Reactive.Disposables;
  6. using System.Reactive.Subjects;
  7. namespace System.Reactive.Linq
  8. {
  9. #if !NO_PERF
  10. using ObservableImpl;
  11. #endif
  12. internal partial class QueryLanguage
  13. {
  14. #region + AsObservable +
  15. public virtual IObservable<TSource> AsObservable<TSource>(IObservable<TSource> source)
  16. {
  17. #if !NO_PERF
  18. var asObservable = source as AsObservable<TSource>;
  19. if (asObservable != null)
  20. return asObservable.Omega();
  21. return new AsObservable<TSource>(source);
  22. #else
  23. return new AnonymousObservable<TSource>(observer => source.Subscribe(observer));
  24. #endif
  25. }
  26. #endregion
  27. #region + Buffer +
  28. public virtual IObservable<IList<TSource>> Buffer<TSource>(IObservable<TSource> source, int count)
  29. {
  30. return Buffer_<TSource>(source, count, count);
  31. }
  32. public virtual IObservable<IList<TSource>> Buffer<TSource>(IObservable<TSource> source, int count, int skip)
  33. {
  34. return Buffer_<TSource>(source, count, skip);
  35. }
  36. private static IObservable<IList<TSource>> Buffer_<TSource>(IObservable<TSource> source, int count, int skip)
  37. {
  38. #if !NO_PERF
  39. return new Buffer<TSource>(source, count, skip);
  40. #else
  41. return Window_<TSource>(source, count, skip).SelectMany(Observable.ToList).Where(list => list.Count > 0);
  42. #endif
  43. }
  44. #endregion
  45. #region + Dematerialize +
  46. public virtual IObservable<TSource> Dematerialize<TSource>(IObservable<Notification<TSource>> source)
  47. {
  48. #if !NO_PERF
  49. var materialize = source as Materialize<TSource>;
  50. if (materialize != null)
  51. return materialize.Dematerialize();
  52. return new Dematerialize<TSource>(source);
  53. #else
  54. return new AnonymousObservable<TSource>(observer =>
  55. source.Subscribe(x => x.Accept(observer), observer.OnError, observer.OnCompleted));
  56. #endif
  57. }
  58. #endregion
  59. #region + DistinctUntilChanged +
  60. public virtual IObservable<TSource> DistinctUntilChanged<TSource>(IObservable<TSource> source)
  61. {
  62. return DistinctUntilChanged_(source, x => x, EqualityComparer<TSource>.Default);
  63. }
  64. public virtual IObservable<TSource> DistinctUntilChanged<TSource>(IObservable<TSource> source, IEqualityComparer<TSource> comparer)
  65. {
  66. return DistinctUntilChanged_(source, x => x, comparer);
  67. }
  68. public virtual IObservable<TSource> DistinctUntilChanged<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector)
  69. {
  70. return DistinctUntilChanged_(source, keySelector, EqualityComparer<TKey>.Default);
  71. }
  72. public virtual IObservable<TSource> DistinctUntilChanged<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer)
  73. {
  74. return DistinctUntilChanged_(source, keySelector, comparer);
  75. }
  76. private static IObservable<TSource> DistinctUntilChanged_<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer)
  77. {
  78. #if !NO_PERF
  79. return new DistinctUntilChanged<TSource, TKey>(source, keySelector, comparer);
  80. #else
  81. return new AnonymousObservable<TSource>(observer =>
  82. {
  83. var currentKey = default(TKey);
  84. var hasCurrentKey = false;
  85. return source.Subscribe(
  86. value =>
  87. {
  88. var key = default(TKey);
  89. try
  90. {
  91. key = keySelector(value);
  92. }
  93. catch (Exception exception)
  94. {
  95. observer.OnError(exception);
  96. return;
  97. }
  98. var comparerEquals = false;
  99. if (hasCurrentKey)
  100. {
  101. try
  102. {
  103. comparerEquals = comparer.Equals(currentKey, key);
  104. }
  105. catch (Exception exception)
  106. {
  107. observer.OnError(exception);
  108. return;
  109. }
  110. }
  111. if (!hasCurrentKey || !comparerEquals)
  112. {
  113. hasCurrentKey = true;
  114. currentKey = key;
  115. observer.OnNext(value);
  116. }
  117. },
  118. observer.OnError,
  119. observer.OnCompleted);
  120. });
  121. #endif
  122. }
  123. #endregion
  124. #region + Do +
  125. public virtual IObservable<TSource> Do<TSource>(IObservable<TSource> source, Action<TSource> onNext)
  126. {
  127. #if !NO_PERF
  128. return Do_<TSource>(source, onNext, Stubs<Exception>.Ignore, Stubs.Nop);
  129. #else
  130. // PERFORMANCE - Use of Select allows for operator coalescing
  131. return source.Select(
  132. x =>
  133. {
  134. onNext(x);
  135. return x;
  136. }
  137. );
  138. #endif
  139. }
  140. public virtual IObservable<TSource> Do<TSource>(IObservable<TSource> source, Action<TSource> onNext, Action onCompleted)
  141. {
  142. #if !NO_PERF
  143. return Do_<TSource>(source, onNext, Stubs<Exception>.Ignore, onCompleted);
  144. #else
  145. return new AnonymousObservable<TSource>(obs =>
  146. {
  147. return source.Subscribe(
  148. x =>
  149. {
  150. try
  151. {
  152. onNext(x);
  153. }
  154. catch (Exception ex)
  155. {
  156. obs.OnError(ex);
  157. }
  158. obs.OnNext(x);
  159. },
  160. obs.OnError,
  161. () =>
  162. {
  163. try
  164. {
  165. onCompleted();
  166. }
  167. catch (Exception ex)
  168. {
  169. obs.OnError(ex);
  170. }
  171. obs.OnCompleted();
  172. });
  173. });
  174. #endif
  175. }
  176. public virtual IObservable<TSource> Do<TSource>(IObservable<TSource> source, Action<TSource> onNext, Action<Exception> onError)
  177. {
  178. #if !NO_PERF
  179. return Do_<TSource>(source, onNext, onError, Stubs.Nop);
  180. #else
  181. return new AnonymousObservable<TSource>(obs =>
  182. {
  183. return source.Subscribe(
  184. x =>
  185. {
  186. try
  187. {
  188. onNext(x);
  189. }
  190. catch (Exception ex)
  191. {
  192. obs.OnError(ex);
  193. }
  194. obs.OnNext(x);
  195. },
  196. ex =>
  197. {
  198. try
  199. {
  200. onError(ex);
  201. }
  202. catch (Exception ex2)
  203. {
  204. obs.OnError(ex2);
  205. }
  206. obs.OnError(ex);
  207. },
  208. obs.OnCompleted);
  209. });
  210. #endif
  211. }
  212. public virtual IObservable<TSource> Do<TSource>(IObservable<TSource> source, Action<TSource> onNext, Action<Exception> onError, Action onCompleted)
  213. {
  214. return Do_(source, onNext, onError, onCompleted);
  215. }
  216. public virtual IObservable<TSource> Do<TSource>(IObservable<TSource> source, IObserver<TSource> observer)
  217. {
  218. return Do_(source, observer.OnNext, observer.OnError, observer.OnCompleted);
  219. }
  220. private static IObservable<TSource> Do_<TSource>(IObservable<TSource> source, Action<TSource> onNext, Action<Exception> onError, Action onCompleted)
  221. {
  222. #if !NO_PERF
  223. return new Do<TSource>(source, onNext, onError, onCompleted);
  224. #else
  225. return new AnonymousObservable<TSource>(obs =>
  226. {
  227. return source.Subscribe(
  228. x =>
  229. {
  230. try
  231. {
  232. onNext(x);
  233. }
  234. catch (Exception ex)
  235. {
  236. obs.OnError(ex);
  237. }
  238. obs.OnNext(x);
  239. },
  240. ex =>
  241. {
  242. try
  243. {
  244. onError(ex);
  245. }
  246. catch (Exception ex2)
  247. {
  248. obs.OnError(ex2);
  249. }
  250. obs.OnError(ex);
  251. },
  252. () =>
  253. {
  254. try
  255. {
  256. onCompleted();
  257. }
  258. catch (Exception ex)
  259. {
  260. obs.OnError(ex);
  261. }
  262. obs.OnCompleted();
  263. });
  264. });
  265. #endif
  266. }
  267. #endregion
  268. #region + Finally +
  269. public virtual IObservable<TSource> Finally<TSource>(IObservable<TSource> source, Action finallyAction)
  270. {
  271. #if !NO_PERF
  272. return new Finally<TSource>(source, finallyAction);
  273. #else
  274. return new AnonymousObservable<TSource>(observer =>
  275. {
  276. var subscription = source.Subscribe(observer);
  277. return Disposable.Create(() =>
  278. {
  279. try
  280. {
  281. subscription.Dispose();
  282. }
  283. finally
  284. {
  285. finallyAction();
  286. }
  287. });
  288. });
  289. #endif
  290. }
  291. #endregion
  292. #region + IgnoreElements +
  293. public virtual IObservable<TSource> IgnoreElements<TSource>(IObservable<TSource> source)
  294. {
  295. #if !NO_PERF
  296. var ignoreElements = source as IgnoreElements<TSource>;
  297. if (ignoreElements != null)
  298. return ignoreElements.Omega();
  299. return new IgnoreElements<TSource>(source);
  300. #else
  301. return new AnonymousObservable<TSource>(observer => source.Subscribe(_ => { }, observer.OnError, observer.OnCompleted));
  302. #endif
  303. }
  304. #endregion
  305. #region + Materialize +
  306. public virtual IObservable<Notification<TSource>> Materialize<TSource>(IObservable<TSource> source)
  307. {
  308. #if !NO_PERF
  309. //
  310. // NOTE: Peephole optimization of xs.Dematerialize().Materialize() should not be performed. It's possible for xs to
  311. // contain multiple terminal notifications, which won't survive a Dematerialize().Materialize() chain. In case
  312. // a reduction to xs.AsObservable() would be performed, those notification elements would survive.
  313. //
  314. return new Materialize<TSource>(source);
  315. #else
  316. return new AnonymousObservable<Notification<TSource>>(observer =>
  317. source.Subscribe(
  318. value => observer.OnNext(Notification.CreateOnNext<TSource>(value)),
  319. exception =>
  320. {
  321. observer.OnNext(Notification.CreateOnError<TSource>(exception));
  322. observer.OnCompleted();
  323. },
  324. () =>
  325. {
  326. observer.OnNext(Notification.CreateOnCompleted<TSource>());
  327. observer.OnCompleted();
  328. }));
  329. #endif
  330. }
  331. #endregion
  332. #region - Repeat -
  333. public virtual IObservable<TSource> Repeat<TSource>(IObservable<TSource> source)
  334. {
  335. return RepeatInfinite(source).Concat();
  336. }
  337. private static IEnumerable<T> RepeatInfinite<T>(T value)
  338. {
  339. while (true)
  340. yield return value;
  341. }
  342. public virtual IObservable<TSource> Repeat<TSource>(IObservable<TSource> source, int repeatCount)
  343. {
  344. return Enumerable.Repeat(source, repeatCount).Concat();
  345. }
  346. #endregion
  347. #region - Retry -
  348. public virtual IObservable<TSource> Retry<TSource>(IObservable<TSource> source)
  349. {
  350. return RepeatInfinite(source).Catch();
  351. }
  352. public virtual IObservable<TSource> Retry<TSource>(IObservable<TSource> source, int retryCount)
  353. {
  354. return Enumerable.Repeat(source, retryCount).Catch();
  355. }
  356. #endregion
  357. #region + Scan +
  358. public virtual IObservable<TAccumulate> Scan<TSource, TAccumulate>(IObservable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator)
  359. {
  360. #if !NO_PERF
  361. return new Scan<TSource, TAccumulate>(source, seed, accumulator);
  362. #else
  363. return Defer(() =>
  364. {
  365. var accumulation = default(TAccumulate);
  366. var hasAccumulation = false;
  367. return source.Select(x =>
  368. {
  369. if (hasAccumulation)
  370. accumulation = accumulator(accumulation, x);
  371. else
  372. {
  373. accumulation = accumulator(seed, x);
  374. hasAccumulation = true;
  375. }
  376. return accumulation;
  377. });
  378. });
  379. #endif
  380. }
  381. public virtual IObservable<TSource> Scan<TSource>(IObservable<TSource> source, Func<TSource, TSource, TSource> accumulator)
  382. {
  383. #if !NO_PERF
  384. return new Scan<TSource>(source, accumulator);
  385. #else
  386. return Defer(() =>
  387. {
  388. var accumulation = default(TSource);
  389. var hasAccumulation = false;
  390. return source.Select(x =>
  391. {
  392. if (hasAccumulation)
  393. accumulation = accumulator(accumulation, x);
  394. else
  395. {
  396. accumulation = x;
  397. hasAccumulation = true;
  398. }
  399. return accumulation;
  400. });
  401. });
  402. #endif
  403. }
  404. #endregion
  405. #region + SkipLast +
  406. public virtual IObservable<TSource> SkipLast<TSource>(IObservable<TSource> source, int count)
  407. {
  408. #if !NO_PERF
  409. return new SkipLast<TSource>(source, count);
  410. #else
  411. return new AnonymousObservable<TSource>(observer =>
  412. {
  413. var q = new Queue<TSource>();
  414. return source.Subscribe(
  415. x =>
  416. {
  417. q.Enqueue(x);
  418. if (q.Count > count)
  419. observer.OnNext(q.Dequeue());
  420. },
  421. observer.OnError,
  422. observer.OnCompleted);
  423. });
  424. #endif
  425. }
  426. #endregion
  427. #region - StartWith -
  428. public virtual IObservable<TSource> StartWith<TSource>(IObservable<TSource> source, params TSource[] values)
  429. {
  430. return StartWith_<TSource>(source, SchedulerDefaults.ConstantTimeOperations, values);
  431. }
  432. public virtual IObservable<TSource> StartWith<TSource>(IObservable<TSource> source, IScheduler scheduler, params TSource[] values)
  433. {
  434. return StartWith_<TSource>(source, scheduler, values);
  435. }
  436. public virtual IObservable<TSource> StartWith<TSource>(IObservable<TSource> source, IEnumerable<TSource> values)
  437. {
  438. return StartWith(source, SchedulerDefaults.ConstantTimeOperations, values);
  439. }
  440. public virtual IObservable<TSource> StartWith<TSource>(IObservable<TSource> source, IScheduler scheduler, IEnumerable<TSource> values)
  441. {
  442. //
  443. // NOTE: For some reason, someone introduced this signature in the Observable class, which is inconsistent with the Rx pattern
  444. // of putting the IScheduler last. It also wasn't wired up through IQueryLanguage. When introducing this method in the
  445. // IQueryLanguage interface, we went for consistency with the public API, hence the odd position of the IScheduler.
  446. //
  447. var valueArray = values as TSource[];
  448. if (valueArray == null)
  449. {
  450. var valueList = new List<TSource>(values);
  451. valueArray = valueList.ToArray();
  452. }
  453. return StartWith_<TSource>(source, scheduler, valueArray);
  454. }
  455. private static IObservable<TSource> StartWith_<TSource>(IObservable<TSource> source, IScheduler scheduler, params TSource[] values)
  456. {
  457. return values.ToObservable(scheduler).Concat(source);
  458. }
  459. #endregion
  460. #region + TakeLast +
  461. public virtual IObservable<TSource> TakeLast<TSource>(IObservable<TSource> source, int count)
  462. {
  463. return TakeLast_(source, count, SchedulerDefaults.Iteration);
  464. }
  465. public virtual IObservable<TSource> TakeLast<TSource>(IObservable<TSource> source, int count, IScheduler scheduler)
  466. {
  467. return TakeLast_(source, count, scheduler);
  468. }
  469. private static IObservable<TSource> TakeLast_<TSource>(IObservable<TSource> source, int count, IScheduler scheduler)
  470. {
  471. #if !NO_PERF
  472. return new TakeLast<TSource>(source, count, scheduler);
  473. #else
  474. return new AnonymousObservable<TSource>(observer =>
  475. {
  476. var q = new Queue<TSource>();
  477. var g = new CompositeDisposable();
  478. g.Add(source.Subscribe(
  479. x =>
  480. {
  481. q.Enqueue(x);
  482. if (q.Count > count)
  483. q.Dequeue();
  484. },
  485. observer.OnError,
  486. () =>
  487. {
  488. g.Add(scheduler.Schedule(rec =>
  489. {
  490. if (q.Count > 0)
  491. {
  492. observer.OnNext(q.Dequeue());
  493. rec();
  494. }
  495. else
  496. {
  497. observer.OnCompleted();
  498. }
  499. }));
  500. }
  501. ));
  502. return g;
  503. });
  504. #endif
  505. }
  506. public virtual IObservable<IList<TSource>> TakeLastBuffer<TSource>(IObservable<TSource> source, int count)
  507. {
  508. #if !NO_PERF
  509. return new TakeLastBuffer<TSource>(source, count);
  510. #else
  511. return new AnonymousObservable<IList<TSource>>(observer =>
  512. {
  513. var q = new Queue<TSource>();
  514. return source.Subscribe(
  515. x =>
  516. {
  517. q.Enqueue(x);
  518. if (q.Count > count)
  519. q.Dequeue();
  520. },
  521. observer.OnError,
  522. () =>
  523. {
  524. observer.OnNext(q.ToList());
  525. observer.OnCompleted();
  526. });
  527. });
  528. #endif
  529. }
  530. #endregion
  531. #region + Window +
  532. public virtual IObservable<IObservable<TSource>> Window<TSource>(IObservable<TSource> source, int count, int skip)
  533. {
  534. return Window_<TSource>(source, count, skip);
  535. }
  536. public virtual IObservable<IObservable<TSource>> Window<TSource>(IObservable<TSource> source, int count)
  537. {
  538. return Window_<TSource>(source, count, count);
  539. }
  540. private static IObservable<IObservable<TSource>> Window_<TSource>(IObservable<TSource> source, int count, int skip)
  541. {
  542. #if !NO_PERF
  543. return new Window<TSource>(source, count, skip);
  544. #else
  545. return new AnonymousObservable<IObservable<TSource>>(observer =>
  546. {
  547. var q = new Queue<ISubject<TSource>>();
  548. var n = 0;
  549. var m = new SingleAssignmentDisposable();
  550. var refCountDisposable = new RefCountDisposable(m);
  551. Action createWindow = () =>
  552. {
  553. var s = new Subject<TSource>();
  554. q.Enqueue(s);
  555. observer.OnNext(s.AddRef(refCountDisposable));
  556. };
  557. createWindow();
  558. m.Disposable = source.Subscribe(
  559. x =>
  560. {
  561. foreach (var s in q)
  562. s.OnNext(x);
  563. var c = n - count + 1;
  564. if (c >= 0 && c % skip == 0)
  565. {
  566. var s = q.Dequeue();
  567. s.OnCompleted();
  568. }
  569. n++;
  570. if (n % skip == 0)
  571. createWindow();
  572. },
  573. exception =>
  574. {
  575. while (q.Count > 0)
  576. q.Dequeue().OnError(exception);
  577. observer.OnError(exception);
  578. },
  579. () =>
  580. {
  581. while (q.Count > 0)
  582. q.Dequeue().OnCompleted();
  583. observer.OnCompleted();
  584. }
  585. );
  586. return refCountDisposable;
  587. });
  588. #endif
  589. }
  590. #endregion
  591. }
  592. }