|
|
@@ -9,12 +9,12 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
{
|
|
|
internal static class Repeat<TResult>
|
|
|
{
|
|
|
- internal sealed class Forever : Producer<TResult, Forever._>
|
|
|
+ internal sealed class ForeverRecursive : Producer<TResult, ForeverRecursive._>
|
|
|
{
|
|
|
private readonly TResult _value;
|
|
|
private readonly IScheduler _scheduler;
|
|
|
|
|
|
- public Forever(TResult value, IScheduler scheduler)
|
|
|
+ public ForeverRecursive(TResult value, IScheduler scheduler)
|
|
|
{
|
|
|
_value = value;
|
|
|
_scheduler = scheduler;
|
|
|
@@ -22,35 +22,75 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
|
|
|
protected override _ CreateSink(IObserver<TResult> observer) => new _(_value, observer);
|
|
|
|
|
|
- protected override void Run(_ sink) => sink.Run(this);
|
|
|
+ protected override void Run(_ sink) => sink.Run(_scheduler);
|
|
|
|
|
|
internal sealed class _ : IdentitySink<TResult>
|
|
|
{
|
|
|
private readonly TResult _value;
|
|
|
|
|
|
+ IDisposable _task;
|
|
|
+
|
|
|
public _(TResult value, IObserver<TResult> observer)
|
|
|
: base(observer)
|
|
|
{
|
|
|
_value = value;
|
|
|
}
|
|
|
|
|
|
- public void Run(Forever parent)
|
|
|
+ public void Run(IScheduler scheduler)
|
|
|
{
|
|
|
- var longRunning = parent._scheduler.AsLongRunning();
|
|
|
- if (longRunning != null)
|
|
|
- {
|
|
|
- SetUpstream(longRunning.ScheduleLongRunning(this, (@this, c) => @this.LoopInf(c)));
|
|
|
- }
|
|
|
- else
|
|
|
+ var first = scheduler.Schedule(this, (innerScheduler, @this) => @this.LoopRecInf(innerScheduler));
|
|
|
+ Disposable.TrySetSingle(ref _task, first);
|
|
|
+ }
|
|
|
+
|
|
|
+ protected override void Dispose(bool disposing)
|
|
|
+ {
|
|
|
+ base.Dispose(disposing);
|
|
|
+ if (disposing)
|
|
|
{
|
|
|
- SetUpstream(parent._scheduler.Schedule(this, (@this, a) => @this.LoopRecInf(a)));
|
|
|
+ Disposable.TryDispose(ref _task);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void LoopRecInf(Action<_> recurse)
|
|
|
+ private IDisposable LoopRecInf(IScheduler scheduler)
|
|
|
{
|
|
|
ForwardOnNext(_value);
|
|
|
- recurse(this);
|
|
|
+
|
|
|
+ var next = scheduler.Schedule(this, (innerScheduler, @this) => @this.LoopRecInf(innerScheduler));
|
|
|
+ Disposable.TrySetMultiple(ref _task, next);
|
|
|
+
|
|
|
+ return Disposable.Empty;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ internal sealed class ForeverLongRunning : Producer<TResult, ForeverLongRunning._>
|
|
|
+ {
|
|
|
+ private readonly TResult _value;
|
|
|
+ private readonly ISchedulerLongRunning _scheduler;
|
|
|
+
|
|
|
+ public ForeverLongRunning(TResult value, ISchedulerLongRunning scheduler)
|
|
|
+ {
|
|
|
+ _value = value;
|
|
|
+ _scheduler = scheduler;
|
|
|
+ }
|
|
|
+
|
|
|
+ protected override _ CreateSink(IObserver<TResult> observer) => new _(_value, observer);
|
|
|
+
|
|
|
+ protected override void Run(_ sink) => sink.Run(_scheduler);
|
|
|
+
|
|
|
+ internal sealed class _ : IdentitySink<TResult>
|
|
|
+ {
|
|
|
+ private readonly TResult _value;
|
|
|
+
|
|
|
+ public _(TResult value, IObserver<TResult> observer)
|
|
|
+ : base(observer)
|
|
|
+ {
|
|
|
+ _value = value;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void Run(ISchedulerLongRunning longRunning)
|
|
|
+ {
|
|
|
+ SetUpstream(longRunning.ScheduleLongRunning(this, (@this, c) => @this.LoopInf(c)));
|
|
|
}
|
|
|
|
|
|
private void LoopInf(ICancelable cancel)
|
|
|
@@ -66,66 +106,115 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- internal sealed class Count : Producer<TResult, Count._>
|
|
|
+ internal sealed class CountRecursive : Producer<TResult, CountRecursive._>
|
|
|
{
|
|
|
private readonly TResult _value;
|
|
|
private readonly IScheduler _scheduler;
|
|
|
private readonly int _repeatCount;
|
|
|
|
|
|
- public Count(TResult value, int repeatCount, IScheduler scheduler)
|
|
|
+ public CountRecursive(TResult value, int repeatCount, IScheduler scheduler)
|
|
|
{
|
|
|
_value = value;
|
|
|
_scheduler = scheduler;
|
|
|
_repeatCount = repeatCount;
|
|
|
}
|
|
|
|
|
|
- protected override _ CreateSink(IObserver<TResult> observer) => new _(_value, observer);
|
|
|
+ protected override _ CreateSink(IObserver<TResult> observer) => new _(_value, _repeatCount, observer);
|
|
|
|
|
|
- protected override void Run(_ sink) => sink.Run(this);
|
|
|
+ protected override void Run(_ sink) => sink.Run(_scheduler);
|
|
|
|
|
|
internal sealed class _ : IdentitySink<TResult>
|
|
|
{
|
|
|
private readonly TResult _value;
|
|
|
|
|
|
- public _(TResult value, IObserver<TResult> observer)
|
|
|
+ int _remaining;
|
|
|
+
|
|
|
+ IDisposable _task;
|
|
|
+
|
|
|
+ public _(TResult value, int repeatCount, IObserver<TResult> observer)
|
|
|
: base(observer)
|
|
|
{
|
|
|
_value = value;
|
|
|
+ _remaining = repeatCount;
|
|
|
}
|
|
|
|
|
|
- public void Run(Count parent)
|
|
|
+ public void Run(IScheduler scheduler)
|
|
|
{
|
|
|
- var longRunning = parent._scheduler.AsLongRunning();
|
|
|
- if (longRunning != null)
|
|
|
- {
|
|
|
- SetUpstream(longRunning.ScheduleLongRunning(parent._repeatCount, Loop));
|
|
|
- }
|
|
|
- else
|
|
|
+ var first = scheduler.Schedule(this, (innerScheduler, @this) => @this.LoopRec(innerScheduler));
|
|
|
+ Disposable.TrySetSingle(ref _task, first);
|
|
|
+ }
|
|
|
+
|
|
|
+ protected override void Dispose(bool disposing)
|
|
|
+ {
|
|
|
+ base.Dispose(disposing);
|
|
|
+ if (disposing)
|
|
|
{
|
|
|
- SetUpstream(parent._scheduler.Schedule(parent._repeatCount, LoopRec));
|
|
|
+ Disposable.TryDispose(ref _task);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void LoopRec(int n, Action<int> recurse)
|
|
|
+ private IDisposable LoopRec(IScheduler scheduler)
|
|
|
{
|
|
|
- if (n > 0)
|
|
|
+ var remaining = _remaining;
|
|
|
+ if (remaining > 0)
|
|
|
{
|
|
|
ForwardOnNext(_value);
|
|
|
- n--;
|
|
|
+ _remaining = --remaining;
|
|
|
}
|
|
|
|
|
|
- if (n == 0)
|
|
|
+ if (remaining == 0)
|
|
|
{
|
|
|
ForwardOnCompleted();
|
|
|
- return;
|
|
|
}
|
|
|
+ else
|
|
|
+ {
|
|
|
+ var next = scheduler.Schedule(this, (innerScheduler, @this) => @this.LoopRec(innerScheduler));
|
|
|
+ Disposable.TrySetMultiple(ref _task, next);
|
|
|
+ }
|
|
|
+ return Disposable.Empty;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- recurse(n);
|
|
|
+ internal sealed class CountLongRunning : Producer<TResult, CountLongRunning._>
|
|
|
+ {
|
|
|
+ private readonly TResult _value;
|
|
|
+ private readonly ISchedulerLongRunning _scheduler;
|
|
|
+ private readonly int _repeatCount;
|
|
|
+
|
|
|
+ public CountLongRunning(TResult value, int repeatCount, ISchedulerLongRunning scheduler)
|
|
|
+ {
|
|
|
+ _value = value;
|
|
|
+ _scheduler = scheduler;
|
|
|
+ _repeatCount = repeatCount;
|
|
|
+ }
|
|
|
+
|
|
|
+ protected override _ CreateSink(IObserver<TResult> observer) => new _(_value, _repeatCount, observer);
|
|
|
+
|
|
|
+ protected override void Run(_ sink) => sink.Run(_scheduler);
|
|
|
+
|
|
|
+ internal sealed class _ : IdentitySink<TResult>
|
|
|
+ {
|
|
|
+ private readonly TResult _value;
|
|
|
+
|
|
|
+ int _remaining;
|
|
|
+
|
|
|
+ public _(TResult value, int remaining, IObserver<TResult> observer)
|
|
|
+ : base(observer)
|
|
|
+ {
|
|
|
+ _value = value;
|
|
|
+ _remaining = remaining;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void Run(ISchedulerLongRunning longRunning)
|
|
|
+ {
|
|
|
+ SetUpstream(longRunning.ScheduleLongRunning(this, (@this, cancel) => @this.Loop(cancel)));
|
|
|
}
|
|
|
|
|
|
- private void Loop(int n, ICancelable cancel)
|
|
|
+ private void Loop(ICancelable cancel)
|
|
|
{
|
|
|
var value = _value;
|
|
|
+ var n = _remaining;
|
|
|
while (n > 0 && !cancel.IsDisposed)
|
|
|
{
|
|
|
ForwardOnNext(value);
|