| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375 |
- // Licensed to the .NET Foundation under one or more agreements.
- // The .NET Foundation licenses this file to you under the MIT License.
- // See the LICENSE file in the project root for more information.
- #nullable disable
- using System.Reactive.Concurrency;
- using System.Reactive.Disposables;
- namespace System.Reactive.Linq.ObservableImpl
- {
- internal static class Generate<TState, TResult>
- {
- internal sealed class NoTime : Producer<TResult, NoTime._>
- {
- private readonly TState _initialState;
- private readonly Func<TState, bool> _condition;
- private readonly Func<TState, TState> _iterate;
- private readonly Func<TState, TResult> _resultSelector;
- private readonly IScheduler _scheduler;
- public NoTime(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, IScheduler scheduler)
- {
- _initialState = initialState;
- _condition = condition;
- _iterate = iterate;
- _resultSelector = resultSelector;
- _scheduler = scheduler;
- }
- protected override _ CreateSink(IObserver<TResult> observer) => new _(this, observer);
- protected override void Run(_ sink) => sink.Run(_scheduler);
- internal sealed class _ : IdentitySink<TResult>
- {
- private readonly Func<TState, bool> _condition;
- private readonly Func<TState, TState> _iterate;
- private readonly Func<TState, TResult> _resultSelector;
- public _(NoTime parent, IObserver<TResult> observer)
- : base(observer)
- {
- _condition = parent._condition;
- _iterate = parent._iterate;
- _resultSelector = parent._resultSelector;
- _state = parent._initialState;
- _first = true;
- }
- private TState _state;
- private bool _first;
- public void Run(IScheduler _scheduler)
- {
- var longRunning = _scheduler.AsLongRunning();
- if (longRunning != null)
- {
- SetUpstream(longRunning.ScheduleLongRunning(this, (@this, c) => @this.Loop(c)));
- }
- else
- {
- SetUpstream(_scheduler.Schedule(this, (@this, a) => @this.LoopRec(a)));
- }
- }
- private void Loop(ICancelable cancel)
- {
- while (!cancel.IsDisposed)
- {
- bool hasResult;
- var result = default(TResult);
- try
- {
- if (_first)
- {
- _first = false;
- }
- else
- {
- _state = _iterate(_state);
- }
- hasResult = _condition(_state);
- if (hasResult)
- {
- result = _resultSelector(_state);
- }
- }
- catch (Exception exception)
- {
- ForwardOnError(exception);
- return;
- }
- if (hasResult)
- {
- ForwardOnNext(result);
- }
- else
- {
- break;
- }
- }
- if (!cancel.IsDisposed)
- {
- ForwardOnCompleted();
- }
- }
- private void LoopRec(Action<_> recurse)
- {
- bool hasResult;
- var result = default(TResult);
-
- try
- {
- if (_first)
- {
- _first = false;
- }
- else
- {
- _state = _iterate(_state);
- }
- hasResult = _condition(_state);
- if (hasResult)
- {
- result = _resultSelector(_state);
- }
- }
- catch (Exception exception)
- {
- ForwardOnError(exception);
- return;
- }
- if (hasResult)
- {
- ForwardOnNext(result);
- recurse(this);
- }
- else
- {
- ForwardOnCompleted();
- }
- }
- }
- }
- internal sealed class Absolute : Producer<TResult, Absolute._>
- {
- private readonly TState _initialState;
- private readonly Func<TState, bool> _condition;
- private readonly Func<TState, TState> _iterate;
- private readonly Func<TState, TResult> _resultSelector;
- private readonly Func<TState, DateTimeOffset> _timeSelector;
- private readonly IScheduler _scheduler;
- public Absolute(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, Func<TState, DateTimeOffset> timeSelector, IScheduler scheduler)
- {
- _initialState = initialState;
- _condition = condition;
- _iterate = iterate;
- _resultSelector = resultSelector;
- _timeSelector = timeSelector;
- _scheduler = scheduler;
- }
- protected override _ CreateSink(IObserver<TResult> observer) => new _(this, observer);
- protected override void Run(_ sink) => sink.Run(_scheduler, _initialState);
- internal sealed class _ : IdentitySink<TResult>
- {
- private readonly Func<TState, bool> _condition;
- private readonly Func<TState, TState> _iterate;
- private readonly Func<TState, TResult> _resultSelector;
- private readonly Func<TState, DateTimeOffset> _timeSelector;
- public _(Absolute parent, IObserver<TResult> observer)
- : base(observer)
- {
- _condition = parent._condition;
- _iterate = parent._iterate;
- _resultSelector = parent._resultSelector;
- _timeSelector = parent._timeSelector;
- _first = true;
- }
- private bool _first;
- private bool _hasResult;
- private TResult _result;
- private IDisposable _timerDisposable;
- public void Run(IScheduler outerScheduler, TState initialState)
- {
- var timer = new SingleAssignmentDisposable();
- Disposable.TrySetMultiple(ref _timerDisposable, timer);
- timer.Disposable = outerScheduler.Schedule((@this: this, initialState), (scheduler, tuple) => [email protected](scheduler, tuple.initialState));
- }
- protected override void Dispose(bool disposing)
- {
- Disposable.Dispose(ref _timerDisposable);
- base.Dispose(disposing);
- }
- private IDisposable InvokeRec(IScheduler self, TState state)
- {
- if (_hasResult)
- {
- ForwardOnNext(_result);
- }
- var time = default(DateTimeOffset);
- try
- {
- if (_first)
- {
- _first = false;
- }
- else
- {
- state = _iterate(state);
- }
- _hasResult = _condition(state);
- if (_hasResult)
- {
- _result = _resultSelector(state);
- time = _timeSelector(state);
- }
- }
- catch (Exception exception)
- {
- ForwardOnError(exception);
- return Disposable.Empty;
- }
- if (!_hasResult)
- {
- ForwardOnCompleted();
- return Disposable.Empty;
- }
- var timer = new SingleAssignmentDisposable();
- Disposable.TrySetMultiple(ref _timerDisposable, timer);
- timer.Disposable = self.Schedule((@this: this, state), time, (scheduler, tuple) => [email protected](scheduler, tuple.state));
- return Disposable.Empty;
- }
- }
- }
- internal sealed class Relative : Producer<TResult, Relative._>
- {
- private readonly TState _initialState;
- private readonly Func<TState, bool> _condition;
- private readonly Func<TState, TState> _iterate;
- private readonly Func<TState, TResult> _resultSelector;
- private readonly Func<TState, TimeSpan> _timeSelector;
- private readonly IScheduler _scheduler;
- public Relative(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, Func<TState, TimeSpan> timeSelector, IScheduler scheduler)
- {
- _initialState = initialState;
- _condition = condition;
- _iterate = iterate;
- _resultSelector = resultSelector;
- _timeSelector = timeSelector;
- _scheduler = scheduler;
- }
- protected override _ CreateSink(IObserver<TResult> observer) => new _(this, observer);
- protected override void Run(_ sink) => sink.Run(_scheduler, _initialState);
- internal sealed class _ : IdentitySink<TResult>
- {
- private readonly Func<TState, bool> _condition;
- private readonly Func<TState, TState> _iterate;
- private readonly Func<TState, TResult> _resultSelector;
- private readonly Func<TState, TimeSpan> _timeSelector;
- public _(Relative parent, IObserver<TResult> observer)
- : base(observer)
- {
- _condition = parent._condition;
- _iterate = parent._iterate;
- _resultSelector = parent._resultSelector;
- _timeSelector = parent._timeSelector;
- _first = true;
- }
- private bool _first;
- private bool _hasResult;
- private TResult _result;
- private IDisposable _timerDisposable;
- public void Run(IScheduler outerScheduler, TState initialState)
- {
- var timer = new SingleAssignmentDisposable();
- Disposable.TrySetMultiple(ref _timerDisposable, timer);
- timer.Disposable = outerScheduler.Schedule((@this: this, initialState), (scheduler, tuple) => [email protected](scheduler, tuple.initialState));
- }
- protected override void Dispose(bool disposing)
- {
- Disposable.Dispose(ref _timerDisposable);
- base.Dispose(disposing);
- }
- private IDisposable InvokeRec(IScheduler self, TState state)
- {
- if (_hasResult)
- {
- ForwardOnNext(_result);
- }
- var time = default(TimeSpan);
- try
- {
- if (_first)
- {
- _first = false;
- }
- else
- {
- state = _iterate(state);
- }
- _hasResult = _condition(state);
- if (_hasResult)
- {
- _result = _resultSelector(state);
- time = _timeSelector(state);
- }
- }
- catch (Exception exception)
- {
- ForwardOnError(exception);
- return Disposable.Empty;
- }
- if (!_hasResult)
- {
- ForwardOnCompleted();
- return Disposable.Empty;
- }
- var timer = new SingleAssignmentDisposable();
- Disposable.TrySetMultiple(ref _timerDisposable, timer);
- timer.Disposable = self.Schedule((@this: this, state), time, (scheduler, tuple) => [email protected](scheduler, tuple.state));
- return Disposable.Empty;
- }
- }
- }
- }
- }
|