1
0

SelectMany.cs 34 KB


  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. #if !NO_PERF
  3. using System;
  4. using System.Collections.Generic;
  5. using System.Reactive;
  6. using System.Reactive.Disposables;
  7. #if !NO_TPL
  8. using System.Threading;
  9. using System.Threading.Tasks;
  10. #endif
  11. namespace System.Reactive.Linq.Observαble
  12. {
  13. class SelectMany<TSource, TCollection, TResult> : Producer<TResult>
  14. {
  15. private readonly IObservable<TSource> _source;
  16. private readonly Func<TSource, IObservable<TCollection>> _collectionSelector;
  17. private readonly Func<TSource, int, IObservable<TCollection>> _collectionSelectorWithIndex;
  18. private readonly Func<TSource, IEnumerable<TCollection>> _collectionSelectorE;
  19. private readonly Func<TSource, int, IEnumerable<TCollection>> _collectionSelectorEWithIndex;
  20. private readonly Func<TSource, TCollection, TResult> _resultSelector;
  21. private readonly Func<TSource, int, TCollection, int, TResult> _resultSelectorWithIndex;
  22. public SelectMany(IObservable<TSource> source, Func<TSource, IObservable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
  23. {
  24. _source = source;
  25. _collectionSelector = collectionSelector;
  26. _resultSelector = resultSelector;
  27. }
  28. public SelectMany(IObservable<TSource> source, Func<TSource, int, IObservable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector)
  29. {
  30. _source = source;
  31. _collectionSelectorWithIndex = collectionSelector;
  32. _resultSelectorWithIndex = resultSelector;
  33. }
  34. public SelectMany(IObservable<TSource> source, Func<TSource, IEnumerable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
  35. {
  36. _source = source;
  37. _collectionSelectorE = collectionSelector;
  38. _resultSelector = resultSelector;
  39. }
  40. public SelectMany(IObservable<TSource> source, Func<TSource, int, IEnumerable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector)
  41. {
  42. _source = source;
  43. _collectionSelectorEWithIndex = collectionSelector;
  44. _resultSelectorWithIndex = resultSelector;
  45. }
  46. #if !NO_TPL
  47. private readonly Func<TSource, CancellationToken, Task<TCollection>> _collectionSelectorT;
  48. public SelectMany(IObservable<TSource> source, Func<TSource, CancellationToken, Task<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
  49. {
  50. _source = source;
  51. _collectionSelectorT = collectionSelector;
  52. _resultSelector = resultSelector;
  53. }
  54. #endif
  55. protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
  56. {
  57. if (_collectionSelector != null || _collectionSelectorWithIndex != null)
  58. {
  59. var sink = new _(this, observer, cancel);
  60. setSink(sink);
  61. return sink.Run();
  62. }
  63. #if !NO_TPL
  64. else if (_collectionSelectorT != null)
  65. {
  66. var sink = new τ(this, observer, cancel);
  67. setSink(sink);
  68. return sink.Run();
  69. }
  70. #endif
  71. else
  72. {
  73. var sink = new ε(this, observer, cancel);
  74. setSink(sink);
  75. return _source.SubscribeSafe(sink);
  76. }
  77. }
  78. class _ : Sink<TResult>, IObserver<TSource>
  79. {
  80. private readonly SelectMany<TSource, TCollection, TResult> _parent;
  81. public _(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
  82. : base(observer, cancel)
  83. {
  84. _parent = parent;
  85. _indexInSource = -1;
  86. }
  87. private object _gate;
  88. private bool _isStopped;
  89. private CompositeDisposable _group;
  90. private SingleAssignmentDisposable _sourceSubscription;
  91. private int _indexInSource;
  92. public IDisposable Run()
  93. {
  94. _gate = new object();
  95. _isStopped = false;
  96. _group = new CompositeDisposable();
  97. _sourceSubscription = new SingleAssignmentDisposable();
  98. _group.Add(_sourceSubscription);
  99. _sourceSubscription.Disposable = _parent._source.SubscribeSafe(this);
  100. return _group;
  101. }
  102. public void OnNext(TSource value)
  103. {
  104. var collection = default(IObservable<TCollection>);
  105. try
  106. {
  107. if (_parent._collectionSelector != null)
  108. collection = _parent._collectionSelector(value);
  109. else
  110. {
  111. checked { _indexInSource++; }
  112. collection = _parent._collectionSelectorWithIndex(value, _indexInSource);
  113. }
  114. }
  115. catch (Exception ex)
  116. {
  117. lock (_gate)
  118. {
  119. base._observer.OnError(ex);
  120. base.Dispose();
  121. }
  122. return;
  123. }
  124. var innerSubscription = new SingleAssignmentDisposable();
  125. _group.Add(innerSubscription);
  126. innerSubscription.Disposable = collection.SubscribeSafe(new ι(this, value, innerSubscription, _indexInSource));
  127. }
  128. public void OnError(Exception error)
  129. {
  130. lock (_gate)
  131. {
  132. base._observer.OnError(error);
  133. base.Dispose();
  134. }
  135. }
  136. public void OnCompleted()
  137. {
  138. _isStopped = true;
  139. if (_group.Count == 1)
  140. {
  141. //
  142. // Notice there can be a race between OnCompleted of the source and any
  143. // of the inner sequences, where both see _group.Count == 1, and one is
  144. // waiting for the lock. There won't be a double OnCompleted observation
  145. // though, because the call to Dispose silences the observer by swapping
  146. // in a NopObserver<T>.
  147. //
  148. lock (_gate)
  149. {
  150. base._observer.OnCompleted();
  151. base.Dispose();
  152. }
  153. }
  154. else
  155. {
  156. _sourceSubscription.Dispose();
  157. }
  158. }
  159. class ι : IObserver<TCollection>
  160. {
  161. private readonly _ _parent;
  162. private readonly TSource _value;
  163. private readonly IDisposable _self;
  164. private int _indexInSource;
  165. private int _indexInIntermediate = -1;
  166. public ι(_ parent, TSource value, IDisposable self, int indexInSource)
  167. {
  168. _parent = parent;
  169. _value = value;
  170. _self = self;
  171. _indexInSource = indexInSource;
  172. _indexInIntermediate = -1;
  173. }
  174. public void OnNext(TCollection value)
  175. {
  176. var res = default(TResult);
  177. try
  178. {
  179. if (_parent._parent._resultSelector != null)
  180. res = _parent._parent._resultSelector(_value, value);
  181. else
  182. {
  183. checked { _indexInIntermediate++; }
  184. res = _parent._parent._resultSelectorWithIndex(_value, _indexInSource, value, _indexInIntermediate);
  185. }
  186. }
  187. catch (Exception ex)
  188. {
  189. lock (_parent._gate)
  190. {
  191. _parent._observer.OnError(ex);
  192. _parent.Dispose();
  193. }
  194. return;
  195. }
  196. lock (_parent._gate)
  197. _parent._observer.OnNext(res);
  198. }
  199. public void OnError(Exception error)
  200. {
  201. lock (_parent._gate)
  202. {
  203. _parent._observer.OnError(error);
  204. _parent.Dispose();
  205. }
  206. }
  207. public void OnCompleted()
  208. {
  209. _parent._group.Remove(_self);
  210. if (_parent._isStopped && _parent._group.Count == 1)
  211. {
  212. //
  213. // Notice there can be a race between OnCompleted of the source and any
  214. // of the inner sequences, where both see _group.Count == 1, and one is
  215. // waiting for the lock. There won't be a double OnCompleted observation
  216. // though, because the call to Dispose silences the observer by swapping
  217. // in a NopObserver<T>.
  218. //
  219. lock (_parent._gate)
  220. {
  221. _parent._observer.OnCompleted();
  222. _parent.Dispose();
  223. }
  224. }
  225. }
  226. }
  227. }
  228. class ε : Sink<TResult>, IObserver<TSource>
  229. {
  230. private readonly SelectMany<TSource, TCollection, TResult> _parent;
  231. private int _indexInSource; // The "Weird SelectMany" requires indices in the original collection as well as an intermediate collection
  232. public ε(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
  233. : base(observer, cancel)
  234. {
  235. _parent = parent;
  236. _indexInSource = -1;
  237. }
  238. public void OnNext(TSource value)
  239. {
  240. var xs = default(IEnumerable<TCollection>);
  241. try
  242. {
  243. if (_parent._collectionSelectorE != null)
  244. xs = _parent._collectionSelectorE(value);
  245. else
  246. {
  247. checked { _indexInSource++; }
  248. xs = _parent._collectionSelectorEWithIndex(value, _indexInSource);
  249. }
  250. }
  251. catch (Exception exception)
  252. {
  253. base._observer.OnError(exception);
  254. base.Dispose();
  255. return;
  256. }
  257. var e = default(IEnumerator<TCollection>);
  258. try
  259. {
  260. e = xs.GetEnumerator();
  261. }
  262. catch (Exception exception)
  263. {
  264. base._observer.OnError(exception);
  265. base.Dispose();
  266. return;
  267. }
  268. try
  269. {
  270. int indexInIntermediate = -1;
  271. var hasNext = true;
  272. while (hasNext)
  273. {
  274. hasNext = false;
  275. var current = default(TResult);
  276. try
  277. {
  278. hasNext = e.MoveNext();
  279. if (hasNext)
  280. {
  281. if (_parent._resultSelector != null)
  282. current = _parent._resultSelector(value, e.Current);
  283. else
  284. {
  285. checked { indexInIntermediate++; }
  286. current = _parent._resultSelectorWithIndex(value, _indexInSource, e.Current, indexInIntermediate);
  287. }
  288. }
  289. }
  290. catch (Exception exception)
  291. {
  292. base._observer.OnError(exception);
  293. base.Dispose();
  294. return;
  295. }
  296. if (hasNext)
  297. base._observer.OnNext(current);
  298. }
  299. }
  300. finally
  301. {
  302. if (e != null)
  303. e.Dispose();
  304. }
  305. }
  306. public void OnError(Exception error)
  307. {
  308. base._observer.OnError(error);
  309. base.Dispose();
  310. }
  311. public void OnCompleted()
  312. {
  313. base._observer.OnCompleted();
  314. base.Dispose();
  315. }
  316. }
  317. #if !NO_TPL
  318. #pragma warning disable 0420
  319. class τ : Sink<TResult>, IObserver<TSource>
  320. {
  321. private readonly SelectMany<TSource, TCollection, TResult> _parent;
  322. public τ(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
  323. : base(observer, cancel)
  324. {
  325. _parent = parent;
  326. }
  327. private object _gate;
  328. private CancellationDisposable _cancel;
  329. private volatile int _count;
  330. public IDisposable Run()
  331. {
  332. _gate = new object();
  333. _cancel = new CancellationDisposable();
  334. _count = 1;
  335. return new CompositeDisposable(_parent._source.SubscribeSafe(this), _cancel);
  336. }
  337. public void OnNext(TSource value)
  338. {
  339. var task = default(Task<TCollection>);
  340. try
  341. {
  342. Interlocked.Increment(ref _count);
  343. task = _parent._collectionSelectorT(value, _cancel.Token);
  344. }
  345. catch (Exception ex)
  346. {
  347. lock (_gate)
  348. {
  349. base._observer.OnError(ex);
  350. base.Dispose();
  351. }
  352. return;
  353. }
  354. if (task.IsCompleted)
  355. {
  356. OnCompletedTask(value, task);
  357. }
  358. else
  359. {
  360. AttachContinuation(value, task);
  361. }
  362. }
  363. private void AttachContinuation(TSource value, Task<TCollection> task)
  364. {
  365. //
  366. // Separate method to avoid closure in synchronous completion case.
  367. //
  368. task.ContinueWith(t => OnCompletedTask(value, t));
  369. }
  370. private void OnCompletedTask(TSource value, Task<TCollection> task)
  371. {
  372. switch (task.Status)
  373. {
  374. case TaskStatus.RanToCompletion:
  375. {
  376. var res = default(TResult);
  377. try
  378. {
  379. res = _parent._resultSelector(value, task.Result);
  380. }
  381. catch (Exception ex)
  382. {
  383. lock (_gate)
  384. {
  385. base._observer.OnError(ex);
  386. base.Dispose();
  387. }
  388. return;
  389. }
  390. lock (_gate)
  391. base._observer.OnNext(res);
  392. OnCompleted();
  393. }
  394. break;
  395. case TaskStatus.Faulted:
  396. {
  397. lock (_gate)
  398. {
  399. base._observer.OnError(task.Exception.InnerException);
  400. base.Dispose();
  401. }
  402. }
  403. break;
  404. case TaskStatus.Canceled:
  405. {
  406. if (!_cancel.IsDisposed)
  407. {
  408. lock (_gate)
  409. {
  410. base._observer.OnError(new TaskCanceledException(task));
  411. base.Dispose();
  412. }
  413. }
  414. }
  415. break;
  416. }
  417. }
  418. public void OnError(Exception error)
  419. {
  420. lock (_gate)
  421. {
  422. base._observer.OnError(error);
  423. base.Dispose();
  424. }
  425. }
  426. public void OnCompleted()
  427. {
  428. if (Interlocked.Decrement(ref _count) == 0)
  429. {
  430. lock (_gate)
  431. {
  432. base._observer.OnCompleted();
  433. base.Dispose();
  434. }
  435. }
  436. }
  437. }
  438. #pragma warning restore 0420
  439. #endif
  440. }
  441. class SelectMany<TSource, TResult> : Producer<TResult>
  442. {
  443. private readonly IObservable<TSource> _source;
  444. private readonly Func<TSource, IObservable<TResult>> _selector;
  445. private readonly Func<Exception, IObservable<TResult>> _selectorOnError;
  446. private readonly Func<IObservable<TResult>> _selectorOnCompleted;
  447. private readonly Func<TSource, int, IObservable<TResult>> _selectorWithIndex;
  448. private readonly Func<Exception, int, IObservable<TResult>> _selectorWithIndexOnError;
  449. private readonly Func<int, IObservable<TResult>> _selectorWithIndexOnCompleted;
  450. private readonly Func<TSource, IEnumerable<TResult>> _selectorE;
  451. private readonly Func<TSource, int, IEnumerable<TResult>> _selectorEWithIndex;
  452. public SelectMany(IObservable<TSource> source, Func<TSource, IObservable<TResult>> selector)
  453. {
  454. _source = source;
  455. _selector = selector;
  456. }
  457. public SelectMany(IObservable<TSource> source, Func<TSource, IObservable<TResult>> selector, Func<Exception, IObservable<TResult>> selectorOnError, Func<IObservable<TResult>> selectorOnCompleted)
  458. {
  459. _source = source;
  460. _selector = selector;
  461. _selectorOnError = selectorOnError;
  462. _selectorOnCompleted = selectorOnCompleted;
  463. }
  464. public SelectMany(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector)
  465. {
  466. _source = source;
  467. _selectorWithIndex = selector;
  468. }
  469. public SelectMany(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector, Func<Exception, int, IObservable<TResult>> selectorOnError, Func<int, IObservable<TResult>> selectorOnCompleted)
  470. {
  471. _source = source;
  472. _selectorWithIndex = selector;
  473. _selectorWithIndexOnError = selectorOnError;
  474. _selectorWithIndexOnCompleted = selectorOnCompleted;
  475. }
  476. public SelectMany(IObservable<TSource> source, Func<TSource, IEnumerable<TResult>> selector)
  477. {
  478. _source = source;
  479. _selectorE = selector;
  480. }
  481. public SelectMany(IObservable<TSource> source, Func<TSource, int, IEnumerable<TResult>> selector)
  482. {
  483. _source = source;
  484. _selectorEWithIndex = selector;
  485. }
  486. #if !NO_TPL
  487. private readonly Func<TSource, CancellationToken, Task<TResult>> _selectorT;
  488. public SelectMany(IObservable<TSource> source, Func<TSource, CancellationToken, Task<TResult>> selector)
  489. {
  490. _source = source;
  491. _selectorT = selector;
  492. }
  493. #endif
  494. protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
  495. {
  496. if (_selector != null || _selectorWithIndex != null)
  497. {
  498. var sink = new _(this, observer, cancel);
  499. setSink(sink);
  500. return sink.Run();
  501. }
  502. #if !NO_TPL
  503. else if (_selectorT != null)
  504. {
  505. var sink = new τ(this, observer, cancel);
  506. setSink(sink);
  507. return sink.Run();
  508. }
  509. #endif
  510. else
  511. {
  512. var sink = new ε(this, observer, cancel);
  513. setSink(sink);
  514. return _source.SubscribeSafe(sink);
  515. }
  516. }
  517. class _ : Sink<TResult>, IObserver<TSource>
  518. {
  519. private readonly SelectMany<TSource, TResult> _parent;
  520. public _(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
  521. : base(observer, cancel)
  522. {
  523. _parent = parent;
  524. _index = -1;
  525. }
  526. private object _gate;
  527. private bool _isStopped;
  528. private CompositeDisposable _group;
  529. private SingleAssignmentDisposable _sourceSubscription;
  530. private int _index;
  531. public IDisposable Run()
  532. {
  533. _gate = new object();
  534. _isStopped = false;
  535. _group = new CompositeDisposable();
  536. _sourceSubscription = new SingleAssignmentDisposable();
  537. _group.Add(_sourceSubscription);
  538. _sourceSubscription.Disposable = _parent._source.SubscribeSafe(this);
  539. return _group;
  540. }
  541. public void OnNext(TSource value)
  542. {
  543. var inner = default(IObservable<TResult>);
  544. try
  545. {
  546. if (_parent._selector != null)
  547. inner = _parent._selector(value);
  548. else
  549. {
  550. checked { _index++; }
  551. inner = _parent._selectorWithIndex(value, _index);
  552. }
  553. }
  554. catch (Exception ex)
  555. {
  556. lock (_gate)
  557. {
  558. base._observer.OnError(ex);
  559. base.Dispose();
  560. }
  561. return;
  562. }
  563. SubscribeInner(inner);
  564. }
  565. public void OnError(Exception error)
  566. {
  567. if (_parent._selectorOnError != null)
  568. {
  569. var inner = default(IObservable<TResult>);
  570. try
  571. {
  572. if (_parent._selectorOnError != null)
  573. inner = _parent._selectorOnError(error);
  574. else
  575. {
  576. checked { _index++; }
  577. inner = _parent._selectorWithIndexOnError(error, _index);
  578. }
  579. }
  580. catch (Exception ex)
  581. {
  582. lock (_gate)
  583. {
  584. base._observer.OnError(ex);
  585. base.Dispose();
  586. }
  587. return;
  588. }
  589. SubscribeInner(inner);
  590. Final();
  591. }
  592. else
  593. {
  594. lock (_gate)
  595. {
  596. base._observer.OnError(error);
  597. base.Dispose();
  598. }
  599. }
  600. }
  601. public void OnCompleted()
  602. {
  603. if (_parent._selectorOnCompleted != null)
  604. {
  605. var inner = default(IObservable<TResult>);
  606. try
  607. {
  608. if (_parent._selectorOnCompleted != null)
  609. inner = _parent._selectorOnCompleted();
  610. else
  611. inner = _parent._selectorWithIndexOnCompleted(_index);
  612. }
  613. catch (Exception ex)
  614. {
  615. lock (_gate)
  616. {
  617. base._observer.OnError(ex);
  618. base.Dispose();
  619. }
  620. return;
  621. }
  622. SubscribeInner(inner);
  623. }
  624. Final();
  625. }
  626. private void Final()
  627. {
  628. _isStopped = true;
  629. if (_group.Count == 1)
  630. {
  631. //
  632. // Notice there can be a race between OnCompleted of the source and any
  633. // of the inner sequences, where both see _group.Count == 1, and one is
  634. // waiting for the lock. There won't be a double OnCompleted observation
  635. // though, because the call to Dispose silences the observer by swapping
  636. // in a NopObserver<T>.
  637. //
  638. lock (_gate)
  639. {
  640. base._observer.OnCompleted();
  641. base.Dispose();
  642. }
  643. }
  644. else
  645. {
  646. _sourceSubscription.Dispose();
  647. }
  648. }
  649. private void SubscribeInner(IObservable<TResult> inner)
  650. {
  651. var innerSubscription = new SingleAssignmentDisposable();
  652. _group.Add(innerSubscription);
  653. innerSubscription.Disposable = inner.SubscribeSafe(new ι(this, innerSubscription));
  654. }
  655. class ι : IObserver<TResult>
  656. {
  657. private readonly _ _parent;
  658. private readonly IDisposable _self;
  659. public ι(_ parent, IDisposable self)
  660. {
  661. _parent = parent;
  662. _self = self;
  663. }
  664. public void OnNext(TResult value)
  665. {
  666. lock (_parent._gate)
  667. _parent._observer.OnNext(value);
  668. }
  669. public void OnError(Exception error)
  670. {
  671. lock (_parent._gate)
  672. {
  673. _parent._observer.OnError(error);
  674. _parent.Dispose();
  675. }
  676. }
  677. public void OnCompleted()
  678. {
  679. _parent._group.Remove(_self);
  680. if (_parent._isStopped && _parent._group.Count == 1)
  681. {
  682. //
  683. // Notice there can be a race between OnCompleted of the source and any
  684. // of the inner sequences, where both see _group.Count == 1, and one is
  685. // waiting for the lock. There won't be a double OnCompleted observation
  686. // though, because the call to Dispose silences the observer by swapping
  687. // in a NopObserver<T>.
  688. //
  689. lock (_parent._gate)
  690. {
  691. _parent._observer.OnCompleted();
  692. _parent.Dispose();
  693. }
  694. }
  695. }
  696. }
  697. }
  698. class ε : Sink<TResult>, IObserver<TSource>
  699. {
  700. private readonly SelectMany<TSource, TResult> _parent;
  701. private int _index;
  702. public ε(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
  703. : base(observer, cancel)
  704. {
  705. _parent = parent;
  706. _index = -1;
  707. }
  708. public void OnNext(TSource value)
  709. {
  710. var xs = default(IEnumerable<TResult>);
  711. try
  712. {
  713. if (_parent._selectorE != null)
  714. xs = _parent._selectorE(value);
  715. else
  716. {
  717. checked { _index++; }
  718. xs = _parent._selectorEWithIndex(value, _index);
  719. }
  720. }
  721. catch (Exception exception)
  722. {
  723. base._observer.OnError(exception);
  724. base.Dispose();
  725. return;
  726. }
  727. var e = default(IEnumerator<TResult>);
  728. try
  729. {
  730. e = xs.GetEnumerator();
  731. }
  732. catch (Exception exception)
  733. {
  734. base._observer.OnError(exception);
  735. base.Dispose();
  736. return;
  737. }
  738. try
  739. {
  740. var hasNext = true;
  741. while (hasNext)
  742. {
  743. hasNext = false;
  744. var current = default(TResult);
  745. try
  746. {
  747. hasNext = e.MoveNext();
  748. if (hasNext)
  749. current = e.Current;
  750. }
  751. catch (Exception exception)
  752. {
  753. base._observer.OnError(exception);
  754. base.Dispose();
  755. return;
  756. }
  757. if (hasNext)
  758. base._observer.OnNext(current);
  759. }
  760. }
  761. finally
  762. {
  763. if (e != null)
  764. e.Dispose();
  765. }
  766. }
  767. public void OnError(Exception error)
  768. {
  769. base._observer.OnError(error);
  770. base.Dispose();
  771. }
  772. public void OnCompleted()
  773. {
  774. base._observer.OnCompleted();
  775. base.Dispose();
  776. }
  777. }
  778. #if !NO_TPL
  779. #pragma warning disable 0420
  780. class τ : Sink<TResult>, IObserver<TSource>
  781. {
  782. private readonly SelectMany<TSource, TResult> _parent;
  783. public τ(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
  784. : base(observer, cancel)
  785. {
  786. _parent = parent;
  787. }
  788. private object _gate;
  789. private CancellationDisposable _cancel;
  790. private volatile int _count;
  791. public IDisposable Run()
  792. {
  793. _gate = new object();
  794. _cancel = new CancellationDisposable();
  795. _count = 1;
  796. return new CompositeDisposable(_parent._source.SubscribeSafe(this), _cancel);
  797. }
  798. public void OnNext(TSource value)
  799. {
  800. var task = default(Task<TResult>);
  801. try
  802. {
  803. Interlocked.Increment(ref _count);
  804. task = _parent._selectorT(value, _cancel.Token);
  805. }
  806. catch (Exception ex)
  807. {
  808. lock (_gate)
  809. {
  810. base._observer.OnError(ex);
  811. base.Dispose();
  812. }
  813. return;
  814. }
  815. if (task.IsCompleted)
  816. {
  817. OnCompletedTask(task);
  818. }
  819. else
  820. {
  821. task.ContinueWith(OnCompletedTask);
  822. }
  823. }
  824. private void OnCompletedTask(Task<TResult> task)
  825. {
  826. switch (task.Status)
  827. {
  828. case TaskStatus.RanToCompletion:
  829. {
  830. lock (_gate)
  831. base._observer.OnNext(task.Result);
  832. OnCompleted();
  833. }
  834. break;
  835. case TaskStatus.Faulted:
  836. {
  837. lock (_gate)
  838. {
  839. base._observer.OnError(task.Exception.InnerException);
  840. base.Dispose();
  841. }
  842. }
  843. break;
  844. case TaskStatus.Canceled:
  845. {
  846. if (!_cancel.IsDisposed)
  847. {
  848. lock (_gate)
  849. {
  850. base._observer.OnError(new TaskCanceledException(task));
  851. base.Dispose();
  852. }
  853. }
  854. }
  855. break;
  856. }
  857. }
  858. public void OnError(Exception error)
  859. {
  860. lock (_gate)
  861. {
  862. base._observer.OnError(error);
  863. base.Dispose();
  864. }
  865. }
  866. public void OnCompleted()
  867. {
  868. if (Interlocked.Decrement(ref _count) == 0)
  869. {
  870. lock (_gate)
  871. {
  872. base._observer.OnCompleted();
  873. base.Dispose();
  874. }
  875. }
  876. }
  877. }
  878. #pragma warning restore 0420
  879. #endif
  880. }
  881. }
  882. #endif