// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. #if !NO_PERF using System.Collections.Generic; using System.Reactive.Disposables; #if !NO_TPL using System.Threading; using System.Threading.Tasks; #endif namespace System.Reactive.Linq.ObservableImpl { class SelectMany : Producer { private readonly IObservable _source; private readonly Func> _collectionSelector; private readonly Func> _collectionSelectorI; private readonly Func> _collectionSelectorE; private readonly Func> _collectionSelectorEI; private readonly Func _resultSelector; private readonly Func _resultSelectorI; public SelectMany(IObservable source, Func> collectionSelector, Func resultSelector) { _source = source; _collectionSelector = collectionSelector; _resultSelector = resultSelector; } public SelectMany(IObservable source, Func> collectionSelector, Func resultSelector) { _source = source; _collectionSelectorI = collectionSelector; _resultSelectorI = resultSelector; } public SelectMany(IObservable source, Func> collectionSelector, Func resultSelector) { _source = source; _collectionSelectorE = collectionSelector; _resultSelector = resultSelector; } public SelectMany(IObservable source, Func> collectionSelector, Func resultSelector) { _source = source; _collectionSelectorEI = collectionSelector; _resultSelectorI = resultSelector; } #if !NO_TPL private readonly Func> _collectionSelectorT; private readonly Func> _collectionSelectorTI; private readonly Func _resultSelectorTI; public SelectMany(IObservable source, Func> collectionSelector, Func resultSelector) { _source = source; _collectionSelectorT = collectionSelector; _resultSelector = resultSelector; } public SelectMany(IObservable source, Func> collectionSelector, Func resultSelector) { _source = source; _collectionSelectorTI = collectionSelector; _resultSelectorTI = resultSelector; } #endif protected override IDisposable Run(IObserver observer, IDisposable cancel, Action setSink) { if (_collectionSelector != null) { var sink = new _(this, observer, cancel); setSink(sink); return sink.Run(); } else if (_collectionSelectorI != null) { var sink = new IndexSelectorImpl(this, observer, cancel); setSink(sink); return sink.Run(); } #if !NO_TPL else if (_collectionSelectorT != null) { var sink = new SelectManyImpl(this, observer, cancel); setSink(sink); return sink.Run(); } else if (_collectionSelectorTI != null) { var sink = new Sigma(this, observer, cancel); setSink(sink); return sink.Run(); } #endif else if (_collectionSelectorE != null) { var sink = new NoSelectorImpl(this, observer, cancel); setSink(sink); return _source.SubscribeSafe(sink); } else { var sink = new Omega(this, observer, cancel); setSink(sink); return _source.SubscribeSafe(sink); } } class _ : Sink, IObserver { private readonly SelectMany _parent; public _(SelectMany parent, IObserver observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; } private object _gate; private bool _isStopped; private CompositeDisposable _group; private SingleAssignmentDisposable _sourceSubscription; public IDisposable Run() { _gate = new object(); _isStopped = false; _group = new CompositeDisposable(); _sourceSubscription = new SingleAssignmentDisposable(); _group.Add(_sourceSubscription); _sourceSubscription.Disposable = _parent._source.SubscribeSafe(this); return _group; } public void OnNext(TSource value) { var collection = default(IObservable); try { collection = _parent._collectionSelector(value); } catch (Exception ex) { lock (_gate) { base._observer.OnError(ex); base.Dispose(); } return; } var innerSubscription = new SingleAssignmentDisposable(); _group.Add(innerSubscription); innerSubscription.Disposable = collection.SubscribeSafe(new Iter(this, value, innerSubscription)); } public void OnError(Exception error) { lock (_gate) { base._observer.OnError(error); base.Dispose(); } } public void OnCompleted() { _isStopped = true; if (_group.Count == 1) { // // Notice there can be a race between OnCompleted of the source and any // of the inner sequences, where both see _group.Count == 1, and one is // waiting for the lock. There won't be a double OnCompleted observation // though, because the call to Dispose silences the observer by swapping // in a NopObserver. // lock (_gate) { base._observer.OnCompleted(); base.Dispose(); } } else { _sourceSubscription.Dispose(); } } class Iter : IObserver { private readonly _ _parent; private readonly TSource _value; private readonly IDisposable _self; public Iter(_ parent, TSource value, IDisposable self) { _parent = parent; _value = value; _self = self; } public void OnNext(TCollection value) { var res = default(TResult); try { res = _parent._parent._resultSelector(_value, value); } catch (Exception ex) { lock (_parent._gate) { _parent._observer.OnError(ex); _parent.Dispose(); } return; } lock (_parent._gate) _parent._observer.OnNext(res); } public void OnError(Exception error) { lock (_parent._gate) { _parent._observer.OnError(error); _parent.Dispose(); } } public void OnCompleted() { _parent._group.Remove(_self); if (_parent._isStopped && _parent._group.Count == 1) { // // Notice there can be a race between OnCompleted of the source and any // of the inner sequences, where both see _group.Count == 1, and one is // waiting for the lock. There won't be a double OnCompleted observation // though, because the call to Dispose silences the observer by swapping // in a NopObserver. // lock (_parent._gate) { _parent._observer.OnCompleted(); _parent.Dispose(); } } } } } class IndexSelectorImpl : Sink, IObserver { private readonly SelectMany _parent; public IndexSelectorImpl(SelectMany parent, IObserver observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; } private object _gate; private bool _isStopped; private CompositeDisposable _group; private SingleAssignmentDisposable _sourceSubscription; private int _index; public IDisposable Run() { _gate = new object(); _isStopped = false; _group = new CompositeDisposable(); _sourceSubscription = new SingleAssignmentDisposable(); _group.Add(_sourceSubscription); _sourceSubscription.Disposable = _parent._source.SubscribeSafe(this); return _group; } public void OnNext(TSource value) { var index = checked(_index++); var collection = default(IObservable); try { collection = _parent._collectionSelectorI(value, index); } catch (Exception ex) { lock (_gate) { base._observer.OnError(ex); base.Dispose(); } return; } var innerSubscription = new SingleAssignmentDisposable(); _group.Add(innerSubscription); innerSubscription.Disposable = collection.SubscribeSafe(new Iter(this, value, index, innerSubscription)); } public void OnError(Exception error) { lock (_gate) { base._observer.OnError(error); base.Dispose(); } } public void OnCompleted() { _isStopped = true; if (_group.Count == 1) { // // Notice there can be a race between OnCompleted of the source and any // of the inner sequences, where both see _group.Count == 1, and one is // waiting for the lock. There won't be a double OnCompleted observation // though, because the call to Dispose silences the observer by swapping // in a NopObserver. // lock (_gate) { base._observer.OnCompleted(); base.Dispose(); } } else { _sourceSubscription.Dispose(); } } class Iter : IObserver { private readonly IndexSelectorImpl _parent; private readonly TSource _value; private readonly int _valueIndex; private readonly IDisposable _self; public Iter(IndexSelectorImpl parent, TSource value, int index, IDisposable self) { _parent = parent; _value = value; _valueIndex = index; _self = self; } private int _index; public void OnNext(TCollection value) { var res = default(TResult); try { res = _parent._parent._resultSelectorI(_value, _valueIndex, value, checked(_index++)); } catch (Exception ex) { lock (_parent._gate) { _parent._observer.OnError(ex); _parent.Dispose(); } return; } lock (_parent._gate) _parent._observer.OnNext(res); } public void OnError(Exception error) { lock (_parent._gate) { _parent._observer.OnError(error); _parent.Dispose(); } } public void OnCompleted() { _parent._group.Remove(_self); if (_parent._isStopped && _parent._group.Count == 1) { // // Notice there can be a race between OnCompleted of the source and any // of the inner sequences, where both see _group.Count == 1, and one is // waiting for the lock. There won't be a double OnCompleted observation // though, because the call to Dispose silences the observer by swapping // in a NopObserver. // lock (_parent._gate) { _parent._observer.OnCompleted(); _parent.Dispose(); } } } } } class NoSelectorImpl : Sink, IObserver { private readonly SelectMany _parent; public NoSelectorImpl(SelectMany parent, IObserver observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; } public void OnNext(TSource value) { var xs = default(IEnumerable); try { xs = _parent._collectionSelectorE(value); } catch (Exception exception) { base._observer.OnError(exception); base.Dispose(); return; } var e = default(IEnumerator); try { e = xs.GetEnumerator(); } catch (Exception exception) { base._observer.OnError(exception); base.Dispose(); return; } try { var hasNext = true; while (hasNext) { hasNext = false; var current = default(TResult); try { hasNext = e.MoveNext(); if (hasNext) current = _parent._resultSelector(value, e.Current); } catch (Exception exception) { base._observer.OnError(exception); base.Dispose(); return; } if (hasNext) base._observer.OnNext(current); } } finally { if (e != null) e.Dispose(); } } public void OnError(Exception error) { base._observer.OnError(error); base.Dispose(); } public void OnCompleted() { base._observer.OnCompleted(); base.Dispose(); } } class Omega : Sink, IObserver { private readonly SelectMany _parent; public Omega(SelectMany parent, IObserver observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; } private int _index; public void OnNext(TSource value) { var index = checked(_index++); var xs = default(IEnumerable); try { xs = _parent._collectionSelectorEI(value, index); } catch (Exception exception) { base._observer.OnError(exception); base.Dispose(); return; } var e = default(IEnumerator); try { e = xs.GetEnumerator(); } catch (Exception exception) { base._observer.OnError(exception); base.Dispose(); return; } try { var eIndex = 0; var hasNext = true; while (hasNext) { hasNext = false; var current = default(TResult); try { hasNext = e.MoveNext(); if (hasNext) current = _parent._resultSelectorI(value, index, e.Current, checked(eIndex++)); } catch (Exception exception) { base._observer.OnError(exception); base.Dispose(); return; } if (hasNext) base._observer.OnNext(current); } } finally { if (e != null) e.Dispose(); } } public void OnError(Exception error) { base._observer.OnError(error); base.Dispose(); } public void OnCompleted() { base._observer.OnCompleted(); base.Dispose(); } } #if !NO_TPL #pragma warning disable 0420 class SelectManyImpl : Sink, IObserver { private readonly SelectMany _parent; public SelectManyImpl(SelectMany parent, IObserver observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; } private object _gate; private CancellationDisposable _cancel; private volatile int _count; public IDisposable Run() { _gate = new object(); _cancel = new CancellationDisposable(); _count = 1; return StableCompositeDisposable.Create(_parent._source.SubscribeSafe(this), _cancel); } public void OnNext(TSource value) { var task = default(Task); try { Interlocked.Increment(ref _count); task = _parent._collectionSelectorT(value, _cancel.Token); } catch (Exception ex) { lock (_gate) { base._observer.OnError(ex); base.Dispose(); } return; } if (task.IsCompleted) { OnCompletedTask(value, task); } else { AttachContinuation(value, task); } } private void AttachContinuation(TSource value, Task task) { // // Separate method to avoid closure in synchronous completion case. // task.ContinueWith(t => OnCompletedTask(value, t)); } private void OnCompletedTask(TSource value, Task task) { switch (task.Status) { case TaskStatus.RanToCompletion: { var res = default(TResult); try { res = _parent._resultSelector(value, task.Result); } catch (Exception ex) { lock (_gate) { base._observer.OnError(ex); base.Dispose(); } return; } lock (_gate) base._observer.OnNext(res); OnCompleted(); } break; case TaskStatus.Faulted: { lock (_gate) { base._observer.OnError(task.Exception.InnerException); base.Dispose(); } } break; case TaskStatus.Canceled: { if (!_cancel.IsDisposed) { lock (_gate) { base._observer.OnError(new TaskCanceledException(task)); base.Dispose(); } } } break; } } public void OnError(Exception error) { lock (_gate) { base._observer.OnError(error); base.Dispose(); } } public void OnCompleted() { if (Interlocked.Decrement(ref _count) == 0) { lock (_gate) { base._observer.OnCompleted(); base.Dispose(); } } } } class Sigma : Sink, IObserver { private readonly SelectMany _parent; public Sigma(SelectMany parent, IObserver observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; } private object _gate; private CancellationDisposable _cancel; private volatile int _count; private int _index; public IDisposable Run() { _gate = new object(); _cancel = new CancellationDisposable(); _count = 1; return StableCompositeDisposable.Create(_parent._source.SubscribeSafe(this), _cancel); } public void OnNext(TSource value) { var index = checked(_index++); var task = default(Task); try { Interlocked.Increment(ref _count); task = _parent._collectionSelectorTI(value, index, _cancel.Token); } catch (Exception ex) { lock (_gate) { base._observer.OnError(ex); base.Dispose(); } return; } if (task.IsCompleted) { OnCompletedTask(value, index, task); } else { AttachContinuation(value, index, task); } } private void AttachContinuation(TSource value, int index, Task task) { // // Separate method to avoid closure in synchronous completion case. // task.ContinueWith(t => OnCompletedTask(value, index, t)); } private void OnCompletedTask(TSource value, int index, Task task) { switch (task.Status) { case TaskStatus.RanToCompletion: { var res = default(TResult); try { res = _parent._resultSelectorTI(value, index, task.Result); } catch (Exception ex) { lock (_gate) { base._observer.OnError(ex); base.Dispose(); } return; } lock (_gate) base._observer.OnNext(res); OnCompleted(); } break; case TaskStatus.Faulted: { lock (_gate) { base._observer.OnError(task.Exception.InnerException); base.Dispose(); } } break; case TaskStatus.Canceled: { if (!_cancel.IsDisposed) { lock (_gate) { base._observer.OnError(new TaskCanceledException(task)); base.Dispose(); } } } break; } } public void OnError(Exception error) { lock (_gate) { base._observer.OnError(error); base.Dispose(); } } public void OnCompleted() { if (Interlocked.Decrement(ref _count) == 0) { lock (_gate) { base._observer.OnCompleted(); base.Dispose(); } } } } #pragma warning restore 0420 #endif } class SelectMany : Producer { private readonly IObservable _source; private readonly Func> _selector; private readonly Func> _selectorI; private readonly Func> _selectorOnError; private readonly Func> _selectorOnCompleted; private readonly Func> _selectorE; private readonly Func> _selectorEI; public SelectMany(IObservable source, Func> selector) { _source = source; _selector = selector; } public SelectMany(IObservable source, Func> selector) { _source = source; _selectorI = selector; } public SelectMany(IObservable source, Func> selector, Func> selectorOnError, Func> selectorOnCompleted) { _source = source; _selector = selector; _selectorOnError = selectorOnError; _selectorOnCompleted = selectorOnCompleted; } public SelectMany(IObservable source, Func> selector, Func> selectorOnError, Func> selectorOnCompleted) { _source = source; _selectorI = selector; _selectorOnError = selectorOnError; _selectorOnCompleted = selectorOnCompleted; } public SelectMany(IObservable source, Func> selector) { _source = source; _selectorE = selector; } public SelectMany(IObservable source, Func> selector) { _source = source; _selectorEI = selector; } #if !NO_TPL private readonly Func> _selectorT; private readonly Func> _selectorTI; public SelectMany(IObservable source, Func> selector) { _source = source; _selectorT = selector; } public SelectMany(IObservable source, Func> selector) { _source = source; _selectorTI = selector; } #endif protected override IDisposable Run(IObserver observer, IDisposable cancel, Action setSink) { if (_selector != null) { var sink = new _(this, observer, cancel); setSink(sink); return sink.Run(); } else if (_selectorI != null) { var sink = new IndexSelectorImpl(this, observer, cancel); setSink(sink); return sink.Run(); } #if !NO_TPL else if (_selectorT != null) { var sink = new SelectManyImpl(this, observer, cancel); setSink(sink); return sink.Run(); } else if (_selectorTI != null) { var sink = new Sigma(this, observer, cancel); setSink(sink); return sink.Run(); } #endif else if (_selectorE != null) { var sink = new NoSelectorImpl(this, observer, cancel); setSink(sink); return _source.SubscribeSafe(sink); } else { var sink = new Omega(this, observer, cancel); setSink(sink); return _source.SubscribeSafe(sink); } } class _ : Sink, IObserver { private readonly SelectMany _parent; public _(SelectMany parent, IObserver observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; } private object _gate; private bool _isStopped; private CompositeDisposable _group; private SingleAssignmentDisposable _sourceSubscription; public IDisposable Run() { _gate = new object(); _isStopped = false; _group = new CompositeDisposable(); _sourceSubscription = new SingleAssignmentDisposable(); _group.Add(_sourceSubscription); _sourceSubscription.Disposable = _parent._source.SubscribeSafe(this); return _group; } public void OnNext(TSource value) { var inner = default(IObservable); try { inner = _parent._selector(value); } catch (Exception ex) { lock (_gate) { base._observer.OnError(ex); base.Dispose(); } return; } SubscribeInner(inner); } public void OnError(Exception error) { if (_parent._selectorOnError != null) { var inner = default(IObservable); try { inner = _parent._selectorOnError(error); } catch (Exception ex) { lock (_gate) { base._observer.OnError(ex); base.Dispose(); } return; } SubscribeInner(inner); Final(); } else { lock (_gate) { base._observer.OnError(error); base.Dispose(); } } } public void OnCompleted() { if (_parent._selectorOnCompleted != null) { var inner = default(IObservable); try { inner = _parent._selectorOnCompleted(); } catch (Exception ex) { lock (_gate) { base._observer.OnError(ex); base.Dispose(); } return; } SubscribeInner(inner); } Final(); } private void Final() { _isStopped = true; if (_group.Count == 1) { // // Notice there can be a race between OnCompleted of the source and any // of the inner sequences, where both see _group.Count == 1, and one is // waiting for the lock. There won't be a double OnCompleted observation // though, because the call to Dispose silences the observer by swapping // in a NopObserver. // lock (_gate) { base._observer.OnCompleted(); base.Dispose(); } } else { _sourceSubscription.Dispose(); } } private void SubscribeInner(IObservable inner) { var innerSubscription = new SingleAssignmentDisposable(); _group.Add(innerSubscription); innerSubscription.Disposable = inner.SubscribeSafe(new Iter(this, innerSubscription)); } class Iter : IObserver { private readonly _ _parent; private readonly IDisposable _self; public Iter(_ parent, IDisposable self) { _parent = parent; _self = self; } public void OnNext(TResult value) { lock (_parent._gate) _parent._observer.OnNext(value); } public void OnError(Exception error) { lock (_parent._gate) { _parent._observer.OnError(error); _parent.Dispose(); } } public void OnCompleted() { _parent._group.Remove(_self); if (_parent._isStopped && _parent._group.Count == 1) { // // Notice there can be a race between OnCompleted of the source and any // of the inner sequences, where both see _group.Count == 1, and one is // waiting for the lock. There won't be a double OnCompleted observation // though, because the call to Dispose silences the observer by swapping // in a NopObserver. // lock (_parent._gate) { _parent._observer.OnCompleted(); _parent.Dispose(); } } } } } class IndexSelectorImpl : Sink, IObserver { private readonly SelectMany _parent; public IndexSelectorImpl(SelectMany parent, IObserver observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; } private object _gate; private bool _isStopped; private CompositeDisposable _group; private SingleAssignmentDisposable _sourceSubscription; private int _index; public IDisposable Run() { _gate = new object(); _isStopped = false; _group = new CompositeDisposable(); _sourceSubscription = new SingleAssignmentDisposable(); _group.Add(_sourceSubscription); _sourceSubscription.Disposable = _parent._source.SubscribeSafe(this); return _group; } public void OnNext(TSource value) { var inner = default(IObservable); try { inner = _parent._selectorI(value, checked(_index++)); } catch (Exception ex) { lock (_gate) { base._observer.OnError(ex); base.Dispose(); } return; } SubscribeInner(inner); } public void OnError(Exception error) { if (_parent._selectorOnError != null) { var inner = default(IObservable); try { inner = _parent._selectorOnError(error); } catch (Exception ex) { lock (_gate) { base._observer.OnError(ex); base.Dispose(); } return; } SubscribeInner(inner); Final(); } else { lock (_gate) { base._observer.OnError(error); base.Dispose(); } } } public void OnCompleted() { if (_parent._selectorOnCompleted != null) { var inner = default(IObservable); try { inner = _parent._selectorOnCompleted(); } catch (Exception ex) { lock (_gate) { base._observer.OnError(ex); base.Dispose(); } return; } SubscribeInner(inner); } Final(); } private void Final() { _isStopped = true; if (_group.Count == 1) { // // Notice there can be a race between OnCompleted of the source and any // of the inner sequences, where both see _group.Count == 1, and one is // waiting for the lock. There won't be a double OnCompleted observation // though, because the call to Dispose silences the observer by swapping // in a NopObserver. // lock (_gate) { base._observer.OnCompleted(); base.Dispose(); } } else { _sourceSubscription.Dispose(); } } private void SubscribeInner(IObservable inner) { var innerSubscription = new SingleAssignmentDisposable(); _group.Add(innerSubscription); innerSubscription.Disposable = inner.SubscribeSafe(new Iter(this, innerSubscription)); } class Iter : IObserver { private readonly IndexSelectorImpl _parent; private readonly IDisposable _self; public Iter(IndexSelectorImpl parent, IDisposable self) { _parent = parent; _self = self; } public void OnNext(TResult value) { lock (_parent._gate) _parent._observer.OnNext(value); } public void OnError(Exception error) { lock (_parent._gate) { _parent._observer.OnError(error); _parent.Dispose(); } } public void OnCompleted() { _parent._group.Remove(_self); if (_parent._isStopped && _parent._group.Count == 1) { // // Notice there can be a race between OnCompleted of the source and any // of the inner sequences, where both see _group.Count == 1, and one is // waiting for the lock. There won't be a double OnCompleted observation // though, because the call to Dispose silences the observer by swapping // in a NopObserver. // lock (_parent._gate) { _parent._observer.OnCompleted(); _parent.Dispose(); } } } } } class NoSelectorImpl : Sink, IObserver { private readonly SelectMany _parent; public NoSelectorImpl(SelectMany parent, IObserver observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; } public void OnNext(TSource value) { var xs = default(IEnumerable); try { xs = _parent._selectorE(value); } catch (Exception exception) { base._observer.OnError(exception); base.Dispose(); return; } var e = default(IEnumerator); try { e = xs.GetEnumerator(); } catch (Exception exception) { base._observer.OnError(exception); base.Dispose(); return; } try { var hasNext = true; while (hasNext) { hasNext = false; var current = default(TResult); try { hasNext = e.MoveNext(); if (hasNext) current = e.Current; } catch (Exception exception) { base._observer.OnError(exception); base.Dispose(); return; } if (hasNext) base._observer.OnNext(current); } } finally { if (e != null) e.Dispose(); } } public void OnError(Exception error) { base._observer.OnError(error); base.Dispose(); } public void OnCompleted() { base._observer.OnCompleted(); base.Dispose(); } } class Omega : Sink, IObserver { private readonly SelectMany _parent; public Omega(SelectMany parent, IObserver observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; } private int _index; public void OnNext(TSource value) { var xs = default(IEnumerable); try { xs = _parent._selectorEI(value, checked(_index++)); } catch (Exception exception) { base._observer.OnError(exception); base.Dispose(); return; } var e = default(IEnumerator); try { e = xs.GetEnumerator(); } catch (Exception exception) { base._observer.OnError(exception); base.Dispose(); return; } try { var hasNext = true; while (hasNext) { hasNext = false; var current = default(TResult); try { hasNext = e.MoveNext(); if (hasNext) current = e.Current; } catch (Exception exception) { base._observer.OnError(exception); base.Dispose(); return; } if (hasNext) base._observer.OnNext(current); } } finally { if (e != null) e.Dispose(); } } public void OnError(Exception error) { base._observer.OnError(error); base.Dispose(); } public void OnCompleted() { base._observer.OnCompleted(); base.Dispose(); } } #if !NO_TPL #pragma warning disable 0420 class SelectManyImpl : Sink, IObserver { private readonly SelectMany _parent; public SelectManyImpl(SelectMany parent, IObserver observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; } private object _gate; private CancellationDisposable _cancel; private volatile int _count; public IDisposable Run() { _gate = new object(); _cancel = new CancellationDisposable(); _count = 1; return StableCompositeDisposable.Create(_parent._source.SubscribeSafe(this), _cancel); } public void OnNext(TSource value) { var task = default(Task); try { Interlocked.Increment(ref _count); task = _parent._selectorT(value, _cancel.Token); } catch (Exception ex) { lock (_gate) { base._observer.OnError(ex); base.Dispose(); } return; } if (task.IsCompleted) { OnCompletedTask(task); } else { task.ContinueWith(OnCompletedTask); } } private void OnCompletedTask(Task task) { switch (task.Status) { case TaskStatus.RanToCompletion: { lock (_gate) base._observer.OnNext(task.Result); OnCompleted(); } break; case TaskStatus.Faulted: { lock (_gate) { base._observer.OnError(task.Exception.InnerException); base.Dispose(); } } break; case TaskStatus.Canceled: { if (!_cancel.IsDisposed) { lock (_gate) { base._observer.OnError(new TaskCanceledException(task)); base.Dispose(); } } } break; } } public void OnError(Exception error) { lock (_gate) { base._observer.OnError(error); base.Dispose(); } } public void OnCompleted() { if (Interlocked.Decrement(ref _count) == 0) { lock (_gate) { base._observer.OnCompleted(); base.Dispose(); } } } } class Sigma : Sink, IObserver { private readonly SelectMany _parent; public Sigma(SelectMany parent, IObserver observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; } private object _gate; private CancellationDisposable _cancel; private volatile int _count; private int _index; public IDisposable Run() { _gate = new object(); _cancel = new CancellationDisposable(); _count = 1; return StableCompositeDisposable.Create(_parent._source.SubscribeSafe(this), _cancel); } public void OnNext(TSource value) { var task = default(Task); try { Interlocked.Increment(ref _count); task = _parent._selectorTI(value, checked(_index++), _cancel.Token); } catch (Exception ex) { lock (_gate) { base._observer.OnError(ex); base.Dispose(); } return; } if (task.IsCompleted) { OnCompletedTask(task); } else { task.ContinueWith(OnCompletedTask); } } private void OnCompletedTask(Task task) { switch (task.Status) { case TaskStatus.RanToCompletion: { lock (_gate) base._observer.OnNext(task.Result); OnCompleted(); } break; case TaskStatus.Faulted: { lock (_gate) { base._observer.OnError(task.Exception.InnerException); base.Dispose(); } } break; case TaskStatus.Canceled: { if (!_cancel.IsDisposed) { lock (_gate) { base._observer.OnError(new TaskCanceledException(task)); base.Dispose(); } } } break; } } public void OnError(Exception error) { lock (_gate) { base._observer.OnError(error); base.Dispose(); } } public void OnCompleted() { if (Interlocked.Decrement(ref _count) == 0) { lock (_gate) { base._observer.OnCompleted(); base.Dispose(); } } } } #pragma warning restore 0420 #endif } } #endif