| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292 |
- // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
- #if !NO_PERF
- using System;
- using System.Diagnostics;
- using System.Reactive.Concurrency;
- using System.Reactive.Disposables;
- using System.Threading;
- namespace System.Reactive.Linq.ObservableImpl
- {
- class Generate<TState, TResult> : Producer<TResult>
- {
- private readonly TState _initialState;
- private readonly Func<TState, bool> _condition;
- private readonly Func<TState, TState> _iterate;
- private readonly Func<TState, TResult> _resultSelector;
- private readonly Func<TState, DateTimeOffset> _timeSelectorA;
- private readonly Func<TState, TimeSpan> _timeSelectorR;
- private readonly IScheduler _scheduler;
- public Generate(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, IScheduler scheduler)
- {
- _initialState = initialState;
- _condition = condition;
- _iterate = iterate;
- _resultSelector = resultSelector;
- _scheduler = scheduler;
- }
- public Generate(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, Func<TState, DateTimeOffset> timeSelector, IScheduler scheduler)
- {
- _initialState = initialState;
- _condition = condition;
- _iterate = iterate;
- _resultSelector = resultSelector;
- _timeSelectorA = timeSelector;
- _scheduler = scheduler;
- }
- public Generate(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, Func<TState, TimeSpan> timeSelector, IScheduler scheduler)
- {
- _initialState = initialState;
- _condition = condition;
- _iterate = iterate;
- _resultSelector = resultSelector;
- _timeSelectorR = timeSelector;
- _scheduler = scheduler;
- }
- protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
- {
- if (_timeSelectorA != null)
- {
- var sink = new SelectorA(this, observer, cancel);
- setSink(sink);
- return sink.Run();
- }
- else if (_timeSelectorR != null)
- {
- var sink = new Delta(this, observer, cancel);
- setSink(sink);
- return sink.Run();
- }
- else
- {
- var sink = new _(this, observer, cancel);
- setSink(sink);
- return sink.Run();
- }
- }
- class SelectorA : Sink<TResult>
- {
- private readonly Generate<TState, TResult> _parent;
- public SelectorA(Generate<TState, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
- : base(observer, cancel)
- {
- _parent = parent;
- }
- private bool _first;
- private bool _hasResult;
- private TResult _result;
- public IDisposable Run()
- {
- _first = true;
- _hasResult = false;
- _result = default(TResult);
- return _parent._scheduler.Schedule(_parent._initialState, InvokeRec);
- }
- private IDisposable InvokeRec(IScheduler self, TState state)
- {
- var time = default(DateTimeOffset);
- if (_hasResult)
- base._observer.OnNext(_result);
- try
- {
- if (_first)
- _first = false;
- else
- state = _parent._iterate(state);
- _hasResult = _parent._condition(state);
- if (_hasResult)
- {
- _result = _parent._resultSelector(state);
- time = _parent._timeSelectorA(state);
- }
- }
- catch (Exception exception)
- {
- base._observer.OnError(exception);
- base.Dispose();
- return Disposable.Empty;
- }
- if (!_hasResult)
- {
- base._observer.OnCompleted();
- base.Dispose();
- return Disposable.Empty;
- }
- return self.Schedule(state, time, InvokeRec);
- }
- }
- class Delta : Sink<TResult>
- {
- private readonly Generate<TState, TResult> _parent;
- public Delta(Generate<TState, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
- : base(observer, cancel)
- {
- _parent = parent;
- }
- private bool _first;
- private bool _hasResult;
- private TResult _result;
- public IDisposable Run()
- {
- _first = true;
- _hasResult = false;
- _result = default(TResult);
- return _parent._scheduler.Schedule(_parent._initialState, InvokeRec);
- }
- private IDisposable InvokeRec(IScheduler self, TState state)
- {
- var time = default(TimeSpan);
- if (_hasResult)
- base._observer.OnNext(_result);
- try
- {
- if (_first)
- _first = false;
- else
- state = _parent._iterate(state);
- _hasResult = _parent._condition(state);
- if (_hasResult)
- {
- _result = _parent._resultSelector(state);
- time = _parent._timeSelectorR(state);
- }
- }
- catch (Exception exception)
- {
- base._observer.OnError(exception);
- base.Dispose();
- return Disposable.Empty;
- }
- if (!_hasResult)
- {
- base._observer.OnCompleted();
- base.Dispose();
- return Disposable.Empty;
- }
- return self.Schedule(state, time, InvokeRec);
- }
- }
- class _ : Sink<TResult>
- {
- private readonly Generate<TState, TResult> _parent;
- public _(Generate<TState, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
- : base(observer, cancel)
- {
- _parent = parent;
- }
- private TState _state;
- private bool _first;
- public IDisposable Run()
- {
- _state = _parent._initialState;
- _first = true;
- var longRunning = _parent._scheduler.AsLongRunning();
- if (longRunning != null)
- {
- return longRunning.ScheduleLongRunning(Loop);
- }
- else
- {
- return _parent._scheduler.Schedule(LoopRec);
- }
- }
- private void Loop(ICancelable cancel)
- {
- while (!cancel.IsDisposed)
- {
- var hasResult = false;
- var result = default(TResult);
- try
- {
- if (_first)
- _first = false;
- else
- _state = _parent._iterate(_state);
- hasResult = _parent._condition(_state);
- if (hasResult)
- result = _parent._resultSelector(_state);
- }
- catch (Exception exception)
- {
- base._observer.OnError(exception);
- base.Dispose();
- return;
- }
- if (hasResult)
- base._observer.OnNext(result);
- else
- break;
- }
- if (!cancel.IsDisposed)
- base._observer.OnCompleted();
- base.Dispose();
- }
- private void LoopRec(Action recurse)
- {
- var hasResult = false;
- var result = default(TResult);
- try
- {
- if (_first)
- _first = false;
- else
- _state = _parent._iterate(_state);
- hasResult = _parent._condition(_state);
- if (hasResult)
- result = _parent._resultSelector(_state);
- }
- catch (Exception exception)
- {
- base._observer.OnError(exception);
- base.Dispose();
- return;
- }
- if (hasResult)
- {
- base._observer.OnNext(result);
- recurse();
- }
- else
- {
- base._observer.OnCompleted();
- base.Dispose();
- }
- }
- }
- }
- }
- #endif
|