|
|
@@ -20,38 +20,33 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
|
|
|
protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
|
|
|
{
|
|
|
- var sink = new _(this, observer, cancel);
|
|
|
+ var sink = new _(observer, cancel);
|
|
|
setSink(sink);
|
|
|
- return sink.Run();
|
|
|
+ return sink.Run(this);
|
|
|
}
|
|
|
|
|
|
- class _ : Sink<TSource>, IObserver<TSource>
|
|
|
+ private sealed class _ : Sink<TSource>, IObserver<TSource>
|
|
|
{
|
|
|
- private readonly Sample<TSource, TSample> _parent;
|
|
|
+ private readonly object _gate = new object();
|
|
|
|
|
|
- public _(Sample<TSource, TSample> parent, IObserver<TSource> observer, IDisposable cancel)
|
|
|
+ public _(IObserver<TSource> observer, IDisposable cancel)
|
|
|
: base(observer, cancel)
|
|
|
{
|
|
|
- _parent = parent;
|
|
|
}
|
|
|
|
|
|
- private object _gate;
|
|
|
-
|
|
|
private IDisposable _sourceSubscription;
|
|
|
|
|
|
private bool _hasValue;
|
|
|
private TSource _value;
|
|
|
private bool _atEnd;
|
|
|
|
|
|
- public IDisposable Run()
|
|
|
+ public IDisposable Run(Sample<TSource, TSample> parent)
|
|
|
{
|
|
|
- _gate = new object();
|
|
|
-
|
|
|
var sourceSubscription = new SingleAssignmentDisposable();
|
|
|
_sourceSubscription = sourceSubscription;
|
|
|
- sourceSubscription.Disposable = _parent._source.SubscribeSafe(this);
|
|
|
+ sourceSubscription.Disposable = parent._source.SubscribeSafe(this);
|
|
|
|
|
|
- var samplerSubscription = _parent._sampler.SubscribeSafe(new SampleImpl(this));
|
|
|
+ var samplerSubscription = parent._sampler.SubscribeSafe(new SampleObserver(this));
|
|
|
|
|
|
return StableCompositeDisposable.Create(_sourceSubscription, samplerSubscription);
|
|
|
}
|
|
|
@@ -83,11 +78,11 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- class SampleImpl : IObserver<TSample>
|
|
|
+ private sealed class SampleObserver : IObserver<TSample>
|
|
|
{
|
|
|
private readonly _ _parent;
|
|
|
|
|
|
- public SampleImpl(_ parent)
|
|
|
+ public SampleObserver(_ parent)
|
|
|
{
|
|
|
_parent = parent;
|
|
|
}
|
|
|
@@ -156,40 +151,35 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
|
|
|
protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
|
|
|
{
|
|
|
- var sink = new _(this, observer, cancel);
|
|
|
+ var sink = new _(observer, cancel);
|
|
|
setSink(sink);
|
|
|
- return sink.Run();
|
|
|
+ return sink.Run(this);
|
|
|
}
|
|
|
|
|
|
- class _ : Sink<TSource>, IObserver<TSource>
|
|
|
+ private sealed class _ : Sink<TSource>, IObserver<TSource>
|
|
|
{
|
|
|
- private readonly Sample<TSource> _parent;
|
|
|
+ private object _gate = new object();
|
|
|
|
|
|
- public _(Sample<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
|
|
|
+ public _(IObserver<TSource> observer, IDisposable cancel)
|
|
|
: base(observer, cancel)
|
|
|
{
|
|
|
- _parent = parent;
|
|
|
}
|
|
|
|
|
|
- private object _gate;
|
|
|
-
|
|
|
private IDisposable _sourceSubscription;
|
|
|
|
|
|
private bool _hasValue;
|
|
|
private TSource _value;
|
|
|
private bool _atEnd;
|
|
|
|
|
|
- public IDisposable Run()
|
|
|
+ public IDisposable Run(Sample<TSource> parent)
|
|
|
{
|
|
|
- _gate = new object();
|
|
|
-
|
|
|
var sourceSubscription = new SingleAssignmentDisposable();
|
|
|
_sourceSubscription = sourceSubscription;
|
|
|
- sourceSubscription.Disposable = _parent._source.SubscribeSafe(this);
|
|
|
+ sourceSubscription.Disposable = parent._source.SubscribeSafe(this);
|
|
|
|
|
|
return StableCompositeDisposable.Create(
|
|
|
sourceSubscription,
|
|
|
- _parent._scheduler.SchedulePeriodic(_parent._interval, Tick)
|
|
|
+ parent._scheduler.SchedulePeriodic(parent._interval, Tick)
|
|
|
);
|
|
|
}
|
|
|
|