SelectMany.cs 68 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Reactive.Disposables;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. namespace System.Reactive.Linq.ObservableImpl
  9. {
  10. internal static class SelectMany<TSource, TCollection, TResult>
  11. {
  12. internal sealed class ObservableSelector : Producer<TResult, ObservableSelector._>
  13. {
  14. private readonly IObservable<TSource> _source;
  15. private readonly Func<TSource, IObservable<TCollection>> _collectionSelector;
  16. private readonly Func<TSource, TCollection, TResult> _resultSelector;
  17. public ObservableSelector(IObservable<TSource> source, Func<TSource, IObservable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
  18. {
  19. _source = source;
  20. _collectionSelector = collectionSelector;
  21. _resultSelector = resultSelector;
  22. }
  23. protected override _ CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(this, observer, cancel);
  24. protected override IDisposable Run(_ sink) => sink.Run(_source);
  25. internal sealed class _ : Sink<TResult>, IObserver<TSource>
  26. {
  27. private readonly object _gate = new object();
  28. private readonly SingleAssignmentDisposable _sourceSubscription = new SingleAssignmentDisposable();
  29. private readonly CompositeDisposable _group = new CompositeDisposable();
  30. private readonly Func<TSource, IObservable<TCollection>> _collectionSelector;
  31. private readonly Func<TSource, TCollection, TResult> _resultSelector;
  32. public _(ObservableSelector parent, IObserver<TResult> observer, IDisposable cancel)
  33. : base(observer, cancel)
  34. {
  35. _collectionSelector = parent._collectionSelector;
  36. _resultSelector = parent._resultSelector;
  37. _group.Add(_sourceSubscription);
  38. }
  39. private bool _isStopped;
  40. public IDisposable Run(IObservable<TSource> source)
  41. {
  42. _isStopped = false;
  43. _sourceSubscription.Disposable = source.SubscribeSafe(this);
  44. return _group;
  45. }
  46. public void OnNext(TSource value)
  47. {
  48. var collection = default(IObservable<TCollection>);
  49. try
  50. {
  51. collection = _collectionSelector(value);
  52. }
  53. catch (Exception ex)
  54. {
  55. lock (_gate)
  56. {
  57. base._observer.OnError(ex);
  58. base.Dispose();
  59. }
  60. return;
  61. }
  62. var innerSubscription = new SingleAssignmentDisposable();
  63. _group.Add(innerSubscription);
  64. innerSubscription.Disposable = collection.SubscribeSafe(new InnerObserver(this, value, innerSubscription));
  65. }
  66. public void OnError(Exception error)
  67. {
  68. lock (_gate)
  69. {
  70. base._observer.OnError(error);
  71. base.Dispose();
  72. }
  73. }
  74. public void OnCompleted()
  75. {
  76. _isStopped = true;
  77. if (_group.Count == 1)
  78. {
  79. //
  80. // Notice there can be a race between OnCompleted of the source and any
  81. // of the inner sequences, where both see _group.Count == 1, and one is
  82. // waiting for the lock. There won't be a double OnCompleted observation
  83. // though, because the call to Dispose silences the observer by swapping
  84. // in a NopObserver<T>.
  85. //
  86. lock (_gate)
  87. {
  88. base._observer.OnCompleted();
  89. base.Dispose();
  90. }
  91. }
  92. else
  93. {
  94. _sourceSubscription.Dispose();
  95. }
  96. }
  97. private sealed class InnerObserver : IObserver<TCollection>
  98. {
  99. private readonly _ _parent;
  100. private readonly TSource _value;
  101. private readonly IDisposable _self;
  102. public InnerObserver(_ parent, TSource value, IDisposable self)
  103. {
  104. _parent = parent;
  105. _value = value;
  106. _self = self;
  107. }
  108. public void OnNext(TCollection value)
  109. {
  110. var res = default(TResult);
  111. try
  112. {
  113. res = _parent._resultSelector(_value, value);
  114. }
  115. catch (Exception ex)
  116. {
  117. lock (_parent._gate)
  118. {
  119. _parent._observer.OnError(ex);
  120. _parent.Dispose();
  121. }
  122. return;
  123. }
  124. lock (_parent._gate)
  125. _parent._observer.OnNext(res);
  126. }
  127. public void OnError(Exception error)
  128. {
  129. lock (_parent._gate)
  130. {
  131. _parent._observer.OnError(error);
  132. _parent.Dispose();
  133. }
  134. }
  135. public void OnCompleted()
  136. {
  137. _parent._group.Remove(_self);
  138. if (_parent._isStopped && _parent._group.Count == 1)
  139. {
  140. //
  141. // Notice there can be a race between OnCompleted of the source and any
  142. // of the inner sequences, where both see _group.Count == 1, and one is
  143. // waiting for the lock. There won't be a double OnCompleted observation
  144. // though, because the call to Dispose silences the observer by swapping
  145. // in a NopObserver<T>.
  146. //
  147. lock (_parent._gate)
  148. {
  149. _parent._observer.OnCompleted();
  150. _parent.Dispose();
  151. }
  152. }
  153. }
  154. }
  155. }
  156. }
  157. internal sealed class ObservableSelectorIndexed : Producer<TResult, ObservableSelectorIndexed._>
  158. {
  159. private readonly IObservable<TSource> _source;
  160. private readonly Func<TSource, int, IObservable<TCollection>> _collectionSelector;
  161. private readonly Func<TSource, int, TCollection, int, TResult> _resultSelector;
  162. public ObservableSelectorIndexed(IObservable<TSource> source, Func<TSource, int, IObservable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector)
  163. {
  164. _source = source;
  165. _collectionSelector = collectionSelector;
  166. _resultSelector = resultSelector;
  167. }
  168. protected override _ CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(this, observer, cancel);
  169. protected override IDisposable Run(_ sink) => sink.Run(_source);
  170. internal sealed class _ : Sink<TResult>, IObserver<TSource>
  171. {
  172. private readonly object _gate = new object();
  173. private readonly SingleAssignmentDisposable _sourceSubscription = new SingleAssignmentDisposable();
  174. private readonly CompositeDisposable _group = new CompositeDisposable();
  175. private readonly Func<TSource, int, IObservable<TCollection>> _collectionSelector;
  176. private readonly Func<TSource, int, TCollection, int, TResult> _resultSelector;
  177. public _(ObservableSelectorIndexed parent, IObserver<TResult> observer, IDisposable cancel)
  178. : base(observer, cancel)
  179. {
  180. _collectionSelector = parent._collectionSelector;
  181. _resultSelector = parent._resultSelector;
  182. _group.Add(_sourceSubscription);
  183. }
  184. private bool _isStopped;
  185. private int _index;
  186. public IDisposable Run(IObservable<TSource> source)
  187. {
  188. _isStopped = false;
  189. _sourceSubscription.Disposable = source.SubscribeSafe(this);
  190. return _group;
  191. }
  192. public void OnNext(TSource value)
  193. {
  194. var index = checked(_index++);
  195. var collection = default(IObservable<TCollection>);
  196. try
  197. {
  198. collection = _collectionSelector(value, index);
  199. }
  200. catch (Exception ex)
  201. {
  202. lock (_gate)
  203. {
  204. base._observer.OnError(ex);
  205. base.Dispose();
  206. }
  207. return;
  208. }
  209. var innerSubscription = new SingleAssignmentDisposable();
  210. _group.Add(innerSubscription);
  211. innerSubscription.Disposable = collection.SubscribeSafe(new InnerObserver(this, value, index, innerSubscription));
  212. }
  213. public void OnError(Exception error)
  214. {
  215. lock (_gate)
  216. {
  217. base._observer.OnError(error);
  218. base.Dispose();
  219. }
  220. }
  221. public void OnCompleted()
  222. {
  223. _isStopped = true;
  224. if (_group.Count == 1)
  225. {
  226. //
  227. // Notice there can be a race between OnCompleted of the source and any
  228. // of the inner sequences, where both see _group.Count == 1, and one is
  229. // waiting for the lock. There won't be a double OnCompleted observation
  230. // though, because the call to Dispose silences the observer by swapping
  231. // in a NopObserver<T>.
  232. //
  233. lock (_gate)
  234. {
  235. base._observer.OnCompleted();
  236. base.Dispose();
  237. }
  238. }
  239. else
  240. {
  241. _sourceSubscription.Dispose();
  242. }
  243. }
  244. private sealed class InnerObserver : IObserver<TCollection>
  245. {
  246. private readonly _ _parent;
  247. private readonly TSource _value;
  248. private readonly int _valueIndex;
  249. private readonly IDisposable _self;
  250. public InnerObserver(_ parent, TSource value, int index, IDisposable self)
  251. {
  252. _parent = parent;
  253. _value = value;
  254. _valueIndex = index;
  255. _self = self;
  256. }
  257. private int _index;
  258. public void OnNext(TCollection value)
  259. {
  260. var res = default(TResult);
  261. try
  262. {
  263. res = _parent._resultSelector(_value, _valueIndex, value, checked(_index++));
  264. }
  265. catch (Exception ex)
  266. {
  267. lock (_parent._gate)
  268. {
  269. _parent._observer.OnError(ex);
  270. _parent.Dispose();
  271. }
  272. return;
  273. }
  274. lock (_parent._gate)
  275. _parent._observer.OnNext(res);
  276. }
  277. public void OnError(Exception error)
  278. {
  279. lock (_parent._gate)
  280. {
  281. _parent._observer.OnError(error);
  282. _parent.Dispose();
  283. }
  284. }
  285. public void OnCompleted()
  286. {
  287. _parent._group.Remove(_self);
  288. if (_parent._isStopped && _parent._group.Count == 1)
  289. {
  290. //
  291. // Notice there can be a race between OnCompleted of the source and any
  292. // of the inner sequences, where both see _group.Count == 1, and one is
  293. // waiting for the lock. There won't be a double OnCompleted observation
  294. // though, because the call to Dispose silences the observer by swapping
  295. // in a NopObserver<T>.
  296. //
  297. lock (_parent._gate)
  298. {
  299. _parent._observer.OnCompleted();
  300. _parent.Dispose();
  301. }
  302. }
  303. }
  304. }
  305. }
  306. }
  307. internal sealed class EnumerableSelector : Producer<TResult, EnumerableSelector._>
  308. {
  309. private readonly IObservable<TSource> _source;
  310. private readonly Func<TSource, IEnumerable<TCollection>> _collectionSelector;
  311. private readonly Func<TSource, TCollection, TResult> _resultSelector;
  312. public EnumerableSelector(IObservable<TSource> source, Func<TSource, IEnumerable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
  313. {
  314. _source = source;
  315. _collectionSelector = collectionSelector;
  316. _resultSelector = resultSelector;
  317. }
  318. protected override _ CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(this, observer, cancel);
  319. protected override IDisposable Run(_ sink) => _source.SubscribeSafe(sink);
  320. internal sealed class _ : Sink<TResult>, IObserver<TSource>
  321. {
  322. private readonly Func<TSource, IEnumerable<TCollection>> _collectionSelector;
  323. private readonly Func<TSource, TCollection, TResult> _resultSelector;
  324. public _(EnumerableSelector parent, IObserver<TResult> observer, IDisposable cancel)
  325. : base(observer, cancel)
  326. {
  327. _collectionSelector = parent._collectionSelector;
  328. _resultSelector = parent._resultSelector;
  329. }
  330. public void OnNext(TSource value)
  331. {
  332. var xs = default(IEnumerable<TCollection>);
  333. try
  334. {
  335. xs = _collectionSelector(value);
  336. }
  337. catch (Exception exception)
  338. {
  339. base._observer.OnError(exception);
  340. base.Dispose();
  341. return;
  342. }
  343. var e = default(IEnumerator<TCollection>);
  344. try
  345. {
  346. e = xs.GetEnumerator();
  347. }
  348. catch (Exception exception)
  349. {
  350. base._observer.OnError(exception);
  351. base.Dispose();
  352. return;
  353. }
  354. try
  355. {
  356. var hasNext = true;
  357. while (hasNext)
  358. {
  359. hasNext = false;
  360. var current = default(TResult);
  361. try
  362. {
  363. hasNext = e.MoveNext();
  364. if (hasNext)
  365. current = _resultSelector(value, e.Current);
  366. }
  367. catch (Exception exception)
  368. {
  369. base._observer.OnError(exception);
  370. base.Dispose();
  371. return;
  372. }
  373. if (hasNext)
  374. base._observer.OnNext(current);
  375. }
  376. }
  377. finally
  378. {
  379. if (e != null)
  380. e.Dispose();
  381. }
  382. }
  383. public void OnError(Exception error)
  384. {
  385. base._observer.OnError(error);
  386. base.Dispose();
  387. }
  388. public void OnCompleted()
  389. {
  390. base._observer.OnCompleted();
  391. base.Dispose();
  392. }
  393. }
  394. }
  395. internal sealed class EnumerableSelectorIndexed : Producer<TResult, EnumerableSelectorIndexed._>
  396. {
  397. private readonly IObservable<TSource> _source;
  398. private readonly Func<TSource, int, IEnumerable<TCollection>> _collectionSelector;
  399. private readonly Func<TSource, int, TCollection, int, TResult> _resultSelector;
  400. public EnumerableSelectorIndexed(IObservable<TSource> source, Func<TSource, int, IEnumerable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector)
  401. {
  402. _source = source;
  403. _collectionSelector = collectionSelector;
  404. _resultSelector = resultSelector;
  405. }
  406. protected override _ CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(this, observer, cancel);
  407. protected override IDisposable Run(_ sink) => _source.SubscribeSafe(sink);
  408. internal sealed class _ : Sink<TResult>, IObserver<TSource>
  409. {
  410. private readonly Func<TSource, int, IEnumerable<TCollection>> _collectionSelector;
  411. private readonly Func<TSource, int, TCollection, int, TResult> _resultSelector;
  412. public _(EnumerableSelectorIndexed parent, IObserver<TResult> observer, IDisposable cancel)
  413. : base(observer, cancel)
  414. {
  415. _collectionSelector = parent._collectionSelector;
  416. _resultSelector = parent._resultSelector;
  417. }
  418. private int _index;
  419. public void OnNext(TSource value)
  420. {
  421. var index = checked(_index++);
  422. var xs = default(IEnumerable<TCollection>);
  423. try
  424. {
  425. xs = _collectionSelector(value, index);
  426. }
  427. catch (Exception exception)
  428. {
  429. base._observer.OnError(exception);
  430. base.Dispose();
  431. return;
  432. }
  433. var e = default(IEnumerator<TCollection>);
  434. try
  435. {
  436. e = xs.GetEnumerator();
  437. }
  438. catch (Exception exception)
  439. {
  440. base._observer.OnError(exception);
  441. base.Dispose();
  442. return;
  443. }
  444. try
  445. {
  446. var eIndex = 0;
  447. var hasNext = true;
  448. while (hasNext)
  449. {
  450. hasNext = false;
  451. var current = default(TResult);
  452. try
  453. {
  454. hasNext = e.MoveNext();
  455. if (hasNext)
  456. current = _resultSelector(value, index, e.Current, checked(eIndex++));
  457. }
  458. catch (Exception exception)
  459. {
  460. base._observer.OnError(exception);
  461. base.Dispose();
  462. return;
  463. }
  464. if (hasNext)
  465. base._observer.OnNext(current);
  466. }
  467. }
  468. finally
  469. {
  470. if (e != null)
  471. e.Dispose();
  472. }
  473. }
  474. public void OnError(Exception error)
  475. {
  476. base._observer.OnError(error);
  477. base.Dispose();
  478. }
  479. public void OnCompleted()
  480. {
  481. base._observer.OnCompleted();
  482. base.Dispose();
  483. }
  484. }
  485. }
  486. internal sealed class TaskSelector : Producer<TResult, TaskSelector._>
  487. {
  488. private readonly IObservable<TSource> _source;
  489. private readonly Func<TSource, CancellationToken, Task<TCollection>> _collectionSelector;
  490. private readonly Func<TSource, TCollection, TResult> _resultSelector;
  491. public TaskSelector(IObservable<TSource> source, Func<TSource, CancellationToken, Task<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
  492. {
  493. _source = source;
  494. _collectionSelector = collectionSelector;
  495. _resultSelector = resultSelector;
  496. }
  497. protected override _ CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(this, observer, cancel);
  498. protected override IDisposable Run(_ sink) => sink.Run(_source);
  499. internal sealed class _ : Sink<TResult>, IObserver<TSource>
  500. {
  501. private readonly object _gate = new object();
  502. private readonly CancellationDisposable _cancel = new CancellationDisposable();
  503. private readonly Func<TSource, CancellationToken, Task<TCollection>> _collectionSelector;
  504. private readonly Func<TSource, TCollection, TResult> _resultSelector;
  505. public _(TaskSelector parent, IObserver<TResult> observer, IDisposable cancel)
  506. : base(observer, cancel)
  507. {
  508. _collectionSelector = parent._collectionSelector;
  509. _resultSelector = parent._resultSelector;
  510. }
  511. private volatile int _count;
  512. public IDisposable Run(IObservable<TSource> source)
  513. {
  514. _count = 1;
  515. return StableCompositeDisposable.Create(source.SubscribeSafe(this), _cancel);
  516. }
  517. public void OnNext(TSource value)
  518. {
  519. var task = default(Task<TCollection>);
  520. try
  521. {
  522. Interlocked.Increment(ref _count);
  523. task = _collectionSelector(value, _cancel.Token);
  524. }
  525. catch (Exception ex)
  526. {
  527. lock (_gate)
  528. {
  529. base._observer.OnError(ex);
  530. base.Dispose();
  531. }
  532. return;
  533. }
  534. if (task.IsCompleted)
  535. {
  536. OnCompletedTask(value, task);
  537. }
  538. else
  539. {
  540. AttachContinuation(value, task);
  541. }
  542. }
  543. private void AttachContinuation(TSource value, Task<TCollection> task)
  544. {
  545. //
  546. // Separate method to avoid closure in synchronous completion case.
  547. //
  548. task.ContinueWith(t => OnCompletedTask(value, t));
  549. }
  550. private void OnCompletedTask(TSource value, Task<TCollection> task)
  551. {
  552. switch (task.Status)
  553. {
  554. case TaskStatus.RanToCompletion:
  555. {
  556. var res = default(TResult);
  557. try
  558. {
  559. res = _resultSelector(value, task.Result);
  560. }
  561. catch (Exception ex)
  562. {
  563. lock (_gate)
  564. {
  565. base._observer.OnError(ex);
  566. base.Dispose();
  567. }
  568. return;
  569. }
  570. lock (_gate)
  571. base._observer.OnNext(res);
  572. OnCompleted();
  573. }
  574. break;
  575. case TaskStatus.Faulted:
  576. {
  577. lock (_gate)
  578. {
  579. base._observer.OnError(task.Exception.InnerException);
  580. base.Dispose();
  581. }
  582. }
  583. break;
  584. case TaskStatus.Canceled:
  585. {
  586. if (!_cancel.IsDisposed)
  587. {
  588. lock (_gate)
  589. {
  590. base._observer.OnError(new TaskCanceledException(task));
  591. base.Dispose();
  592. }
  593. }
  594. }
  595. break;
  596. }
  597. }
  598. public void OnError(Exception error)
  599. {
  600. lock (_gate)
  601. {
  602. base._observer.OnError(error);
  603. base.Dispose();
  604. }
  605. }
  606. public void OnCompleted()
  607. {
  608. if (Interlocked.Decrement(ref _count) == 0)
  609. {
  610. lock (_gate)
  611. {
  612. base._observer.OnCompleted();
  613. base.Dispose();
  614. }
  615. }
  616. }
  617. }
  618. }
  619. internal sealed class TaskSelectorIndexed : Producer<TResult, TaskSelectorIndexed._>
  620. {
  621. private readonly IObservable<TSource> _source;
  622. private readonly Func<TSource, int, CancellationToken, Task<TCollection>> _collectionSelector;
  623. private readonly Func<TSource, int, TCollection, TResult> _resultSelector;
  624. public TaskSelectorIndexed(IObservable<TSource> source, Func<TSource, int, CancellationToken, Task<TCollection>> collectionSelector, Func<TSource, int, TCollection, TResult> resultSelector)
  625. {
  626. _source = source;
  627. _collectionSelector = collectionSelector;
  628. _resultSelector = resultSelector;
  629. }
  630. protected override _ CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(this, observer, cancel);
  631. protected override IDisposable Run(_ sink) => sink.Run(_source);
  632. internal sealed class _ : Sink<TResult>, IObserver<TSource>
  633. {
  634. private readonly object _gate = new object();
  635. private readonly CancellationDisposable _cancel = new CancellationDisposable();
  636. private readonly Func<TSource, int, CancellationToken, Task<TCollection>> _collectionSelector;
  637. private readonly Func<TSource, int, TCollection, TResult> _resultSelector;
  638. public _(TaskSelectorIndexed parent, IObserver<TResult> observer, IDisposable cancel)
  639. : base(observer, cancel)
  640. {
  641. _collectionSelector = parent._collectionSelector;
  642. _resultSelector = parent._resultSelector;
  643. }
  644. private volatile int _count;
  645. private int _index;
  646. public IDisposable Run(IObservable<TSource> source)
  647. {
  648. _count = 1;
  649. return StableCompositeDisposable.Create(source.SubscribeSafe(this), _cancel);
  650. }
  651. public void OnNext(TSource value)
  652. {
  653. var index = checked(_index++);
  654. var task = default(Task<TCollection>);
  655. try
  656. {
  657. Interlocked.Increment(ref _count);
  658. task = _collectionSelector(value, index, _cancel.Token);
  659. }
  660. catch (Exception ex)
  661. {
  662. lock (_gate)
  663. {
  664. base._observer.OnError(ex);
  665. base.Dispose();
  666. }
  667. return;
  668. }
  669. if (task.IsCompleted)
  670. {
  671. OnCompletedTask(value, index, task);
  672. }
  673. else
  674. {
  675. AttachContinuation(value, index, task);
  676. }
  677. }
  678. private void AttachContinuation(TSource value, int index, Task<TCollection> task)
  679. {
  680. //
  681. // Separate method to avoid closure in synchronous completion case.
  682. //
  683. task.ContinueWith(t => OnCompletedTask(value, index, t));
  684. }
  685. private void OnCompletedTask(TSource value, int index, Task<TCollection> task)
  686. {
  687. switch (task.Status)
  688. {
  689. case TaskStatus.RanToCompletion:
  690. {
  691. var res = default(TResult);
  692. try
  693. {
  694. res = _resultSelector(value, index, task.Result);
  695. }
  696. catch (Exception ex)
  697. {
  698. lock (_gate)
  699. {
  700. base._observer.OnError(ex);
  701. base.Dispose();
  702. }
  703. return;
  704. }
  705. lock (_gate)
  706. base._observer.OnNext(res);
  707. OnCompleted();
  708. }
  709. break;
  710. case TaskStatus.Faulted:
  711. {
  712. lock (_gate)
  713. {
  714. base._observer.OnError(task.Exception.InnerException);
  715. base.Dispose();
  716. }
  717. }
  718. break;
  719. case TaskStatus.Canceled:
  720. {
  721. if (!_cancel.IsDisposed)
  722. {
  723. lock (_gate)
  724. {
  725. base._observer.OnError(new TaskCanceledException(task));
  726. base.Dispose();
  727. }
  728. }
  729. }
  730. break;
  731. }
  732. }
  733. public void OnError(Exception error)
  734. {
  735. lock (_gate)
  736. {
  737. base._observer.OnError(error);
  738. base.Dispose();
  739. }
  740. }
  741. public void OnCompleted()
  742. {
  743. if (Interlocked.Decrement(ref _count) == 0)
  744. {
  745. lock (_gate)
  746. {
  747. base._observer.OnCompleted();
  748. base.Dispose();
  749. }
  750. }
  751. }
  752. }
  753. }
  754. }
  755. internal static class SelectMany<TSource, TResult>
  756. {
  757. internal class ObservableSelector : Producer<TResult, ObservableSelector._>
  758. {
  759. protected readonly IObservable<TSource> _source;
  760. protected readonly Func<TSource, IObservable<TResult>> _selector;
  761. public ObservableSelector(IObservable<TSource> source, Func<TSource, IObservable<TResult>> selector)
  762. {
  763. _source = source;
  764. _selector = selector;
  765. }
  766. protected override _ CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(this, observer, cancel);
  767. protected override IDisposable Run(_ sink) => sink.Run(_source);
  768. internal class _ : Sink<TResult>, IObserver<TSource>
  769. {
  770. protected readonly object _gate = new object();
  771. private readonly SingleAssignmentDisposable _sourceSubscription = new SingleAssignmentDisposable();
  772. private readonly CompositeDisposable _group = new CompositeDisposable();
  773. private readonly Func<TSource, IObservable<TResult>> _selector;
  774. public _(ObservableSelector parent, IObserver<TResult> observer, IDisposable cancel)
  775. : base(observer, cancel)
  776. {
  777. _selector = parent._selector;
  778. _group.Add(_sourceSubscription);
  779. }
  780. private bool _isStopped;
  781. public IDisposable Run(IObservable<TSource> source)
  782. {
  783. _isStopped = false;
  784. _sourceSubscription.Disposable = source.SubscribeSafe(this);
  785. return _group;
  786. }
  787. public void OnNext(TSource value)
  788. {
  789. var inner = default(IObservable<TResult>);
  790. try
  791. {
  792. inner = _selector(value);
  793. }
  794. catch (Exception ex)
  795. {
  796. lock (_gate)
  797. {
  798. base._observer.OnError(ex);
  799. base.Dispose();
  800. }
  801. return;
  802. }
  803. SubscribeInner(inner);
  804. }
  805. public virtual void OnError(Exception error)
  806. {
  807. lock (_gate)
  808. {
  809. base._observer.OnError(error);
  810. base.Dispose();
  811. }
  812. }
  813. public virtual void OnCompleted()
  814. {
  815. Final();
  816. }
  817. protected void Final()
  818. {
  819. _isStopped = true;
  820. if (_group.Count == 1)
  821. {
  822. //
  823. // Notice there can be a race between OnCompleted of the source and any
  824. // of the inner sequences, where both see _group.Count == 1, and one is
  825. // waiting for the lock. There won't be a double OnCompleted observation
  826. // though, because the call to Dispose silences the observer by swapping
  827. // in a NopObserver<T>.
  828. //
  829. lock (_gate)
  830. {
  831. base._observer.OnCompleted();
  832. base.Dispose();
  833. }
  834. }
  835. else
  836. {
  837. _sourceSubscription.Dispose();
  838. }
  839. }
  840. protected void SubscribeInner(IObservable<TResult> inner)
  841. {
  842. var innerSubscription = new SingleAssignmentDisposable();
  843. _group.Add(innerSubscription);
  844. innerSubscription.Disposable = inner.SubscribeSafe(new InnerObserver(this, innerSubscription));
  845. }
  846. private sealed class InnerObserver : IObserver<TResult>
  847. {
  848. private readonly _ _parent;
  849. private readonly IDisposable _self;
  850. public InnerObserver(_ parent, IDisposable self)
  851. {
  852. _parent = parent;
  853. _self = self;
  854. }
  855. public void OnNext(TResult value)
  856. {
  857. lock (_parent._gate)
  858. _parent._observer.OnNext(value);
  859. }
  860. public void OnError(Exception error)
  861. {
  862. lock (_parent._gate)
  863. {
  864. _parent._observer.OnError(error);
  865. _parent.Dispose();
  866. }
  867. }
  868. public void OnCompleted()
  869. {
  870. _parent._group.Remove(_self);
  871. if (_parent._isStopped && _parent._group.Count == 1)
  872. {
  873. //
  874. // Notice there can be a race between OnCompleted of the source and any
  875. // of the inner sequences, where both see _group.Count == 1, and one is
  876. // waiting for the lock. There won't be a double OnCompleted observation
  877. // though, because the call to Dispose silences the observer by swapping
  878. // in a NopObserver<T>.
  879. //
  880. lock (_parent._gate)
  881. {
  882. _parent._observer.OnCompleted();
  883. _parent.Dispose();
  884. }
  885. }
  886. }
  887. }
  888. }
  889. }
  890. internal sealed class ObservableSelectors : ObservableSelector
  891. {
  892. private readonly Func<Exception, IObservable<TResult>> _selectorOnError;
  893. private readonly Func<IObservable<TResult>> _selectorOnCompleted;
  894. public ObservableSelectors(IObservable<TSource> source, Func<TSource, IObservable<TResult>> selector, Func<Exception, IObservable<TResult>> selectorOnError, Func<IObservable<TResult>> selectorOnCompleted)
  895. : base(source, selector)
  896. {
  897. _selectorOnError = selectorOnError;
  898. _selectorOnCompleted = selectorOnCompleted;
  899. }
  900. protected override ObservableSelector._ CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(this, observer, cancel);
  901. new internal sealed class _ : ObservableSelector._
  902. {
  903. private readonly Func<Exception, IObservable<TResult>> _selectorOnError;
  904. private readonly Func<IObservable<TResult>> _selectorOnCompleted;
  905. public _(ObservableSelectors parent, IObserver<TResult> observer, IDisposable cancel)
  906. : base(parent, observer, cancel)
  907. {
  908. _selectorOnError = parent._selectorOnError;
  909. _selectorOnCompleted = parent._selectorOnCompleted;
  910. }
  911. public override void OnError(Exception error)
  912. {
  913. if (_selectorOnError != null)
  914. {
  915. var inner = default(IObservable<TResult>);
  916. try
  917. {
  918. inner = _selectorOnError(error);
  919. }
  920. catch (Exception ex)
  921. {
  922. lock (_gate)
  923. {
  924. base._observer.OnError(ex);
  925. base.Dispose();
  926. }
  927. return;
  928. }
  929. SubscribeInner(inner);
  930. Final();
  931. }
  932. else
  933. {
  934. base.OnError(error);
  935. }
  936. }
  937. public override void OnCompleted()
  938. {
  939. if (_selectorOnCompleted != null)
  940. {
  941. var inner = default(IObservable<TResult>);
  942. try
  943. {
  944. inner = _selectorOnCompleted();
  945. }
  946. catch (Exception ex)
  947. {
  948. lock (_gate)
  949. {
  950. base._observer.OnError(ex);
  951. base.Dispose();
  952. }
  953. return;
  954. }
  955. SubscribeInner(inner);
  956. }
  957. Final();
  958. }
  959. }
  960. }
  961. internal class ObservableSelectorIndexed : Producer<TResult, ObservableSelectorIndexed._>
  962. {
  963. protected readonly IObservable<TSource> _source;
  964. protected readonly Func<TSource, int, IObservable<TResult>> _selector;
  965. public ObservableSelectorIndexed(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector)
  966. {
  967. _source = source;
  968. _selector = selector;
  969. }
  970. protected override _ CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(this, observer, cancel);
  971. protected override IDisposable Run(_ sink) => sink.Run(_source);
  972. internal class _ : Sink<TResult>, IObserver<TSource>
  973. {
  974. private readonly object _gate = new object();
  975. private readonly SingleAssignmentDisposable _sourceSubscription = new SingleAssignmentDisposable();
  976. private readonly CompositeDisposable _group = new CompositeDisposable();
  977. protected readonly Func<TSource, int, IObservable<TResult>> _selector;
  978. public _(ObservableSelectorIndexed parent, IObserver<TResult> observer, IDisposable cancel)
  979. : base(observer, cancel)
  980. {
  981. _selector = parent._selector;
  982. _group.Add(_sourceSubscription);
  983. }
  984. private bool _isStopped;
  985. private int _index;
  986. public IDisposable Run(IObservable<TSource> source)
  987. {
  988. _isStopped = false;
  989. _sourceSubscription.Disposable = source.SubscribeSafe(this);
  990. return _group;
  991. }
  992. public void OnNext(TSource value)
  993. {
  994. var inner = default(IObservable<TResult>);
  995. try
  996. {
  997. inner = _selector(value, checked(_index++));
  998. }
  999. catch (Exception ex)
  1000. {
  1001. lock (_gate)
  1002. {
  1003. base._observer.OnError(ex);
  1004. base.Dispose();
  1005. }
  1006. return;
  1007. }
  1008. SubscribeInner(inner);
  1009. }
  1010. public virtual void OnError(Exception error)
  1011. {
  1012. lock (_gate)
  1013. {
  1014. base._observer.OnError(error);
  1015. base.Dispose();
  1016. }
  1017. }
  1018. public virtual void OnCompleted()
  1019. {
  1020. Final();
  1021. }
  1022. protected void Final()
  1023. {
  1024. _isStopped = true;
  1025. if (_group.Count == 1)
  1026. {
  1027. //
  1028. // Notice there can be a race between OnCompleted of the source and any
  1029. // of the inner sequences, where both see _group.Count == 1, and one is
  1030. // waiting for the lock. There won't be a double OnCompleted observation
  1031. // though, because the call to Dispose silences the observer by swapping
  1032. // in a NopObserver<T>.
  1033. //
  1034. lock (_gate)
  1035. {
  1036. base._observer.OnCompleted();
  1037. base.Dispose();
  1038. }
  1039. }
  1040. else
  1041. {
  1042. _sourceSubscription.Dispose();
  1043. }
  1044. }
  1045. protected void SubscribeInner(IObservable<TResult> inner)
  1046. {
  1047. var innerSubscription = new SingleAssignmentDisposable();
  1048. _group.Add(innerSubscription);
  1049. innerSubscription.Disposable = inner.SubscribeSafe(new InnerObserver(this, innerSubscription));
  1050. }
  1051. private sealed class InnerObserver : IObserver<TResult>
  1052. {
  1053. private readonly _ _parent;
  1054. private readonly IDisposable _self;
  1055. public InnerObserver(_ parent, IDisposable self)
  1056. {
  1057. _parent = parent;
  1058. _self = self;
  1059. }
  1060. public void OnNext(TResult value)
  1061. {
  1062. lock (_parent._gate)
  1063. _parent._observer.OnNext(value);
  1064. }
  1065. public void OnError(Exception error)
  1066. {
  1067. lock (_parent._gate)
  1068. {
  1069. _parent._observer.OnError(error);
  1070. _parent.Dispose();
  1071. }
  1072. }
  1073. public void OnCompleted()
  1074. {
  1075. _parent._group.Remove(_self);
  1076. if (_parent._isStopped && _parent._group.Count == 1)
  1077. {
  1078. //
  1079. // Notice there can be a race between OnCompleted of the source and any
  1080. // of the inner sequences, where both see _group.Count == 1, and one is
  1081. // waiting for the lock. There won't be a double OnCompleted observation
  1082. // though, because the call to Dispose silences the observer by swapping
  1083. // in a NopObserver<T>.
  1084. //
  1085. lock (_parent._gate)
  1086. {
  1087. _parent._observer.OnCompleted();
  1088. _parent.Dispose();
  1089. }
  1090. }
  1091. }
  1092. }
  1093. }
  1094. }
  1095. internal sealed class ObservableSelectorsIndexed : ObservableSelectorIndexed
  1096. {
  1097. private readonly Func<Exception, IObservable<TResult>> _selectorOnError;
  1098. private readonly Func<IObservable<TResult>> _selectorOnCompleted;
  1099. public ObservableSelectorsIndexed(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector, Func<Exception, IObservable<TResult>> selectorOnError, Func<IObservable<TResult>> selectorOnCompleted)
  1100. : base(source, selector)
  1101. {
  1102. _selectorOnError = selectorOnError;
  1103. _selectorOnCompleted = selectorOnCompleted;
  1104. }
  1105. protected override ObservableSelectorIndexed._ CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(this, observer, cancel);
  1106. new internal sealed class _ : ObservableSelectorIndexed._
  1107. {
  1108. private readonly object _gate = new object();
  1109. private readonly SingleAssignmentDisposable _sourceSubscription = new SingleAssignmentDisposable();
  1110. private readonly CompositeDisposable _group = new CompositeDisposable();
  1111. private readonly Func<Exception, IObservable<TResult>> _selectorOnError;
  1112. private readonly Func<IObservable<TResult>> _selectorOnCompleted;
  1113. public _(ObservableSelectorsIndexed parent, IObserver<TResult> observer, IDisposable cancel)
  1114. : base(parent, observer, cancel)
  1115. {
  1116. _selectorOnError = parent._selectorOnError;
  1117. _selectorOnCompleted = parent._selectorOnCompleted;
  1118. _group.Add(_sourceSubscription);
  1119. }
  1120. public override void OnError(Exception error)
  1121. {
  1122. if (_selectorOnError != null)
  1123. {
  1124. var inner = default(IObservable<TResult>);
  1125. try
  1126. {
  1127. inner = _selectorOnError(error);
  1128. }
  1129. catch (Exception ex)
  1130. {
  1131. lock (_gate)
  1132. {
  1133. base._observer.OnError(ex);
  1134. base.Dispose();
  1135. }
  1136. return;
  1137. }
  1138. SubscribeInner(inner);
  1139. Final();
  1140. }
  1141. else
  1142. {
  1143. base.OnError(error);
  1144. }
  1145. }
  1146. public override void OnCompleted()
  1147. {
  1148. if (_selectorOnCompleted != null)
  1149. {
  1150. var inner = default(IObservable<TResult>);
  1151. try
  1152. {
  1153. inner = _selectorOnCompleted();
  1154. }
  1155. catch (Exception ex)
  1156. {
  1157. lock (_gate)
  1158. {
  1159. base._observer.OnError(ex);
  1160. base.Dispose();
  1161. }
  1162. return;
  1163. }
  1164. SubscribeInner(inner);
  1165. }
  1166. Final();
  1167. }
  1168. }
  1169. }
  1170. internal sealed class EnumerableSelector : Producer<TResult, EnumerableSelector._>
  1171. {
  1172. private readonly IObservable<TSource> _source;
  1173. private readonly Func<TSource, IEnumerable<TResult>> _selector;
  1174. public EnumerableSelector(IObservable<TSource> source, Func<TSource, IEnumerable<TResult>> selector)
  1175. {
  1176. _source = source;
  1177. _selector = selector;
  1178. }
  1179. protected override _ CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(this, observer, cancel);
  1180. protected override IDisposable Run(_ sink) => _source.SubscribeSafe(sink);
  1181. internal sealed class _ : Sink<TResult>, IObserver<TSource>
  1182. {
  1183. private readonly Func<TSource, IEnumerable<TResult>> _selector;
  1184. public _(EnumerableSelector parent, IObserver<TResult> observer, IDisposable cancel)
  1185. : base(observer, cancel)
  1186. {
  1187. _selector = parent._selector;
  1188. }
  1189. public void OnNext(TSource value)
  1190. {
  1191. var xs = default(IEnumerable<TResult>);
  1192. try
  1193. {
  1194. xs = _selector(value);
  1195. }
  1196. catch (Exception exception)
  1197. {
  1198. base._observer.OnError(exception);
  1199. base.Dispose();
  1200. return;
  1201. }
  1202. var e = default(IEnumerator<TResult>);
  1203. try
  1204. {
  1205. e = xs.GetEnumerator();
  1206. }
  1207. catch (Exception exception)
  1208. {
  1209. base._observer.OnError(exception);
  1210. base.Dispose();
  1211. return;
  1212. }
  1213. try
  1214. {
  1215. var hasNext = true;
  1216. while (hasNext)
  1217. {
  1218. hasNext = false;
  1219. var current = default(TResult);
  1220. try
  1221. {
  1222. hasNext = e.MoveNext();
  1223. if (hasNext)
  1224. current = e.Current;
  1225. }
  1226. catch (Exception exception)
  1227. {
  1228. base._observer.OnError(exception);
  1229. base.Dispose();
  1230. return;
  1231. }
  1232. if (hasNext)
  1233. base._observer.OnNext(current);
  1234. }
  1235. }
  1236. finally
  1237. {
  1238. if (e != null)
  1239. e.Dispose();
  1240. }
  1241. }
  1242. public void OnError(Exception error)
  1243. {
  1244. base._observer.OnError(error);
  1245. base.Dispose();
  1246. }
  1247. public void OnCompleted()
  1248. {
  1249. base._observer.OnCompleted();
  1250. base.Dispose();
  1251. }
  1252. }
  1253. }
  1254. internal sealed class EnumerableSelectorIndexed : Producer<TResult, EnumerableSelectorIndexed._>
  1255. {
  1256. private readonly IObservable<TSource> _source;
  1257. private readonly Func<TSource, int, IEnumerable<TResult>> _selector;
  1258. public EnumerableSelectorIndexed(IObservable<TSource> source, Func<TSource, int, IEnumerable<TResult>> selector)
  1259. {
  1260. _source = source;
  1261. _selector = selector;
  1262. }
  1263. protected override _ CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(this, observer, cancel);
  1264. protected override IDisposable Run(_ sink) => _source.SubscribeSafe(sink);
  1265. internal sealed class _ : Sink<TResult>, IObserver<TSource>
  1266. {
  1267. private readonly Func<TSource, int, IEnumerable<TResult>> _selector;
  1268. public _(EnumerableSelectorIndexed parent, IObserver<TResult> observer, IDisposable cancel)
  1269. : base(observer, cancel)
  1270. {
  1271. _selector = parent._selector;
  1272. }
  1273. private int _index;
  1274. public void OnNext(TSource value)
  1275. {
  1276. var xs = default(IEnumerable<TResult>);
  1277. try
  1278. {
  1279. xs = _selector(value, checked(_index++));
  1280. }
  1281. catch (Exception exception)
  1282. {
  1283. base._observer.OnError(exception);
  1284. base.Dispose();
  1285. return;
  1286. }
  1287. var e = default(IEnumerator<TResult>);
  1288. try
  1289. {
  1290. e = xs.GetEnumerator();
  1291. }
  1292. catch (Exception exception)
  1293. {
  1294. base._observer.OnError(exception);
  1295. base.Dispose();
  1296. return;
  1297. }
  1298. try
  1299. {
  1300. var hasNext = true;
  1301. while (hasNext)
  1302. {
  1303. hasNext = false;
  1304. var current = default(TResult);
  1305. try
  1306. {
  1307. hasNext = e.MoveNext();
  1308. if (hasNext)
  1309. current = e.Current;
  1310. }
  1311. catch (Exception exception)
  1312. {
  1313. base._observer.OnError(exception);
  1314. base.Dispose();
  1315. return;
  1316. }
  1317. if (hasNext)
  1318. base._observer.OnNext(current);
  1319. }
  1320. }
  1321. finally
  1322. {
  1323. if (e != null)
  1324. e.Dispose();
  1325. }
  1326. }
  1327. public void OnError(Exception error)
  1328. {
  1329. base._observer.OnError(error);
  1330. base.Dispose();
  1331. }
  1332. public void OnCompleted()
  1333. {
  1334. base._observer.OnCompleted();
  1335. base.Dispose();
  1336. }
  1337. }
  1338. }
  1339. internal sealed class TaskSelector : Producer<TResult, TaskSelector._>
  1340. {
  1341. private readonly IObservable<TSource> _source;
  1342. private readonly Func<TSource, CancellationToken, Task<TResult>> _selector;
  1343. public TaskSelector(IObservable<TSource> source, Func<TSource, CancellationToken, Task<TResult>> selector)
  1344. {
  1345. _source = source;
  1346. _selector = selector;
  1347. }
  1348. protected override _ CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(this, observer, cancel);
  1349. protected override IDisposable Run(_ sink) => sink.Run(_source);
  1350. internal sealed class _ : Sink<TResult>, IObserver<TSource>
  1351. {
  1352. private readonly object _gate = new object();
  1353. private readonly CancellationDisposable _cancel = new CancellationDisposable();
  1354. private readonly Func<TSource, CancellationToken, Task<TResult>> _selector;
  1355. public _(TaskSelector parent, IObserver<TResult> observer, IDisposable cancel)
  1356. : base(observer, cancel)
  1357. {
  1358. _selector = parent._selector;
  1359. }
  1360. private volatile int _count;
  1361. public IDisposable Run(IObservable<TSource> source)
  1362. {
  1363. _count = 1;
  1364. return StableCompositeDisposable.Create(source.SubscribeSafe(this), _cancel);
  1365. }
  1366. public void OnNext(TSource value)
  1367. {
  1368. var task = default(Task<TResult>);
  1369. try
  1370. {
  1371. Interlocked.Increment(ref _count);
  1372. task = _selector(value, _cancel.Token);
  1373. }
  1374. catch (Exception ex)
  1375. {
  1376. lock (_gate)
  1377. {
  1378. base._observer.OnError(ex);
  1379. base.Dispose();
  1380. }
  1381. return;
  1382. }
  1383. if (task.IsCompleted)
  1384. {
  1385. OnCompletedTask(task);
  1386. }
  1387. else
  1388. {
  1389. task.ContinueWith(OnCompletedTask);
  1390. }
  1391. }
  1392. private void OnCompletedTask(Task<TResult> task)
  1393. {
  1394. switch (task.Status)
  1395. {
  1396. case TaskStatus.RanToCompletion:
  1397. {
  1398. lock (_gate)
  1399. base._observer.OnNext(task.Result);
  1400. OnCompleted();
  1401. }
  1402. break;
  1403. case TaskStatus.Faulted:
  1404. {
  1405. lock (_gate)
  1406. {
  1407. base._observer.OnError(task.Exception.InnerException);
  1408. base.Dispose();
  1409. }
  1410. }
  1411. break;
  1412. case TaskStatus.Canceled:
  1413. {
  1414. if (!_cancel.IsDisposed)
  1415. {
  1416. lock (_gate)
  1417. {
  1418. base._observer.OnError(new TaskCanceledException(task));
  1419. base.Dispose();
  1420. }
  1421. }
  1422. }
  1423. break;
  1424. }
  1425. }
  1426. public void OnError(Exception error)
  1427. {
  1428. lock (_gate)
  1429. {
  1430. base._observer.OnError(error);
  1431. base.Dispose();
  1432. }
  1433. }
  1434. public void OnCompleted()
  1435. {
  1436. if (Interlocked.Decrement(ref _count) == 0)
  1437. {
  1438. lock (_gate)
  1439. {
  1440. base._observer.OnCompleted();
  1441. base.Dispose();
  1442. }
  1443. }
  1444. }
  1445. }
  1446. }
  1447. internal sealed class TaskSelectorIndexed : Producer<TResult, TaskSelectorIndexed._>
  1448. {
  1449. private readonly IObservable<TSource> _source;
  1450. private readonly Func<TSource, int, CancellationToken, Task<TResult>> _selector;
  1451. public TaskSelectorIndexed(IObservable<TSource> source, Func<TSource, int, CancellationToken, Task<TResult>> selector)
  1452. {
  1453. _source = source;
  1454. _selector = selector;
  1455. }
  1456. protected override _ CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(this, observer, cancel);
  1457. protected override IDisposable Run(_ sink) => sink.Run(_source);
  1458. internal sealed class _ : Sink<TResult>, IObserver<TSource>
  1459. {
  1460. private readonly object _gate = new object();
  1461. private readonly CancellationDisposable _cancel = new CancellationDisposable();
  1462. private readonly Func<TSource, int, CancellationToken, Task<TResult>> _selector;
  1463. public _(TaskSelectorIndexed parent, IObserver<TResult> observer, IDisposable cancel)
  1464. : base(observer, cancel)
  1465. {
  1466. _selector = parent._selector;
  1467. }
  1468. private volatile int _count;
  1469. private int _index;
  1470. public IDisposable Run(IObservable<TSource> source)
  1471. {
  1472. _count = 1;
  1473. return StableCompositeDisposable.Create(source.SubscribeSafe(this), _cancel);
  1474. }
  1475. public void OnNext(TSource value)
  1476. {
  1477. var task = default(Task<TResult>);
  1478. try
  1479. {
  1480. Interlocked.Increment(ref _count);
  1481. task = _selector(value, checked(_index++), _cancel.Token);
  1482. }
  1483. catch (Exception ex)
  1484. {
  1485. lock (_gate)
  1486. {
  1487. base._observer.OnError(ex);
  1488. base.Dispose();
  1489. }
  1490. return;
  1491. }
  1492. if (task.IsCompleted)
  1493. {
  1494. OnCompletedTask(task);
  1495. }
  1496. else
  1497. {
  1498. task.ContinueWith(OnCompletedTask);
  1499. }
  1500. }
  1501. private void OnCompletedTask(Task<TResult> task)
  1502. {
  1503. switch (task.Status)
  1504. {
  1505. case TaskStatus.RanToCompletion:
  1506. {
  1507. lock (_gate)
  1508. base._observer.OnNext(task.Result);
  1509. OnCompleted();
  1510. }
  1511. break;
  1512. case TaskStatus.Faulted:
  1513. {
  1514. lock (_gate)
  1515. {
  1516. base._observer.OnError(task.Exception.InnerException);
  1517. base.Dispose();
  1518. }
  1519. }
  1520. break;
  1521. case TaskStatus.Canceled:
  1522. {
  1523. if (!_cancel.IsDisposed)
  1524. {
  1525. lock (_gate)
  1526. {
  1527. base._observer.OnError(new TaskCanceledException(task));
  1528. base.Dispose();
  1529. }
  1530. }
  1531. }
  1532. break;
  1533. }
  1534. }
  1535. public void OnError(Exception error)
  1536. {
  1537. lock (_gate)
  1538. {
  1539. base._observer.OnError(error);
  1540. base.Dispose();
  1541. }
  1542. }
  1543. public void OnCompleted()
  1544. {
  1545. if (Interlocked.Decrement(ref _count) == 0)
  1546. {
  1547. lock (_gate)
  1548. {
  1549. base._observer.OnCompleted();
  1550. base.Dispose();
  1551. }
  1552. }
  1553. }
  1554. }
  1555. }
  1556. }
  1557. }