|
|
@@ -7,240 +7,313 @@ using System.Reactive.Disposables;
|
|
|
|
|
|
namespace System.Reactive.Linq.ObservableImpl
|
|
|
{
|
|
|
- internal sealed class Generate<TState, TResult> : Producer<TResult>
|
|
|
+ internal static class Generate<TState, TResult>
|
|
|
{
|
|
|
- private readonly TState _initialState;
|
|
|
- private readonly Func<TState, bool> _condition;
|
|
|
- private readonly Func<TState, TState> _iterate;
|
|
|
- private readonly Func<TState, TResult> _resultSelector;
|
|
|
- private readonly Func<TState, DateTimeOffset> _timeSelectorA;
|
|
|
- private readonly Func<TState, TimeSpan> _timeSelectorR;
|
|
|
- private readonly IScheduler _scheduler;
|
|
|
-
|
|
|
- public Generate(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, IScheduler scheduler)
|
|
|
+ internal sealed class NoTime : Producer<TResult>
|
|
|
{
|
|
|
- _initialState = initialState;
|
|
|
- _condition = condition;
|
|
|
- _iterate = iterate;
|
|
|
- _resultSelector = resultSelector;
|
|
|
- _scheduler = scheduler;
|
|
|
- }
|
|
|
-
|
|
|
- public Generate(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, Func<TState, DateTimeOffset> timeSelector, IScheduler scheduler)
|
|
|
- {
|
|
|
- _initialState = initialState;
|
|
|
- _condition = condition;
|
|
|
- _iterate = iterate;
|
|
|
- _resultSelector = resultSelector;
|
|
|
- _timeSelectorA = timeSelector;
|
|
|
- _scheduler = scheduler;
|
|
|
- }
|
|
|
-
|
|
|
- public Generate(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, Func<TState, TimeSpan> timeSelector, IScheduler scheduler)
|
|
|
- {
|
|
|
- _initialState = initialState;
|
|
|
- _condition = condition;
|
|
|
- _iterate = iterate;
|
|
|
- _resultSelector = resultSelector;
|
|
|
- _timeSelectorR = timeSelector;
|
|
|
- _scheduler = scheduler;
|
|
|
- }
|
|
|
+ private readonly TState _initialState;
|
|
|
+ private readonly Func<TState, bool> _condition;
|
|
|
+ private readonly Func<TState, TState> _iterate;
|
|
|
+ private readonly Func<TState, TResult> _resultSelector;
|
|
|
+ private readonly IScheduler _scheduler;
|
|
|
|
|
|
- protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
|
|
|
- {
|
|
|
- if (_timeSelectorA != null)
|
|
|
+ public NoTime(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, IScheduler scheduler)
|
|
|
{
|
|
|
- var sink = new SelectorA(this, observer, cancel);
|
|
|
- setSink(sink);
|
|
|
- return sink.Run();
|
|
|
+ _initialState = initialState;
|
|
|
+ _condition = condition;
|
|
|
+ _iterate = iterate;
|
|
|
+ _resultSelector = resultSelector;
|
|
|
+ _scheduler = scheduler;
|
|
|
}
|
|
|
- else if (_timeSelectorR != null)
|
|
|
- {
|
|
|
- var sink = new Delta(this, observer, cancel);
|
|
|
- setSink(sink);
|
|
|
- return sink.Run();
|
|
|
- }
|
|
|
- else
|
|
|
+
|
|
|
+ protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
|
|
|
{
|
|
|
var sink = new _(this, observer, cancel);
|
|
|
setSink(sink);
|
|
|
return sink.Run();
|
|
|
}
|
|
|
- }
|
|
|
-
|
|
|
- private sealed class SelectorA : Sink<TResult>
|
|
|
- {
|
|
|
- private readonly Generate<TState, TResult> _parent;
|
|
|
|
|
|
- public SelectorA(Generate<TState, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
|
|
|
- : base(observer, cancel)
|
|
|
+ private sealed class _ : Sink<TResult>
|
|
|
{
|
|
|
- _parent = parent;
|
|
|
- }
|
|
|
-
|
|
|
- private bool _first;
|
|
|
- private bool _hasResult;
|
|
|
- private TResult _result;
|
|
|
-
|
|
|
- public IDisposable Run()
|
|
|
- {
|
|
|
- _first = true;
|
|
|
- _hasResult = false;
|
|
|
- _result = default(TResult);
|
|
|
-
|
|
|
- return _parent._scheduler.Schedule(_parent._initialState, InvokeRec);
|
|
|
- }
|
|
|
+ // CONSIDER: This sink has a parent reference that can be considered for removal.
|
|
|
|
|
|
- private IDisposable InvokeRec(IScheduler self, TState state)
|
|
|
- {
|
|
|
- var time = default(DateTimeOffset);
|
|
|
+ private readonly NoTime _parent;
|
|
|
|
|
|
- if (_hasResult)
|
|
|
+ public _(NoTime parent, IObserver<TResult> observer, IDisposable cancel)
|
|
|
+ : base(observer, cancel)
|
|
|
{
|
|
|
- base._observer.OnNext(_result);
|
|
|
+ _parent = parent;
|
|
|
}
|
|
|
|
|
|
- try
|
|
|
+ private TState _state;
|
|
|
+ private bool _first;
|
|
|
+
|
|
|
+ public IDisposable Run()
|
|
|
{
|
|
|
- if (_first)
|
|
|
+ _state = _parent._initialState;
|
|
|
+ _first = true;
|
|
|
+
|
|
|
+ var longRunning = _parent._scheduler.AsLongRunning();
|
|
|
+ if (longRunning != null)
|
|
|
{
|
|
|
- _first = false;
|
|
|
+ return longRunning.ScheduleLongRunning(Loop);
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- state = _parent._iterate(state);
|
|
|
+ return _parent._scheduler.Schedule(LoopRec);
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
- _hasResult = _parent._condition(state);
|
|
|
+ private void Loop(ICancelable cancel)
|
|
|
+ {
|
|
|
+ while (!cancel.IsDisposed)
|
|
|
+ {
|
|
|
+ var hasResult = false;
|
|
|
+ var result = default(TResult);
|
|
|
+ try
|
|
|
+ {
|
|
|
+ if (_first)
|
|
|
+ {
|
|
|
+ _first = false;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ _state = _parent._iterate(_state);
|
|
|
+ }
|
|
|
+
|
|
|
+ hasResult = _parent._condition(_state);
|
|
|
+
|
|
|
+ if (hasResult)
|
|
|
+ {
|
|
|
+ result = _parent._resultSelector(_state);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ catch (Exception exception)
|
|
|
+ {
|
|
|
+ base._observer.OnError(exception);
|
|
|
+ base.Dispose();
|
|
|
+ return;
|
|
|
+ }
|
|
|
|
|
|
- if (_hasResult)
|
|
|
+ if (hasResult)
|
|
|
+ {
|
|
|
+ base._observer.OnNext(result);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (!cancel.IsDisposed)
|
|
|
{
|
|
|
- _result = _parent._resultSelector(state);
|
|
|
- time = _parent._timeSelectorA(state);
|
|
|
+ base._observer.OnCompleted();
|
|
|
}
|
|
|
- }
|
|
|
- catch (Exception exception)
|
|
|
- {
|
|
|
- base._observer.OnError(exception);
|
|
|
+
|
|
|
base.Dispose();
|
|
|
- return Disposable.Empty;
|
|
|
}
|
|
|
|
|
|
- if (!_hasResult)
|
|
|
+ private void LoopRec(Action recurse)
|
|
|
{
|
|
|
- base._observer.OnCompleted();
|
|
|
- base.Dispose();
|
|
|
- return Disposable.Empty;
|
|
|
- }
|
|
|
+ var hasResult = false;
|
|
|
+ var result = default(TResult);
|
|
|
+ try
|
|
|
+ {
|
|
|
+ if (_first)
|
|
|
+ {
|
|
|
+ _first = false;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ _state = _parent._iterate(_state);
|
|
|
+ }
|
|
|
+
|
|
|
+ hasResult = _parent._condition(_state);
|
|
|
+
|
|
|
+ if (hasResult)
|
|
|
+ {
|
|
|
+ result = _parent._resultSelector(_state);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ catch (Exception exception)
|
|
|
+ {
|
|
|
+ base._observer.OnError(exception);
|
|
|
+ base.Dispose();
|
|
|
+ return;
|
|
|
+ }
|
|
|
|
|
|
- return self.Schedule(state, time, InvokeRec);
|
|
|
+ if (hasResult)
|
|
|
+ {
|
|
|
+ base._observer.OnNext(result);
|
|
|
+ recurse();
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ base._observer.OnCompleted();
|
|
|
+ base.Dispose();
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private sealed class Delta : Sink<TResult>
|
|
|
+ internal sealed class Absolute : Producer<TResult>
|
|
|
{
|
|
|
- private readonly Generate<TState, TResult> _parent;
|
|
|
-
|
|
|
- public Delta(Generate<TState, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
|
|
|
- : base(observer, cancel)
|
|
|
+ private readonly TState _initialState;
|
|
|
+ private readonly Func<TState, bool> _condition;
|
|
|
+ private readonly Func<TState, TState> _iterate;
|
|
|
+ private readonly Func<TState, TResult> _resultSelector;
|
|
|
+ private readonly Func<TState, DateTimeOffset> _timeSelector;
|
|
|
+ private readonly IScheduler _scheduler;
|
|
|
+
|
|
|
+ public Absolute(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, Func<TState, DateTimeOffset> timeSelector, IScheduler scheduler)
|
|
|
{
|
|
|
- _parent = parent;
|
|
|
+ _initialState = initialState;
|
|
|
+ _condition = condition;
|
|
|
+ _iterate = iterate;
|
|
|
+ _resultSelector = resultSelector;
|
|
|
+ _timeSelector = timeSelector;
|
|
|
+ _scheduler = scheduler;
|
|
|
}
|
|
|
|
|
|
- private bool _first;
|
|
|
- private bool _hasResult;
|
|
|
- private TResult _result;
|
|
|
-
|
|
|
- public IDisposable Run()
|
|
|
+ protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
|
|
|
{
|
|
|
- _first = true;
|
|
|
- _hasResult = false;
|
|
|
- _result = default(TResult);
|
|
|
-
|
|
|
- return _parent._scheduler.Schedule(_parent._initialState, InvokeRec);
|
|
|
+ var sink = new _(this, observer, cancel);
|
|
|
+ setSink(sink);
|
|
|
+ return sink.Run();
|
|
|
}
|
|
|
|
|
|
- private IDisposable InvokeRec(IScheduler self, TState state)
|
|
|
+ private sealed class _ : Sink<TResult>
|
|
|
{
|
|
|
- var time = default(TimeSpan);
|
|
|
+ // CONSIDER: This sink has a parent reference that can be considered for removal.
|
|
|
+
|
|
|
+ private readonly Absolute _parent;
|
|
|
|
|
|
- if (_hasResult)
|
|
|
+ public _(Absolute parent, IObserver<TResult> observer, IDisposable cancel)
|
|
|
+ : base(observer, cancel)
|
|
|
{
|
|
|
- base._observer.OnNext(_result);
|
|
|
+ _parent = parent;
|
|
|
}
|
|
|
|
|
|
- try
|
|
|
+ private bool _first;
|
|
|
+ private bool _hasResult;
|
|
|
+ private TResult _result;
|
|
|
+
|
|
|
+ public IDisposable Run()
|
|
|
{
|
|
|
- if (_first)
|
|
|
+ _first = true;
|
|
|
+ _hasResult = false;
|
|
|
+ _result = default(TResult);
|
|
|
+
|
|
|
+ return _parent._scheduler.Schedule(_parent._initialState, InvokeRec);
|
|
|
+ }
|
|
|
+
|
|
|
+ private IDisposable InvokeRec(IScheduler self, TState state)
|
|
|
+ {
|
|
|
+ var time = default(DateTimeOffset);
|
|
|
+
|
|
|
+ if (_hasResult)
|
|
|
{
|
|
|
- _first = false;
|
|
|
+ base._observer.OnNext(_result);
|
|
|
}
|
|
|
- else
|
|
|
+
|
|
|
+ try
|
|
|
{
|
|
|
- state = _parent._iterate(state);
|
|
|
- }
|
|
|
+ if (_first)
|
|
|
+ {
|
|
|
+ _first = false;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ state = _parent._iterate(state);
|
|
|
+ }
|
|
|
|
|
|
- _hasResult = _parent._condition(state);
|
|
|
+ _hasResult = _parent._condition(state);
|
|
|
|
|
|
- if (_hasResult)
|
|
|
+ if (_hasResult)
|
|
|
+ {
|
|
|
+ _result = _parent._resultSelector(state);
|
|
|
+ time = _parent._timeSelector(state);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ catch (Exception exception)
|
|
|
+ {
|
|
|
+ base._observer.OnError(exception);
|
|
|
+ base.Dispose();
|
|
|
+ return Disposable.Empty;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (!_hasResult)
|
|
|
{
|
|
|
- _result = _parent._resultSelector(state);
|
|
|
- time = _parent._timeSelectorR(state);
|
|
|
+ base._observer.OnCompleted();
|
|
|
+ base.Dispose();
|
|
|
+ return Disposable.Empty;
|
|
|
}
|
|
|
- }
|
|
|
- catch (Exception exception)
|
|
|
- {
|
|
|
- base._observer.OnError(exception);
|
|
|
- base.Dispose();
|
|
|
- return Disposable.Empty;
|
|
|
- }
|
|
|
|
|
|
- if (!_hasResult)
|
|
|
- {
|
|
|
- base._observer.OnCompleted();
|
|
|
- base.Dispose();
|
|
|
- return Disposable.Empty;
|
|
|
+ return self.Schedule(state, time, InvokeRec);
|
|
|
}
|
|
|
-
|
|
|
- return self.Schedule(state, time, InvokeRec);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private sealed class _ : Sink<TResult>
|
|
|
+ internal sealed class Relative : Producer<TResult>
|
|
|
{
|
|
|
- private readonly Generate<TState, TResult> _parent;
|
|
|
-
|
|
|
- public _(Generate<TState, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
|
|
|
- : base(observer, cancel)
|
|
|
+ private readonly TState _initialState;
|
|
|
+ private readonly Func<TState, bool> _condition;
|
|
|
+ private readonly Func<TState, TState> _iterate;
|
|
|
+ private readonly Func<TState, TResult> _resultSelector;
|
|
|
+ private readonly Func<TState, TimeSpan> _timeSelector;
|
|
|
+ private readonly IScheduler _scheduler;
|
|
|
+
|
|
|
+ public Relative(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, Func<TState, TimeSpan> timeSelector, IScheduler scheduler)
|
|
|
{
|
|
|
- _parent = parent;
|
|
|
+ _initialState = initialState;
|
|
|
+ _condition = condition;
|
|
|
+ _iterate = iterate;
|
|
|
+ _resultSelector = resultSelector;
|
|
|
+ _timeSelector = timeSelector;
|
|
|
+ _scheduler = scheduler;
|
|
|
}
|
|
|
|
|
|
- private TState _state;
|
|
|
- private bool _first;
|
|
|
+ protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
|
|
|
+ {
|
|
|
+ var sink = new _(this, observer, cancel);
|
|
|
+ setSink(sink);
|
|
|
+ return sink.Run();
|
|
|
+ }
|
|
|
|
|
|
- public IDisposable Run()
|
|
|
+ private sealed class _ : Sink<TResult>
|
|
|
{
|
|
|
- _state = _parent._initialState;
|
|
|
- _first = true;
|
|
|
+ // CONSIDER: This sink has a parent reference that can be considered for removal.
|
|
|
+
|
|
|
+ private readonly Relative _parent;
|
|
|
|
|
|
- var longRunning = _parent._scheduler.AsLongRunning();
|
|
|
- if (longRunning != null)
|
|
|
+ public _(Relative parent, IObserver<TResult> observer, IDisposable cancel)
|
|
|
+ : base(observer, cancel)
|
|
|
{
|
|
|
- return longRunning.ScheduleLongRunning(Loop);
|
|
|
+ _parent = parent;
|
|
|
}
|
|
|
- else
|
|
|
+
|
|
|
+ private bool _first;
|
|
|
+ private bool _hasResult;
|
|
|
+ private TResult _result;
|
|
|
+
|
|
|
+ public IDisposable Run()
|
|
|
{
|
|
|
- return _parent._scheduler.Schedule(LoopRec);
|
|
|
+ _first = true;
|
|
|
+ _hasResult = false;
|
|
|
+ _result = default(TResult);
|
|
|
+
|
|
|
+ return _parent._scheduler.Schedule(_parent._initialState, InvokeRec);
|
|
|
}
|
|
|
- }
|
|
|
|
|
|
- private void Loop(ICancelable cancel)
|
|
|
- {
|
|
|
- while (!cancel.IsDisposed)
|
|
|
+ private IDisposable InvokeRec(IScheduler self, TState state)
|
|
|
{
|
|
|
- var hasResult = false;
|
|
|
- var result = default(TResult);
|
|
|
+ var time = default(TimeSpan);
|
|
|
+
|
|
|
+ if (_hasResult)
|
|
|
+ {
|
|
|
+ base._observer.OnNext(_result);
|
|
|
+ }
|
|
|
+
|
|
|
try
|
|
|
{
|
|
|
if (_first)
|
|
|
@@ -249,79 +322,32 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- _state = _parent._iterate(_state);
|
|
|
+ state = _parent._iterate(state);
|
|
|
}
|
|
|
|
|
|
- hasResult = _parent._condition(_state);
|
|
|
+ _hasResult = _parent._condition(state);
|
|
|
|
|
|
- if (hasResult)
|
|
|
+ if (_hasResult)
|
|
|
{
|
|
|
- result = _parent._resultSelector(_state);
|
|
|
+ _result = _parent._resultSelector(state);
|
|
|
+ time = _parent._timeSelector(state);
|
|
|
}
|
|
|
}
|
|
|
catch (Exception exception)
|
|
|
{
|
|
|
base._observer.OnError(exception);
|
|
|
base.Dispose();
|
|
|
- return;
|
|
|
+ return Disposable.Empty;
|
|
|
}
|
|
|
|
|
|
- if (hasResult)
|
|
|
- {
|
|
|
- base._observer.OnNext(result);
|
|
|
- }
|
|
|
- else
|
|
|
+ if (!_hasResult)
|
|
|
{
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- if (!cancel.IsDisposed)
|
|
|
- {
|
|
|
- base._observer.OnCompleted();
|
|
|
- }
|
|
|
-
|
|
|
- base.Dispose();
|
|
|
- }
|
|
|
-
|
|
|
- private void LoopRec(Action recurse)
|
|
|
- {
|
|
|
- var hasResult = false;
|
|
|
- var result = default(TResult);
|
|
|
- try
|
|
|
- {
|
|
|
- if (_first)
|
|
|
- {
|
|
|
- _first = false;
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- _state = _parent._iterate(_state);
|
|
|
- }
|
|
|
-
|
|
|
- hasResult = _parent._condition(_state);
|
|
|
-
|
|
|
- if (hasResult)
|
|
|
- {
|
|
|
- result = _parent._resultSelector(_state);
|
|
|
+ base._observer.OnCompleted();
|
|
|
+ base.Dispose();
|
|
|
+ return Disposable.Empty;
|
|
|
}
|
|
|
- }
|
|
|
- catch (Exception exception)
|
|
|
- {
|
|
|
- base._observer.OnError(exception);
|
|
|
- base.Dispose();
|
|
|
- return;
|
|
|
- }
|
|
|
|
|
|
- if (hasResult)
|
|
|
- {
|
|
|
- base._observer.OnNext(result);
|
|
|
- recurse();
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- base._observer.OnCompleted();
|
|
|
- base.Dispose();
|
|
|
+ return self.Schedule(state, time, InvokeRec);
|
|
|
}
|
|
|
}
|
|
|
}
|