|
|
@@ -16,20 +16,20 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
IScheduler Scheduler { get; }
|
|
|
}
|
|
|
|
|
|
- internal sealed class SingleValue : Producer<TSource, SingleValue._>, IAppendPrepend
|
|
|
+ internal abstract class SingleBase<TSink> : Producer<TSource, TSink>, IAppendPrepend
|
|
|
+ where TSink : IDisposable
|
|
|
{
|
|
|
- private readonly IObservable<TSource> _source;
|
|
|
- private readonly TSource _value;
|
|
|
- private readonly bool _append;
|
|
|
+ protected readonly IObservable<TSource> _source;
|
|
|
+ protected readonly TSource _value;
|
|
|
+ protected readonly bool _append;
|
|
|
|
|
|
- public IScheduler Scheduler { get; }
|
|
|
+ public abstract IScheduler Scheduler { get; }
|
|
|
|
|
|
- public SingleValue(IObservable<TSource> source, TSource value, IScheduler scheduler, bool append)
|
|
|
+ public SingleBase(IObservable<TSource> source, TSource value, bool append)
|
|
|
{
|
|
|
_source = source;
|
|
|
_value = value;
|
|
|
_append = append;
|
|
|
- Scheduler = scheduler;
|
|
|
}
|
|
|
|
|
|
public IAppendPrepend Append(TSource value)
|
|
|
@@ -79,6 +79,18 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
|
|
|
return new Recursive(_source, prepend, append, Scheduler);
|
|
|
}
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ internal sealed class SingleValue : SingleBase<SingleValue._>
|
|
|
+ {
|
|
|
+ public override IScheduler Scheduler { get; }
|
|
|
+
|
|
|
+ public SingleValue(IObservable<TSource> source, TSource value, IScheduler scheduler, bool append)
|
|
|
+ : base (source, value, append)
|
|
|
+ {
|
|
|
+ Scheduler = scheduler;
|
|
|
+ }
|
|
|
|
|
|
protected override _ CreateSink(IObserver<TSource> observer) => new _(this, observer);
|
|
|
|
|
|
@@ -479,47 +491,13 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- internal sealed class AppendPrependSingleImmediate<TSource> : Producer<TSource, AppendPrependSingleImmediate<TSource>._>, IAppendPrepend<TSource>
|
|
|
+ internal sealed class SingleImmediate : SingleBase<SingleImmediate._>
|
|
|
{
|
|
|
- private readonly IObservable<TSource> _source;
|
|
|
- private readonly TSource _value;
|
|
|
- private readonly bool _append;
|
|
|
-
|
|
|
- public IScheduler Scheduler { get { return ImmediateScheduler.Instance; } }
|
|
|
-
|
|
|
- public AppendPrependSingleImmediate(IObservable<TSource> source, TSource value, bool append)
|
|
|
- {
|
|
|
- _source = source;
|
|
|
- _value = value;
|
|
|
- _append = append;
|
|
|
- }
|
|
|
-
|
|
|
- public IAppendPrepend<TSource> Append(TSource value)
|
|
|
- {
|
|
|
- var prev = new Node<TSource>(_value);
|
|
|
-
|
|
|
- if (_append)
|
|
|
- {
|
|
|
- return new AppendPrependMultiple<TSource>(_source,
|
|
|
- null, new Node<TSource>(prev, value), Scheduler);
|
|
|
- }
|
|
|
-
|
|
|
- return new AppendPrependMultiple<TSource>(_source,
|
|
|
- prev, new Node<TSource>(value), Scheduler);
|
|
|
- }
|
|
|
+ public override IScheduler Scheduler => ImmediateScheduler.Instance;
|
|
|
|
|
|
- public IAppendPrepend<TSource> Prepend(TSource value)
|
|
|
+ public SingleImmediate(IObservable<TSource> source, TSource value, bool append)
|
|
|
+ : base(source, value, append)
|
|
|
{
|
|
|
- var prev = new Node<TSource>(_value);
|
|
|
-
|
|
|
- if (_append)
|
|
|
- {
|
|
|
- return new AppendPrependMultiple<TSource>(_source,
|
|
|
- new Node<TSource>(value), prev, Scheduler);
|
|
|
- }
|
|
|
-
|
|
|
- return new AppendPrependMultiple<TSource>(_source,
|
|
|
- new Node<TSource>(prev, value), null, Scheduler);
|
|
|
}
|
|
|
|
|
|
protected override _ CreateSink(IObserver<TSource> observer) => new _(this, observer);
|
|
|
@@ -532,7 +510,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
private readonly TSource _value;
|
|
|
private readonly bool _append;
|
|
|
|
|
|
- public _(AppendPrependSingleImmediate<TSource> parent, IObserver<TSource> observer)
|
|
|
+ public _(SingleImmediate parent, IObserver<TSource> observer)
|
|
|
: base(observer)
|
|
|
{
|
|
|
_source = parent._source;
|