1
0

SelectMany.cs 62 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Reactive.Disposables;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. namespace System.Reactive.Linq.ObservableImpl
  9. {
  10. internal static class SelectMany<TSource, TCollection, TResult>
  11. {
  12. internal sealed class ObservableSelector : Producer<TResult, ObservableSelector._>
  13. {
  14. private readonly IObservable<TSource> _source;
  15. private readonly Func<TSource, IObservable<TCollection>> _collectionSelector;
  16. private readonly Func<TSource, TCollection, TResult> _resultSelector;
  17. public ObservableSelector(IObservable<TSource> source, Func<TSource, IObservable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
  18. {
  19. _source = source;
  20. _collectionSelector = collectionSelector;
  21. _resultSelector = resultSelector;
  22. }
  23. protected override _ CreateSink(IObserver<TResult> observer) => new _(this, observer);
  24. protected override void Run(_ sink) => sink.Run(_source);
  25. internal sealed class _ : Sink<TSource, TResult>
  26. {
  27. private readonly object _gate = new object();
  28. private readonly CompositeDisposable _group = new CompositeDisposable();
  29. private readonly Func<TSource, IObservable<TCollection>> _collectionSelector;
  30. private readonly Func<TSource, TCollection, TResult> _resultSelector;
  31. public _(ObservableSelector parent, IObserver<TResult> observer)
  32. : base(observer)
  33. {
  34. _collectionSelector = parent._collectionSelector;
  35. _resultSelector = parent._resultSelector;
  36. }
  37. private volatile bool _isStopped;
  38. public override void OnNext(TSource value)
  39. {
  40. var collection = default(IObservable<TCollection>);
  41. try
  42. {
  43. collection = _collectionSelector(value);
  44. }
  45. catch (Exception ex)
  46. {
  47. lock (_gate)
  48. {
  49. ForwardOnError(ex);
  50. }
  51. return;
  52. }
  53. var innerObserver = new InnerObserver(this, value);
  54. _group.Add(innerObserver);
  55. innerObserver.SetResource(collection.SubscribeSafe(innerObserver));
  56. }
  57. public override void OnError(Exception error)
  58. {
  59. lock (_gate)
  60. {
  61. ForwardOnError(error);
  62. }
  63. }
  64. public override void OnCompleted()
  65. {
  66. _isStopped = true;
  67. if (_group.Count == 0)
  68. {
  69. //
  70. // Notice there can be a race between OnCompleted of the source and any
  71. // of the inner sequences, where both see _group.Count == 1, and one is
  72. // waiting for the lock. There won't be a double OnCompleted observation
  73. // though, because the call to Dispose silences the observer by swapping
  74. // in a NopObserver<T>.
  75. //
  76. lock (_gate)
  77. {
  78. ForwardOnCompleted();
  79. }
  80. }
  81. else
  82. {
  83. DisposeUpstream();
  84. }
  85. }
  86. protected override void Dispose(bool disposing)
  87. {
  88. base.Dispose(disposing);
  89. if (disposing)
  90. {
  91. _group.Dispose();
  92. }
  93. }
  94. private sealed class InnerObserver : SafeObserver<TCollection>
  95. {
  96. private readonly _ _parent;
  97. private readonly TSource _value;
  98. public InnerObserver(_ parent, TSource value)
  99. {
  100. _parent = parent;
  101. _value = value;
  102. }
  103. public override void OnNext(TCollection value)
  104. {
  105. var res = default(TResult);
  106. try
  107. {
  108. res = _parent._resultSelector(_value, value);
  109. }
  110. catch (Exception ex)
  111. {
  112. lock (_parent._gate)
  113. {
  114. _parent.ForwardOnError(ex);
  115. }
  116. return;
  117. }
  118. lock (_parent._gate)
  119. {
  120. _parent.ForwardOnNext(res);
  121. }
  122. }
  123. public override void OnError(Exception error)
  124. {
  125. lock (_parent._gate)
  126. {
  127. _parent.ForwardOnError(error);
  128. }
  129. }
  130. public override void OnCompleted()
  131. {
  132. _parent._group.Remove(this);
  133. if (_parent._isStopped && _parent._group.Count == 0)
  134. {
  135. //
  136. // Notice there can be a race between OnCompleted of the source and any
  137. // of the inner sequences, where both see _group.Count == 1, and one is
  138. // waiting for the lock. There won't be a double OnCompleted observation
  139. // though, because the call to Dispose silences the observer by swapping
  140. // in a NopObserver<T>.
  141. //
  142. lock (_parent._gate)
  143. {
  144. _parent.ForwardOnCompleted();
  145. }
  146. }
  147. }
  148. }
  149. }
  150. }
  151. internal sealed class ObservableSelectorIndexed : Producer<TResult, ObservableSelectorIndexed._>
  152. {
  153. private readonly IObservable<TSource> _source;
  154. private readonly Func<TSource, int, IObservable<TCollection>> _collectionSelector;
  155. private readonly Func<TSource, int, TCollection, int, TResult> _resultSelector;
  156. public ObservableSelectorIndexed(IObservable<TSource> source, Func<TSource, int, IObservable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector)
  157. {
  158. _source = source;
  159. _collectionSelector = collectionSelector;
  160. _resultSelector = resultSelector;
  161. }
  162. protected override _ CreateSink(IObserver<TResult> observer) => new _(this, observer);
  163. protected override void Run(_ sink) => sink.Run(_source);
  164. internal sealed class _ : Sink<TSource, TResult>
  165. {
  166. private readonly object _gate = new object();
  167. private readonly CompositeDisposable _group = new CompositeDisposable();
  168. private readonly Func<TSource, int, IObservable<TCollection>> _collectionSelector;
  169. private readonly Func<TSource, int, TCollection, int, TResult> _resultSelector;
  170. public _(ObservableSelectorIndexed parent, IObserver<TResult> observer)
  171. : base(observer)
  172. {
  173. _collectionSelector = parent._collectionSelector;
  174. _resultSelector = parent._resultSelector;
  175. }
  176. private volatile bool _isStopped;
  177. private int _index;
  178. public override void OnNext(TSource value)
  179. {
  180. var index = checked(_index++);
  181. var collection = default(IObservable<TCollection>);
  182. try
  183. {
  184. collection = _collectionSelector(value, index);
  185. }
  186. catch (Exception ex)
  187. {
  188. lock (_gate)
  189. {
  190. ForwardOnError(ex);
  191. }
  192. return;
  193. }
  194. var innerObserver = new InnerObserver(this, value, index);
  195. _group.Add(innerObserver);
  196. innerObserver.SetResource(collection.SubscribeSafe(innerObserver));
  197. }
  198. public override void OnError(Exception error)
  199. {
  200. lock (_gate)
  201. {
  202. ForwardOnError(error);
  203. }
  204. }
  205. public override void OnCompleted()
  206. {
  207. _isStopped = true;
  208. if (_group.Count == 0)
  209. {
  210. //
  211. // Notice there can be a race between OnCompleted of the source and any
  212. // of the inner sequences, where both see _group.Count == 1, and one is
  213. // waiting for the lock. There won't be a double OnCompleted observation
  214. // though, because the call to Dispose silences the observer by swapping
  215. // in a NopObserver<T>.
  216. //
  217. lock (_gate)
  218. {
  219. ForwardOnCompleted();
  220. }
  221. }
  222. else
  223. {
  224. DisposeUpstream();
  225. }
  226. }
  227. protected override void Dispose(bool disposing)
  228. {
  229. base.Dispose(disposing);
  230. if (disposing)
  231. {
  232. _group.Dispose();
  233. }
  234. }
  235. private sealed class InnerObserver : SafeObserver<TCollection>
  236. {
  237. private readonly _ _parent;
  238. private readonly TSource _value;
  239. private readonly int _valueIndex;
  240. public InnerObserver(_ parent, TSource value, int index)
  241. {
  242. _parent = parent;
  243. _value = value;
  244. _valueIndex = index;
  245. }
  246. private int _index;
  247. public override void OnNext(TCollection value)
  248. {
  249. var res = default(TResult);
  250. try
  251. {
  252. res = _parent._resultSelector(_value, _valueIndex, value, checked(_index++));
  253. }
  254. catch (Exception ex)
  255. {
  256. lock (_parent._gate)
  257. {
  258. _parent.ForwardOnError(ex);
  259. }
  260. return;
  261. }
  262. lock (_parent._gate)
  263. {
  264. _parent.ForwardOnNext(res);
  265. }
  266. }
  267. public override void OnError(Exception error)
  268. {
  269. lock (_parent._gate)
  270. {
  271. _parent.ForwardOnError(error);
  272. }
  273. }
  274. public override void OnCompleted()
  275. {
  276. _parent._group.Remove(this);
  277. if (_parent._isStopped && _parent._group.Count == 0)
  278. {
  279. //
  280. // Notice there can be a race between OnCompleted of the source and any
  281. // of the inner sequences, where both see _group.Count == 1, and one is
  282. // waiting for the lock. There won't be a double OnCompleted observation
  283. // though, because the call to Dispose silences the observer by swapping
  284. // in a NopObserver<T>.
  285. //
  286. lock (_parent._gate)
  287. {
  288. _parent.ForwardOnCompleted();
  289. }
  290. }
  291. }
  292. }
  293. }
  294. }
  295. internal sealed class EnumerableSelector : Producer<TResult, EnumerableSelector._>
  296. {
  297. private readonly IObservable<TSource> _source;
  298. private readonly Func<TSource, IEnumerable<TCollection>> _collectionSelector;
  299. private readonly Func<TSource, TCollection, TResult> _resultSelector;
  300. public EnumerableSelector(IObservable<TSource> source, Func<TSource, IEnumerable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
  301. {
  302. _source = source;
  303. _collectionSelector = collectionSelector;
  304. _resultSelector = resultSelector;
  305. }
  306. protected override _ CreateSink(IObserver<TResult> observer) => new _(this, observer);
  307. protected override void Run(_ sink) => sink.Run(_source);
  308. internal sealed class _ : Sink<TSource, TResult>
  309. {
  310. private readonly Func<TSource, IEnumerable<TCollection>> _collectionSelector;
  311. private readonly Func<TSource, TCollection, TResult> _resultSelector;
  312. public _(EnumerableSelector parent, IObserver<TResult> observer)
  313. : base(observer)
  314. {
  315. _collectionSelector = parent._collectionSelector;
  316. _resultSelector = parent._resultSelector;
  317. }
  318. public override void OnNext(TSource value)
  319. {
  320. var xs = default(IEnumerable<TCollection>);
  321. try
  322. {
  323. xs = _collectionSelector(value);
  324. }
  325. catch (Exception exception)
  326. {
  327. ForwardOnError(exception);
  328. return;
  329. }
  330. var e = default(IEnumerator<TCollection>);
  331. try
  332. {
  333. e = xs.GetEnumerator();
  334. }
  335. catch (Exception exception)
  336. {
  337. ForwardOnError(exception);
  338. return;
  339. }
  340. using (e)
  341. {
  342. var hasNext = true;
  343. while (hasNext)
  344. {
  345. var current = default(TResult);
  346. try
  347. {
  348. hasNext = e.MoveNext();
  349. if (hasNext)
  350. {
  351. current = _resultSelector(value, e.Current);
  352. }
  353. }
  354. catch (Exception exception)
  355. {
  356. ForwardOnError(exception);
  357. return;
  358. }
  359. if (hasNext)
  360. {
  361. ForwardOnNext(current);
  362. }
  363. }
  364. }
  365. }
  366. }
  367. }
  368. internal sealed class EnumerableSelectorIndexed : Producer<TResult, EnumerableSelectorIndexed._>
  369. {
  370. private readonly IObservable<TSource> _source;
  371. private readonly Func<TSource, int, IEnumerable<TCollection>> _collectionSelector;
  372. private readonly Func<TSource, int, TCollection, int, TResult> _resultSelector;
  373. public EnumerableSelectorIndexed(IObservable<TSource> source, Func<TSource, int, IEnumerable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector)
  374. {
  375. _source = source;
  376. _collectionSelector = collectionSelector;
  377. _resultSelector = resultSelector;
  378. }
  379. protected override _ CreateSink(IObserver<TResult> observer) => new _(this, observer);
  380. protected override void Run(_ sink) => sink.Run(_source);
  381. internal sealed class _ : Sink<TSource, TResult>
  382. {
  383. private readonly Func<TSource, int, IEnumerable<TCollection>> _collectionSelector;
  384. private readonly Func<TSource, int, TCollection, int, TResult> _resultSelector;
  385. public _(EnumerableSelectorIndexed parent, IObserver<TResult> observer)
  386. : base(observer)
  387. {
  388. _collectionSelector = parent._collectionSelector;
  389. _resultSelector = parent._resultSelector;
  390. }
  391. private int _index;
  392. public override void OnNext(TSource value)
  393. {
  394. var index = checked(_index++);
  395. var xs = default(IEnumerable<TCollection>);
  396. try
  397. {
  398. xs = _collectionSelector(value, index);
  399. }
  400. catch (Exception exception)
  401. {
  402. ForwardOnError(exception);
  403. return;
  404. }
  405. var e = default(IEnumerator<TCollection>);
  406. try
  407. {
  408. e = xs.GetEnumerator();
  409. }
  410. catch (Exception exception)
  411. {
  412. ForwardOnError(exception);
  413. return;
  414. }
  415. using (e)
  416. {
  417. var eIndex = 0;
  418. var hasNext = true;
  419. while (hasNext)
  420. {
  421. var current = default(TResult);
  422. try
  423. {
  424. hasNext = e.MoveNext();
  425. if (hasNext)
  426. {
  427. current = _resultSelector(value, index, e.Current, checked(eIndex++));
  428. }
  429. }
  430. catch (Exception exception)
  431. {
  432. ForwardOnError(exception);
  433. return;
  434. }
  435. if (hasNext)
  436. {
  437. ForwardOnNext(current);
  438. }
  439. }
  440. }
  441. }
  442. }
  443. }
  444. internal sealed class TaskSelector : Producer<TResult, TaskSelector._>
  445. {
  446. private readonly IObservable<TSource> _source;
  447. private readonly Func<TSource, CancellationToken, Task<TCollection>> _collectionSelector;
  448. private readonly Func<TSource, TCollection, TResult> _resultSelector;
  449. public TaskSelector(IObservable<TSource> source, Func<TSource, CancellationToken, Task<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
  450. {
  451. _source = source;
  452. _collectionSelector = collectionSelector;
  453. _resultSelector = resultSelector;
  454. }
  455. protected override _ CreateSink(IObserver<TResult> observer) => new _(this, observer);
  456. protected override void Run(_ sink) => sink.Run(_source);
  457. internal sealed class _ : Sink<TSource, TResult>
  458. {
  459. private readonly object _gate = new object();
  460. private readonly CancellationTokenSource _cancel = new CancellationTokenSource();
  461. private readonly Func<TSource, CancellationToken, Task<TCollection>> _collectionSelector;
  462. private readonly Func<TSource, TCollection, TResult> _resultSelector;
  463. public _(TaskSelector parent, IObserver<TResult> observer)
  464. : base(observer)
  465. {
  466. _collectionSelector = parent._collectionSelector;
  467. _resultSelector = parent._resultSelector;
  468. }
  469. private volatile int _count;
  470. public override void Run(IObservable<TSource> source)
  471. {
  472. _count = 1;
  473. base.Run(source);
  474. }
  475. protected override void Dispose(bool disposing)
  476. {
  477. if (disposing)
  478. {
  479. _cancel.Cancel();
  480. }
  481. base.Dispose(disposing);
  482. }
  483. public override void OnNext(TSource value)
  484. {
  485. var task = default(Task<TCollection>);
  486. try
  487. {
  488. Interlocked.Increment(ref _count);
  489. task = _collectionSelector(value, _cancel.Token);
  490. }
  491. catch (Exception ex)
  492. {
  493. lock (_gate)
  494. {
  495. ForwardOnError(ex);
  496. }
  497. return;
  498. }
  499. if (task.IsCompleted)
  500. {
  501. OnCompletedTask(value, task);
  502. }
  503. else
  504. {
  505. AttachContinuation(value, task);
  506. }
  507. }
  508. private void AttachContinuation(TSource value, Task<TCollection> task)
  509. {
  510. //
  511. // Separate method to avoid closure in synchronous completion case.
  512. //
  513. task.ContinueWith(t => OnCompletedTask(value, t), _cancel.Token);
  514. }
  515. private void OnCompletedTask(TSource value, Task<TCollection> task)
  516. {
  517. switch (task.Status)
  518. {
  519. case TaskStatus.RanToCompletion:
  520. {
  521. var res = default(TResult);
  522. try
  523. {
  524. res = _resultSelector(value, task.Result);
  525. }
  526. catch (Exception ex)
  527. {
  528. lock (_gate)
  529. {
  530. ForwardOnError(ex);
  531. }
  532. return;
  533. }
  534. lock (_gate)
  535. {
  536. ForwardOnNext(res);
  537. }
  538. OnCompleted();
  539. break;
  540. }
  541. case TaskStatus.Faulted:
  542. {
  543. lock (_gate)
  544. {
  545. ForwardOnError(task.Exception.InnerException);
  546. }
  547. break;
  548. }
  549. case TaskStatus.Canceled:
  550. {
  551. if (!_cancel.IsCancellationRequested)
  552. {
  553. lock (_gate)
  554. {
  555. ForwardOnError(new TaskCanceledException(task));
  556. }
  557. }
  558. break;
  559. }
  560. }
  561. }
  562. public override void OnError(Exception error)
  563. {
  564. lock (_gate)
  565. {
  566. ForwardOnError(error);
  567. }
  568. }
  569. public override void OnCompleted()
  570. {
  571. if (Interlocked.Decrement(ref _count) == 0)
  572. {
  573. lock (_gate)
  574. {
  575. ForwardOnCompleted();
  576. }
  577. }
  578. }
  579. }
  580. }
  581. internal sealed class TaskSelectorIndexed : Producer<TResult, TaskSelectorIndexed._>
  582. {
  583. private readonly IObservable<TSource> _source;
  584. private readonly Func<TSource, int, CancellationToken, Task<TCollection>> _collectionSelector;
  585. private readonly Func<TSource, int, TCollection, TResult> _resultSelector;
  586. public TaskSelectorIndexed(IObservable<TSource> source, Func<TSource, int, CancellationToken, Task<TCollection>> collectionSelector, Func<TSource, int, TCollection, TResult> resultSelector)
  587. {
  588. _source = source;
  589. _collectionSelector = collectionSelector;
  590. _resultSelector = resultSelector;
  591. }
  592. protected override _ CreateSink(IObserver<TResult> observer) => new _(this, observer);
  593. protected override void Run(_ sink) => sink.Run(_source);
  594. internal sealed class _ : Sink<TSource, TResult>
  595. {
  596. private readonly object _gate = new object();
  597. private readonly CancellationTokenSource _cancel = new CancellationTokenSource();
  598. private readonly Func<TSource, int, CancellationToken, Task<TCollection>> _collectionSelector;
  599. private readonly Func<TSource, int, TCollection, TResult> _resultSelector;
  600. public _(TaskSelectorIndexed parent, IObserver<TResult> observer)
  601. : base(observer)
  602. {
  603. _collectionSelector = parent._collectionSelector;
  604. _resultSelector = parent._resultSelector;
  605. }
  606. private volatile int _count;
  607. private int _index;
  608. public override void Run(IObservable<TSource> source)
  609. {
  610. _count = 1;
  611. base.Run(source);
  612. }
  613. protected override void Dispose(bool disposing)
  614. {
  615. if (disposing)
  616. {
  617. _cancel.Cancel();
  618. }
  619. base.Dispose(disposing);
  620. }
  621. public override void OnNext(TSource value)
  622. {
  623. var index = checked(_index++);
  624. var task = default(Task<TCollection>);
  625. try
  626. {
  627. Interlocked.Increment(ref _count);
  628. task = _collectionSelector(value, index, _cancel.Token);
  629. }
  630. catch (Exception ex)
  631. {
  632. lock (_gate)
  633. {
  634. ForwardOnError(ex);
  635. }
  636. return;
  637. }
  638. if (task.IsCompleted)
  639. {
  640. OnCompletedTask(value, index, task);
  641. }
  642. else
  643. {
  644. AttachContinuation(value, index, task);
  645. }
  646. }
  647. private void AttachContinuation(TSource value, int index, Task<TCollection> task)
  648. {
  649. //
  650. // Separate method to avoid closure in synchronous completion case.
  651. //
  652. task.ContinueWith(t => OnCompletedTask(value, index, t), _cancel.Token);
  653. }
  654. private void OnCompletedTask(TSource value, int index, Task<TCollection> task)
  655. {
  656. switch (task.Status)
  657. {
  658. case TaskStatus.RanToCompletion:
  659. {
  660. var res = default(TResult);
  661. try
  662. {
  663. res = _resultSelector(value, index, task.Result);
  664. }
  665. catch (Exception ex)
  666. {
  667. lock (_gate)
  668. {
  669. ForwardOnError(ex);
  670. }
  671. return;
  672. }
  673. lock (_gate)
  674. {
  675. ForwardOnNext(res);
  676. }
  677. OnCompleted();
  678. break;
  679. }
  680. case TaskStatus.Faulted:
  681. {
  682. lock (_gate)
  683. {
  684. ForwardOnError(task.Exception.InnerException);
  685. }
  686. break;
  687. }
  688. case TaskStatus.Canceled:
  689. {
  690. if (!_cancel.IsCancellationRequested)
  691. {
  692. lock (_gate)
  693. {
  694. ForwardOnError(new TaskCanceledException(task));
  695. }
  696. }
  697. break;
  698. }
  699. }
  700. }
  701. public override void OnError(Exception error)
  702. {
  703. lock (_gate)
  704. {
  705. ForwardOnError(error);
  706. }
  707. }
  708. public override void OnCompleted()
  709. {
  710. if (Interlocked.Decrement(ref _count) == 0)
  711. {
  712. lock (_gate)
  713. {
  714. ForwardOnCompleted();
  715. }
  716. }
  717. }
  718. }
  719. }
  720. }
  721. internal static class SelectMany<TSource, TResult>
  722. {
  723. internal class ObservableSelector : Producer<TResult, ObservableSelector._>
  724. {
  725. protected readonly IObservable<TSource> _source;
  726. protected readonly Func<TSource, IObservable<TResult>> _selector;
  727. public ObservableSelector(IObservable<TSource> source, Func<TSource, IObservable<TResult>> selector)
  728. {
  729. _source = source;
  730. _selector = selector;
  731. }
  732. protected override _ CreateSink(IObserver<TResult> observer) => new _(this, observer);
  733. protected override void Run(_ sink) => sink.Run(_source);
  734. internal class _ : Sink<TSource, TResult>
  735. {
  736. protected readonly object _gate = new object();
  737. private readonly Func<TSource, IObservable<TResult>> _selector;
  738. private readonly CompositeDisposable _group = new CompositeDisposable();
  739. private volatile bool _isStopped;
  740. public _(ObservableSelector parent, IObserver<TResult> observer)
  741. : base(observer)
  742. {
  743. _selector = parent._selector;
  744. }
  745. public override void OnNext(TSource value)
  746. {
  747. var inner = default(IObservable<TResult>);
  748. try
  749. {
  750. inner = _selector(value);
  751. }
  752. catch (Exception ex)
  753. {
  754. lock (_gate)
  755. {
  756. ForwardOnError(ex);
  757. }
  758. return;
  759. }
  760. SubscribeInner(inner);
  761. }
  762. public override void OnError(Exception error)
  763. {
  764. lock (_gate)
  765. {
  766. ForwardOnError(error);
  767. }
  768. }
  769. public override void OnCompleted()
  770. {
  771. Final();
  772. }
  773. protected override void Dispose(bool disposing)
  774. {
  775. base.Dispose(disposing);
  776. if (disposing)
  777. {
  778. _group.Dispose();
  779. }
  780. }
  781. protected void Final()
  782. {
  783. _isStopped = true;
  784. if (_group.Count == 0)
  785. {
  786. //
  787. // Notice there can be a race between OnCompleted of the source and any
  788. // of the inner sequences, where both see _group.Count == 0, and one is
  789. // waiting for the lock. There won't be a double OnCompleted observation
  790. // though, because the call to Dispose silences the observer by swapping
  791. // in a NopObserver<T>.
  792. //
  793. lock (_gate)
  794. {
  795. ForwardOnCompleted();
  796. }
  797. }
  798. else
  799. {
  800. DisposeUpstream();
  801. }
  802. }
  803. protected void SubscribeInner(IObservable<TResult> inner)
  804. {
  805. var innerObserver = new InnerObserver(this);
  806. _group.Add(innerObserver);
  807. innerObserver.SetResource(inner.SubscribeSafe(innerObserver));
  808. }
  809. private sealed class InnerObserver : SafeObserver<TResult>
  810. {
  811. private readonly _ _parent;
  812. public InnerObserver(_ parent)
  813. {
  814. _parent = parent;
  815. }
  816. public override void OnNext(TResult value)
  817. {
  818. lock (_parent._gate)
  819. {
  820. _parent.ForwardOnNext(value);
  821. }
  822. }
  823. public override void OnError(Exception error)
  824. {
  825. lock (_parent._gate)
  826. {
  827. _parent.ForwardOnError(error);
  828. }
  829. }
  830. public override void OnCompleted()
  831. {
  832. _parent._group.Remove(this);
  833. if (_parent._isStopped && _parent._group.Count == 0)
  834. {
  835. //
  836. // Notice there can be a race between OnCompleted of the source and any
  837. // of the inner sequences, where both see _group.Count == 1, and one is
  838. // waiting for the lock. There won't be a double OnCompleted observation
  839. // though, because the call to Dispose silences the observer by swapping
  840. // in a NopObserver<T>.
  841. //
  842. lock (_parent._gate)
  843. {
  844. _parent.ForwardOnCompleted();
  845. }
  846. }
  847. }
  848. }
  849. }
  850. }
  851. internal sealed class ObservableSelectors : ObservableSelector
  852. {
  853. private readonly Func<Exception, IObservable<TResult>> _selectorOnError;
  854. private readonly Func<IObservable<TResult>> _selectorOnCompleted;
  855. public ObservableSelectors(IObservable<TSource> source, Func<TSource, IObservable<TResult>> selector, Func<Exception, IObservable<TResult>> selectorOnError, Func<IObservable<TResult>> selectorOnCompleted)
  856. : base(source, selector)
  857. {
  858. _selectorOnError = selectorOnError;
  859. _selectorOnCompleted = selectorOnCompleted;
  860. }
  861. protected override ObservableSelector._ CreateSink(IObserver<TResult> observer) => new _(this, observer);
  862. internal new sealed class _ : ObservableSelector._
  863. {
  864. private readonly Func<Exception, IObservable<TResult>> _selectorOnError;
  865. private readonly Func<IObservable<TResult>> _selectorOnCompleted;
  866. public _(ObservableSelectors parent, IObserver<TResult> observer)
  867. : base(parent, observer)
  868. {
  869. _selectorOnError = parent._selectorOnError;
  870. _selectorOnCompleted = parent._selectorOnCompleted;
  871. }
  872. public override void OnError(Exception error)
  873. {
  874. if (_selectorOnError != null)
  875. {
  876. var inner = default(IObservable<TResult>);
  877. try
  878. {
  879. inner = _selectorOnError(error);
  880. }
  881. catch (Exception ex)
  882. {
  883. lock (_gate)
  884. {
  885. ForwardOnError(ex);
  886. }
  887. return;
  888. }
  889. SubscribeInner(inner);
  890. Final();
  891. }
  892. else
  893. {
  894. base.OnError(error);
  895. }
  896. }
  897. public override void OnCompleted()
  898. {
  899. if (_selectorOnCompleted != null)
  900. {
  901. var inner = default(IObservable<TResult>);
  902. try
  903. {
  904. inner = _selectorOnCompleted();
  905. }
  906. catch (Exception ex)
  907. {
  908. lock (_gate)
  909. {
  910. ForwardOnError(ex);
  911. }
  912. return;
  913. }
  914. SubscribeInner(inner);
  915. }
  916. Final();
  917. }
  918. }
  919. }
  920. internal class ObservableSelectorIndexed : Producer<TResult, ObservableSelectorIndexed._>
  921. {
  922. protected readonly IObservable<TSource> _source;
  923. protected readonly Func<TSource, int, IObservable<TResult>> _selector;
  924. public ObservableSelectorIndexed(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector)
  925. {
  926. _source = source;
  927. _selector = selector;
  928. }
  929. protected override _ CreateSink(IObserver<TResult> observer) => new _(this, observer);
  930. protected override void Run(_ sink) => sink.Run(_source);
  931. internal class _ : Sink<TSource, TResult>
  932. {
  933. private readonly object _gate = new object();
  934. private readonly CompositeDisposable _group = new CompositeDisposable();
  935. protected readonly Func<TSource, int, IObservable<TResult>> _selector;
  936. private int _index;
  937. private volatile bool _isStopped;
  938. public _(ObservableSelectorIndexed parent, IObserver<TResult> observer)
  939. : base(observer)
  940. {
  941. _selector = parent._selector;
  942. }
  943. public override void OnNext(TSource value)
  944. {
  945. var inner = default(IObservable<TResult>);
  946. try
  947. {
  948. inner = _selector(value, checked(_index++));
  949. }
  950. catch (Exception ex)
  951. {
  952. lock (_gate)
  953. {
  954. ForwardOnError(ex);
  955. }
  956. return;
  957. }
  958. SubscribeInner(inner);
  959. }
  960. public override void OnError(Exception error)
  961. {
  962. lock (_gate)
  963. {
  964. ForwardOnError(error);
  965. }
  966. }
  967. public override void OnCompleted()
  968. {
  969. Final();
  970. }
  971. protected override void Dispose(bool disposing)
  972. {
  973. base.Dispose(disposing);
  974. if (disposing)
  975. {
  976. _group.Dispose();
  977. }
  978. }
  979. protected void Final()
  980. {
  981. _isStopped = true;
  982. if (_group.Count == 0)
  983. {
  984. //
  985. // Notice there can be a race between OnCompleted of the source and any
  986. // of the inner sequences, where both see _group.Count == 1, and one is
  987. // waiting for the lock. There won't be a double OnCompleted observation
  988. // though, because the call to Dispose silences the observer by swapping
  989. // in a NopObserver<T>.
  990. //
  991. lock (_gate)
  992. {
  993. ForwardOnCompleted();
  994. }
  995. }
  996. else
  997. {
  998. DisposeUpstream();
  999. }
  1000. }
  1001. protected void SubscribeInner(IObservable<TResult> inner)
  1002. {
  1003. var innerObserver = new InnerObserver(this);
  1004. _group.Add(innerObserver);
  1005. innerObserver.SetResource(inner.SubscribeSafe(innerObserver));
  1006. }
  1007. private sealed class InnerObserver : SafeObserver<TResult>
  1008. {
  1009. private readonly _ _parent;
  1010. public InnerObserver(_ parent)
  1011. {
  1012. _parent = parent;
  1013. }
  1014. public override void OnNext(TResult value)
  1015. {
  1016. lock (_parent._gate)
  1017. {
  1018. _parent.ForwardOnNext(value);
  1019. }
  1020. }
  1021. public override void OnError(Exception error)
  1022. {
  1023. lock (_parent._gate)
  1024. {
  1025. _parent.ForwardOnError(error);
  1026. }
  1027. }
  1028. public override void OnCompleted()
  1029. {
  1030. _parent._group.Remove(this);
  1031. if (_parent._isStopped && _parent._group.Count == 0)
  1032. {
  1033. //
  1034. // Notice there can be a race between OnCompleted of the source and any
  1035. // of the inner sequences, where both see _group.Count == 1, and one is
  1036. // waiting for the lock. There won't be a double OnCompleted observation
  1037. // though, because the call to Dispose silences the observer by swapping
  1038. // in a NopObserver<T>.
  1039. //
  1040. lock (_parent._gate)
  1041. {
  1042. _parent.ForwardOnCompleted();
  1043. }
  1044. }
  1045. }
  1046. }
  1047. }
  1048. }
  1049. internal sealed class ObservableSelectorsIndexed : ObservableSelectorIndexed
  1050. {
  1051. private readonly Func<Exception, IObservable<TResult>> _selectorOnError;
  1052. private readonly Func<IObservable<TResult>> _selectorOnCompleted;
  1053. public ObservableSelectorsIndexed(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector, Func<Exception, IObservable<TResult>> selectorOnError, Func<IObservable<TResult>> selectorOnCompleted)
  1054. : base(source, selector)
  1055. {
  1056. _selectorOnError = selectorOnError;
  1057. _selectorOnCompleted = selectorOnCompleted;
  1058. }
  1059. protected override ObservableSelectorIndexed._ CreateSink(IObserver<TResult> observer) => new _(this, observer);
  1060. internal new sealed class _ : ObservableSelectorIndexed._
  1061. {
  1062. private readonly object _gate = new object();
  1063. private readonly Func<Exception, IObservable<TResult>> _selectorOnError;
  1064. private readonly Func<IObservable<TResult>> _selectorOnCompleted;
  1065. public _(ObservableSelectorsIndexed parent, IObserver<TResult> observer)
  1066. : base(parent, observer)
  1067. {
  1068. _selectorOnError = parent._selectorOnError;
  1069. _selectorOnCompleted = parent._selectorOnCompleted;
  1070. }
  1071. public override void OnError(Exception error)
  1072. {
  1073. if (_selectorOnError != null)
  1074. {
  1075. var inner = default(IObservable<TResult>);
  1076. try
  1077. {
  1078. inner = _selectorOnError(error);
  1079. }
  1080. catch (Exception ex)
  1081. {
  1082. lock (_gate)
  1083. {
  1084. ForwardOnError(ex);
  1085. }
  1086. return;
  1087. }
  1088. SubscribeInner(inner);
  1089. Final();
  1090. }
  1091. else
  1092. {
  1093. base.OnError(error);
  1094. }
  1095. }
  1096. public override void OnCompleted()
  1097. {
  1098. if (_selectorOnCompleted != null)
  1099. {
  1100. var inner = default(IObservable<TResult>);
  1101. try
  1102. {
  1103. inner = _selectorOnCompleted();
  1104. }
  1105. catch (Exception ex)
  1106. {
  1107. lock (_gate)
  1108. {
  1109. ForwardOnError(ex);
  1110. }
  1111. return;
  1112. }
  1113. SubscribeInner(inner);
  1114. }
  1115. Final();
  1116. }
  1117. }
  1118. }
  1119. internal sealed class EnumerableSelector : Producer<TResult, EnumerableSelector._>
  1120. {
  1121. private readonly IObservable<TSource> _source;
  1122. private readonly Func<TSource, IEnumerable<TResult>> _selector;
  1123. public EnumerableSelector(IObservable<TSource> source, Func<TSource, IEnumerable<TResult>> selector)
  1124. {
  1125. _source = source;
  1126. _selector = selector;
  1127. }
  1128. protected override _ CreateSink(IObserver<TResult> observer) => new _(this, observer);
  1129. protected override void Run(_ sink) => sink.Run(_source);
  1130. internal sealed class _ : Sink<TSource, TResult>
  1131. {
  1132. private readonly Func<TSource, IEnumerable<TResult>> _selector;
  1133. public _(EnumerableSelector parent, IObserver<TResult> observer)
  1134. : base(observer)
  1135. {
  1136. _selector = parent._selector;
  1137. }
  1138. public override void OnNext(TSource value)
  1139. {
  1140. var xs = default(IEnumerable<TResult>);
  1141. try
  1142. {
  1143. xs = _selector(value);
  1144. }
  1145. catch (Exception exception)
  1146. {
  1147. ForwardOnError(exception);
  1148. return;
  1149. }
  1150. var e = default(IEnumerator<TResult>);
  1151. try
  1152. {
  1153. e = xs.GetEnumerator();
  1154. }
  1155. catch (Exception exception)
  1156. {
  1157. ForwardOnError(exception);
  1158. return;
  1159. }
  1160. using (e)
  1161. {
  1162. var hasNext = true;
  1163. while (hasNext)
  1164. {
  1165. var current = default(TResult);
  1166. try
  1167. {
  1168. hasNext = e.MoveNext();
  1169. if (hasNext)
  1170. {
  1171. current = e.Current;
  1172. }
  1173. }
  1174. catch (Exception exception)
  1175. {
  1176. ForwardOnError(exception);
  1177. return;
  1178. }
  1179. if (hasNext)
  1180. {
  1181. ForwardOnNext(current);
  1182. }
  1183. }
  1184. }
  1185. }
  1186. }
  1187. }
  1188. internal sealed class EnumerableSelectorIndexed : Producer<TResult, EnumerableSelectorIndexed._>
  1189. {
  1190. private readonly IObservable<TSource> _source;
  1191. private readonly Func<TSource, int, IEnumerable<TResult>> _selector;
  1192. public EnumerableSelectorIndexed(IObservable<TSource> source, Func<TSource, int, IEnumerable<TResult>> selector)
  1193. {
  1194. _source = source;
  1195. _selector = selector;
  1196. }
  1197. protected override _ CreateSink(IObserver<TResult> observer) => new _(this, observer);
  1198. protected override void Run(_ sink) => sink.Run(_source);
  1199. internal sealed class _ : Sink<TSource, TResult>
  1200. {
  1201. private readonly Func<TSource, int, IEnumerable<TResult>> _selector;
  1202. public _(EnumerableSelectorIndexed parent, IObserver<TResult> observer)
  1203. : base(observer)
  1204. {
  1205. _selector = parent._selector;
  1206. }
  1207. private int _index;
  1208. public override void OnNext(TSource value)
  1209. {
  1210. var xs = default(IEnumerable<TResult>);
  1211. try
  1212. {
  1213. xs = _selector(value, checked(_index++));
  1214. }
  1215. catch (Exception exception)
  1216. {
  1217. ForwardOnError(exception);
  1218. return;
  1219. }
  1220. var e = default(IEnumerator<TResult>);
  1221. try
  1222. {
  1223. e = xs.GetEnumerator();
  1224. }
  1225. catch (Exception exception)
  1226. {
  1227. ForwardOnError(exception);
  1228. return;
  1229. }
  1230. using (e)
  1231. {
  1232. var hasNext = true;
  1233. while (hasNext)
  1234. {
  1235. var current = default(TResult);
  1236. try
  1237. {
  1238. hasNext = e.MoveNext();
  1239. if (hasNext)
  1240. {
  1241. current = e.Current;
  1242. }
  1243. }
  1244. catch (Exception exception)
  1245. {
  1246. ForwardOnError(exception);
  1247. return;
  1248. }
  1249. if (hasNext)
  1250. {
  1251. ForwardOnNext(current);
  1252. }
  1253. }
  1254. }
  1255. }
  1256. }
  1257. }
  1258. internal sealed class TaskSelector : Producer<TResult, TaskSelector._>
  1259. {
  1260. private readonly IObservable<TSource> _source;
  1261. private readonly Func<TSource, CancellationToken, Task<TResult>> _selector;
  1262. public TaskSelector(IObservable<TSource> source, Func<TSource, CancellationToken, Task<TResult>> selector)
  1263. {
  1264. _source = source;
  1265. _selector = selector;
  1266. }
  1267. protected override _ CreateSink(IObserver<TResult> observer) => new _(this, observer);
  1268. protected override void Run(_ sink) => sink.Run(_source);
  1269. internal sealed class _ : Sink<TSource, TResult>
  1270. {
  1271. private readonly object _gate = new object();
  1272. private readonly CancellationTokenSource _cts = new CancellationTokenSource();
  1273. private readonly Func<TSource, CancellationToken, Task<TResult>> _selector;
  1274. public _(TaskSelector parent, IObserver<TResult> observer)
  1275. : base(observer)
  1276. {
  1277. _selector = parent._selector;
  1278. }
  1279. private volatile int _count;
  1280. public override void Run(IObservable<TSource> source)
  1281. {
  1282. _count = 1;
  1283. base.Run(source);
  1284. }
  1285. protected override void Dispose(bool disposing)
  1286. {
  1287. if (disposing)
  1288. {
  1289. _cts.Cancel();
  1290. }
  1291. base.Dispose(disposing);
  1292. }
  1293. public override void OnNext(TSource value)
  1294. {
  1295. var task = default(Task<TResult>);
  1296. try
  1297. {
  1298. Interlocked.Increment(ref _count);
  1299. task = _selector(value, _cts.Token);
  1300. }
  1301. catch (Exception ex)
  1302. {
  1303. lock (_gate)
  1304. {
  1305. ForwardOnError(ex);
  1306. }
  1307. return;
  1308. }
  1309. if (task.IsCompleted)
  1310. {
  1311. OnCompletedTask(task);
  1312. }
  1313. else
  1314. {
  1315. task.ContinueWith((closureTask, thisObject) => ((_)thisObject).OnCompletedTask(closureTask), this, _cts.Token);
  1316. }
  1317. }
  1318. private void OnCompletedTask(Task<TResult> task)
  1319. {
  1320. switch (task.Status)
  1321. {
  1322. case TaskStatus.RanToCompletion:
  1323. {
  1324. lock (_gate)
  1325. {
  1326. ForwardOnNext(task.Result);
  1327. }
  1328. OnCompleted();
  1329. break;
  1330. }
  1331. case TaskStatus.Faulted:
  1332. {
  1333. lock (_gate)
  1334. {
  1335. ForwardOnError(task.Exception.InnerException);
  1336. }
  1337. break;
  1338. }
  1339. case TaskStatus.Canceled:
  1340. {
  1341. if (!_cts.IsCancellationRequested)
  1342. {
  1343. lock (_gate)
  1344. {
  1345. ForwardOnError(new TaskCanceledException(task));
  1346. }
  1347. }
  1348. break;
  1349. }
  1350. }
  1351. }
  1352. public override void OnError(Exception error)
  1353. {
  1354. lock (_gate)
  1355. {
  1356. ForwardOnError(error);
  1357. }
  1358. }
  1359. public override void OnCompleted()
  1360. {
  1361. if (Interlocked.Decrement(ref _count) == 0)
  1362. {
  1363. lock (_gate)
  1364. {
  1365. ForwardOnCompleted();
  1366. }
  1367. }
  1368. }
  1369. }
  1370. }
  1371. internal sealed class TaskSelectorIndexed : Producer<TResult, TaskSelectorIndexed._>
  1372. {
  1373. private readonly IObservable<TSource> _source;
  1374. private readonly Func<TSource, int, CancellationToken, Task<TResult>> _selector;
  1375. public TaskSelectorIndexed(IObservable<TSource> source, Func<TSource, int, CancellationToken, Task<TResult>> selector)
  1376. {
  1377. _source = source;
  1378. _selector = selector;
  1379. }
  1380. protected override _ CreateSink(IObserver<TResult> observer) => new _(this, observer);
  1381. protected override void Run(_ sink) => sink.Run(_source);
  1382. internal sealed class _ : Sink<TSource, TResult>
  1383. {
  1384. private readonly object _gate = new object();
  1385. private readonly CancellationTokenSource _cts = new CancellationTokenSource();
  1386. private readonly Func<TSource, int, CancellationToken, Task<TResult>> _selector;
  1387. public _(TaskSelectorIndexed parent, IObserver<TResult> observer)
  1388. : base(observer)
  1389. {
  1390. _selector = parent._selector;
  1391. }
  1392. private volatile int _count;
  1393. private int _index;
  1394. public override void Run(IObservable<TSource> source)
  1395. {
  1396. _count = 1;
  1397. base.Run(source);
  1398. }
  1399. protected override void Dispose(bool disposing)
  1400. {
  1401. if (disposing)
  1402. {
  1403. _cts.Cancel();
  1404. }
  1405. base.Dispose(disposing);
  1406. }
  1407. public override void OnNext(TSource value)
  1408. {
  1409. var task = default(Task<TResult>);
  1410. try
  1411. {
  1412. Interlocked.Increment(ref _count);
  1413. task = _selector(value, checked(_index++), _cts.Token);
  1414. }
  1415. catch (Exception ex)
  1416. {
  1417. lock (_gate)
  1418. {
  1419. ForwardOnError(ex);
  1420. }
  1421. return;
  1422. }
  1423. if (task.IsCompleted)
  1424. {
  1425. OnCompletedTask(task);
  1426. }
  1427. else
  1428. {
  1429. task.ContinueWith((closureTask, thisObject) => ((_)thisObject).OnCompletedTask(closureTask), this, _cts.Token);
  1430. }
  1431. }
  1432. private void OnCompletedTask(Task<TResult> task)
  1433. {
  1434. switch (task.Status)
  1435. {
  1436. case TaskStatus.RanToCompletion:
  1437. {
  1438. lock (_gate)
  1439. {
  1440. ForwardOnNext(task.Result);
  1441. }
  1442. OnCompleted();
  1443. break;
  1444. }
  1445. case TaskStatus.Faulted:
  1446. {
  1447. lock (_gate)
  1448. {
  1449. ForwardOnError(task.Exception.InnerException);
  1450. }
  1451. break;
  1452. }
  1453. case TaskStatus.Canceled:
  1454. {
  1455. if (!_cts.IsCancellationRequested)
  1456. {
  1457. lock (_gate)
  1458. {
  1459. ForwardOnError(new TaskCanceledException(task));
  1460. }
  1461. }
  1462. break;
  1463. }
  1464. }
  1465. }
  1466. public override void OnError(Exception error)
  1467. {
  1468. lock (_gate)
  1469. {
  1470. ForwardOnError(error);
  1471. }
  1472. }
  1473. public override void OnCompleted()
  1474. {
  1475. if (Interlocked.Decrement(ref _count) == 0)
  1476. {
  1477. lock (_gate)
  1478. {
  1479. ForwardOnCompleted();
  1480. }
  1481. }
  1482. }
  1483. }
  1484. }
  1485. }
  1486. }