QueryLanguage.StandardSequenceOperators.cs 58 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. using System.Collections.Generic;
  3. using System.Reactive.Concurrency;
  4. #if !NO_TPL
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. #endif
  8. namespace System.Reactive.Linq
  9. {
  10. #if !NO_PERF
  11. using ObservableImpl;
  12. #endif
  13. internal partial class QueryLanguage
  14. {
  15. #region + Cast +
  16. public virtual IObservable<TResult> Cast<TResult>(IObservable<object> source)
  17. {
  18. #if !NO_PERF
  19. return new Cast<object, TResult>(source);
  20. #else
  21. return source.Select(x => (TResult)x);
  22. #endif
  23. }
  24. #endregion
  25. #region + DefaultIfEmpty +
  26. public virtual IObservable<TSource> DefaultIfEmpty<TSource>(IObservable<TSource> source)
  27. {
  28. #if !NO_PERF
  29. return new DefaultIfEmpty<TSource>(source, default(TSource));
  30. #else
  31. return DefaultIfEmpty_(source, default(TSource));
  32. #endif
  33. }
  34. public virtual IObservable<TSource> DefaultIfEmpty<TSource>(IObservable<TSource> source, TSource defaultValue)
  35. {
  36. #if !NO_PERF
  37. return new DefaultIfEmpty<TSource>(source, defaultValue);
  38. #else
  39. return DefaultIfEmpty_(source, defaultValue);
  40. #endif
  41. }
  42. #if NO_PERF
  43. private static IObservable<TSource> DefaultIfEmpty_<TSource>(IObservable<TSource> source, TSource defaultValue)
  44. {
  45. return new AnonymousObservable<TSource>(observer =>
  46. {
  47. var found = false;
  48. return source.Subscribe(
  49. x =>
  50. {
  51. found = true;
  52. observer.OnNext(x);
  53. },
  54. observer.OnError,
  55. () =>
  56. {
  57. if (!found)
  58. observer.OnNext(defaultValue);
  59. observer.OnCompleted();
  60. }
  61. );
  62. });
  63. }
  64. #endif
  65. #endregion
  66. #region + Distinct +
  67. public virtual IObservable<TSource> Distinct<TSource>(IObservable<TSource> source)
  68. {
  69. #if !NO_PERF
  70. return new Distinct<TSource, TSource>(source, x => x, EqualityComparer<TSource>.Default);
  71. #else
  72. return Distinct_(source, x => x, EqualityComparer<TSource>.Default);
  73. #endif
  74. }
  75. public virtual IObservable<TSource> Distinct<TSource>(IObservable<TSource> source, IEqualityComparer<TSource> comparer)
  76. {
  77. #if !NO_PERF
  78. return new Distinct<TSource, TSource>(source, x => x, comparer);
  79. #else
  80. return Distinct_(source, x => x, comparer);
  81. #endif
  82. }
  83. public virtual IObservable<TSource> Distinct<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector)
  84. {
  85. #if !NO_PERF
  86. return new Distinct<TSource, TKey>(source, keySelector, EqualityComparer<TKey>.Default);
  87. #else
  88. return Distinct_(source, keySelector, EqualityComparer<TKey>.Default);
  89. #endif
  90. }
  91. public virtual IObservable<TSource> Distinct<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer)
  92. {
  93. #if !NO_PERF
  94. return new Distinct<TSource, TKey>(source, keySelector, comparer);
  95. #else
  96. return Distinct_(source, keySelector, comparer);
  97. #endif
  98. }
  99. #if NO_PERF
  100. private static IObservable<TSource> Distinct_<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer)
  101. {
  102. return new AnonymousObservable<TSource>(observer =>
  103. {
  104. var hashSet = new HashSet<TKey>(comparer);
  105. return source.Subscribe(
  106. x =>
  107. {
  108. var key = default(TKey);
  109. var hasAdded = false;
  110. try
  111. {
  112. key = keySelector(x);
  113. hasAdded = hashSet.Add(key);
  114. }
  115. catch (Exception exception)
  116. {
  117. observer.OnError(exception);
  118. return;
  119. }
  120. if (hasAdded)
  121. observer.OnNext(x);
  122. },
  123. observer.OnError,
  124. observer.OnCompleted
  125. );
  126. });
  127. }
  128. #endif
  129. #endregion
  130. #region + GroupBy +
  131. public virtual IObservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector)
  132. {
  133. return GroupBy_<TSource, TKey, TElement>(source, keySelector, elementSelector, null, EqualityComparer<TKey>.Default);
  134. }
  135. public virtual IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer)
  136. {
  137. return GroupBy_<TSource, TKey, TSource>(source, keySelector, x => x, null, comparer);
  138. }
  139. public virtual IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector)
  140. {
  141. return GroupBy_<TSource, TKey, TSource>(source, keySelector, x => x, null, EqualityComparer<TKey>.Default);
  142. }
  143. public virtual IObservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, IEqualityComparer<TKey> comparer)
  144. {
  145. return GroupBy_<TSource, TKey, TElement>(source, keySelector, elementSelector, null, comparer);
  146. }
  147. public virtual IObservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, int capacity)
  148. {
  149. return GroupBy_<TSource, TKey, TElement>(source, keySelector, elementSelector, capacity, EqualityComparer<TKey>.Default);
  150. }
  151. public virtual IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector, int capacity, IEqualityComparer<TKey> comparer)
  152. {
  153. return GroupBy_<TSource, TKey, TSource>(source, keySelector, x => x, capacity, comparer);
  154. }
  155. public virtual IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector, int capacity)
  156. {
  157. return GroupBy_<TSource, TKey, TSource>(source, keySelector, x => x, capacity, EqualityComparer<TKey>.Default);
  158. }
  159. public virtual IObservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, int capacity, IEqualityComparer<TKey> comparer)
  160. {
  161. return GroupBy_<TSource, TKey, TElement>(source, keySelector, elementSelector, capacity, comparer);
  162. }
  163. private static IObservable<IGroupedObservable<TKey, TElement>> GroupBy_<TSource, TKey, TElement>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, int? capacity, IEqualityComparer<TKey> comparer)
  164. {
  165. #if !NO_PERF
  166. return new GroupBy<TSource, TKey, TElement>(source, keySelector, elementSelector, capacity, comparer);
  167. #else
  168. return GroupByUntil_<TSource, TKey, TElement, Unit>(source, keySelector, elementSelector, _ => Observable.Never<Unit>(), capacity, comparer);
  169. #endif
  170. }
  171. #endregion
  172. #region + GroupByUntil +
  173. public virtual IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
  174. {
  175. return GroupByUntil_<TSource, TKey, TElement, TDuration>(source, keySelector, elementSelector, durationSelector, null, comparer);
  176. }
  177. public virtual IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector)
  178. {
  179. return GroupByUntil_<TSource, TKey, TElement, TDuration>(source, keySelector, elementSelector, durationSelector, null, EqualityComparer<TKey>.Default);
  180. }
  181. public virtual IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
  182. {
  183. return GroupByUntil_<TSource, TKey, TSource, TDuration>(source, keySelector, x => x, durationSelector, null, comparer);
  184. }
  185. public virtual IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector)
  186. {
  187. return GroupByUntil_<TSource, TKey, TSource, TDuration>(source, keySelector, x => x, durationSelector, null, EqualityComparer<TKey>.Default);
  188. }
  189. public virtual IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer)
  190. {
  191. return GroupByUntil_<TSource, TKey, TElement, TDuration>(source, keySelector, elementSelector, durationSelector, capacity, comparer);
  192. }
  193. public virtual IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, int capacity)
  194. {
  195. return GroupByUntil_<TSource, TKey, TElement, TDuration>(source, keySelector, elementSelector, durationSelector, capacity, EqualityComparer<TKey>.Default);
  196. }
  197. public virtual IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer)
  198. {
  199. return GroupByUntil_<TSource, TKey, TSource, TDuration>(source, keySelector, x => x, durationSelector, capacity, comparer);
  200. }
  201. public virtual IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector, int capacity)
  202. {
  203. return GroupByUntil_<TSource, TKey, TSource, TDuration>(source, keySelector, x => x, durationSelector, capacity, EqualityComparer<TKey>.Default);
  204. }
  205. private static IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil_<TSource, TKey, TElement, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, int? capacity, IEqualityComparer<TKey> comparer)
  206. {
  207. #if !NO_PERF
  208. return new GroupByUntil<TSource, TKey, TElement, TDuration>(source, keySelector, elementSelector, durationSelector, capacity, comparer);
  209. #else
  210. return new AnonymousObservable<IGroupedObservable<TKey, TElement>>(observer =>
  211. {
  212. var map = capacity.HasValue
  213. ? new Dictionary<TKey, ISubject<TElement>>(capacity.Value, comparer)
  214. : new Dictionary<TKey, ISubject<TElement>>(comparer);
  215. var groupDisposable = new CompositeDisposable();
  216. var refCountDisposable = new RefCountDisposable(groupDisposable);
  217. groupDisposable.Add(source.Subscribe(x =>
  218. {
  219. var key = default(TKey);
  220. try
  221. {
  222. key = keySelector(x);
  223. }
  224. catch (Exception exception)
  225. {
  226. lock (map)
  227. foreach (var w in map.Values.ToArray())
  228. w.OnError(exception);
  229. observer.OnError(exception);
  230. return;
  231. }
  232. var fireNewMapEntry = false;
  233. var writer = default(ISubject<TElement>);
  234. try
  235. {
  236. lock (map)
  237. {
  238. if (!map.TryGetValue(key, out writer))
  239. {
  240. writer = new Subject<TElement>();
  241. map.Add(key, writer);
  242. fireNewMapEntry = true;
  243. }
  244. }
  245. }
  246. catch (Exception exception)
  247. {
  248. lock (map)
  249. {
  250. foreach (var w in map.Values.ToArray())
  251. w.OnError(exception);
  252. }
  253. observer.OnError(exception);
  254. return;
  255. }
  256. if (fireNewMapEntry)
  257. {
  258. var group = new GroupedObservable<TKey, TElement>(key, writer, refCountDisposable);
  259. var durationGroup = new GroupedObservable<TKey, TElement>(key, writer);
  260. var duration = default(IObservable<TDuration>);
  261. try
  262. {
  263. duration = durationSelector(durationGroup);
  264. }
  265. catch (Exception exception)
  266. {
  267. foreach (var w in map.Values.ToArray())
  268. w.OnError(exception);
  269. observer.OnError(exception);
  270. return;
  271. }
  272. observer.OnNext(group);
  273. var md = new SingleAssignmentDisposable();
  274. groupDisposable.Add(md);
  275. Action expire = () =>
  276. {
  277. lock (map)
  278. {
  279. if (map.Remove(key))
  280. writer.OnCompleted();
  281. }
  282. groupDisposable.Remove(md);
  283. };
  284. md.Disposable = duration.Take(1).Subscribe(
  285. _ => { },
  286. exception =>
  287. {
  288. lock (map)
  289. foreach (var o in map.Values.ToArray())
  290. o.OnError(exception);
  291. observer.OnError(exception);
  292. },
  293. expire);
  294. }
  295. var element = default(TElement);
  296. try
  297. {
  298. element = elementSelector(x);
  299. }
  300. catch (Exception exception)
  301. {
  302. lock (map)
  303. foreach (var w in map.Values.ToArray())
  304. w.OnError(exception);
  305. observer.OnError(exception);
  306. return;
  307. }
  308. writer.OnNext(element);
  309. },
  310. e =>
  311. {
  312. lock (map)
  313. foreach (var w in map.Values.ToArray())
  314. w.OnError(e);
  315. observer.OnError(e);
  316. },
  317. () =>
  318. {
  319. lock (map)
  320. foreach (var w in map.Values.ToArray())
  321. w.OnCompleted();
  322. observer.OnCompleted();
  323. }));
  324. return refCountDisposable;
  325. });
  326. #endif
  327. }
  328. #endregion
  329. #region + GroupJoin +
  330. public virtual IObservable<TResult> GroupJoin<TLeft, TRight, TLeftDuration, TRightDuration, TResult>(IObservable<TLeft> left, IObservable<TRight> right, Func<TLeft, IObservable<TLeftDuration>> leftDurationSelector, Func<TRight, IObservable<TRightDuration>> rightDurationSelector, Func<TLeft, IObservable<TRight>, TResult> resultSelector)
  331. {
  332. return GroupJoin_<TLeft, TRight, TLeftDuration, TRightDuration, TResult>(left, right, leftDurationSelector, rightDurationSelector, resultSelector);
  333. }
  334. private static IObservable<TResult> GroupJoin_<TLeft, TRight, TLeftDuration, TRightDuration, TResult>(IObservable<TLeft> left, IObservable<TRight> right, Func<TLeft, IObservable<TLeftDuration>> leftDurationSelector, Func<TRight, IObservable<TRightDuration>> rightDurationSelector, Func<TLeft, IObservable<TRight>, TResult> resultSelector)
  335. {
  336. #if !NO_PERF
  337. return new GroupJoin<TLeft, TRight, TLeftDuration, TRightDuration, TResult>(left, right, leftDurationSelector, rightDurationSelector, resultSelector);
  338. #else
  339. return new AnonymousObservable<TResult>(observer =>
  340. {
  341. var gate = new object();
  342. var group = new CompositeDisposable();
  343. var r = new RefCountDisposable(group);
  344. var leftMap = new Dictionary<int, IObserver<TRight>>();
  345. var rightMap = new Dictionary<int, TRight>();
  346. var leftID = 0;
  347. var rightID = 0;
  348. group.Add(left.Subscribe(
  349. value =>
  350. {
  351. var s = new Subject<TRight>();
  352. var id = 0;
  353. lock (gate)
  354. {
  355. id = leftID++;
  356. leftMap.Add(id, s);
  357. }
  358. lock (gate)
  359. {
  360. var result = default(TResult);
  361. try
  362. {
  363. result = resultSelector(value, s.AddRef(r));
  364. }
  365. catch (Exception exception)
  366. {
  367. foreach (var o in leftMap.Values.ToArray())
  368. o.OnError(exception);
  369. observer.OnError(exception);
  370. return;
  371. }
  372. observer.OnNext(result);
  373. foreach (var rightValue in rightMap.Values.ToArray())
  374. {
  375. s.OnNext(rightValue);
  376. }
  377. }
  378. var md = new SingleAssignmentDisposable();
  379. group.Add(md);
  380. Action expire = () =>
  381. {
  382. lock (gate)
  383. if (leftMap.Remove(id))
  384. s.OnCompleted();
  385. group.Remove(md);
  386. };
  387. var duration = default(IObservable<TLeftDuration>);
  388. try
  389. {
  390. duration = leftDurationSelector(value);
  391. }
  392. catch (Exception exception)
  393. {
  394. lock (gate)
  395. {
  396. foreach (var o in leftMap.Values.ToArray())
  397. o.OnError(exception);
  398. observer.OnError(exception);
  399. }
  400. return;
  401. }
  402. md.Disposable = duration.Take(1).Subscribe(
  403. _ => { },
  404. exception =>
  405. {
  406. lock (gate)
  407. {
  408. foreach (var o in leftMap.Values.ToArray())
  409. o.OnError(exception);
  410. observer.OnError(exception);
  411. }
  412. },
  413. expire);
  414. },
  415. exception =>
  416. {
  417. lock (gate)
  418. {
  419. foreach (var o in leftMap.Values.ToArray())
  420. o.OnError(exception);
  421. observer.OnError(exception);
  422. }
  423. },
  424. () =>
  425. {
  426. lock (gate)
  427. observer.OnCompleted();
  428. }));
  429. group.Add(right.Subscribe(
  430. value =>
  431. {
  432. var id = 0;
  433. lock (gate)
  434. {
  435. id = rightID++;
  436. rightMap.Add(id, value);
  437. }
  438. var md = new SingleAssignmentDisposable();
  439. group.Add(md);
  440. Action expire = () =>
  441. {
  442. lock (gate)
  443. rightMap.Remove(id);
  444. group.Remove(md);
  445. };
  446. var duration = default(IObservable<TRightDuration>);
  447. try
  448. {
  449. duration = rightDurationSelector(value);
  450. }
  451. catch (Exception exception)
  452. {
  453. lock (gate)
  454. {
  455. foreach (var o in leftMap.Values.ToArray())
  456. o.OnError(exception);
  457. observer.OnError(exception);
  458. }
  459. return;
  460. }
  461. md.Disposable = duration.Take(1).Subscribe(
  462. _ => { },
  463. exception =>
  464. {
  465. lock (gate)
  466. {
  467. foreach (var o in leftMap.Values.ToArray())
  468. o.OnError(exception);
  469. observer.OnError(exception);
  470. }
  471. },
  472. expire);
  473. lock (gate)
  474. {
  475. foreach (var o in leftMap.Values.ToArray())
  476. o.OnNext(value);
  477. }
  478. },
  479. exception =>
  480. {
  481. lock (gate)
  482. {
  483. foreach (var o in leftMap.Values.ToArray())
  484. o.OnError(exception);
  485. observer.OnError(exception);
  486. }
  487. }));
  488. return r;
  489. });
  490. #endif
  491. }
  492. #endregion
  493. #region + Join +
  494. public virtual IObservable<TResult> Join<TLeft, TRight, TLeftDuration, TRightDuration, TResult>(IObservable<TLeft> left, IObservable<TRight> right, Func<TLeft, IObservable<TLeftDuration>> leftDurationSelector, Func<TRight, IObservable<TRightDuration>> rightDurationSelector, Func<TLeft, TRight, TResult> resultSelector)
  495. {
  496. return Join_<TLeft, TRight, TLeftDuration, TRightDuration, TResult>(left, right, leftDurationSelector, rightDurationSelector, resultSelector);
  497. }
  498. private static IObservable<TResult> Join_<TLeft, TRight, TLeftDuration, TRightDuration, TResult>(IObservable<TLeft> left, IObservable<TRight> right, Func<TLeft, IObservable<TLeftDuration>> leftDurationSelector, Func<TRight, IObservable<TRightDuration>> rightDurationSelector, Func<TLeft, TRight, TResult> resultSelector)
  499. {
  500. #if !NO_PERF
  501. return new Join<TLeft, TRight, TLeftDuration, TRightDuration, TResult>(left, right, leftDurationSelector, rightDurationSelector, resultSelector);
  502. #else
  503. return new AnonymousObservable<TResult>(observer =>
  504. {
  505. var gate = new object();
  506. var leftDone = false;
  507. var rightDone = false;
  508. var group = new CompositeDisposable();
  509. var leftMap = new Dictionary<int, TLeft>();
  510. var rightMap = new Dictionary<int, TRight>();
  511. var leftID = 0;
  512. var rightID = 0;
  513. group.Add(left.Subscribe(
  514. value =>
  515. {
  516. var id = 0;
  517. lock (gate)
  518. {
  519. id = leftID++;
  520. leftMap.Add(id, value);
  521. }
  522. var md = new SingleAssignmentDisposable();
  523. group.Add(md);
  524. Action expire = () =>
  525. {
  526. lock (gate)
  527. {
  528. if (leftMap.Remove(id) && leftMap.Count == 0 && leftDone)
  529. observer.OnCompleted();
  530. }
  531. group.Remove(md);
  532. };
  533. var duration = default(IObservable<TLeftDuration>);
  534. try
  535. {
  536. duration = leftDurationSelector(value);
  537. }
  538. catch (Exception exception)
  539. {
  540. observer.OnError(exception);
  541. return;
  542. }
  543. md.Disposable = duration.Take(1).Subscribe(
  544. _ => { },
  545. error =>
  546. {
  547. lock (gate)
  548. observer.OnError(error);
  549. },
  550. expire);
  551. lock (gate)
  552. {
  553. foreach (var rightValue in rightMap.Values.ToArray())
  554. {
  555. var result = default(TResult);
  556. try
  557. {
  558. result = resultSelector(value, rightValue);
  559. }
  560. catch (Exception exception)
  561. {
  562. observer.OnError(exception);
  563. return;
  564. }
  565. observer.OnNext(result);
  566. }
  567. }
  568. },
  569. error =>
  570. {
  571. lock (gate)
  572. observer.OnError(error);
  573. },
  574. () =>
  575. {
  576. lock (gate)
  577. {
  578. leftDone = true;
  579. if (rightDone || leftMap.Count == 0)
  580. observer.OnCompleted();
  581. }
  582. }));
  583. group.Add(right.Subscribe(
  584. value =>
  585. {
  586. var id = 0;
  587. lock (gate)
  588. {
  589. id = rightID++;
  590. rightMap.Add(id, value);
  591. }
  592. var md = new SingleAssignmentDisposable();
  593. group.Add(md);
  594. Action expire = () =>
  595. {
  596. lock (gate)
  597. {
  598. if (rightMap.Remove(id) && rightMap.Count == 0 && rightDone)
  599. observer.OnCompleted();
  600. }
  601. group.Remove(md);
  602. };
  603. var duration = default(IObservable<TRightDuration>);
  604. try
  605. {
  606. duration = rightDurationSelector(value);
  607. }
  608. catch (Exception exception)
  609. {
  610. observer.OnError(exception);
  611. return;
  612. }
  613. md.Disposable = duration.Take(1).Subscribe(
  614. _ => { },
  615. error =>
  616. {
  617. lock (gate)
  618. observer.OnError(error);
  619. },
  620. expire);
  621. lock (gate)
  622. {
  623. foreach (var leftValue in leftMap.Values.ToArray())
  624. {
  625. var result = default(TResult);
  626. try
  627. {
  628. result = resultSelector(leftValue, value);
  629. }
  630. catch (Exception exception)
  631. {
  632. observer.OnError(exception);
  633. return;
  634. }
  635. observer.OnNext(result);
  636. }
  637. }
  638. },
  639. error =>
  640. {
  641. lock (gate)
  642. observer.OnError(error);
  643. },
  644. () =>
  645. {
  646. lock (gate)
  647. {
  648. rightDone = true;
  649. if (leftDone || rightMap.Count == 0)
  650. observer.OnCompleted();
  651. }
  652. }));
  653. return group;
  654. });
  655. #endif
  656. }
  657. #endregion
  658. #region + OfType +
  659. public virtual IObservable<TResult> OfType<TResult>(IObservable<object> source)
  660. {
  661. #if !NO_PERF
  662. return new OfType<object, TResult>(source);
  663. #else
  664. return source.Where(x => x is TResult).Cast<TResult>();
  665. #endif
  666. }
  667. #endregion
  668. #region + Select +
  669. public virtual IObservable<TResult> Select<TSource, TResult>(IObservable<TSource> source, Func<TSource, TResult> selector)
  670. {
  671. #if !NO_PERF
  672. return new Select<TSource, TResult>(source, selector);
  673. #else
  674. var s = source as SelectObservable<TSource>;
  675. if (s != null)
  676. return s.Select(selector);
  677. return new SelectObservable<TSource, TResult>(source, selector);
  678. #endif
  679. }
  680. #if NO_PERF
  681. abstract class SelectObservable<TResult> : ObservableBase<TResult>
  682. {
  683. public abstract IObservable<TResult2> Select<TResult2>(Func<TResult, TResult2> selector);
  684. }
  685. class SelectObservable<TSource, TResult> : SelectObservable<TResult>
  686. {
  687. private readonly IObservable<TSource> _source;
  688. private readonly Func<TSource, TResult> _selector;
  689. public SelectObservable(IObservable<TSource> source, Func<TSource, TResult> selector)
  690. {
  691. _source = source;
  692. _selector = selector;
  693. }
  694. protected override IDisposable SubscribeCore(IObserver<TResult> observer)
  695. {
  696. return _source.Subscribe(new Observer(observer, _selector));
  697. }
  698. public override IObservable<TResult2> Select<TResult2>(Func<TResult, TResult2> selector)
  699. {
  700. return new SelectObservable<TSource, TResult2>(_source, x => selector(_selector(x)));
  701. }
  702. class Observer : ObserverBase<TSource>
  703. {
  704. private readonly IObserver<TResult> _observer;
  705. private readonly Func<TSource, TResult> _selector;
  706. public Observer(IObserver<TResult> observer, Func<TSource, TResult> selector)
  707. {
  708. _observer = observer;
  709. _selector = selector;
  710. }
  711. protected override void OnNextCore(TSource value)
  712. {
  713. TResult result;
  714. try
  715. {
  716. result = _selector(value);
  717. }
  718. catch (Exception exception)
  719. {
  720. _observer.OnError(exception);
  721. return;
  722. }
  723. _observer.OnNext(result);
  724. }
  725. protected override void OnErrorCore(Exception error)
  726. {
  727. _observer.OnError(error);
  728. }
  729. protected override void OnCompletedCore()
  730. {
  731. _observer.OnCompleted();
  732. }
  733. }
  734. }
  735. #endif
  736. public virtual IObservable<TResult> Select<TSource, TResult>(IObservable<TSource> source, Func<TSource, int, TResult> selector)
  737. {
  738. #if !NO_PERF
  739. return new Select<TSource, TResult>(source, selector);
  740. #else
  741. return Defer(() =>
  742. {
  743. var index = 0;
  744. return source.Select(x => selector(x, checked(index++)));
  745. });
  746. #endif
  747. }
  748. #endregion
  749. #region + SelectMany +
  750. public virtual IObservable<TOther> SelectMany<TSource, TOther>(IObservable<TSource> source, IObservable<TOther> other)
  751. {
  752. return SelectMany_<TSource, TOther>(source, _ => other);
  753. }
  754. public virtual IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, IObservable<TResult>> selector)
  755. {
  756. return SelectMany_<TSource, TResult>(source, selector);
  757. }
  758. public virtual IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector)
  759. {
  760. return SelectMany_<TSource, TResult>(source, selector);
  761. }
  762. #if !NO_TPL
  763. public virtual IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, Task<TResult>> selector)
  764. {
  765. #if !NO_PERF
  766. return new SelectMany<TSource, TResult>(source, (x, token) => selector(x));
  767. #else
  768. return SelectMany_<TSource, TResult>(source, x => selector(x).ToObservable());
  769. #endif
  770. }
  771. public virtual IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, int, Task<TResult>> selector)
  772. {
  773. #if !NO_PERF
  774. return new SelectMany<TSource, TResult>(source, (x, i, token) => selector(x, i));
  775. #else
  776. return SelectMany_<TSource, TResult>(source, (x, i) => selector(x, i).ToObservable());
  777. #endif
  778. }
  779. public virtual IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, CancellationToken, Task<TResult>> selector)
  780. {
  781. #if !NO_PERF
  782. return new SelectMany<TSource, TResult>(source, selector);
  783. #else
  784. return SelectMany_<TSource, TResult>(source, x => FromAsync(ct => selector(x, ct)));
  785. #endif
  786. }
  787. public virtual IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, int, CancellationToken, Task<TResult>> selector)
  788. {
  789. #if !NO_PERF
  790. return new SelectMany<TSource, TResult>(source, selector);
  791. #else
  792. return SelectMany_<TSource, TResult>(source, (x, i) => FromAsync(ct => selector(x, i, ct)));
  793. #endif
  794. }
  795. #endif
  796. public virtual IObservable<TResult> SelectMany<TSource, TCollection, TResult>(IObservable<TSource> source, Func<TSource, IObservable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
  797. {
  798. return SelectMany_<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
  799. }
  800. public virtual IObservable<TResult> SelectMany<TSource, TCollection, TResult>(IObservable<TSource> source, Func<TSource, int, IObservable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector)
  801. {
  802. return SelectMany_<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
  803. }
  804. #if !NO_TPL
  805. public virtual IObservable<TResult> SelectMany<TSource, TTaskResult, TResult>(IObservable<TSource> source, Func<TSource, Task<TTaskResult>> taskSelector, Func<TSource, TTaskResult, TResult> resultSelector)
  806. {
  807. #if !NO_PERF
  808. return new SelectMany<TSource, TTaskResult, TResult>(source, (x, token) => taskSelector(x), resultSelector);
  809. #else
  810. return SelectMany_<TSource, TTaskResult, TResult>(source, x => taskSelector(x).ToObservable(), resultSelector);
  811. #endif
  812. }
  813. public virtual IObservable<TResult> SelectMany<TSource, TTaskResult, TResult>(IObservable<TSource> source, Func<TSource, int, Task<TTaskResult>> taskSelector, Func<TSource, int, TTaskResult, TResult> resultSelector)
  814. {
  815. #if !NO_PERF
  816. return new SelectMany<TSource, TTaskResult, TResult>(source, (x, i, token) => taskSelector(x, i), resultSelector);
  817. #else
  818. return SelectMany_<TSource, TTaskResult, TResult>(source, (x, i) => taskSelector(x, i).ToObservable(), (x, i, t, _) => resultSelector(x, i, t));
  819. #endif
  820. }
  821. public virtual IObservable<TResult> SelectMany<TSource, TTaskResult, TResult>(IObservable<TSource> source, Func<TSource, CancellationToken, Task<TTaskResult>> taskSelector, Func<TSource, TTaskResult, TResult> resultSelector)
  822. {
  823. #if !NO_PERF
  824. return new SelectMany<TSource, TTaskResult, TResult>(source, taskSelector, resultSelector);
  825. #else
  826. return SelectMany_<TSource, TTaskResult, TResult>(source, x => FromAsync(ct => taskSelector(x, ct)), resultSelector);
  827. #endif
  828. }
  829. public virtual IObservable<TResult> SelectMany<TSource, TTaskResult, TResult>(IObservable<TSource> source, Func<TSource, int, CancellationToken, Task<TTaskResult>> taskSelector, Func<TSource, int, TTaskResult, TResult> resultSelector)
  830. {
  831. #if !NO_PERF
  832. return new SelectMany<TSource, TTaskResult, TResult>(source, taskSelector, resultSelector);
  833. #else
  834. return SelectMany_<TSource, TTaskResult, TResult>(source, (x, i) => FromAsync(ct => taskSelector(x, i, ct)), (x, i, t, _) => resultSelector(x, i, t));
  835. #endif
  836. }
  837. #endif
  838. private static IObservable<TResult> SelectMany_<TSource, TResult>(IObservable<TSource> source, Func<TSource, IObservable<TResult>> selector)
  839. {
  840. #if !NO_PERF
  841. return new SelectMany<TSource, TResult>(source, selector);
  842. #else
  843. return source.Select(selector).Merge();
  844. #endif
  845. }
  846. private static IObservable<TResult> SelectMany_<TSource, TResult>(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector)
  847. {
  848. #if !NO_PERF
  849. return new SelectMany<TSource, TResult>(source, selector);
  850. #else
  851. return source.Select(selector).Merge();
  852. #endif
  853. }
  854. private static IObservable<TResult> SelectMany_<TSource, TCollection, TResult>(IObservable<TSource> source, Func<TSource, IObservable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
  855. {
  856. #if !NO_PERF
  857. return new SelectMany<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
  858. #else
  859. return SelectMany_<TSource, TResult>(source, x => collectionSelector(x).Select(y => resultSelector(x, y)));
  860. #endif
  861. }
  862. private static IObservable<TResult> SelectMany_<TSource, TCollection, TResult>(IObservable<TSource> source, Func<TSource, int, IObservable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector)
  863. {
  864. #if !NO_PERF
  865. return new SelectMany<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
  866. #else
  867. return SelectMany_<TSource, TResult>(source, (x, i) => collectionSelector(x, i).Select((y, i2) => resultSelector(x, i, y, i2)));
  868. #endif
  869. }
  870. public virtual IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, IObservable<TResult>> onNext, Func<Exception, IObservable<TResult>> onError, Func<IObservable<TResult>> onCompleted)
  871. {
  872. #if !NO_PERF
  873. return new SelectMany<TSource, TResult>(source, onNext, onError, onCompleted);
  874. #else
  875. return source.Materialize().SelectMany(notification =>
  876. {
  877. if (notification.Kind == NotificationKind.OnNext)
  878. return onNext(notification.Value);
  879. else if (notification.Kind == NotificationKind.OnError)
  880. return onError(notification.Exception);
  881. else
  882. return onCompleted();
  883. });
  884. #endif
  885. }
  886. public virtual IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> onNext, Func<Exception, IObservable<TResult>> onError, Func<IObservable<TResult>> onCompleted)
  887. {
  888. #if !NO_PERF
  889. return new SelectMany<TSource, TResult>(source, onNext, onError, onCompleted);
  890. #else
  891. return Defer(() =>
  892. {
  893. var index = 0;
  894. return source.Materialize().SelectMany(notification =>
  895. {
  896. if (notification.Kind == NotificationKind.OnNext)
  897. return onNext(notification.Value, checked(index++));
  898. else if (notification.Kind == NotificationKind.OnError)
  899. return onError(notification.Exception);
  900. else
  901. return onCompleted();
  902. });
  903. });
  904. #endif
  905. }
  906. public virtual IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, IEnumerable<TResult>> selector)
  907. {
  908. #if !NO_PERF
  909. return new SelectMany<TSource, TResult>(source, selector);
  910. #else
  911. return SelectMany_<TSource, TResult, TResult>(source, selector, (_, x) => x);
  912. #endif
  913. }
  914. public virtual IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, int, IEnumerable<TResult>> selector)
  915. {
  916. #if !NO_PERF
  917. return new SelectMany<TSource, TResult>(source, selector);
  918. #else
  919. return SelectMany_<TSource, TResult, TResult>(source, selector, (_, __, x, ___) => x);
  920. #endif
  921. }
  922. public virtual IObservable<TResult> SelectMany<TSource, TCollection, TResult>(IObservable<TSource> source, Func<TSource, IEnumerable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
  923. {
  924. return SelectMany_<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
  925. }
  926. public virtual IObservable<TResult> SelectMany<TSource, TCollection, TResult>(IObservable<TSource> source, Func<TSource, int, IEnumerable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector)
  927. {
  928. return SelectMany_<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
  929. }
  930. private static IObservable<TResult> SelectMany_<TSource, TCollection, TResult>(IObservable<TSource> source, Func<TSource, IEnumerable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
  931. {
  932. #if !NO_PERF
  933. return new SelectMany<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
  934. #else
  935. return new AnonymousObservable<TResult>(observer =>
  936. source.Subscribe(
  937. x =>
  938. {
  939. var xs = default(IEnumerable<TCollection>);
  940. try
  941. {
  942. xs = collectionSelector(x);
  943. }
  944. catch (Exception exception)
  945. {
  946. observer.OnError(exception);
  947. return;
  948. }
  949. var e = xs.GetEnumerator();
  950. try
  951. {
  952. var hasNext = true;
  953. while (hasNext)
  954. {
  955. hasNext = false;
  956. var current = default(TResult);
  957. try
  958. {
  959. hasNext = e.MoveNext();
  960. if (hasNext)
  961. current = resultSelector(x, e.Current);
  962. }
  963. catch (Exception exception)
  964. {
  965. observer.OnError(exception);
  966. return;
  967. }
  968. if (hasNext)
  969. observer.OnNext(current);
  970. }
  971. }
  972. finally
  973. {
  974. if (e != null)
  975. e.Dispose();
  976. }
  977. },
  978. observer.OnError,
  979. observer.OnCompleted
  980. )
  981. );
  982. #endif
  983. }
  984. private static IObservable<TResult> SelectMany_<TSource, TCollection, TResult>(IObservable<TSource> source, Func<TSource, int, IEnumerable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector)
  985. {
  986. #if !NO_PERF
  987. return new SelectMany<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
  988. #else
  989. return new AnonymousObservable<TResult>(observer =>
  990. {
  991. var index = 0;
  992. return source.Subscribe(
  993. x =>
  994. {
  995. var xs = default(IEnumerable<TCollection>);
  996. try
  997. {
  998. xs = collectionSelector(x, checked(index++));
  999. }
  1000. catch (Exception exception)
  1001. {
  1002. observer.OnError(exception);
  1003. return;
  1004. }
  1005. var e = xs.GetEnumerator();
  1006. try
  1007. {
  1008. var eIndex = 0;
  1009. var hasNext = true;
  1010. while (hasNext)
  1011. {
  1012. hasNext = false;
  1013. var current = default(TResult);
  1014. try
  1015. {
  1016. hasNext = e.MoveNext();
  1017. if (hasNext)
  1018. current = resultSelector(x, index, e.Current, checked(eIndex++));
  1019. }
  1020. catch (Exception exception)
  1021. {
  1022. observer.OnError(exception);
  1023. return;
  1024. }
  1025. if (hasNext)
  1026. observer.OnNext(current);
  1027. }
  1028. }
  1029. finally
  1030. {
  1031. if (e != null)
  1032. e.Dispose();
  1033. }
  1034. },
  1035. observer.OnError,
  1036. observer.OnCompleted
  1037. )
  1038. });
  1039. #endif
  1040. }
  1041. #endregion
  1042. #region + Skip +
  1043. public virtual IObservable<TSource> Skip<TSource>(IObservable<TSource> source, int count)
  1044. {
  1045. #if !NO_PERF
  1046. var skip = source as Skip<TSource>;
  1047. if (skip != null && skip._scheduler == null)
  1048. return skip.Omega(count);
  1049. return new Skip<TSource>(source, count);
  1050. #else
  1051. return new AnonymousObservable<TSource>(observer =>
  1052. {
  1053. var remaining = count;
  1054. return source.Subscribe(
  1055. x =>
  1056. {
  1057. if (remaining <= 0)
  1058. observer.OnNext(x);
  1059. else
  1060. remaining--;
  1061. },
  1062. observer.OnError,
  1063. observer.OnCompleted);
  1064. });
  1065. #endif
  1066. }
  1067. #endregion
  1068. #region + SkipWhile +
  1069. public virtual IObservable<TSource> SkipWhile<TSource>(IObservable<TSource> source, Func<TSource, bool> predicate)
  1070. {
  1071. #if !NO_PERF
  1072. return new SkipWhile<TSource>(source, predicate);
  1073. #else
  1074. return SkipWhile_(source, (x, i) => predicate(x));
  1075. #endif
  1076. }
  1077. public virtual IObservable<TSource> SkipWhile<TSource>(IObservable<TSource> source, Func<TSource, int, bool> predicate)
  1078. {
  1079. #if !NO_PERF
  1080. return new SkipWhile<TSource>(source, predicate);
  1081. #else
  1082. return SkipWhile_(source, predicate);
  1083. #endif
  1084. }
  1085. #if NO_PERF
  1086. private static IObservable<TSource> SkipWhile_<TSource>(IObservable<TSource> source, Func<TSource, int, bool> predicate)
  1087. {
  1088. return new AnonymousObservable<TSource>(observer =>
  1089. {
  1090. var running = false;
  1091. var i = 0;
  1092. return source.Subscribe(
  1093. x =>
  1094. {
  1095. if (!running)
  1096. try
  1097. {
  1098. running = !predicate(x, checked(i++));
  1099. }
  1100. catch (Exception exception)
  1101. {
  1102. observer.OnError(exception);
  1103. return;
  1104. }
  1105. if (running)
  1106. observer.OnNext(x);
  1107. },
  1108. observer.OnError,
  1109. observer.OnCompleted);
  1110. });
  1111. }
  1112. #endif
  1113. #endregion
  1114. #region + Take +
  1115. public virtual IObservable<TSource> Take<TSource>(IObservable<TSource> source, int count)
  1116. {
  1117. if (count == 0)
  1118. return Empty<TSource>();
  1119. return Take_(source, count);
  1120. }
  1121. public virtual IObservable<TSource> Take<TSource>(IObservable<TSource> source, int count, IScheduler scheduler)
  1122. {
  1123. if (count == 0)
  1124. return Empty<TSource>(scheduler);
  1125. return Take_(source, count);
  1126. }
  1127. #if !NO_PERF
  1128. private static IObservable<TSource> Take_<TSource>(IObservable<TSource> source, int count)
  1129. {
  1130. var take = source as Take<TSource>;
  1131. if (take != null && take._scheduler == null)
  1132. return take.Omega(count);
  1133. return new Take<TSource>(source, count);
  1134. }
  1135. #else
  1136. private static IObservable<TSource> Take_<TSource>(IObservable<TSource> source, int count)
  1137. {
  1138. return new AnonymousObservable<TSource>(observer =>
  1139. {
  1140. var remaining = count;
  1141. return source.Subscribe(
  1142. x =>
  1143. {
  1144. if (remaining > 0)
  1145. {
  1146. --remaining;
  1147. observer.OnNext(x);
  1148. if (remaining == 0)
  1149. observer.OnCompleted();
  1150. }
  1151. },
  1152. observer.OnError,
  1153. observer.OnCompleted);
  1154. });
  1155. }
  1156. #endif
  1157. #endregion
  1158. #region + TakeWhile +
  1159. public virtual IObservable<TSource> TakeWhile<TSource>(IObservable<TSource> source, Func<TSource, bool> predicate)
  1160. {
  1161. #if !NO_PERF
  1162. return new TakeWhile<TSource>(source, predicate);
  1163. #else
  1164. return TakeWhile_(source, (x, i) => predicate(x));
  1165. #endif
  1166. }
  1167. public virtual IObservable<TSource> TakeWhile<TSource>(IObservable<TSource> source, Func<TSource, int, bool> predicate)
  1168. {
  1169. #if !NO_PERF
  1170. return new TakeWhile<TSource>(source, predicate);
  1171. #else
  1172. return TakeWhile_(source, predicate);
  1173. #endif
  1174. }
  1175. #if NO_PERF
  1176. private static IObservable<TSource> TakeWhile_<TSource>(IObservable<TSource> source, Func<TSource, int, bool> predicate)
  1177. {
  1178. return new AnonymousObservable<TSource>(observer =>
  1179. {
  1180. var running = true;
  1181. var i = 0;
  1182. return source.Subscribe(
  1183. x =>
  1184. {
  1185. if (running)
  1186. {
  1187. try
  1188. {
  1189. running = predicate(x, checked(i++));
  1190. }
  1191. catch (Exception exception)
  1192. {
  1193. observer.OnError(exception);
  1194. return;
  1195. }
  1196. if (running)
  1197. observer.OnNext(x);
  1198. else
  1199. observer.OnCompleted();
  1200. }
  1201. },
  1202. observer.OnError,
  1203. observer.OnCompleted);
  1204. });
  1205. }
  1206. #endif
  1207. #endregion
  1208. #region + Where +
  1209. public virtual IObservable<TSource> Where<TSource>(IObservable<TSource> source, Func<TSource, bool> predicate)
  1210. {
  1211. #if !NO_PERF
  1212. var where = source as Where<TSource>;
  1213. if (where != null)
  1214. return where.Omega(predicate);
  1215. return new Where<TSource>(source, predicate);
  1216. #else
  1217. var w = source as WhereObservable<TSource>;
  1218. if (w != null)
  1219. return w.Where(predicate);
  1220. return new WhereObservable<TSource>(source, predicate);
  1221. #endif
  1222. }
  1223. #if NO_PERF
  1224. class WhereObservable<TSource> : ObservableBase<TSource>
  1225. {
  1226. private readonly IObservable<TSource> _source;
  1227. private readonly Func<TSource, bool> _predicate;
  1228. public WhereObservable(IObservable<TSource> source, Func<TSource, bool> predicate)
  1229. {
  1230. _source = source;
  1231. _predicate = predicate;
  1232. }
  1233. protected override IDisposable SubscribeCore(IObserver<TSource> observer)
  1234. {
  1235. return _source.Subscribe(new Observer(observer, _predicate));
  1236. }
  1237. public IObservable<TSource> Where(Func<TSource, bool> predicate)
  1238. {
  1239. return new WhereObservable<TSource>(_source, x => _predicate(x) && predicate(x));
  1240. }
  1241. class Observer : ObserverBase<TSource>
  1242. {
  1243. private readonly IObserver<TSource> _observer;
  1244. private readonly Func<TSource, bool> _predicate;
  1245. public Observer(IObserver<TSource> observer, Func<TSource, bool> predicate)
  1246. {
  1247. _observer = observer;
  1248. _predicate = predicate;
  1249. }
  1250. protected override void OnNextCore(TSource value)
  1251. {
  1252. bool shouldRun;
  1253. try
  1254. {
  1255. shouldRun = _predicate(value);
  1256. }
  1257. catch (Exception exception)
  1258. {
  1259. _observer.OnError(exception);
  1260. return;
  1261. }
  1262. if (shouldRun)
  1263. _observer.OnNext(value);
  1264. }
  1265. protected override void OnErrorCore(Exception error)
  1266. {
  1267. _observer.OnError(error);
  1268. }
  1269. protected override void OnCompletedCore()
  1270. {
  1271. _observer.OnCompleted();
  1272. }
  1273. }
  1274. }
  1275. #endif
  1276. public virtual IObservable<TSource> Where<TSource>(IObservable<TSource> source, Func<TSource, int, bool> predicate)
  1277. {
  1278. #if !NO_PERF
  1279. return new Where<TSource>(source, predicate);
  1280. #else
  1281. return Defer(() =>
  1282. {
  1283. var index = 0;
  1284. return source.Where(x => predicate(x, checked(index++)));
  1285. });
  1286. #endif
  1287. }
  1288. #endregion
  1289. }
  1290. }