|
|
@@ -26,23 +26,22 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
|
|
|
private sealed class _ : Sink<TSource>, IObserver<TSource>
|
|
|
{
|
|
|
- private readonly Func<TSource, bool> _predicate;
|
|
|
- private bool _running;
|
|
|
+ private Func<TSource, bool> _predicate;
|
|
|
|
|
|
public _(Func<TSource, bool> predicate, IObserver<TSource> observer, IDisposable cancel)
|
|
|
: base(observer, cancel)
|
|
|
{
|
|
|
_predicate = predicate;
|
|
|
- _running = false;
|
|
|
}
|
|
|
|
|
|
public void OnNext(TSource value)
|
|
|
{
|
|
|
- if (!_running)
|
|
|
+ if (_predicate != null)
|
|
|
{
|
|
|
+ var shouldStart = default(bool);
|
|
|
try
|
|
|
{
|
|
|
- _running = !_predicate(value);
|
|
|
+ shouldStart = !_predicate(value);
|
|
|
}
|
|
|
catch (Exception exception)
|
|
|
{
|
|
|
@@ -50,9 +49,15 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
base.Dispose();
|
|
|
return;
|
|
|
}
|
|
|
- }
|
|
|
|
|
|
- if (_running)
|
|
|
+ if (shouldStart)
|
|
|
+ {
|
|
|
+ _predicate = null;
|
|
|
+
|
|
|
+ base._observer.OnNext(value);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else
|
|
|
{
|
|
|
base._observer.OnNext(value);
|
|
|
}
|
|
|
@@ -92,25 +97,24 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
|
|
|
private sealed class _ : Sink<TSource>, IObserver<TSource>
|
|
|
{
|
|
|
- private readonly Func<TSource, int, bool> _predicate;
|
|
|
- private bool _running;
|
|
|
+ private Func<TSource, int, bool> _predicate;
|
|
|
private int _index;
|
|
|
|
|
|
public _(Func<TSource, int, bool> predicate, IObserver<TSource> observer, IDisposable cancel)
|
|
|
: base(observer, cancel)
|
|
|
{
|
|
|
_predicate = predicate;
|
|
|
- _running = false;
|
|
|
_index = 0;
|
|
|
}
|
|
|
|
|
|
public void OnNext(TSource value)
|
|
|
{
|
|
|
- if (!_running)
|
|
|
+ if (_predicate != null)
|
|
|
{
|
|
|
+ var shouldStart = default(bool);
|
|
|
try
|
|
|
{
|
|
|
- _running = !_predicate(value, checked(_index++));
|
|
|
+ shouldStart = !_predicate(value, checked(_index++));
|
|
|
}
|
|
|
catch (Exception exception)
|
|
|
{
|
|
|
@@ -118,9 +122,15 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
base.Dispose();
|
|
|
return;
|
|
|
}
|
|
|
- }
|
|
|
|
|
|
- if (_running)
|
|
|
+ if (shouldStart)
|
|
|
+ {
|
|
|
+ _predicate = null;
|
|
|
+
|
|
|
+ base._observer.OnNext(value);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else
|
|
|
{
|
|
|
base._observer.OnNext(value);
|
|
|
}
|