|
|
@@ -141,9 +141,9 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
lock (_gate)
|
|
|
ForwardOnNext(group);
|
|
|
|
|
|
- var md = new SingleAssignmentDisposable();
|
|
|
- _groupDisposable.Add(md);
|
|
|
- md.Disposable = duration.SubscribeSafe(new DurationObserver(this, key, writer, md));
|
|
|
+ var durationObserver = new DurationObserver(this, key, writer);
|
|
|
+ _groupDisposable.Add(durationObserver);
|
|
|
+ durationObserver.SetResource(duration.SubscribeSafe(durationObserver));
|
|
|
}
|
|
|
|
|
|
var element = default(TElement);
|
|
|
@@ -178,33 +178,31 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
writer.OnNext(element);
|
|
|
}
|
|
|
|
|
|
- private sealed class DurationObserver : IObserver<TDuration>
|
|
|
+ private sealed class DurationObserver : SafeObserver<TDuration>
|
|
|
{
|
|
|
private readonly _ _parent;
|
|
|
private readonly TKey _key;
|
|
|
private readonly ISubject<TElement> _writer;
|
|
|
- private readonly IDisposable _self;
|
|
|
|
|
|
- public DurationObserver(_ parent, TKey key, ISubject<TElement> writer, IDisposable self)
|
|
|
+ public DurationObserver(_ parent, TKey key, ISubject<TElement> writer)
|
|
|
{
|
|
|
_parent = parent;
|
|
|
_key = key;
|
|
|
_writer = writer;
|
|
|
- _self = self;
|
|
|
}
|
|
|
|
|
|
- public void OnNext(TDuration value)
|
|
|
+ public override void OnNext(TDuration value)
|
|
|
{
|
|
|
OnCompleted();
|
|
|
}
|
|
|
|
|
|
- public void OnError(Exception error)
|
|
|
+ public override void OnError(Exception error)
|
|
|
{
|
|
|
_parent.Error(error);
|
|
|
- _self.Dispose();
|
|
|
+ Dispose();
|
|
|
}
|
|
|
|
|
|
- public void OnCompleted()
|
|
|
+ public override void OnCompleted()
|
|
|
{
|
|
|
if (_key == null)
|
|
|
{
|
|
|
@@ -225,7 +223,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- _parent._groupDisposable.Remove(_self);
|
|
|
+ _parent._groupDisposable.Remove(this);
|
|
|
}
|
|
|
}
|
|
|
|