|
|
@@ -195,16 +195,10 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
if (isShift)
|
|
|
_nextShift += _timeShift;
|
|
|
|
|
|
- m.Disposable = _scheduler.Schedule(new State { isSpan = isSpan, isShift = isShift }, ts, Tick);
|
|
|
+ m.Disposable = _scheduler.Schedule((@this: this, isSpan, isShift), ts, (_, tuple) => [email protected](tuple.isSpan, tuple.isShift));
|
|
|
}
|
|
|
|
|
|
- struct State
|
|
|
- {
|
|
|
- public bool isSpan;
|
|
|
- public bool isShift;
|
|
|
- }
|
|
|
-
|
|
|
- private IDisposable Tick(IScheduler self, State state)
|
|
|
+ private IDisposable Tick(bool isSpan, bool isShift)
|
|
|
{
|
|
|
lock (_gate)
|
|
|
{
|
|
|
@@ -214,13 +208,13 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
// overloads of Window and Buffer. Before v2, the two
|
|
|
// operations below were reversed.
|
|
|
//
|
|
|
- if (state.isSpan)
|
|
|
+ if (isSpan)
|
|
|
{
|
|
|
var s = _q.Dequeue();
|
|
|
s.OnCompleted();
|
|
|
}
|
|
|
|
|
|
- if (state.isShift)
|
|
|
+ if (isShift)
|
|
|
{
|
|
|
CreateWindow();
|
|
|
}
|
|
|
@@ -300,7 +294,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
|
|
|
CreateWindow();
|
|
|
|
|
|
- groupDisposable.Add(parent._scheduler.SchedulePeriodic(parent._timeSpan, Tick));
|
|
|
+ groupDisposable.Add(parent._scheduler.SchedulePeriodic(this, parent._timeSpan, @this => @this.Tick()));
|
|
|
groupDisposable.Add(parent._source.SubscribeSafe(this));
|
|
|
|
|
|
SetUpstream(_refCountDisposable);
|
|
|
@@ -413,10 +407,10 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
var m = new SingleAssignmentDisposable();
|
|
|
_timerD.Disposable = m;
|
|
|
|
|
|
- m.Disposable = _scheduler.Schedule(window, _timeSpan, Tick);
|
|
|
+ m.Disposable = _scheduler.Schedule((@this: this, window), _timeSpan, (_, tuple) => tuple.@this.Tick(tuple.window));
|
|
|
}
|
|
|
|
|
|
- private IDisposable Tick(IScheduler self, Subject<TSource> window)
|
|
|
+ private IDisposable Tick(Subject<TSource> window)
|
|
|
{
|
|
|
var d = Disposable.Empty;
|
|
|
|
|
|
@@ -551,9 +545,9 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- var closingSubscription = new SingleAssignmentDisposable();
|
|
|
- _m.Disposable = closingSubscription;
|
|
|
- closingSubscription.Disposable = windowClose.SubscribeSafe(new WindowClosingObserver(this, closingSubscription));
|
|
|
+ var observer = new WindowClosingObserver(this);
|
|
|
+ _m.Disposable = observer;
|
|
|
+ observer.SetResource(windowClose.SubscribeSafe(observer));
|
|
|
}
|
|
|
|
|
|
private void CloseWindow(IDisposable closingSubscription)
|
|
|
@@ -572,30 +566,28 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
_windowGate.Wait(this, @this => @this.CreateWindowClose());
|
|
|
}
|
|
|
|
|
|
- private sealed class WindowClosingObserver : IObserver<TWindowClosing>
|
|
|
+ private sealed class WindowClosingObserver : SafeObserver<TWindowClosing>
|
|
|
{
|
|
|
private readonly _ _parent;
|
|
|
- private readonly IDisposable _self;
|
|
|
|
|
|
- public WindowClosingObserver(_ parent, IDisposable self)
|
|
|
+ public WindowClosingObserver(_ parent)
|
|
|
{
|
|
|
_parent = parent;
|
|
|
- _self = self;
|
|
|
}
|
|
|
|
|
|
- public void OnNext(TWindowClosing value)
|
|
|
+ public override void OnNext(TWindowClosing value)
|
|
|
{
|
|
|
- _parent.CloseWindow(_self);
|
|
|
+ _parent.CloseWindow(this);
|
|
|
}
|
|
|
|
|
|
- public void OnError(Exception error)
|
|
|
+ public override void OnError(Exception error)
|
|
|
{
|
|
|
_parent.OnError(error);
|
|
|
}
|
|
|
|
|
|
- public void OnCompleted()
|
|
|
+ public override void OnCompleted()
|
|
|
{
|
|
|
- _parent.CloseWindow(_self);
|
|
|
+ _parent.CloseWindow(this);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -638,7 +630,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
_windowBoundaries = windowBoundaries;
|
|
|
}
|
|
|
|
|
|
- protected override _ CreateSink(IObserver<IObservable<TSource>> observer) => new _(this, observer);
|
|
|
+ protected override _ CreateSink(IObserver<IObservable<TSource>> observer) => new _(observer);
|
|
|
|
|
|
protected override void Run(_ sink) => sink.Run(this);
|
|
|
|
|
|
@@ -646,12 +638,9 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
{
|
|
|
private readonly object _gate = new object();
|
|
|
|
|
|
- private readonly IObservable<TWindowClosing> _windowBoundaries;
|
|
|
-
|
|
|
- public _(Boundaries parent, IObserver<IObservable<TSource>> observer)
|
|
|
+ public _(IObserver<IObservable<TSource>> observer)
|
|
|
: base(observer)
|
|
|
{
|
|
|
- _windowBoundaries = parent._windowBoundaries;
|
|
|
}
|
|
|
|
|
|
private ISubject<TSource> _window;
|