|
|
@@ -130,32 +130,36 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- SetUpstream(Disposable.Create(() =>
|
|
|
- {
|
|
|
- subscription.Dispose();
|
|
|
-
|
|
|
- lock (parent._gate)
|
|
|
+ SetUpstream(Disposable.Create(
|
|
|
+ (parent, subscription),
|
|
|
+ tuple =>
|
|
|
{
|
|
|
- if (--parent._count == 0)
|
|
|
- {
|
|
|
- var cancelable = (SingleAssignmentDisposable)Volatile.Read(ref parent._serial);
|
|
|
+ var (closureParent, closureSubscription) = tuple;
|
|
|
+
|
|
|
+ closureSubscription.Dispose();
|
|
|
|
|
|
- cancelable.Disposable = parent._scheduler.Schedule(cancelable, parent._disconnectTime, (self, state) =>
|
|
|
+ lock (closureParent._gate)
|
|
|
+ {
|
|
|
+ if (--closureParent._count == 0)
|
|
|
{
|
|
|
- lock (parent._gate)
|
|
|
+ var cancelable = (SingleAssignmentDisposable)Volatile.Read(ref closureParent._serial);
|
|
|
+
|
|
|
+ cancelable.Disposable = closureParent._scheduler.Schedule((cancelable, closureParent), closureParent._disconnectTime, (self, tuple2) =>
|
|
|
{
|
|
|
- if (object.ReferenceEquals(Volatile.Read(ref parent._serial), state))
|
|
|
+ lock (tuple2.closureParent._gate)
|
|
|
{
|
|
|
- parent._connectableSubscription.Dispose();
|
|
|
- parent._connectableSubscription = null;
|
|
|
+ if (object.ReferenceEquals(Volatile.Read(ref tuple2.closureParent._serial), tuple2.cancelable))
|
|
|
+ {
|
|
|
+ tuple2.closureParent._connectableSubscription.Dispose();
|
|
|
+ tuple2.closureParent._connectableSubscription = null;
|
|
|
+ }
|
|
|
}
|
|
|
- }
|
|
|
|
|
|
- return Disposable.Empty;
|
|
|
- });
|
|
|
+ return Disposable.Empty;
|
|
|
+ });
|
|
|
+ }
|
|
|
}
|
|
|
- }
|
|
|
- }));
|
|
|
+ }));
|
|
|
}
|
|
|
}
|
|
|
}
|