|
@@ -21,7 +21,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
|
|
|
protected override IDisposable Run(_ sink) => _source.SubscribeSafe(sink);
|
|
|
|
|
|
- internal abstract class _ : Sink<TSource>, IObserver<TSource>
|
|
|
+ internal abstract class _ : IdentitySink<TSource>
|
|
|
{
|
|
|
protected readonly IComparer<TSource> _comparer;
|
|
|
|
|
@@ -30,10 +30,6 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
{
|
|
|
_comparer = comparer;
|
|
|
}
|
|
|
-
|
|
|
- public abstract void OnCompleted();
|
|
|
- public abstract void OnError(Exception error);
|
|
|
- public abstract void OnNext(TSource value);
|
|
|
}
|
|
|
|
|
|
private sealed class NonNull : _
|
|
@@ -60,8 +56,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
}
|
|
|
catch (Exception ex)
|
|
|
{
|
|
|
- base._observer.OnError(ex);
|
|
|
- base.Dispose();
|
|
|
+ ForwardOnError(ex);
|
|
|
return;
|
|
|
}
|
|
|
|
|
@@ -77,25 +72,17 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public override void OnError(Exception error)
|
|
|
- {
|
|
|
- base._observer.OnError(error);
|
|
|
- base.Dispose();
|
|
|
- }
|
|
|
-
|
|
|
public override void OnCompleted()
|
|
|
{
|
|
|
if (!_hasValue)
|
|
|
{
|
|
|
- base._observer.OnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS));
|
|
|
+ ForwardOnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS));
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- base._observer.OnNext(_lastValue);
|
|
|
- base._observer.OnCompleted();
|
|
|
+ ForwardOnNext(_lastValue);
|
|
|
+ ForwardOnCompleted();
|
|
|
}
|
|
|
-
|
|
|
- base.Dispose();
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -127,8 +114,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
}
|
|
|
catch (Exception ex)
|
|
|
{
|
|
|
- base._observer.OnError(ex);
|
|
|
- base.Dispose();
|
|
|
+ ForwardOnError(ex);
|
|
|
return;
|
|
|
}
|
|
|
|
|
@@ -142,15 +128,13 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
|
|
|
public override void OnError(Exception error)
|
|
|
{
|
|
|
- base._observer.OnError(error);
|
|
|
- base.Dispose();
|
|
|
+ ForwardOnError(error);
|
|
|
}
|
|
|
|
|
|
public override void OnCompleted()
|
|
|
{
|
|
|
- base._observer.OnNext(_lastValue);
|
|
|
- base._observer.OnCompleted();
|
|
|
- base.Dispose();
|
|
|
+ ForwardOnNext(_lastValue);
|
|
|
+ ForwardOnCompleted();
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -168,7 +152,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
|
|
|
protected override IDisposable Run(_ sink) => _source.SubscribeSafe(sink);
|
|
|
|
|
|
- internal sealed class _ : Sink<double>, IObserver<double>
|
|
|
+ internal sealed class _ : IdentitySink<double>
|
|
|
{
|
|
|
private bool _hasValue;
|
|
|
private double _lastValue;
|
|
@@ -180,7 +164,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
_lastValue = default(double);
|
|
|
}
|
|
|
|
|
|
- public void OnNext(double value)
|
|
|
+ public override void OnNext(double value)
|
|
|
{
|
|
|
if (_hasValue)
|
|
|
{
|
|
@@ -196,25 +180,17 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public void OnError(Exception error)
|
|
|
- {
|
|
|
- base._observer.OnError(error);
|
|
|
- base.Dispose();
|
|
|
- }
|
|
|
-
|
|
|
- public void OnCompleted()
|
|
|
+ public override void OnCompleted()
|
|
|
{
|
|
|
if (!_hasValue)
|
|
|
{
|
|
|
- base._observer.OnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS));
|
|
|
+ ForwardOnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS));
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- base._observer.OnNext(_lastValue);
|
|
|
- base._observer.OnCompleted();
|
|
|
+ ForwardOnNext(_lastValue);
|
|
|
+ ForwardOnCompleted();
|
|
|
}
|
|
|
-
|
|
|
- base.Dispose();
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -232,7 +208,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
|
|
|
protected override IDisposable Run(_ sink) => _source.SubscribeSafe(sink);
|
|
|
|
|
|
- internal sealed class _ : Sink<float>, IObserver<float>
|
|
|
+ internal sealed class _ : IdentitySink<float>
|
|
|
{
|
|
|
private bool _hasValue;
|
|
|
private float _lastValue;
|
|
@@ -244,7 +220,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
_lastValue = default(float);
|
|
|
}
|
|
|
|
|
|
- public void OnNext(float value)
|
|
|
+ public override void OnNext(float value)
|
|
|
{
|
|
|
if (_hasValue)
|
|
|
{
|
|
@@ -260,25 +236,17 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public void OnError(Exception error)
|
|
|
- {
|
|
|
- base._observer.OnError(error);
|
|
|
- base.Dispose();
|
|
|
- }
|
|
|
-
|
|
|
- public void OnCompleted()
|
|
|
+ public override void OnCompleted()
|
|
|
{
|
|
|
if (!_hasValue)
|
|
|
{
|
|
|
- base._observer.OnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS));
|
|
|
+ ForwardOnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS));
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- base._observer.OnNext(_lastValue);
|
|
|
- base._observer.OnCompleted();
|
|
|
+ ForwardOnNext(_lastValue);
|
|
|
+ ForwardOnCompleted();
|
|
|
}
|
|
|
-
|
|
|
- base.Dispose();
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -296,7 +264,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
|
|
|
protected override IDisposable Run(_ sink) => _source.SubscribeSafe(sink);
|
|
|
|
|
|
- internal sealed class _ : Sink<decimal>, IObserver<decimal>
|
|
|
+ internal sealed class _ : IdentitySink<decimal>
|
|
|
{
|
|
|
private bool _hasValue;
|
|
|
private decimal _lastValue;
|
|
@@ -308,7 +276,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
_lastValue = default(decimal);
|
|
|
}
|
|
|
|
|
|
- public void OnNext(decimal value)
|
|
|
+ public override void OnNext(decimal value)
|
|
|
{
|
|
|
if (_hasValue)
|
|
|
{
|
|
@@ -324,25 +292,17 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public void OnError(Exception error)
|
|
|
- {
|
|
|
- base._observer.OnError(error);
|
|
|
- base.Dispose();
|
|
|
- }
|
|
|
-
|
|
|
- public void OnCompleted()
|
|
|
+ public override void OnCompleted()
|
|
|
{
|
|
|
if (!_hasValue)
|
|
|
{
|
|
|
- base._observer.OnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS));
|
|
|
+ ForwardOnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS));
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- base._observer.OnNext(_lastValue);
|
|
|
- base._observer.OnCompleted();
|
|
|
+ ForwardOnNext(_lastValue);
|
|
|
+ ForwardOnCompleted();
|
|
|
}
|
|
|
-
|
|
|
- base.Dispose();
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -360,7 +320,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
|
|
|
protected override IDisposable Run(_ sink) => _source.SubscribeSafe(sink);
|
|
|
|
|
|
- internal sealed class _ : Sink<int>, IObserver<int>
|
|
|
+ internal sealed class _ : IdentitySink<int>
|
|
|
{
|
|
|
private bool _hasValue;
|
|
|
private int _lastValue;
|
|
@@ -372,7 +332,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
_lastValue = default(int);
|
|
|
}
|
|
|
|
|
|
- public void OnNext(int value)
|
|
|
+ public override void OnNext(int value)
|
|
|
{
|
|
|
if (_hasValue)
|
|
|
{
|
|
@@ -388,25 +348,17 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public void OnError(Exception error)
|
|
|
- {
|
|
|
- base._observer.OnError(error);
|
|
|
- base.Dispose();
|
|
|
- }
|
|
|
-
|
|
|
- public void OnCompleted()
|
|
|
+ public override void OnCompleted()
|
|
|
{
|
|
|
if (!_hasValue)
|
|
|
{
|
|
|
- base._observer.OnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS));
|
|
|
+ ForwardOnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS));
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- base._observer.OnNext(_lastValue);
|
|
|
- base._observer.OnCompleted();
|
|
|
+ ForwardOnNext(_lastValue);
|
|
|
+ ForwardOnCompleted();
|
|
|
}
|
|
|
-
|
|
|
- base.Dispose();
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -424,7 +376,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
|
|
|
protected override IDisposable Run(_ sink) => _source.SubscribeSafe(sink);
|
|
|
|
|
|
- internal sealed class _ : Sink<long>, IObserver<long>
|
|
|
+ internal sealed class _ : IdentitySink<long>
|
|
|
{
|
|
|
private bool _hasValue;
|
|
|
private long _lastValue;
|
|
@@ -436,7 +388,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
_lastValue = default(long);
|
|
|
}
|
|
|
|
|
|
- public void OnNext(long value)
|
|
|
+ public override void OnNext(long value)
|
|
|
{
|
|
|
if (_hasValue)
|
|
|
{
|
|
@@ -452,25 +404,17 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public void OnError(Exception error)
|
|
|
- {
|
|
|
- base._observer.OnError(error);
|
|
|
- base.Dispose();
|
|
|
- }
|
|
|
-
|
|
|
- public void OnCompleted()
|
|
|
+ public override void OnCompleted()
|
|
|
{
|
|
|
if (!_hasValue)
|
|
|
{
|
|
|
- base._observer.OnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS));
|
|
|
+ ForwardOnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS));
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- base._observer.OnNext(_lastValue);
|
|
|
- base._observer.OnCompleted();
|
|
|
+ ForwardOnNext(_lastValue);
|
|
|
+ ForwardOnCompleted();
|
|
|
}
|
|
|
-
|
|
|
- base.Dispose();
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -488,7 +432,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
|
|
|
protected override IDisposable Run(_ sink) => _source.SubscribeSafe(sink);
|
|
|
|
|
|
- internal sealed class _ : Sink<double?>, IObserver<double?>
|
|
|
+ internal sealed class _ : IdentitySink<double?>
|
|
|
{
|
|
|
private double? _lastValue;
|
|
|
|
|
@@ -498,7 +442,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
_lastValue = default(double?);
|
|
|
}
|
|
|
|
|
|
- public void OnNext(double? value)
|
|
|
+ public override void OnNext(double? value)
|
|
|
{
|
|
|
if (!value.HasValue)
|
|
|
return;
|
|
@@ -516,17 +460,10 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public void OnError(Exception error)
|
|
|
- {
|
|
|
- base._observer.OnError(error);
|
|
|
- base.Dispose();
|
|
|
- }
|
|
|
-
|
|
|
- public void OnCompleted()
|
|
|
+ public override void OnCompleted()
|
|
|
{
|
|
|
- base._observer.OnNext(_lastValue);
|
|
|
- base._observer.OnCompleted();
|
|
|
- base.Dispose();
|
|
|
+ ForwardOnNext(_lastValue);
|
|
|
+ ForwardOnCompleted();
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -544,7 +481,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
|
|
|
protected override IDisposable Run(_ sink) => _source.SubscribeSafe(sink);
|
|
|
|
|
|
- internal sealed class _ : Sink<float?>, IObserver<float?>
|
|
|
+ internal sealed class _ : IdentitySink<float?>
|
|
|
{
|
|
|
private float? _lastValue;
|
|
|
|
|
@@ -554,7 +491,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
_lastValue = default(float?);
|
|
|
}
|
|
|
|
|
|
- public void OnNext(float? value)
|
|
|
+ public override void OnNext(float? value)
|
|
|
{
|
|
|
if (!value.HasValue)
|
|
|
return;
|
|
@@ -572,17 +509,10 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public void OnError(Exception error)
|
|
|
- {
|
|
|
- base._observer.OnError(error);
|
|
|
- base.Dispose();
|
|
|
- }
|
|
|
-
|
|
|
- public void OnCompleted()
|
|
|
+ public override void OnCompleted()
|
|
|
{
|
|
|
- base._observer.OnNext(_lastValue);
|
|
|
- base._observer.OnCompleted();
|
|
|
- base.Dispose();
|
|
|
+ ForwardOnNext(_lastValue);
|
|
|
+ ForwardOnCompleted();
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -600,7 +530,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
|
|
|
protected override IDisposable Run(_ sink) => _source.SubscribeSafe(sink);
|
|
|
|
|
|
- internal sealed class _ : Sink<decimal?>, IObserver<decimal?>
|
|
|
+ internal sealed class _ : IdentitySink<decimal?>
|
|
|
{
|
|
|
private decimal? _lastValue;
|
|
|
|
|
@@ -610,7 +540,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
_lastValue = default(decimal?);
|
|
|
}
|
|
|
|
|
|
- public void OnNext(decimal? value)
|
|
|
+ public override void OnNext(decimal? value)
|
|
|
{
|
|
|
if (!value.HasValue)
|
|
|
return;
|
|
@@ -628,17 +558,10 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public void OnError(Exception error)
|
|
|
- {
|
|
|
- base._observer.OnError(error);
|
|
|
- base.Dispose();
|
|
|
- }
|
|
|
-
|
|
|
- public void OnCompleted()
|
|
|
+ public override void OnCompleted()
|
|
|
{
|
|
|
- base._observer.OnNext(_lastValue);
|
|
|
- base._observer.OnCompleted();
|
|
|
- base.Dispose();
|
|
|
+ ForwardOnNext(_lastValue);
|
|
|
+ ForwardOnCompleted();
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -656,7 +579,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
|
|
|
protected override IDisposable Run(_ sink) => _source.SubscribeSafe(sink);
|
|
|
|
|
|
- internal sealed class _ : Sink<int?>, IObserver<int?>
|
|
|
+ internal sealed class _ : IdentitySink<int?>
|
|
|
{
|
|
|
private int? _lastValue;
|
|
|
|
|
@@ -666,7 +589,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
_lastValue = default(int?);
|
|
|
}
|
|
|
|
|
|
- public void OnNext(int? value)
|
|
|
+ public override void OnNext(int? value)
|
|
|
{
|
|
|
if (!value.HasValue)
|
|
|
return;
|
|
@@ -684,17 +607,10 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public void OnError(Exception error)
|
|
|
- {
|
|
|
- base._observer.OnError(error);
|
|
|
- base.Dispose();
|
|
|
- }
|
|
|
-
|
|
|
- public void OnCompleted()
|
|
|
+ public override void OnCompleted()
|
|
|
{
|
|
|
- base._observer.OnNext(_lastValue);
|
|
|
- base._observer.OnCompleted();
|
|
|
- base.Dispose();
|
|
|
+ ForwardOnNext(_lastValue);
|
|
|
+ ForwardOnCompleted();
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -712,7 +628,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
|
|
|
protected override IDisposable Run(_ sink) => _source.SubscribeSafe(sink);
|
|
|
|
|
|
- internal sealed class _ : Sink<long?>, IObserver<long?>
|
|
|
+ internal sealed class _ : IdentitySink<long?>
|
|
|
{
|
|
|
private long? _lastValue;
|
|
|
|
|
@@ -722,7 +638,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
_lastValue = default(long?);
|
|
|
}
|
|
|
|
|
|
- public void OnNext(long? value)
|
|
|
+ public override void OnNext(long? value)
|
|
|
{
|
|
|
if (!value.HasValue)
|
|
|
return;
|
|
@@ -740,17 +656,10 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public void OnError(Exception error)
|
|
|
- {
|
|
|
- base._observer.OnError(error);
|
|
|
- base.Dispose();
|
|
|
- }
|
|
|
-
|
|
|
- public void OnCompleted()
|
|
|
+ public override void OnCompleted()
|
|
|
{
|
|
|
- base._observer.OnNext(_lastValue);
|
|
|
- base._observer.OnCompleted();
|
|
|
- base.Dispose();
|
|
|
+ ForwardOnNext(_lastValue);
|
|
|
+ ForwardOnCompleted();
|
|
|
}
|
|
|
}
|
|
|
}
|