|
|
@@ -50,9 +50,10 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
_hasLatest = true;
|
|
|
}
|
|
|
|
|
|
- var d = new SingleAssignmentDisposable();
|
|
|
- Disposable.TrySetSerial(ref _innerSerialDisposable, d);
|
|
|
- d.Disposable = value.SubscribeSafe(new InnerObserver(this, id, d));
|
|
|
+ var innerObserver = new InnerObserver(this, id);
|
|
|
+
|
|
|
+ Disposable.TrySetSerial(ref _innerSerialDisposable, innerObserver);
|
|
|
+ innerObserver.SetResource(value.SubscribeSafe(innerObserver));
|
|
|
}
|
|
|
|
|
|
public override void OnError(Exception error)
|
|
|
@@ -75,20 +76,18 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private sealed class InnerObserver : IObserver<TSource>
|
|
|
+ private sealed class InnerObserver : SafeObserver<TSource>
|
|
|
{
|
|
|
private readonly _ _parent;
|
|
|
private readonly ulong _id;
|
|
|
- private readonly IDisposable _self;
|
|
|
|
|
|
- public InnerObserver(_ parent, ulong id, IDisposable self)
|
|
|
+ public InnerObserver(_ parent, ulong id)
|
|
|
{
|
|
|
_parent = parent;
|
|
|
_id = id;
|
|
|
- _self = self;
|
|
|
}
|
|
|
|
|
|
- public void OnNext(TSource value)
|
|
|
+ public override void OnNext(TSource value)
|
|
|
{
|
|
|
lock (_parent._gate)
|
|
|
{
|
|
|
@@ -99,11 +98,11 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public void OnError(Exception error)
|
|
|
+ public override void OnError(Exception error)
|
|
|
{
|
|
|
lock (_parent._gate)
|
|
|
{
|
|
|
- _self.Dispose();
|
|
|
+ Dispose();
|
|
|
|
|
|
if (_parent._latest == _id)
|
|
|
{
|
|
|
@@ -112,11 +111,11 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public void OnCompleted()
|
|
|
+ public override void OnCompleted()
|
|
|
{
|
|
|
lock (_parent._gate)
|
|
|
{
|
|
|
- _self.Dispose();
|
|
|
+ Dispose();
|
|
|
|
|
|
if (_parent._latest == _id)
|
|
|
{
|