| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292 | // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.#if !NO_PERFusing System;using System.Diagnostics;using System.Reactive.Concurrency;using System.Reactive.Disposables;using System.Threading;namespace System.Reactive.Linq.ObservableImpl{    class Generate<TState, TResult> : Producer<TResult>    {        private readonly TState _initialState;        private readonly Func<TState, bool> _condition;        private readonly Func<TState, TState> _iterate;        private readonly Func<TState, TResult> _resultSelector;        private readonly Func<TState, DateTimeOffset> _timeSelectorA;        private readonly Func<TState, TimeSpan> _timeSelectorR;        private readonly IScheduler _scheduler;        public Generate(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, IScheduler scheduler)        {            _initialState = initialState;            _condition = condition;            _iterate = iterate;            _resultSelector = resultSelector;            _scheduler = scheduler;        }        public Generate(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, Func<TState, DateTimeOffset> timeSelector, IScheduler scheduler)        {            _initialState = initialState;            _condition = condition;            _iterate = iterate;            _resultSelector = resultSelector;            _timeSelectorA = timeSelector;            _scheduler = scheduler;        }        public Generate(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, Func<TState, TimeSpan> timeSelector, IScheduler scheduler)        {            _initialState = initialState;            _condition = condition;            _iterate = iterate;            _resultSelector = resultSelector;            _timeSelectorR = timeSelector;            _scheduler = scheduler;        }        protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)        {            if (_timeSelectorA != null)            {                var sink = new SelectorA(this, observer, cancel);                setSink(sink);                return sink.Run();            }            else if (_timeSelectorR != null)            {                var sink = new Delta(this, observer, cancel);                setSink(sink);                return sink.Run();            }            else            {                var sink = new _(this, observer, cancel);                setSink(sink);                return sink.Run();            }        }        class SelectorA : Sink<TResult>        {            private readonly Generate<TState, TResult> _parent;            public SelectorA(Generate<TState, TResult> parent, IObserver<TResult> observer, IDisposable cancel)                : base(observer, cancel)            {                _parent = parent;            }            private bool _first;            private bool _hasResult;            private TResult _result;            public IDisposable Run()            {                _first = true;                _hasResult = false;                _result = default(TResult);                return _parent._scheduler.Schedule(_parent._initialState, InvokeRec);            }            private IDisposable InvokeRec(IScheduler self, TState state)            {                var time = default(DateTimeOffset);                if (_hasResult)                    base._observer.OnNext(_result);                try                {                    if (_first)                        _first = false;                    else                        state = _parent._iterate(state);                    _hasResult = _parent._condition(state);                    if (_hasResult)                    {                        _result = _parent._resultSelector(state);                        time = _parent._timeSelectorA(state);                    }                }                catch (Exception exception)                {                    base._observer.OnError(exception);                    base.Dispose();                    return Disposable.Empty;                }                if (!_hasResult)                {                    base._observer.OnCompleted();                    base.Dispose();                    return Disposable.Empty;                }                return self.Schedule(state, time, InvokeRec);            }        }        class Delta : Sink<TResult>        {            private readonly Generate<TState, TResult> _parent;            public Delta(Generate<TState, TResult> parent, IObserver<TResult> observer, IDisposable cancel)                : base(observer, cancel)            {                _parent = parent;            }            private bool _first;            private bool _hasResult;            private TResult _result;            public IDisposable Run()            {                _first = true;                _hasResult = false;                _result = default(TResult);                return _parent._scheduler.Schedule(_parent._initialState, InvokeRec);            }            private IDisposable InvokeRec(IScheduler self, TState state)            {                var time = default(TimeSpan);                if (_hasResult)                    base._observer.OnNext(_result);                try                {                    if (_first)                        _first = false;                    else                        state = _parent._iterate(state);                    _hasResult = _parent._condition(state);                    if (_hasResult)                    {                        _result = _parent._resultSelector(state);                        time = _parent._timeSelectorR(state);                    }                }                catch (Exception exception)                {                    base._observer.OnError(exception);                    base.Dispose();                    return Disposable.Empty;                }                if (!_hasResult)                {                    base._observer.OnCompleted();                    base.Dispose();                    return Disposable.Empty;                }                return self.Schedule(state, time, InvokeRec);            }        }        class _ : Sink<TResult>        {            private readonly Generate<TState, TResult> _parent;            public _(Generate<TState, TResult> parent, IObserver<TResult> observer, IDisposable cancel)                : base(observer, cancel)            {                _parent = parent;            }            private TState _state;            private bool _first;            public IDisposable Run()            {                _state = _parent._initialState;                _first = true;                var longRunning = _parent._scheduler.AsLongRunning();                if (longRunning != null)                {                    return longRunning.ScheduleLongRunning(Loop);                }                else                {                    return _parent._scheduler.Schedule(LoopRec);                }            }            private void Loop(ICancelable cancel)            {                while (!cancel.IsDisposed)                {                    var hasResult = false;                    var result = default(TResult);                    try                    {                        if (_first)                            _first = false;                        else                            _state = _parent._iterate(_state);                        hasResult = _parent._condition(_state);                        if (hasResult)                            result = _parent._resultSelector(_state);                    }                    catch (Exception exception)                    {                        base._observer.OnError(exception);                        base.Dispose();                        return;                    }                    if (hasResult)                        base._observer.OnNext(result);                    else                        break;                }                if (!cancel.IsDisposed)                    base._observer.OnCompleted();                base.Dispose();            }            private void LoopRec(Action recurse)            {                var hasResult = false;                var result = default(TResult);                try                {                    if (_first)                        _first = false;                    else                        _state = _parent._iterate(_state);                    hasResult = _parent._condition(_state);                    if (hasResult)                        result = _parent._resultSelector(_state);                }                catch (Exception exception)                {                    base._observer.OnError(exception);                    base.Dispose();                    return;                }                if (hasResult)                {                    base._observer.OnNext(result);                    recurse();                }                else                {                    base._observer.OnCompleted();                    base.Dispose();                }            }        }    }}#endif
 |