|
|
@@ -14,6 +14,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
internal sealed class Eager : Producer<TSource, Eager._>
|
|
|
{
|
|
|
private readonly IConnectableObservable<TSource> _source;
|
|
|
+
|
|
|
private readonly object _gate;
|
|
|
private int _count;
|
|
|
private IDisposable _connectableSubscription;
|
|
|
@@ -22,60 +23,45 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
{
|
|
|
_source = source;
|
|
|
_gate = new object();
|
|
|
+ _count = 0;
|
|
|
+ _connectableSubscription = default(IDisposable);
|
|
|
}
|
|
|
|
|
|
- protected override _ CreateSink(IObserver<TSource> observer) => new _(observer, this);
|
|
|
+ protected override _ CreateSink(IObserver<TSource> observer) => new _(observer);
|
|
|
|
|
|
- protected override void Run(_ sink) => sink.Run();
|
|
|
+ protected override void Run(_ sink) => sink.Run(this);
|
|
|
|
|
|
internal sealed class _ : IdentitySink<TSource>
|
|
|
{
|
|
|
- private readonly Eager _parent;
|
|
|
-
|
|
|
- public _(IObserver<TSource> observer, Eager parent)
|
|
|
+ public _(IObserver<TSource> observer)
|
|
|
: base(observer)
|
|
|
{
|
|
|
- _parent = parent;
|
|
|
}
|
|
|
|
|
|
- public void Run()
|
|
|
+ public void Run(Eager parent)
|
|
|
{
|
|
|
- Run(_parent._source);
|
|
|
+ var subscription = parent._source.SubscribeSafe(this);
|
|
|
|
|
|
- lock (_parent._gate)
|
|
|
+ lock (parent._gate)
|
|
|
{
|
|
|
- if (++_parent._count == 1)
|
|
|
+ if (++parent._count == 1)
|
|
|
{
|
|
|
- // We need to set _connectableSubscription to something
|
|
|
- // before Connect because if Connect terminates synchronously,
|
|
|
- // Dispose(bool) gets executed and will try to dispose
|
|
|
- // _connectableSubscription of null.
|
|
|
- // ?.Dispose() is no good because the dispose action has to be
|
|
|
- // executed anyway.
|
|
|
- // We can't inline SAD either because the IDisposable of Connect
|
|
|
- // may belong to the wrong connection.
|
|
|
- var sad = new SingleAssignmentDisposable();
|
|
|
- _parent._connectableSubscription = sad;
|
|
|
-
|
|
|
- sad.Disposable = _parent._source.Connect();
|
|
|
+ parent._connectableSubscription = parent._source.Connect();
|
|
|
}
|
|
|
}
|
|
|
- }
|
|
|
-
|
|
|
- protected override void Dispose(bool disposing)
|
|
|
- {
|
|
|
- base.Dispose(disposing);
|
|
|
|
|
|
- if (disposing)
|
|
|
+ SetUpstream(Disposable.Create(() =>
|
|
|
{
|
|
|
- lock (_parent._gate)
|
|
|
+ subscription.Dispose();
|
|
|
+
|
|
|
+ lock (parent._gate)
|
|
|
{
|
|
|
- if (--_parent._count == 0)
|
|
|
+ if (--parent._count == 0)
|
|
|
{
|
|
|
- _parent._connectableSubscription.Dispose();
|
|
|
+ parent._connectableSubscription.Dispose();
|
|
|
}
|
|
|
}
|
|
|
- }
|
|
|
+ }));
|
|
|
}
|
|
|
}
|
|
|
}
|