QueryLanguage.Single.cs 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Linq;
  6. using System.Reactive.Concurrency;
  7. using System.Reactive.Disposables;
  8. using System.Reactive.Subjects;
  9. namespace System.Reactive.Linq
  10. {
  11. #if !NO_PERF
  12. using ObservableImpl;
  13. #endif
  14. internal partial class QueryLanguage
  15. {
  16. #region + AsObservable +
  17. public virtual IObservable<TSource> AsObservable<TSource>(IObservable<TSource> source)
  18. {
  19. #if !NO_PERF
  20. var asObservable = source as AsObservable<TSource>;
  21. if (asObservable != null)
  22. return asObservable.Omega();
  23. return new AsObservable<TSource>(source);
  24. #else
  25. return new AnonymousObservable<TSource>(observer => source.Subscribe(observer));
  26. #endif
  27. }
  28. #endregion
  29. #region + Buffer +
  30. public virtual IObservable<IList<TSource>> Buffer<TSource>(IObservable<TSource> source, int count)
  31. {
  32. return Buffer_<TSource>(source, count, count);
  33. }
  34. public virtual IObservable<IList<TSource>> Buffer<TSource>(IObservable<TSource> source, int count, int skip)
  35. {
  36. return Buffer_<TSource>(source, count, skip);
  37. }
  38. private static IObservable<IList<TSource>> Buffer_<TSource>(IObservable<TSource> source, int count, int skip)
  39. {
  40. #if !NO_PERF
  41. return new Buffer<TSource>(source, count, skip);
  42. #else
  43. return Window_<TSource>(source, count, skip).SelectMany(Observable.ToList).Where(list => list.Count > 0);
  44. #endif
  45. }
  46. #endregion
  47. #region + Dematerialize +
  48. public virtual IObservable<TSource> Dematerialize<TSource>(IObservable<Notification<TSource>> source)
  49. {
  50. #if !NO_PERF
  51. var materialize = source as Materialize<TSource>;
  52. if (materialize != null)
  53. return materialize.Dematerialize();
  54. return new Dematerialize<TSource>(source);
  55. #else
  56. return new AnonymousObservable<TSource>(observer =>
  57. source.Subscribe(x => x.Accept(observer), observer.OnError, observer.OnCompleted));
  58. #endif
  59. }
  60. #endregion
  61. #region + DistinctUntilChanged +
  62. public virtual IObservable<TSource> DistinctUntilChanged<TSource>(IObservable<TSource> source)
  63. {
  64. return DistinctUntilChanged_(source, x => x, EqualityComparer<TSource>.Default);
  65. }
  66. public virtual IObservable<TSource> DistinctUntilChanged<TSource>(IObservable<TSource> source, IEqualityComparer<TSource> comparer)
  67. {
  68. return DistinctUntilChanged_(source, x => x, comparer);
  69. }
  70. public virtual IObservable<TSource> DistinctUntilChanged<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector)
  71. {
  72. return DistinctUntilChanged_(source, keySelector, EqualityComparer<TKey>.Default);
  73. }
  74. public virtual IObservable<TSource> DistinctUntilChanged<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer)
  75. {
  76. return DistinctUntilChanged_(source, keySelector, comparer);
  77. }
  78. private static IObservable<TSource> DistinctUntilChanged_<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer)
  79. {
  80. #if !NO_PERF
  81. return new DistinctUntilChanged<TSource, TKey>(source, keySelector, comparer);
  82. #else
  83. return new AnonymousObservable<TSource>(observer =>
  84. {
  85. var currentKey = default(TKey);
  86. var hasCurrentKey = false;
  87. return source.Subscribe(
  88. value =>
  89. {
  90. var key = default(TKey);
  91. try
  92. {
  93. key = keySelector(value);
  94. }
  95. catch (Exception exception)
  96. {
  97. observer.OnError(exception);
  98. return;
  99. }
  100. var comparerEquals = false;
  101. if (hasCurrentKey)
  102. {
  103. try
  104. {
  105. comparerEquals = comparer.Equals(currentKey, key);
  106. }
  107. catch (Exception exception)
  108. {
  109. observer.OnError(exception);
  110. return;
  111. }
  112. }
  113. if (!hasCurrentKey || !comparerEquals)
  114. {
  115. hasCurrentKey = true;
  116. currentKey = key;
  117. observer.OnNext(value);
  118. }
  119. },
  120. observer.OnError,
  121. observer.OnCompleted);
  122. });
  123. #endif
  124. }
  125. #endregion
  126. #region + Do +
  127. public virtual IObservable<TSource> Do<TSource>(IObservable<TSource> source, Action<TSource> onNext)
  128. {
  129. #if !NO_PERF
  130. return Do_<TSource>(source, onNext, Stubs<Exception>.Ignore, Stubs.Nop);
  131. #else
  132. // PERFORMANCE - Use of Select allows for operator coalescing
  133. return source.Select(
  134. x =>
  135. {
  136. onNext(x);
  137. return x;
  138. }
  139. );
  140. #endif
  141. }
  142. public virtual IObservable<TSource> Do<TSource>(IObservable<TSource> source, Action<TSource> onNext, Action onCompleted)
  143. {
  144. #if !NO_PERF
  145. return Do_<TSource>(source, onNext, Stubs<Exception>.Ignore, onCompleted);
  146. #else
  147. return new AnonymousObservable<TSource>(obs =>
  148. {
  149. return source.Subscribe(
  150. x =>
  151. {
  152. try
  153. {
  154. onNext(x);
  155. }
  156. catch (Exception ex)
  157. {
  158. obs.OnError(ex);
  159. }
  160. obs.OnNext(x);
  161. },
  162. obs.OnError,
  163. () =>
  164. {
  165. try
  166. {
  167. onCompleted();
  168. }
  169. catch (Exception ex)
  170. {
  171. obs.OnError(ex);
  172. }
  173. obs.OnCompleted();
  174. });
  175. });
  176. #endif
  177. }
  178. public virtual IObservable<TSource> Do<TSource>(IObservable<TSource> source, Action<TSource> onNext, Action<Exception> onError)
  179. {
  180. #if !NO_PERF
  181. return Do_<TSource>(source, onNext, onError, Stubs.Nop);
  182. #else
  183. return new AnonymousObservable<TSource>(obs =>
  184. {
  185. return source.Subscribe(
  186. x =>
  187. {
  188. try
  189. {
  190. onNext(x);
  191. }
  192. catch (Exception ex)
  193. {
  194. obs.OnError(ex);
  195. }
  196. obs.OnNext(x);
  197. },
  198. ex =>
  199. {
  200. try
  201. {
  202. onError(ex);
  203. }
  204. catch (Exception ex2)
  205. {
  206. obs.OnError(ex2);
  207. }
  208. obs.OnError(ex);
  209. },
  210. obs.OnCompleted);
  211. });
  212. #endif
  213. }
  214. public virtual IObservable<TSource> Do<TSource>(IObservable<TSource> source, Action<TSource> onNext, Action<Exception> onError, Action onCompleted)
  215. {
  216. return Do_(source, onNext, onError, onCompleted);
  217. }
  218. public virtual IObservable<TSource> Do<TSource>(IObservable<TSource> source, IObserver<TSource> observer)
  219. {
  220. return Do_(source, observer.OnNext, observer.OnError, observer.OnCompleted);
  221. }
  222. private static IObservable<TSource> Do_<TSource>(IObservable<TSource> source, Action<TSource> onNext, Action<Exception> onError, Action onCompleted)
  223. {
  224. #if !NO_PERF
  225. return new Do<TSource>(source, onNext, onError, onCompleted);
  226. #else
  227. return new AnonymousObservable<TSource>(obs =>
  228. {
  229. return source.Subscribe(
  230. x =>
  231. {
  232. try
  233. {
  234. onNext(x);
  235. }
  236. catch (Exception ex)
  237. {
  238. obs.OnError(ex);
  239. }
  240. obs.OnNext(x);
  241. },
  242. ex =>
  243. {
  244. try
  245. {
  246. onError(ex);
  247. }
  248. catch (Exception ex2)
  249. {
  250. obs.OnError(ex2);
  251. }
  252. obs.OnError(ex);
  253. },
  254. () =>
  255. {
  256. try
  257. {
  258. onCompleted();
  259. }
  260. catch (Exception ex)
  261. {
  262. obs.OnError(ex);
  263. }
  264. obs.OnCompleted();
  265. });
  266. });
  267. #endif
  268. }
  269. #endregion
  270. #region + Finally +
  271. public virtual IObservable<TSource> Finally<TSource>(IObservable<TSource> source, Action finallyAction)
  272. {
  273. #if !NO_PERF
  274. return new Finally<TSource>(source, finallyAction);
  275. #else
  276. return new AnonymousObservable<TSource>(observer =>
  277. {
  278. var subscription = source.Subscribe(observer);
  279. return Disposable.Create(() =>
  280. {
  281. try
  282. {
  283. subscription.Dispose();
  284. }
  285. finally
  286. {
  287. finallyAction();
  288. }
  289. });
  290. });
  291. #endif
  292. }
  293. #endregion
  294. #region + IgnoreElements +
  295. public virtual IObservable<TSource> IgnoreElements<TSource>(IObservable<TSource> source)
  296. {
  297. #if !NO_PERF
  298. var ignoreElements = source as IgnoreElements<TSource>;
  299. if (ignoreElements != null)
  300. return ignoreElements.Omega();
  301. return new IgnoreElements<TSource>(source);
  302. #else
  303. return new AnonymousObservable<TSource>(observer => source.Subscribe(_ => { }, observer.OnError, observer.OnCompleted));
  304. #endif
  305. }
  306. #endregion
  307. #region + Materialize +
  308. public virtual IObservable<Notification<TSource>> Materialize<TSource>(IObservable<TSource> source)
  309. {
  310. #if !NO_PERF
  311. //
  312. // NOTE: Peephole optimization of xs.Dematerialize().Materialize() should not be performed. It's possible for xs to
  313. // contain multiple terminal notifications, which won't survive a Dematerialize().Materialize() chain. In case
  314. // a reduction to xs.AsObservable() would be performed, those notification elements would survive.
  315. //
  316. return new Materialize<TSource>(source);
  317. #else
  318. return new AnonymousObservable<Notification<TSource>>(observer =>
  319. source.Subscribe(
  320. value => observer.OnNext(Notification.CreateOnNext<TSource>(value)),
  321. exception =>
  322. {
  323. observer.OnNext(Notification.CreateOnError<TSource>(exception));
  324. observer.OnCompleted();
  325. },
  326. () =>
  327. {
  328. observer.OnNext(Notification.CreateOnCompleted<TSource>());
  329. observer.OnCompleted();
  330. }));
  331. #endif
  332. }
  333. #endregion
  334. #region - Repeat -
  335. public virtual IObservable<TSource> Repeat<TSource>(IObservable<TSource> source)
  336. {
  337. return RepeatInfinite(source).Concat();
  338. }
  339. private static IEnumerable<T> RepeatInfinite<T>(T value)
  340. {
  341. while (true)
  342. yield return value;
  343. }
  344. public virtual IObservable<TSource> Repeat<TSource>(IObservable<TSource> source, int repeatCount)
  345. {
  346. return Enumerable.Repeat(source, repeatCount).Concat();
  347. }
  348. #endregion
  349. #region - Retry -
  350. public virtual IObservable<TSource> Retry<TSource>(IObservable<TSource> source)
  351. {
  352. return RepeatInfinite(source).Catch();
  353. }
  354. public virtual IObservable<TSource> Retry<TSource>(IObservable<TSource> source, int retryCount)
  355. {
  356. return Enumerable.Repeat(source, retryCount).Catch();
  357. }
  358. #endregion
  359. #region + Scan +
  360. public virtual IObservable<TAccumulate> Scan<TSource, TAccumulate>(IObservable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator)
  361. {
  362. #if !NO_PERF
  363. return new Scan<TSource, TAccumulate>(source, seed, accumulator);
  364. #else
  365. return Defer(() =>
  366. {
  367. var accumulation = default(TAccumulate);
  368. var hasAccumulation = false;
  369. return source.Select(x =>
  370. {
  371. if (hasAccumulation)
  372. accumulation = accumulator(accumulation, x);
  373. else
  374. {
  375. accumulation = accumulator(seed, x);
  376. hasAccumulation = true;
  377. }
  378. return accumulation;
  379. });
  380. });
  381. #endif
  382. }
  383. public virtual IObservable<TSource> Scan<TSource>(IObservable<TSource> source, Func<TSource, TSource, TSource> accumulator)
  384. {
  385. #if !NO_PERF
  386. return new Scan<TSource>(source, accumulator);
  387. #else
  388. return Defer(() =>
  389. {
  390. var accumulation = default(TSource);
  391. var hasAccumulation = false;
  392. return source.Select(x =>
  393. {
  394. if (hasAccumulation)
  395. accumulation = accumulator(accumulation, x);
  396. else
  397. {
  398. accumulation = x;
  399. hasAccumulation = true;
  400. }
  401. return accumulation;
  402. });
  403. });
  404. #endif
  405. }
  406. #endregion
  407. #region + SkipLast +
  408. public virtual IObservable<TSource> SkipLast<TSource>(IObservable<TSource> source, int count)
  409. {
  410. #if !NO_PERF
  411. return new SkipLast<TSource>(source, count);
  412. #else
  413. return new AnonymousObservable<TSource>(observer =>
  414. {
  415. var q = new Queue<TSource>();
  416. return source.Subscribe(
  417. x =>
  418. {
  419. q.Enqueue(x);
  420. if (q.Count > count)
  421. observer.OnNext(q.Dequeue());
  422. },
  423. observer.OnError,
  424. observer.OnCompleted);
  425. });
  426. #endif
  427. }
  428. #endregion
  429. #region - StartWith -
  430. public virtual IObservable<TSource> StartWith<TSource>(IObservable<TSource> source, params TSource[] values)
  431. {
  432. return StartWith_<TSource>(source, SchedulerDefaults.ConstantTimeOperations, values);
  433. }
  434. public virtual IObservable<TSource> StartWith<TSource>(IObservable<TSource> source, IScheduler scheduler, params TSource[] values)
  435. {
  436. return StartWith_<TSource>(source, scheduler, values);
  437. }
  438. public virtual IObservable<TSource> StartWith<TSource>(IObservable<TSource> source, IEnumerable<TSource> values)
  439. {
  440. return StartWith(source, SchedulerDefaults.ConstantTimeOperations, values);
  441. }
  442. public virtual IObservable<TSource> StartWith<TSource>(IObservable<TSource> source, IScheduler scheduler, IEnumerable<TSource> values)
  443. {
  444. //
  445. // NOTE: For some reason, someone introduced this signature in the Observable class, which is inconsistent with the Rx pattern
  446. // of putting the IScheduler last. It also wasn't wired up through IQueryLanguage. When introducing this method in the
  447. // IQueryLanguage interface, we went for consistency with the public API, hence the odd position of the IScheduler.
  448. //
  449. var valueArray = values as TSource[];
  450. if (valueArray == null)
  451. {
  452. var valueList = new List<TSource>(values);
  453. valueArray = valueList.ToArray();
  454. }
  455. return StartWith_<TSource>(source, scheduler, valueArray);
  456. }
  457. private static IObservable<TSource> StartWith_<TSource>(IObservable<TSource> source, IScheduler scheduler, params TSource[] values)
  458. {
  459. return values.ToObservable(scheduler).Concat(source);
  460. }
  461. #endregion
  462. #region + TakeLast +
  463. public virtual IObservable<TSource> TakeLast<TSource>(IObservable<TSource> source, int count)
  464. {
  465. return TakeLast_(source, count, SchedulerDefaults.Iteration);
  466. }
  467. public virtual IObservable<TSource> TakeLast<TSource>(IObservable<TSource> source, int count, IScheduler scheduler)
  468. {
  469. return TakeLast_(source, count, scheduler);
  470. }
  471. private static IObservable<TSource> TakeLast_<TSource>(IObservable<TSource> source, int count, IScheduler scheduler)
  472. {
  473. #if !NO_PERF
  474. return new TakeLast<TSource>(source, count, scheduler);
  475. #else
  476. return new AnonymousObservable<TSource>(observer =>
  477. {
  478. var q = new Queue<TSource>();
  479. var g = new CompositeDisposable();
  480. g.Add(source.Subscribe(
  481. x =>
  482. {
  483. q.Enqueue(x);
  484. if (q.Count > count)
  485. q.Dequeue();
  486. },
  487. observer.OnError,
  488. () =>
  489. {
  490. g.Add(scheduler.Schedule(rec =>
  491. {
  492. if (q.Count > 0)
  493. {
  494. observer.OnNext(q.Dequeue());
  495. rec();
  496. }
  497. else
  498. {
  499. observer.OnCompleted();
  500. }
  501. }));
  502. }
  503. ));
  504. return g;
  505. });
  506. #endif
  507. }
  508. public virtual IObservable<IList<TSource>> TakeLastBuffer<TSource>(IObservable<TSource> source, int count)
  509. {
  510. #if !NO_PERF
  511. return new TakeLastBuffer<TSource>(source, count);
  512. #else
  513. return new AnonymousObservable<IList<TSource>>(observer =>
  514. {
  515. var q = new Queue<TSource>();
  516. return source.Subscribe(
  517. x =>
  518. {
  519. q.Enqueue(x);
  520. if (q.Count > count)
  521. q.Dequeue();
  522. },
  523. observer.OnError,
  524. () =>
  525. {
  526. observer.OnNext(q.ToList());
  527. observer.OnCompleted();
  528. });
  529. });
  530. #endif
  531. }
  532. #endregion
  533. #region + Window +
  534. public virtual IObservable<IObservable<TSource>> Window<TSource>(IObservable<TSource> source, int count, int skip)
  535. {
  536. return Window_<TSource>(source, count, skip);
  537. }
  538. public virtual IObservable<IObservable<TSource>> Window<TSource>(IObservable<TSource> source, int count)
  539. {
  540. return Window_<TSource>(source, count, count);
  541. }
  542. private static IObservable<IObservable<TSource>> Window_<TSource>(IObservable<TSource> source, int count, int skip)
  543. {
  544. #if !NO_PERF
  545. return new Window<TSource>(source, count, skip);
  546. #else
  547. return new AnonymousObservable<IObservable<TSource>>(observer =>
  548. {
  549. var q = new Queue<ISubject<TSource>>();
  550. var n = 0;
  551. var m = new SingleAssignmentDisposable();
  552. var refCountDisposable = new RefCountDisposable(m);
  553. Action createWindow = () =>
  554. {
  555. var s = new Subject<TSource>();
  556. q.Enqueue(s);
  557. observer.OnNext(s.AddRef(refCountDisposable));
  558. };
  559. createWindow();
  560. m.Disposable = source.Subscribe(
  561. x =>
  562. {
  563. foreach (var s in q)
  564. s.OnNext(x);
  565. var c = n - count + 1;
  566. if (c >= 0 && c % skip == 0)
  567. {
  568. var s = q.Dequeue();
  569. s.OnCompleted();
  570. }
  571. n++;
  572. if (n % skip == 0)
  573. createWindow();
  574. },
  575. exception =>
  576. {
  577. while (q.Count > 0)
  578. q.Dequeue().OnError(exception);
  579. observer.OnError(exception);
  580. },
  581. () =>
  582. {
  583. while (q.Count > 0)
  584. q.Dequeue().OnCompleted();
  585. observer.OnCompleted();
  586. }
  587. );
  588. return refCountDisposable;
  589. });
  590. #endif
  591. }
  592. #endregion
  593. }
  594. }