1
0

SelectMany.cs 57 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. #if !NO_PERF
  5. using System.Collections.Generic;
  6. using System.Reactive.Disposables;
  7. #if !NO_TPL
  8. using System.Threading;
  9. using System.Threading.Tasks;
  10. #endif
  11. namespace System.Reactive.Linq.ObservableImpl
  12. {
  13. class SelectMany<TSource, TCollection, TResult> : Producer<TResult>
  14. {
  15. private readonly IObservable<TSource> _source;
  16. private readonly Func<TSource, IObservable<TCollection>> _collectionSelector;
  17. private readonly Func<TSource, int, IObservable<TCollection>> _collectionSelectorI;
  18. private readonly Func<TSource, IEnumerable<TCollection>> _collectionSelectorE;
  19. private readonly Func<TSource, int, IEnumerable<TCollection>> _collectionSelectorEI;
  20. private readonly Func<TSource, TCollection, TResult> _resultSelector;
  21. private readonly Func<TSource, int, TCollection, int, TResult> _resultSelectorI;
  22. public SelectMany(IObservable<TSource> source, Func<TSource, IObservable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
  23. {
  24. _source = source;
  25. _collectionSelector = collectionSelector;
  26. _resultSelector = resultSelector;
  27. }
  28. public SelectMany(IObservable<TSource> source, Func<TSource, int, IObservable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector)
  29. {
  30. _source = source;
  31. _collectionSelectorI = collectionSelector;
  32. _resultSelectorI = resultSelector;
  33. }
  34. public SelectMany(IObservable<TSource> source, Func<TSource, IEnumerable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
  35. {
  36. _source = source;
  37. _collectionSelectorE = collectionSelector;
  38. _resultSelector = resultSelector;
  39. }
  40. public SelectMany(IObservable<TSource> source, Func<TSource, int, IEnumerable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector)
  41. {
  42. _source = source;
  43. _collectionSelectorEI = collectionSelector;
  44. _resultSelectorI = resultSelector;
  45. }
  46. #if !NO_TPL
  47. private readonly Func<TSource, CancellationToken, Task<TCollection>> _collectionSelectorT;
  48. private readonly Func<TSource, int, CancellationToken, Task<TCollection>> _collectionSelectorTI;
  49. private readonly Func<TSource, int, TCollection, TResult> _resultSelectorTI;
  50. public SelectMany(IObservable<TSource> source, Func<TSource, CancellationToken, Task<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
  51. {
  52. _source = source;
  53. _collectionSelectorT = collectionSelector;
  54. _resultSelector = resultSelector;
  55. }
  56. public SelectMany(IObservable<TSource> source, Func<TSource, int, CancellationToken, Task<TCollection>> collectionSelector, Func<TSource, int, TCollection, TResult> resultSelector)
  57. {
  58. _source = source;
  59. _collectionSelectorTI = collectionSelector;
  60. _resultSelectorTI = resultSelector;
  61. }
  62. #endif
  63. protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
  64. {
  65. if (_collectionSelector != null)
  66. {
  67. var sink = new _(this, observer, cancel);
  68. setSink(sink);
  69. return sink.Run();
  70. }
  71. else if (_collectionSelectorI != null)
  72. {
  73. var sink = new IndexSelectorImpl(this, observer, cancel);
  74. setSink(sink);
  75. return sink.Run();
  76. }
  77. #if !NO_TPL
  78. else if (_collectionSelectorT != null)
  79. {
  80. var sink = new SelectManyImpl(this, observer, cancel);
  81. setSink(sink);
  82. return sink.Run();
  83. }
  84. else if (_collectionSelectorTI != null)
  85. {
  86. var sink = new Sigma(this, observer, cancel);
  87. setSink(sink);
  88. return sink.Run();
  89. }
  90. #endif
  91. else if (_collectionSelectorE != null)
  92. {
  93. var sink = new NoSelectorImpl(this, observer, cancel);
  94. setSink(sink);
  95. return _source.SubscribeSafe(sink);
  96. }
  97. else
  98. {
  99. var sink = new Omega(this, observer, cancel);
  100. setSink(sink);
  101. return _source.SubscribeSafe(sink);
  102. }
  103. }
  104. class _ : Sink<TResult>, IObserver<TSource>
  105. {
  106. private readonly SelectMany<TSource, TCollection, TResult> _parent;
  107. public _(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
  108. : base(observer, cancel)
  109. {
  110. _parent = parent;
  111. }
  112. private object _gate;
  113. private bool _isStopped;
  114. private CompositeDisposable _group;
  115. private SingleAssignmentDisposable _sourceSubscription;
  116. public IDisposable Run()
  117. {
  118. _gate = new object();
  119. _isStopped = false;
  120. _group = new CompositeDisposable();
  121. _sourceSubscription = new SingleAssignmentDisposable();
  122. _group.Add(_sourceSubscription);
  123. _sourceSubscription.Disposable = _parent._source.SubscribeSafe(this);
  124. return _group;
  125. }
  126. public void OnNext(TSource value)
  127. {
  128. var collection = default(IObservable<TCollection>);
  129. try
  130. {
  131. collection = _parent._collectionSelector(value);
  132. }
  133. catch (Exception ex)
  134. {
  135. lock (_gate)
  136. {
  137. base._observer.OnError(ex);
  138. base.Dispose();
  139. }
  140. return;
  141. }
  142. var innerSubscription = new SingleAssignmentDisposable();
  143. _group.Add(innerSubscription);
  144. innerSubscription.Disposable = collection.SubscribeSafe(new Iter(this, value, innerSubscription));
  145. }
  146. public void OnError(Exception error)
  147. {
  148. lock (_gate)
  149. {
  150. base._observer.OnError(error);
  151. base.Dispose();
  152. }
  153. }
  154. public void OnCompleted()
  155. {
  156. _isStopped = true;
  157. if (_group.Count == 1)
  158. {
  159. //
  160. // Notice there can be a race between OnCompleted of the source and any
  161. // of the inner sequences, where both see _group.Count == 1, and one is
  162. // waiting for the lock. There won't be a double OnCompleted observation
  163. // though, because the call to Dispose silences the observer by swapping
  164. // in a NopObserver<T>.
  165. //
  166. lock (_gate)
  167. {
  168. base._observer.OnCompleted();
  169. base.Dispose();
  170. }
  171. }
  172. else
  173. {
  174. _sourceSubscription.Dispose();
  175. }
  176. }
  177. class Iter : IObserver<TCollection>
  178. {
  179. private readonly _ _parent;
  180. private readonly TSource _value;
  181. private readonly IDisposable _self;
  182. public Iter(_ parent, TSource value, IDisposable self)
  183. {
  184. _parent = parent;
  185. _value = value;
  186. _self = self;
  187. }
  188. public void OnNext(TCollection value)
  189. {
  190. var res = default(TResult);
  191. try
  192. {
  193. res = _parent._parent._resultSelector(_value, value);
  194. }
  195. catch (Exception ex)
  196. {
  197. lock (_parent._gate)
  198. {
  199. _parent._observer.OnError(ex);
  200. _parent.Dispose();
  201. }
  202. return;
  203. }
  204. lock (_parent._gate)
  205. _parent._observer.OnNext(res);
  206. }
  207. public void OnError(Exception error)
  208. {
  209. lock (_parent._gate)
  210. {
  211. _parent._observer.OnError(error);
  212. _parent.Dispose();
  213. }
  214. }
  215. public void OnCompleted()
  216. {
  217. _parent._group.Remove(_self);
  218. if (_parent._isStopped && _parent._group.Count == 1)
  219. {
  220. //
  221. // Notice there can be a race between OnCompleted of the source and any
  222. // of the inner sequences, where both see _group.Count == 1, and one is
  223. // waiting for the lock. There won't be a double OnCompleted observation
  224. // though, because the call to Dispose silences the observer by swapping
  225. // in a NopObserver<T>.
  226. //
  227. lock (_parent._gate)
  228. {
  229. _parent._observer.OnCompleted();
  230. _parent.Dispose();
  231. }
  232. }
  233. }
  234. }
  235. }
  236. class IndexSelectorImpl : Sink<TResult>, IObserver<TSource>
  237. {
  238. private readonly SelectMany<TSource, TCollection, TResult> _parent;
  239. public IndexSelectorImpl(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
  240. : base(observer, cancel)
  241. {
  242. _parent = parent;
  243. }
  244. private object _gate;
  245. private bool _isStopped;
  246. private CompositeDisposable _group;
  247. private SingleAssignmentDisposable _sourceSubscription;
  248. private int _index;
  249. public IDisposable Run()
  250. {
  251. _gate = new object();
  252. _isStopped = false;
  253. _group = new CompositeDisposable();
  254. _sourceSubscription = new SingleAssignmentDisposable();
  255. _group.Add(_sourceSubscription);
  256. _sourceSubscription.Disposable = _parent._source.SubscribeSafe(this);
  257. return _group;
  258. }
  259. public void OnNext(TSource value)
  260. {
  261. var index = checked(_index++);
  262. var collection = default(IObservable<TCollection>);
  263. try
  264. {
  265. collection = _parent._collectionSelectorI(value, index);
  266. }
  267. catch (Exception ex)
  268. {
  269. lock (_gate)
  270. {
  271. base._observer.OnError(ex);
  272. base.Dispose();
  273. }
  274. return;
  275. }
  276. var innerSubscription = new SingleAssignmentDisposable();
  277. _group.Add(innerSubscription);
  278. innerSubscription.Disposable = collection.SubscribeSafe(new Iter(this, value, index, innerSubscription));
  279. }
  280. public void OnError(Exception error)
  281. {
  282. lock (_gate)
  283. {
  284. base._observer.OnError(error);
  285. base.Dispose();
  286. }
  287. }
  288. public void OnCompleted()
  289. {
  290. _isStopped = true;
  291. if (_group.Count == 1)
  292. {
  293. //
  294. // Notice there can be a race between OnCompleted of the source and any
  295. // of the inner sequences, where both see _group.Count == 1, and one is
  296. // waiting for the lock. There won't be a double OnCompleted observation
  297. // though, because the call to Dispose silences the observer by swapping
  298. // in a NopObserver<T>.
  299. //
  300. lock (_gate)
  301. {
  302. base._observer.OnCompleted();
  303. base.Dispose();
  304. }
  305. }
  306. else
  307. {
  308. _sourceSubscription.Dispose();
  309. }
  310. }
  311. class Iter : IObserver<TCollection>
  312. {
  313. private readonly IndexSelectorImpl _parent;
  314. private readonly TSource _value;
  315. private readonly int _valueIndex;
  316. private readonly IDisposable _self;
  317. public Iter(IndexSelectorImpl parent, TSource value, int index, IDisposable self)
  318. {
  319. _parent = parent;
  320. _value = value;
  321. _valueIndex = index;
  322. _self = self;
  323. }
  324. private int _index;
  325. public void OnNext(TCollection value)
  326. {
  327. var res = default(TResult);
  328. try
  329. {
  330. res = _parent._parent._resultSelectorI(_value, _valueIndex, value, checked(_index++));
  331. }
  332. catch (Exception ex)
  333. {
  334. lock (_parent._gate)
  335. {
  336. _parent._observer.OnError(ex);
  337. _parent.Dispose();
  338. }
  339. return;
  340. }
  341. lock (_parent._gate)
  342. _parent._observer.OnNext(res);
  343. }
  344. public void OnError(Exception error)
  345. {
  346. lock (_parent._gate)
  347. {
  348. _parent._observer.OnError(error);
  349. _parent.Dispose();
  350. }
  351. }
  352. public void OnCompleted()
  353. {
  354. _parent._group.Remove(_self);
  355. if (_parent._isStopped && _parent._group.Count == 1)
  356. {
  357. //
  358. // Notice there can be a race between OnCompleted of the source and any
  359. // of the inner sequences, where both see _group.Count == 1, and one is
  360. // waiting for the lock. There won't be a double OnCompleted observation
  361. // though, because the call to Dispose silences the observer by swapping
  362. // in a NopObserver<T>.
  363. //
  364. lock (_parent._gate)
  365. {
  366. _parent._observer.OnCompleted();
  367. _parent.Dispose();
  368. }
  369. }
  370. }
  371. }
  372. }
  373. class NoSelectorImpl : Sink<TResult>, IObserver<TSource>
  374. {
  375. private readonly SelectMany<TSource, TCollection, TResult> _parent;
  376. public NoSelectorImpl(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
  377. : base(observer, cancel)
  378. {
  379. _parent = parent;
  380. }
  381. public void OnNext(TSource value)
  382. {
  383. var xs = default(IEnumerable<TCollection>);
  384. try
  385. {
  386. xs = _parent._collectionSelectorE(value);
  387. }
  388. catch (Exception exception)
  389. {
  390. base._observer.OnError(exception);
  391. base.Dispose();
  392. return;
  393. }
  394. var e = default(IEnumerator<TCollection>);
  395. try
  396. {
  397. e = xs.GetEnumerator();
  398. }
  399. catch (Exception exception)
  400. {
  401. base._observer.OnError(exception);
  402. base.Dispose();
  403. return;
  404. }
  405. try
  406. {
  407. var hasNext = true;
  408. while (hasNext)
  409. {
  410. hasNext = false;
  411. var current = default(TResult);
  412. try
  413. {
  414. hasNext = e.MoveNext();
  415. if (hasNext)
  416. current = _parent._resultSelector(value, e.Current);
  417. }
  418. catch (Exception exception)
  419. {
  420. base._observer.OnError(exception);
  421. base.Dispose();
  422. return;
  423. }
  424. if (hasNext)
  425. base._observer.OnNext(current);
  426. }
  427. }
  428. finally
  429. {
  430. if (e != null)
  431. e.Dispose();
  432. }
  433. }
  434. public void OnError(Exception error)
  435. {
  436. base._observer.OnError(error);
  437. base.Dispose();
  438. }
  439. public void OnCompleted()
  440. {
  441. base._observer.OnCompleted();
  442. base.Dispose();
  443. }
  444. }
  445. class Omega : Sink<TResult>, IObserver<TSource>
  446. {
  447. private readonly SelectMany<TSource, TCollection, TResult> _parent;
  448. public Omega(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
  449. : base(observer, cancel)
  450. {
  451. _parent = parent;
  452. }
  453. private int _index;
  454. public void OnNext(TSource value)
  455. {
  456. var index = checked(_index++);
  457. var xs = default(IEnumerable<TCollection>);
  458. try
  459. {
  460. xs = _parent._collectionSelectorEI(value, index);
  461. }
  462. catch (Exception exception)
  463. {
  464. base._observer.OnError(exception);
  465. base.Dispose();
  466. return;
  467. }
  468. var e = default(IEnumerator<TCollection>);
  469. try
  470. {
  471. e = xs.GetEnumerator();
  472. }
  473. catch (Exception exception)
  474. {
  475. base._observer.OnError(exception);
  476. base.Dispose();
  477. return;
  478. }
  479. try
  480. {
  481. var eIndex = 0;
  482. var hasNext = true;
  483. while (hasNext)
  484. {
  485. hasNext = false;
  486. var current = default(TResult);
  487. try
  488. {
  489. hasNext = e.MoveNext();
  490. if (hasNext)
  491. current = _parent._resultSelectorI(value, index, e.Current, checked(eIndex++));
  492. }
  493. catch (Exception exception)
  494. {
  495. base._observer.OnError(exception);
  496. base.Dispose();
  497. return;
  498. }
  499. if (hasNext)
  500. base._observer.OnNext(current);
  501. }
  502. }
  503. finally
  504. {
  505. if (e != null)
  506. e.Dispose();
  507. }
  508. }
  509. public void OnError(Exception error)
  510. {
  511. base._observer.OnError(error);
  512. base.Dispose();
  513. }
  514. public void OnCompleted()
  515. {
  516. base._observer.OnCompleted();
  517. base.Dispose();
  518. }
  519. }
  520. #if !NO_TPL
  521. #pragma warning disable 0420
  522. class SelectManyImpl : Sink<TResult>, IObserver<TSource>
  523. {
  524. private readonly SelectMany<TSource, TCollection, TResult> _parent;
  525. public SelectManyImpl(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
  526. : base(observer, cancel)
  527. {
  528. _parent = parent;
  529. }
  530. private object _gate;
  531. private CancellationDisposable _cancel;
  532. private volatile int _count;
  533. public IDisposable Run()
  534. {
  535. _gate = new object();
  536. _cancel = new CancellationDisposable();
  537. _count = 1;
  538. return StableCompositeDisposable.Create(_parent._source.SubscribeSafe(this), _cancel);
  539. }
  540. public void OnNext(TSource value)
  541. {
  542. var task = default(Task<TCollection>);
  543. try
  544. {
  545. Interlocked.Increment(ref _count);
  546. task = _parent._collectionSelectorT(value, _cancel.Token);
  547. }
  548. catch (Exception ex)
  549. {
  550. lock (_gate)
  551. {
  552. base._observer.OnError(ex);
  553. base.Dispose();
  554. }
  555. return;
  556. }
  557. if (task.IsCompleted)
  558. {
  559. OnCompletedTask(value, task);
  560. }
  561. else
  562. {
  563. AttachContinuation(value, task);
  564. }
  565. }
  566. private void AttachContinuation(TSource value, Task<TCollection> task)
  567. {
  568. //
  569. // Separate method to avoid closure in synchronous completion case.
  570. //
  571. task.ContinueWith(t => OnCompletedTask(value, t));
  572. }
  573. private void OnCompletedTask(TSource value, Task<TCollection> task)
  574. {
  575. switch (task.Status)
  576. {
  577. case TaskStatus.RanToCompletion:
  578. {
  579. var res = default(TResult);
  580. try
  581. {
  582. res = _parent._resultSelector(value, task.Result);
  583. }
  584. catch (Exception ex)
  585. {
  586. lock (_gate)
  587. {
  588. base._observer.OnError(ex);
  589. base.Dispose();
  590. }
  591. return;
  592. }
  593. lock (_gate)
  594. base._observer.OnNext(res);
  595. OnCompleted();
  596. }
  597. break;
  598. case TaskStatus.Faulted:
  599. {
  600. lock (_gate)
  601. {
  602. base._observer.OnError(task.Exception.InnerException);
  603. base.Dispose();
  604. }
  605. }
  606. break;
  607. case TaskStatus.Canceled:
  608. {
  609. if (!_cancel.IsDisposed)
  610. {
  611. lock (_gate)
  612. {
  613. base._observer.OnError(new TaskCanceledException(task));
  614. base.Dispose();
  615. }
  616. }
  617. }
  618. break;
  619. }
  620. }
  621. public void OnError(Exception error)
  622. {
  623. lock (_gate)
  624. {
  625. base._observer.OnError(error);
  626. base.Dispose();
  627. }
  628. }
  629. public void OnCompleted()
  630. {
  631. if (Interlocked.Decrement(ref _count) == 0)
  632. {
  633. lock (_gate)
  634. {
  635. base._observer.OnCompleted();
  636. base.Dispose();
  637. }
  638. }
  639. }
  640. }
  641. class Sigma : Sink<TResult>, IObserver<TSource>
  642. {
  643. private readonly SelectMany<TSource, TCollection, TResult> _parent;
  644. public Sigma(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
  645. : base(observer, cancel)
  646. {
  647. _parent = parent;
  648. }
  649. private object _gate;
  650. private CancellationDisposable _cancel;
  651. private volatile int _count;
  652. private int _index;
  653. public IDisposable Run()
  654. {
  655. _gate = new object();
  656. _cancel = new CancellationDisposable();
  657. _count = 1;
  658. return StableCompositeDisposable.Create(_parent._source.SubscribeSafe(this), _cancel);
  659. }
  660. public void OnNext(TSource value)
  661. {
  662. var index = checked(_index++);
  663. var task = default(Task<TCollection>);
  664. try
  665. {
  666. Interlocked.Increment(ref _count);
  667. task = _parent._collectionSelectorTI(value, index, _cancel.Token);
  668. }
  669. catch (Exception ex)
  670. {
  671. lock (_gate)
  672. {
  673. base._observer.OnError(ex);
  674. base.Dispose();
  675. }
  676. return;
  677. }
  678. if (task.IsCompleted)
  679. {
  680. OnCompletedTask(value, index, task);
  681. }
  682. else
  683. {
  684. AttachContinuation(value, index, task);
  685. }
  686. }
  687. private void AttachContinuation(TSource value, int index, Task<TCollection> task)
  688. {
  689. //
  690. // Separate method to avoid closure in synchronous completion case.
  691. //
  692. task.ContinueWith(t => OnCompletedTask(value, index, t));
  693. }
  694. private void OnCompletedTask(TSource value, int index, Task<TCollection> task)
  695. {
  696. switch (task.Status)
  697. {
  698. case TaskStatus.RanToCompletion:
  699. {
  700. var res = default(TResult);
  701. try
  702. {
  703. res = _parent._resultSelectorTI(value, index, task.Result);
  704. }
  705. catch (Exception ex)
  706. {
  707. lock (_gate)
  708. {
  709. base._observer.OnError(ex);
  710. base.Dispose();
  711. }
  712. return;
  713. }
  714. lock (_gate)
  715. base._observer.OnNext(res);
  716. OnCompleted();
  717. }
  718. break;
  719. case TaskStatus.Faulted:
  720. {
  721. lock (_gate)
  722. {
  723. base._observer.OnError(task.Exception.InnerException);
  724. base.Dispose();
  725. }
  726. }
  727. break;
  728. case TaskStatus.Canceled:
  729. {
  730. if (!_cancel.IsDisposed)
  731. {
  732. lock (_gate)
  733. {
  734. base._observer.OnError(new TaskCanceledException(task));
  735. base.Dispose();
  736. }
  737. }
  738. }
  739. break;
  740. }
  741. }
  742. public void OnError(Exception error)
  743. {
  744. lock (_gate)
  745. {
  746. base._observer.OnError(error);
  747. base.Dispose();
  748. }
  749. }
  750. public void OnCompleted()
  751. {
  752. if (Interlocked.Decrement(ref _count) == 0)
  753. {
  754. lock (_gate)
  755. {
  756. base._observer.OnCompleted();
  757. base.Dispose();
  758. }
  759. }
  760. }
  761. }
  762. #pragma warning restore 0420
  763. #endif
  764. }
  765. class SelectMany<TSource, TResult> : Producer<TResult>
  766. {
  767. private readonly IObservable<TSource> _source;
  768. private readonly Func<TSource, IObservable<TResult>> _selector;
  769. private readonly Func<TSource, int, IObservable<TResult>> _selectorI;
  770. private readonly Func<Exception, IObservable<TResult>> _selectorOnError;
  771. private readonly Func<IObservable<TResult>> _selectorOnCompleted;
  772. private readonly Func<TSource, IEnumerable<TResult>> _selectorE;
  773. private readonly Func<TSource, int, IEnumerable<TResult>> _selectorEI;
  774. public SelectMany(IObservable<TSource> source, Func<TSource, IObservable<TResult>> selector)
  775. {
  776. _source = source;
  777. _selector = selector;
  778. }
  779. public SelectMany(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector)
  780. {
  781. _source = source;
  782. _selectorI = selector;
  783. }
  784. public SelectMany(IObservable<TSource> source, Func<TSource, IObservable<TResult>> selector, Func<Exception, IObservable<TResult>> selectorOnError, Func<IObservable<TResult>> selectorOnCompleted)
  785. {
  786. _source = source;
  787. _selector = selector;
  788. _selectorOnError = selectorOnError;
  789. _selectorOnCompleted = selectorOnCompleted;
  790. }
  791. public SelectMany(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector, Func<Exception, IObservable<TResult>> selectorOnError, Func<IObservable<TResult>> selectorOnCompleted)
  792. {
  793. _source = source;
  794. _selectorI = selector;
  795. _selectorOnError = selectorOnError;
  796. _selectorOnCompleted = selectorOnCompleted;
  797. }
  798. public SelectMany(IObservable<TSource> source, Func<TSource, IEnumerable<TResult>> selector)
  799. {
  800. _source = source;
  801. _selectorE = selector;
  802. }
  803. public SelectMany(IObservable<TSource> source, Func<TSource, int, IEnumerable<TResult>> selector)
  804. {
  805. _source = source;
  806. _selectorEI = selector;
  807. }
  808. #if !NO_TPL
  809. private readonly Func<TSource, CancellationToken, Task<TResult>> _selectorT;
  810. private readonly Func<TSource, int, CancellationToken, Task<TResult>> _selectorTI;
  811. public SelectMany(IObservable<TSource> source, Func<TSource, CancellationToken, Task<TResult>> selector)
  812. {
  813. _source = source;
  814. _selectorT = selector;
  815. }
  816. public SelectMany(IObservable<TSource> source, Func<TSource, int, CancellationToken, Task<TResult>> selector)
  817. {
  818. _source = source;
  819. _selectorTI = selector;
  820. }
  821. #endif
  822. protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
  823. {
  824. if (_selector != null)
  825. {
  826. var sink = new _(this, observer, cancel);
  827. setSink(sink);
  828. return sink.Run();
  829. }
  830. else if (_selectorI != null)
  831. {
  832. var sink = new IndexSelectorImpl(this, observer, cancel);
  833. setSink(sink);
  834. return sink.Run();
  835. }
  836. #if !NO_TPL
  837. else if (_selectorT != null)
  838. {
  839. var sink = new SelectManyImpl(this, observer, cancel);
  840. setSink(sink);
  841. return sink.Run();
  842. }
  843. else if (_selectorTI != null)
  844. {
  845. var sink = new Sigma(this, observer, cancel);
  846. setSink(sink);
  847. return sink.Run();
  848. }
  849. #endif
  850. else if (_selectorE != null)
  851. {
  852. var sink = new NoSelectorImpl(this, observer, cancel);
  853. setSink(sink);
  854. return _source.SubscribeSafe(sink);
  855. }
  856. else
  857. {
  858. var sink = new Omega(this, observer, cancel);
  859. setSink(sink);
  860. return _source.SubscribeSafe(sink);
  861. }
  862. }
  863. class _ : Sink<TResult>, IObserver<TSource>
  864. {
  865. private readonly SelectMany<TSource, TResult> _parent;
  866. public _(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
  867. : base(observer, cancel)
  868. {
  869. _parent = parent;
  870. }
  871. private object _gate;
  872. private bool _isStopped;
  873. private CompositeDisposable _group;
  874. private SingleAssignmentDisposable _sourceSubscription;
  875. public IDisposable Run()
  876. {
  877. _gate = new object();
  878. _isStopped = false;
  879. _group = new CompositeDisposable();
  880. _sourceSubscription = new SingleAssignmentDisposable();
  881. _group.Add(_sourceSubscription);
  882. _sourceSubscription.Disposable = _parent._source.SubscribeSafe(this);
  883. return _group;
  884. }
  885. public void OnNext(TSource value)
  886. {
  887. var inner = default(IObservable<TResult>);
  888. try
  889. {
  890. inner = _parent._selector(value);
  891. }
  892. catch (Exception ex)
  893. {
  894. lock (_gate)
  895. {
  896. base._observer.OnError(ex);
  897. base.Dispose();
  898. }
  899. return;
  900. }
  901. SubscribeInner(inner);
  902. }
  903. public void OnError(Exception error)
  904. {
  905. if (_parent._selectorOnError != null)
  906. {
  907. var inner = default(IObservable<TResult>);
  908. try
  909. {
  910. inner = _parent._selectorOnError(error);
  911. }
  912. catch (Exception ex)
  913. {
  914. lock (_gate)
  915. {
  916. base._observer.OnError(ex);
  917. base.Dispose();
  918. }
  919. return;
  920. }
  921. SubscribeInner(inner);
  922. Final();
  923. }
  924. else
  925. {
  926. lock (_gate)
  927. {
  928. base._observer.OnError(error);
  929. base.Dispose();
  930. }
  931. }
  932. }
  933. public void OnCompleted()
  934. {
  935. if (_parent._selectorOnCompleted != null)
  936. {
  937. var inner = default(IObservable<TResult>);
  938. try
  939. {
  940. inner = _parent._selectorOnCompleted();
  941. }
  942. catch (Exception ex)
  943. {
  944. lock (_gate)
  945. {
  946. base._observer.OnError(ex);
  947. base.Dispose();
  948. }
  949. return;
  950. }
  951. SubscribeInner(inner);
  952. }
  953. Final();
  954. }
  955. private void Final()
  956. {
  957. _isStopped = true;
  958. if (_group.Count == 1)
  959. {
  960. //
  961. // Notice there can be a race between OnCompleted of the source and any
  962. // of the inner sequences, where both see _group.Count == 1, and one is
  963. // waiting for the lock. There won't be a double OnCompleted observation
  964. // though, because the call to Dispose silences the observer by swapping
  965. // in a NopObserver<T>.
  966. //
  967. lock (_gate)
  968. {
  969. base._observer.OnCompleted();
  970. base.Dispose();
  971. }
  972. }
  973. else
  974. {
  975. _sourceSubscription.Dispose();
  976. }
  977. }
  978. private void SubscribeInner(IObservable<TResult> inner)
  979. {
  980. var innerSubscription = new SingleAssignmentDisposable();
  981. _group.Add(innerSubscription);
  982. innerSubscription.Disposable = inner.SubscribeSafe(new Iter(this, innerSubscription));
  983. }
  984. class Iter : IObserver<TResult>
  985. {
  986. private readonly _ _parent;
  987. private readonly IDisposable _self;
  988. public Iter(_ parent, IDisposable self)
  989. {
  990. _parent = parent;
  991. _self = self;
  992. }
  993. public void OnNext(TResult value)
  994. {
  995. lock (_parent._gate)
  996. _parent._observer.OnNext(value);
  997. }
  998. public void OnError(Exception error)
  999. {
  1000. lock (_parent._gate)
  1001. {
  1002. _parent._observer.OnError(error);
  1003. _parent.Dispose();
  1004. }
  1005. }
  1006. public void OnCompleted()
  1007. {
  1008. _parent._group.Remove(_self);
  1009. if (_parent._isStopped && _parent._group.Count == 1)
  1010. {
  1011. //
  1012. // Notice there can be a race between OnCompleted of the source and any
  1013. // of the inner sequences, where both see _group.Count == 1, and one is
  1014. // waiting for the lock. There won't be a double OnCompleted observation
  1015. // though, because the call to Dispose silences the observer by swapping
  1016. // in a NopObserver<T>.
  1017. //
  1018. lock (_parent._gate)
  1019. {
  1020. _parent._observer.OnCompleted();
  1021. _parent.Dispose();
  1022. }
  1023. }
  1024. }
  1025. }
  1026. }
  1027. class IndexSelectorImpl : Sink<TResult>, IObserver<TSource>
  1028. {
  1029. private readonly SelectMany<TSource, TResult> _parent;
  1030. public IndexSelectorImpl(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
  1031. : base(observer, cancel)
  1032. {
  1033. _parent = parent;
  1034. }
  1035. private object _gate;
  1036. private bool _isStopped;
  1037. private CompositeDisposable _group;
  1038. private SingleAssignmentDisposable _sourceSubscription;
  1039. private int _index;
  1040. public IDisposable Run()
  1041. {
  1042. _gate = new object();
  1043. _isStopped = false;
  1044. _group = new CompositeDisposable();
  1045. _sourceSubscription = new SingleAssignmentDisposable();
  1046. _group.Add(_sourceSubscription);
  1047. _sourceSubscription.Disposable = _parent._source.SubscribeSafe(this);
  1048. return _group;
  1049. }
  1050. public void OnNext(TSource value)
  1051. {
  1052. var inner = default(IObservable<TResult>);
  1053. try
  1054. {
  1055. inner = _parent._selectorI(value, checked(_index++));
  1056. }
  1057. catch (Exception ex)
  1058. {
  1059. lock (_gate)
  1060. {
  1061. base._observer.OnError(ex);
  1062. base.Dispose();
  1063. }
  1064. return;
  1065. }
  1066. SubscribeInner(inner);
  1067. }
  1068. public void OnError(Exception error)
  1069. {
  1070. if (_parent._selectorOnError != null)
  1071. {
  1072. var inner = default(IObservable<TResult>);
  1073. try
  1074. {
  1075. inner = _parent._selectorOnError(error);
  1076. }
  1077. catch (Exception ex)
  1078. {
  1079. lock (_gate)
  1080. {
  1081. base._observer.OnError(ex);
  1082. base.Dispose();
  1083. }
  1084. return;
  1085. }
  1086. SubscribeInner(inner);
  1087. Final();
  1088. }
  1089. else
  1090. {
  1091. lock (_gate)
  1092. {
  1093. base._observer.OnError(error);
  1094. base.Dispose();
  1095. }
  1096. }
  1097. }
  1098. public void OnCompleted()
  1099. {
  1100. if (_parent._selectorOnCompleted != null)
  1101. {
  1102. var inner = default(IObservable<TResult>);
  1103. try
  1104. {
  1105. inner = _parent._selectorOnCompleted();
  1106. }
  1107. catch (Exception ex)
  1108. {
  1109. lock (_gate)
  1110. {
  1111. base._observer.OnError(ex);
  1112. base.Dispose();
  1113. }
  1114. return;
  1115. }
  1116. SubscribeInner(inner);
  1117. }
  1118. Final();
  1119. }
  1120. private void Final()
  1121. {
  1122. _isStopped = true;
  1123. if (_group.Count == 1)
  1124. {
  1125. //
  1126. // Notice there can be a race between OnCompleted of the source and any
  1127. // of the inner sequences, where both see _group.Count == 1, and one is
  1128. // waiting for the lock. There won't be a double OnCompleted observation
  1129. // though, because the call to Dispose silences the observer by swapping
  1130. // in a NopObserver<T>.
  1131. //
  1132. lock (_gate)
  1133. {
  1134. base._observer.OnCompleted();
  1135. base.Dispose();
  1136. }
  1137. }
  1138. else
  1139. {
  1140. _sourceSubscription.Dispose();
  1141. }
  1142. }
  1143. private void SubscribeInner(IObservable<TResult> inner)
  1144. {
  1145. var innerSubscription = new SingleAssignmentDisposable();
  1146. _group.Add(innerSubscription);
  1147. innerSubscription.Disposable = inner.SubscribeSafe(new Iter(this, innerSubscription));
  1148. }
  1149. class Iter : IObserver<TResult>
  1150. {
  1151. private readonly IndexSelectorImpl _parent;
  1152. private readonly IDisposable _self;
  1153. public Iter(IndexSelectorImpl parent, IDisposable self)
  1154. {
  1155. _parent = parent;
  1156. _self = self;
  1157. }
  1158. public void OnNext(TResult value)
  1159. {
  1160. lock (_parent._gate)
  1161. _parent._observer.OnNext(value);
  1162. }
  1163. public void OnError(Exception error)
  1164. {
  1165. lock (_parent._gate)
  1166. {
  1167. _parent._observer.OnError(error);
  1168. _parent.Dispose();
  1169. }
  1170. }
  1171. public void OnCompleted()
  1172. {
  1173. _parent._group.Remove(_self);
  1174. if (_parent._isStopped && _parent._group.Count == 1)
  1175. {
  1176. //
  1177. // Notice there can be a race between OnCompleted of the source and any
  1178. // of the inner sequences, where both see _group.Count == 1, and one is
  1179. // waiting for the lock. There won't be a double OnCompleted observation
  1180. // though, because the call to Dispose silences the observer by swapping
  1181. // in a NopObserver<T>.
  1182. //
  1183. lock (_parent._gate)
  1184. {
  1185. _parent._observer.OnCompleted();
  1186. _parent.Dispose();
  1187. }
  1188. }
  1189. }
  1190. }
  1191. }
  1192. class NoSelectorImpl : Sink<TResult>, IObserver<TSource>
  1193. {
  1194. private readonly SelectMany<TSource, TResult> _parent;
  1195. public NoSelectorImpl(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
  1196. : base(observer, cancel)
  1197. {
  1198. _parent = parent;
  1199. }
  1200. public void OnNext(TSource value)
  1201. {
  1202. var xs = default(IEnumerable<TResult>);
  1203. try
  1204. {
  1205. xs = _parent._selectorE(value);
  1206. }
  1207. catch (Exception exception)
  1208. {
  1209. base._observer.OnError(exception);
  1210. base.Dispose();
  1211. return;
  1212. }
  1213. var e = default(IEnumerator<TResult>);
  1214. try
  1215. {
  1216. e = xs.GetEnumerator();
  1217. }
  1218. catch (Exception exception)
  1219. {
  1220. base._observer.OnError(exception);
  1221. base.Dispose();
  1222. return;
  1223. }
  1224. try
  1225. {
  1226. var hasNext = true;
  1227. while (hasNext)
  1228. {
  1229. hasNext = false;
  1230. var current = default(TResult);
  1231. try
  1232. {
  1233. hasNext = e.MoveNext();
  1234. if (hasNext)
  1235. current = e.Current;
  1236. }
  1237. catch (Exception exception)
  1238. {
  1239. base._observer.OnError(exception);
  1240. base.Dispose();
  1241. return;
  1242. }
  1243. if (hasNext)
  1244. base._observer.OnNext(current);
  1245. }
  1246. }
  1247. finally
  1248. {
  1249. if (e != null)
  1250. e.Dispose();
  1251. }
  1252. }
  1253. public void OnError(Exception error)
  1254. {
  1255. base._observer.OnError(error);
  1256. base.Dispose();
  1257. }
  1258. public void OnCompleted()
  1259. {
  1260. base._observer.OnCompleted();
  1261. base.Dispose();
  1262. }
  1263. }
  1264. class Omega : Sink<TResult>, IObserver<TSource>
  1265. {
  1266. private readonly SelectMany<TSource, TResult> _parent;
  1267. public Omega(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
  1268. : base(observer, cancel)
  1269. {
  1270. _parent = parent;
  1271. }
  1272. private int _index;
  1273. public void OnNext(TSource value)
  1274. {
  1275. var xs = default(IEnumerable<TResult>);
  1276. try
  1277. {
  1278. xs = _parent._selectorEI(value, checked(_index++));
  1279. }
  1280. catch (Exception exception)
  1281. {
  1282. base._observer.OnError(exception);
  1283. base.Dispose();
  1284. return;
  1285. }
  1286. var e = default(IEnumerator<TResult>);
  1287. try
  1288. {
  1289. e = xs.GetEnumerator();
  1290. }
  1291. catch (Exception exception)
  1292. {
  1293. base._observer.OnError(exception);
  1294. base.Dispose();
  1295. return;
  1296. }
  1297. try
  1298. {
  1299. var hasNext = true;
  1300. while (hasNext)
  1301. {
  1302. hasNext = false;
  1303. var current = default(TResult);
  1304. try
  1305. {
  1306. hasNext = e.MoveNext();
  1307. if (hasNext)
  1308. current = e.Current;
  1309. }
  1310. catch (Exception exception)
  1311. {
  1312. base._observer.OnError(exception);
  1313. base.Dispose();
  1314. return;
  1315. }
  1316. if (hasNext)
  1317. base._observer.OnNext(current);
  1318. }
  1319. }
  1320. finally
  1321. {
  1322. if (e != null)
  1323. e.Dispose();
  1324. }
  1325. }
  1326. public void OnError(Exception error)
  1327. {
  1328. base._observer.OnError(error);
  1329. base.Dispose();
  1330. }
  1331. public void OnCompleted()
  1332. {
  1333. base._observer.OnCompleted();
  1334. base.Dispose();
  1335. }
  1336. }
  1337. #if !NO_TPL
  1338. #pragma warning disable 0420
  1339. class SelectManyImpl : Sink<TResult>, IObserver<TSource>
  1340. {
  1341. private readonly SelectMany<TSource, TResult> _parent;
  1342. public SelectManyImpl(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
  1343. : base(observer, cancel)
  1344. {
  1345. _parent = parent;
  1346. }
  1347. private object _gate;
  1348. private CancellationDisposable _cancel;
  1349. private volatile int _count;
  1350. public IDisposable Run()
  1351. {
  1352. _gate = new object();
  1353. _cancel = new CancellationDisposable();
  1354. _count = 1;
  1355. return StableCompositeDisposable.Create(_parent._source.SubscribeSafe(this), _cancel);
  1356. }
  1357. public void OnNext(TSource value)
  1358. {
  1359. var task = default(Task<TResult>);
  1360. try
  1361. {
  1362. Interlocked.Increment(ref _count);
  1363. task = _parent._selectorT(value, _cancel.Token);
  1364. }
  1365. catch (Exception ex)
  1366. {
  1367. lock (_gate)
  1368. {
  1369. base._observer.OnError(ex);
  1370. base.Dispose();
  1371. }
  1372. return;
  1373. }
  1374. if (task.IsCompleted)
  1375. {
  1376. OnCompletedTask(task);
  1377. }
  1378. else
  1379. {
  1380. task.ContinueWith(OnCompletedTask);
  1381. }
  1382. }
  1383. private void OnCompletedTask(Task<TResult> task)
  1384. {
  1385. switch (task.Status)
  1386. {
  1387. case TaskStatus.RanToCompletion:
  1388. {
  1389. lock (_gate)
  1390. base._observer.OnNext(task.Result);
  1391. OnCompleted();
  1392. }
  1393. break;
  1394. case TaskStatus.Faulted:
  1395. {
  1396. lock (_gate)
  1397. {
  1398. base._observer.OnError(task.Exception.InnerException);
  1399. base.Dispose();
  1400. }
  1401. }
  1402. break;
  1403. case TaskStatus.Canceled:
  1404. {
  1405. if (!_cancel.IsDisposed)
  1406. {
  1407. lock (_gate)
  1408. {
  1409. base._observer.OnError(new TaskCanceledException(task));
  1410. base.Dispose();
  1411. }
  1412. }
  1413. }
  1414. break;
  1415. }
  1416. }
  1417. public void OnError(Exception error)
  1418. {
  1419. lock (_gate)
  1420. {
  1421. base._observer.OnError(error);
  1422. base.Dispose();
  1423. }
  1424. }
  1425. public void OnCompleted()
  1426. {
  1427. if (Interlocked.Decrement(ref _count) == 0)
  1428. {
  1429. lock (_gate)
  1430. {
  1431. base._observer.OnCompleted();
  1432. base.Dispose();
  1433. }
  1434. }
  1435. }
  1436. }
  1437. class Sigma : Sink<TResult>, IObserver<TSource>
  1438. {
  1439. private readonly SelectMany<TSource, TResult> _parent;
  1440. public Sigma(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
  1441. : base(observer, cancel)
  1442. {
  1443. _parent = parent;
  1444. }
  1445. private object _gate;
  1446. private CancellationDisposable _cancel;
  1447. private volatile int _count;
  1448. private int _index;
  1449. public IDisposable Run()
  1450. {
  1451. _gate = new object();
  1452. _cancel = new CancellationDisposable();
  1453. _count = 1;
  1454. return StableCompositeDisposable.Create(_parent._source.SubscribeSafe(this), _cancel);
  1455. }
  1456. public void OnNext(TSource value)
  1457. {
  1458. var task = default(Task<TResult>);
  1459. try
  1460. {
  1461. Interlocked.Increment(ref _count);
  1462. task = _parent._selectorTI(value, checked(_index++), _cancel.Token);
  1463. }
  1464. catch (Exception ex)
  1465. {
  1466. lock (_gate)
  1467. {
  1468. base._observer.OnError(ex);
  1469. base.Dispose();
  1470. }
  1471. return;
  1472. }
  1473. if (task.IsCompleted)
  1474. {
  1475. OnCompletedTask(task);
  1476. }
  1477. else
  1478. {
  1479. task.ContinueWith(OnCompletedTask);
  1480. }
  1481. }
  1482. private void OnCompletedTask(Task<TResult> task)
  1483. {
  1484. switch (task.Status)
  1485. {
  1486. case TaskStatus.RanToCompletion:
  1487. {
  1488. lock (_gate)
  1489. base._observer.OnNext(task.Result);
  1490. OnCompleted();
  1491. }
  1492. break;
  1493. case TaskStatus.Faulted:
  1494. {
  1495. lock (_gate)
  1496. {
  1497. base._observer.OnError(task.Exception.InnerException);
  1498. base.Dispose();
  1499. }
  1500. }
  1501. break;
  1502. case TaskStatus.Canceled:
  1503. {
  1504. if (!_cancel.IsDisposed)
  1505. {
  1506. lock (_gate)
  1507. {
  1508. base._observer.OnError(new TaskCanceledException(task));
  1509. base.Dispose();
  1510. }
  1511. }
  1512. }
  1513. break;
  1514. }
  1515. }
  1516. public void OnError(Exception error)
  1517. {
  1518. lock (_gate)
  1519. {
  1520. base._observer.OnError(error);
  1521. base.Dispose();
  1522. }
  1523. }
  1524. public void OnCompleted()
  1525. {
  1526. if (Interlocked.Decrement(ref _count) == 0)
  1527. {
  1528. lock (_gate)
  1529. {
  1530. base._observer.OnCompleted();
  1531. base.Dispose();
  1532. }
  1533. }
  1534. }
  1535. }
  1536. #pragma warning restore 0420
  1537. #endif
  1538. }
  1539. }
  1540. #endif