|
|
@@ -192,16 +192,10 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
if (isShift)
|
|
|
_nextShift += _timeShift;
|
|
|
|
|
|
- m.Disposable = _scheduler.Schedule(new State { isSpan = isSpan, isShift = isShift }, ts, Tick);
|
|
|
+ m.Disposable = _scheduler.Schedule((@this: this, isSpan, isShift), ts, (_, tuple) => [email protected](tuple.isSpan, tuple.isShift));
|
|
|
}
|
|
|
|
|
|
- private struct State
|
|
|
- {
|
|
|
- public bool isSpan;
|
|
|
- public bool isShift;
|
|
|
- }
|
|
|
-
|
|
|
- private IDisposable Tick(IScheduler self, State state)
|
|
|
+ private IDisposable Tick(bool isSpan, bool isShift)
|
|
|
{
|
|
|
lock (_gate)
|
|
|
{
|
|
|
@@ -211,13 +205,13 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
// took a breaking change in v2 to ensure consistency across overloads. For more info,
|
|
|
// see the comment in Tick for Window.
|
|
|
//
|
|
|
- if (state.isSpan)
|
|
|
+ if (isSpan)
|
|
|
{
|
|
|
var s = _q.Dequeue();
|
|
|
ForwardOnNext(s);
|
|
|
}
|
|
|
|
|
|
- if (state.isShift)
|
|
|
+ if (isShift)
|
|
|
{
|
|
|
CreateWindow();
|
|
|
}
|
|
|
@@ -296,7 +290,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
{
|
|
|
_list = new List<TSource>();
|
|
|
|
|
|
- Disposable.SetSingle(ref _periodicDisposable, parent._scheduler.SchedulePeriodic(parent._timeSpan, Tick));
|
|
|
+ Disposable.SetSingle(ref _periodicDisposable, parent._scheduler.SchedulePeriodic(this, parent._timeSpan, @this => @this.Tick()));
|
|
|
base.Run(parent._source);
|
|
|
}
|
|
|
|
|
|
@@ -408,10 +402,10 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
var m = new SingleAssignmentDisposable();
|
|
|
Disposable.TrySetSerial(ref _timerSerial, m);
|
|
|
|
|
|
- m.Disposable = _parent._scheduler.Schedule(id, _parent._timeSpan, Tick);
|
|
|
+ m.Disposable = _parent._scheduler.Schedule((@this: this, id), _parent._timeSpan, (_, tuple) => tuple.@this.Tick(tuple.id));
|
|
|
}
|
|
|
|
|
|
- private IDisposable Tick(IScheduler self, int id)
|
|
|
+ private IDisposable Tick(int id)
|
|
|
{
|
|
|
var d = Disposable.Empty;
|
|
|
|
|
|
@@ -548,9 +542,9 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- var closingSubscription = new SingleAssignmentDisposable();
|
|
|
- Disposable.TrySetSerial(ref _bufferClosingSerialDisposable, closingSubscription);
|
|
|
- closingSubscription.Disposable = bufferClose.SubscribeSafe(new BufferClosingObserver(this, closingSubscription));
|
|
|
+ var closingObserver = new BufferClosingObserver(this);
|
|
|
+ Disposable.TrySetSerial(ref _bufferClosingSerialDisposable, closingObserver);
|
|
|
+ closingObserver.SetResource(bufferClose.SubscribeSafe(closingObserver));
|
|
|
}
|
|
|
|
|
|
private void CloseBuffer(IDisposable closingSubscription)
|
|
|
@@ -567,30 +561,28 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
_bufferGate.Wait(this, @this => @this.CreateBufferClose());
|
|
|
}
|
|
|
|
|
|
- private sealed class BufferClosingObserver : IObserver<TBufferClosing>
|
|
|
+ private sealed class BufferClosingObserver : SafeObserver<TBufferClosing>
|
|
|
{
|
|
|
private readonly _ _parent;
|
|
|
- private readonly IDisposable _self;
|
|
|
|
|
|
- public BufferClosingObserver(_ parent, IDisposable self)
|
|
|
+ public BufferClosingObserver(_ parent)
|
|
|
{
|
|
|
_parent = parent;
|
|
|
- _self = self;
|
|
|
}
|
|
|
|
|
|
- public void OnNext(TBufferClosing value)
|
|
|
+ public override void OnNext(TBufferClosing value)
|
|
|
{
|
|
|
- _parent.CloseBuffer(_self);
|
|
|
+ _parent.CloseBuffer(this);
|
|
|
}
|
|
|
|
|
|
- public void OnError(Exception error)
|
|
|
+ public override void OnError(Exception error)
|
|
|
{
|
|
|
_parent.OnError(error);
|
|
|
}
|
|
|
|
|
|
- public void OnCompleted()
|
|
|
+ public override void OnCompleted()
|
|
|
{
|
|
|
- _parent.CloseBuffer(_self);
|
|
|
+ _parent.CloseBuffer(this);
|
|
|
}
|
|
|
}
|
|
|
|