|
|
@@ -20,31 +20,28 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
|
|
|
protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
|
|
|
{
|
|
|
- var sink = new _(this, observer, cancel);
|
|
|
+ var sink = new _(observer, cancel);
|
|
|
setSink(sink);
|
|
|
- return sink.Run();
|
|
|
+ return sink.Run(this);
|
|
|
}
|
|
|
|
|
|
- class _ : Sink<TSource>
|
|
|
+ private sealed class _ : Sink<TSource>
|
|
|
{
|
|
|
- private readonly TakeUntil<TSource, TOther> _parent;
|
|
|
-
|
|
|
- public _(TakeUntil<TSource, TOther> parent, IObserver<TSource> observer, IDisposable cancel)
|
|
|
+ public _(IObserver<TSource> observer, IDisposable cancel)
|
|
|
: base(observer, cancel)
|
|
|
{
|
|
|
- _parent = parent;
|
|
|
}
|
|
|
|
|
|
- public IDisposable Run()
|
|
|
+ public IDisposable Run(TakeUntil<TSource, TOther> parent)
|
|
|
{
|
|
|
- var sourceObserver = new T(this);
|
|
|
- var otherObserver = new O(this, sourceObserver);
|
|
|
+ var sourceObserver = new SourceObserver(this);
|
|
|
+ var otherObserver = new OtherObserver(this, sourceObserver);
|
|
|
|
|
|
// COMPAT - Order of Subscribe calls per v1.0.10621
|
|
|
- var otherSubscription = _parent._other.SubscribeSafe(otherObserver);
|
|
|
+ var otherSubscription = parent._other.SubscribeSafe(otherObserver);
|
|
|
otherObserver.Disposable = otherSubscription;
|
|
|
|
|
|
- var sourceSubscription = _parent._source.SubscribeSafe(sourceObserver);
|
|
|
+ var sourceSubscription = parent._source.SubscribeSafe(sourceObserver);
|
|
|
|
|
|
return StableCompositeDisposable.Create(
|
|
|
otherSubscription,
|
|
|
@@ -58,7 +55,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
*
|
|
|
* Notice an approach where the "other" channel performs an Interlocked.Exchange operation on
|
|
|
* the _parent._observer field to substitute it with a NopObserver<TSource> doesn't work,
|
|
|
- * because the "other" channel still need to send an OnCompleted message, which could happen
|
|
|
+ * because the "other" channel still needs to send an OnCompleted message, which could happen
|
|
|
* concurrently with another message when the "source" channel has already read from the
|
|
|
* _parent._observer field between making the On* call.
|
|
|
*
|
|
|
@@ -66,12 +63,12 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
* access to the outgoing observer while dispatching a message. Doing this more fine-grained
|
|
|
* than using locks turns out to be tricky and doesn't reduce cost.
|
|
|
*/
|
|
|
- class T : IObserver<TSource>
|
|
|
+ private sealed class SourceObserver : IObserver<TSource>
|
|
|
{
|
|
|
private readonly _ _parent;
|
|
|
public volatile bool _open;
|
|
|
|
|
|
- public T(_ parent)
|
|
|
+ public SourceObserver(_ parent)
|
|
|
{
|
|
|
_parent = parent;
|
|
|
_open = false;
|
|
|
@@ -111,13 +108,13 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- class O : IObserver<TOther>
|
|
|
+ private sealed class OtherObserver : IObserver<TOther>
|
|
|
{
|
|
|
private readonly _ _parent;
|
|
|
- private readonly T _sourceObserver;
|
|
|
+ private readonly SourceObserver _sourceObserver;
|
|
|
private readonly SingleAssignmentDisposable _subscription;
|
|
|
|
|
|
- public O(_ parent, T sourceObserver)
|
|
|
+ public OtherObserver(_ parent, SourceObserver sourceObserver)
|
|
|
{
|
|
|
_parent = parent;
|
|
|
_sourceObserver = sourceObserver;
|
|
|
@@ -191,29 +188,24 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
|
|
|
protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
|
|
|
{
|
|
|
- var sink = new _(this, observer, cancel);
|
|
|
+ var sink = new _(observer, cancel);
|
|
|
setSink(sink);
|
|
|
- return sink.Run();
|
|
|
+ return sink.Run(this);
|
|
|
}
|
|
|
|
|
|
- class _ : Sink<TSource>, IObserver<TSource>
|
|
|
+ private sealed class _ : Sink<TSource>, IObserver<TSource>
|
|
|
{
|
|
|
- private readonly TakeUntil<TSource> _parent;
|
|
|
+ private readonly object _gate = new object();
|
|
|
|
|
|
- public _(TakeUntil<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
|
|
|
+ public _(IObserver<TSource> observer, IDisposable cancel)
|
|
|
: base(observer, cancel)
|
|
|
{
|
|
|
- _parent = parent;
|
|
|
}
|
|
|
|
|
|
- private object _gate;
|
|
|
-
|
|
|
- public IDisposable Run()
|
|
|
+ public IDisposable Run(TakeUntil<TSource> parent)
|
|
|
{
|
|
|
- _gate = new object();
|
|
|
-
|
|
|
- var t = _parent._scheduler.Schedule(_parent._endTime, Tick);
|
|
|
- var d = _parent._source.SubscribeSafe(this);
|
|
|
+ var t = parent._scheduler.Schedule(parent._endTime, Tick);
|
|
|
+ var d = parent._source.SubscribeSafe(this);
|
|
|
return StableCompositeDisposable.Create(t, d);
|
|
|
}
|
|
|
|