| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187 | // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.#if !NO_PERFusing System.Collections.Generic;using System.Reactive.Concurrency;using System.Reactive.Disposables;namespace System.Reactive{    abstract class TailRecursiveSink<TSource> : Sink<TSource>, IObserver<TSource>    {        public TailRecursiveSink(IObserver<TSource> observer, IDisposable cancel)            : base(observer, cancel)        {        }        private bool _isDisposed;        private SerialDisposable _subscription;        private AsyncLock _gate;        private Stack<IEnumerator<IObservable<TSource>>> _stack;        private Stack<int?> _length;        protected Action _recurse;        public IDisposable Run(IEnumerable<IObservable<TSource>> sources)        {            _isDisposed = false;            _subscription = new SerialDisposable();            _gate = new AsyncLock();            _stack = new Stack<IEnumerator<IObservable<TSource>>>();            _length = new Stack<int?>();            var e = default(IEnumerator<IObservable<TSource>>);            if (!TryGetEnumerator(sources, out e))                return Disposable.Empty;            _stack.Push(e);            _length.Push(Helpers.GetLength(sources));            var cancelable = SchedulerDefaults.TailRecursion.Schedule(self =>            {                _recurse = self;                _gate.Wait(MoveNext);            });            return new CompositeDisposable(_subscription, cancelable, Disposable.Create(() => _gate.Wait(Dispose)));        }        protected abstract IEnumerable<IObservable<TSource>> Extract(IObservable<TSource> source);        private void MoveNext()        {            var hasNext = false;            var next = default(IObservable<TSource>);            do            {                if (_stack.Count == 0)                    break;                if (_isDisposed)                    return;                var e = _stack.Peek();                var l = _length.Peek();                var current = default(IObservable<TSource>);                try                {                    hasNext = e.MoveNext();                    if (hasNext)                        current = e.Current;                }                catch (Exception ex)                {                    e.Dispose();                    base._observer.OnError(ex);                    base.Dispose();                    return;                }                if (!hasNext)                {                    e.Dispose();                    _stack.Pop();                    _length.Pop();                }                else                {                    var r = l - 1;                    _length.Pop();                    _length.Push(r);                    try                    {                        next = Helpers.Unpack(current);                    }                    catch (Exception exception)                    {                        e.Dispose();                        base._observer.OnError(exception);                        base.Dispose();                        return;                    }                    //                    // Tail recursive case; drop the current frame.                    //                    if (r == 0)                    {                        e.Dispose();                        _stack.Pop();                        _length.Pop();                    }                    //                    // Flattening of nested sequences. Prevents stack overflow in observers.                    //                    var nextSeq = Extract(next);                    if (nextSeq != null)                    {                        var nextEnumerator = default(IEnumerator<IObservable<TSource>>);                        if (!TryGetEnumerator(nextSeq, out nextEnumerator))                            return;                        _stack.Push(nextEnumerator);                        _length.Push(Helpers.GetLength(nextSeq));                        hasNext = false;                    }                }            } while (!hasNext);            if (!hasNext)            {                Done();                return;            }            var d = new SingleAssignmentDisposable();            _subscription.Disposable = d;            d.Disposable = next.SubscribeSafe(this);        }        private new void Dispose()        {            while (_stack.Count > 0)            {                var e = _stack.Pop();                _length.Pop();                e.Dispose();            }            _isDisposed = true;        }        private bool TryGetEnumerator(IEnumerable<IObservable<TSource>> sources, out IEnumerator<IObservable<TSource>> result)        {            try            {                result = sources.GetEnumerator();                return true;            }            catch (Exception exception)            {                base._observer.OnError(exception);                base.Dispose();                result = null;                return false;            }        }        public abstract void OnCompleted();        public abstract void OnError(Exception error);        public abstract void OnNext(TSource value);        protected virtual void Done()        {            base._observer.OnCompleted();            base.Dispose();        }    }}#endif
 |