|
@@ -20,28 +20,25 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
|
|
|
protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
|
|
|
{
|
|
|
- var sink = new _(this, observer, cancel);
|
|
|
+ var sink = new _(observer, cancel);
|
|
|
setSink(sink);
|
|
|
- return sink.Run();
|
|
|
+ return sink.Run(this);
|
|
|
}
|
|
|
|
|
|
- class _ : Sink<TSource>
|
|
|
+ private sealed class _ : Sink<TSource>
|
|
|
{
|
|
|
- private readonly SkipUntil<TSource, TOther> _parent;
|
|
|
-
|
|
|
- public _(SkipUntil<TSource, TOther> parent, IObserver<TSource> observer, IDisposable cancel)
|
|
|
+ public _(IObserver<TSource> observer, IDisposable cancel)
|
|
|
: base(observer, cancel)
|
|
|
{
|
|
|
- _parent = parent;
|
|
|
}
|
|
|
|
|
|
- public IDisposable Run()
|
|
|
+ public IDisposable Run(SkipUntil<TSource, TOther> parent)
|
|
|
{
|
|
|
- var sourceObserver = new T(this);
|
|
|
- var otherObserver = new O(this, sourceObserver);
|
|
|
+ var sourceObserver = new SourceObserver(this);
|
|
|
+ var otherObserver = new OtherObserver(this, sourceObserver);
|
|
|
|
|
|
- var sourceSubscription = _parent._source.SubscribeSafe(sourceObserver);
|
|
|
- var otherSubscription = _parent._other.SubscribeSafe(otherObserver);
|
|
|
+ var sourceSubscription = parent._source.SubscribeSafe(sourceObserver);
|
|
|
+ var otherSubscription = parent._other.SubscribeSafe(otherObserver);
|
|
|
|
|
|
sourceObserver.Disposable = sourceSubscription;
|
|
|
otherObserver.Disposable = otherSubscription;
|
|
@@ -52,13 +49,13 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
);
|
|
|
}
|
|
|
|
|
|
- class T : IObserver<TSource>
|
|
|
+ private sealed class SourceObserver : IObserver<TSource>
|
|
|
{
|
|
|
private readonly _ _parent;
|
|
|
public volatile IObserver<TSource> _observer;
|
|
|
private readonly SingleAssignmentDisposable _subscription;
|
|
|
|
|
|
- public T(_ parent)
|
|
|
+ public SourceObserver(_ parent)
|
|
|
{
|
|
|
_parent = parent;
|
|
|
_observer = NopObserver<TSource>.Instance;
|
|
@@ -88,13 +85,13 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- class O : IObserver<TOther>
|
|
|
+ private sealed class OtherObserver : IObserver<TOther>
|
|
|
{
|
|
|
private readonly _ _parent;
|
|
|
- private readonly T _sourceObserver;
|
|
|
+ private readonly SourceObserver _sourceObserver;
|
|
|
private readonly SingleAssignmentDisposable _subscription;
|
|
|
|
|
|
- public O(_ parent, T sourceObserver)
|
|
|
+ public OtherObserver(_ parent, SourceObserver sourceObserver)
|
|
|
{
|
|
|
_parent = parent;
|
|
|
_sourceObserver = sourceObserver;
|
|
@@ -158,26 +155,24 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
|
|
|
protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
|
|
|
{
|
|
|
- var sink = new _(this, observer, cancel);
|
|
|
+ var sink = new _(observer, cancel);
|
|
|
setSink(sink);
|
|
|
- return sink.Run();
|
|
|
+ return sink.Run(this);
|
|
|
}
|
|
|
|
|
|
- class _ : Sink<TSource>, IObserver<TSource>
|
|
|
+ private sealed class _ : Sink<TSource>, IObserver<TSource>
|
|
|
{
|
|
|
- private readonly SkipUntil<TSource> _parent;
|
|
|
private volatile bool _open;
|
|
|
|
|
|
- public _(SkipUntil<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
|
|
|
+ public _(IObserver<TSource> observer, IDisposable cancel)
|
|
|
: base(observer, cancel)
|
|
|
{
|
|
|
- _parent = parent;
|
|
|
}
|
|
|
|
|
|
- public IDisposable Run()
|
|
|
+ public IDisposable Run(SkipUntil<TSource> parent)
|
|
|
{
|
|
|
- var t = _parent._scheduler.Schedule(_parent._startTime, Tick);
|
|
|
- var d = _parent._source.SubscribeSafe(this);
|
|
|
+ var t = parent._scheduler.Schedule(parent._startTime, Tick);
|
|
|
+ var d = parent._source.SubscribeSafe(this);
|
|
|
return StableCompositeDisposable.Create(t, d);
|
|
|
}
|
|
|
|