|
|
@@ -55,31 +55,25 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
|
|
|
public void Run(Join<TLeft, TRight, TLeftDuration, TRightDuration, TResult> parent)
|
|
|
{
|
|
|
- var leftSubscription = new SingleAssignmentDisposable();
|
|
|
- _group.Add(leftSubscription);
|
|
|
- _leftDone = false;
|
|
|
- _leftID = 0;
|
|
|
+ var leftObserver = new LeftObserver(this);
|
|
|
+ var rightObserver = new RightObserver(this);
|
|
|
|
|
|
- var rightSubscription = new SingleAssignmentDisposable();
|
|
|
- _group.Add(rightSubscription);
|
|
|
- _rightDone = false;
|
|
|
- _rightID = 0;
|
|
|
+ _group.Add(leftObserver);
|
|
|
+ _group.Add(rightObserver);
|
|
|
|
|
|
- leftSubscription.Disposable = parent._left.SubscribeSafe(new LeftObserver(this, leftSubscription));
|
|
|
- rightSubscription.Disposable = parent._right.SubscribeSafe(new RightObserver(this, rightSubscription));
|
|
|
+ leftObserver.SetResource(parent._left.SubscribeSafe(leftObserver));
|
|
|
+ rightObserver.SetResource(parent._right.SubscribeSafe(rightObserver));
|
|
|
|
|
|
SetUpstream(_group);
|
|
|
}
|
|
|
|
|
|
- private sealed class LeftObserver : IObserver<TLeft>
|
|
|
+ private sealed class LeftObserver : SafeObserver<TLeft>
|
|
|
{
|
|
|
private readonly _ _parent;
|
|
|
- private readonly IDisposable _self;
|
|
|
|
|
|
- public LeftObserver(_ parent, IDisposable self)
|
|
|
+ public LeftObserver(_ parent)
|
|
|
{
|
|
|
_parent = parent;
|
|
|
- _self = self;
|
|
|
}
|
|
|
|
|
|
private void Expire(int id, IDisposable resource)
|
|
|
@@ -95,7 +89,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
_parent._group.Remove(resource);
|
|
|
}
|
|
|
|
|
|
- public void OnNext(TLeft value)
|
|
|
+ public override void OnNext(TLeft value)
|
|
|
{
|
|
|
var id = 0;
|
|
|
var rightID = 0;
|
|
|
@@ -106,8 +100,6 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
_parent._leftMap.Add(id, value);
|
|
|
}
|
|
|
|
|
|
- var md = new SingleAssignmentDisposable();
|
|
|
- _parent._group.Add(md);
|
|
|
|
|
|
var duration = default(IObservable<TLeftDuration>);
|
|
|
try
|
|
|
@@ -120,7 +112,10 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- md.Disposable = duration.SubscribeSafe(new DurationObserver(this, id, md));
|
|
|
+ var durationObserver = new DurationObserver(this, id);
|
|
|
+ _parent._group.Add(durationObserver);
|
|
|
+
|
|
|
+ durationObserver.SetResource(duration.SubscribeSafe(durationObserver));
|
|
|
|
|
|
lock (_parent._gate)
|
|
|
{
|
|
|
@@ -145,36 +140,34 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private sealed class DurationObserver : IObserver<TLeftDuration>
|
|
|
+ private sealed class DurationObserver : SafeObserver<TLeftDuration>
|
|
|
{
|
|
|
private readonly LeftObserver _parent;
|
|
|
private readonly int _id;
|
|
|
- private readonly IDisposable _self;
|
|
|
|
|
|
- public DurationObserver(LeftObserver parent, int id, IDisposable self)
|
|
|
+ public DurationObserver(LeftObserver parent, int id)
|
|
|
{
|
|
|
_parent = parent;
|
|
|
_id = id;
|
|
|
- _self = self;
|
|
|
}
|
|
|
|
|
|
- public void OnNext(TLeftDuration value)
|
|
|
+ public override void OnNext(TLeftDuration value)
|
|
|
{
|
|
|
- _parent.Expire(_id, _self);
|
|
|
+ _parent.Expire(_id, this);
|
|
|
}
|
|
|
|
|
|
- public void OnError(Exception error)
|
|
|
+ public override void OnError(Exception error)
|
|
|
{
|
|
|
_parent.OnError(error);
|
|
|
}
|
|
|
|
|
|
- public void OnCompleted()
|
|
|
+ public override void OnCompleted()
|
|
|
{
|
|
|
- _parent.Expire(_id, _self);
|
|
|
+ _parent.Expire(_id, this);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public void OnError(Exception error)
|
|
|
+ public override void OnError(Exception error)
|
|
|
{
|
|
|
lock (_parent._gate)
|
|
|
{
|
|
|
@@ -182,7 +175,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public void OnCompleted()
|
|
|
+ public override void OnCompleted()
|
|
|
{
|
|
|
lock (_parent._gate)
|
|
|
{
|
|
|
@@ -193,21 +186,19 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- _self.Dispose();
|
|
|
+ Dispose();
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private sealed class RightObserver : IObserver<TRight>
|
|
|
+ private sealed class RightObserver : SafeObserver<TRight>
|
|
|
{
|
|
|
private readonly _ _parent;
|
|
|
- private readonly IDisposable _self;
|
|
|
|
|
|
- public RightObserver(_ parent, IDisposable self)
|
|
|
+ public RightObserver(_ parent)
|
|
|
{
|
|
|
_parent = parent;
|
|
|
- _self = self;
|
|
|
}
|
|
|
|
|
|
private void Expire(int id, IDisposable resource)
|
|
|
@@ -223,7 +214,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
_parent._group.Remove(resource);
|
|
|
}
|
|
|
|
|
|
- public void OnNext(TRight value)
|
|
|
+ public override void OnNext(TRight value)
|
|
|
{
|
|
|
var id = 0;
|
|
|
var leftID = 0;
|
|
|
@@ -234,9 +225,6 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
_parent._rightMap.Add(id, value);
|
|
|
}
|
|
|
|
|
|
- var md = new SingleAssignmentDisposable();
|
|
|
- _parent._group.Add(md);
|
|
|
-
|
|
|
var duration = default(IObservable<TRightDuration>);
|
|
|
try
|
|
|
{
|
|
|
@@ -248,7 +236,9 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- md.Disposable = duration.SubscribeSafe(new DurationObserver(this, id, md));
|
|
|
+ var durationObserver = new DurationObserver(this, id);
|
|
|
+ _parent._group.Add(durationObserver);
|
|
|
+ durationObserver.SetResource(duration.SubscribeSafe(durationObserver));
|
|
|
|
|
|
lock (_parent._gate)
|
|
|
{
|
|
|
@@ -273,36 +263,34 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private sealed class DurationObserver : IObserver<TRightDuration>
|
|
|
+ private sealed class DurationObserver : SafeObserver<TRightDuration>
|
|
|
{
|
|
|
private readonly RightObserver _parent;
|
|
|
private readonly int _id;
|
|
|
- private readonly IDisposable _self;
|
|
|
|
|
|
- public DurationObserver(RightObserver parent, int id, IDisposable self)
|
|
|
+ public DurationObserver(RightObserver parent, int id)
|
|
|
{
|
|
|
_parent = parent;
|
|
|
_id = id;
|
|
|
- _self = self;
|
|
|
}
|
|
|
|
|
|
- public void OnNext(TRightDuration value)
|
|
|
+ public override void OnNext(TRightDuration value)
|
|
|
{
|
|
|
- _parent.Expire(_id, _self);
|
|
|
+ _parent.Expire(_id, this);
|
|
|
}
|
|
|
|
|
|
- public void OnError(Exception error)
|
|
|
+ public override void OnError(Exception error)
|
|
|
{
|
|
|
_parent.OnError(error);
|
|
|
}
|
|
|
|
|
|
- public void OnCompleted()
|
|
|
+ public override void OnCompleted()
|
|
|
{
|
|
|
- _parent.Expire(_id, _self);
|
|
|
+ _parent.Expire(_id, this);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public void OnError(Exception error)
|
|
|
+ public override void OnError(Exception error)
|
|
|
{
|
|
|
lock (_parent._gate)
|
|
|
{
|
|
|
@@ -310,7 +298,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public void OnCompleted()
|
|
|
+ public override void OnCompleted()
|
|
|
{
|
|
|
lock (_parent._gate)
|
|
|
{
|
|
|
@@ -321,7 +309,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- _self.Dispose();
|
|
|
+ Dispose();
|
|
|
}
|
|
|
}
|
|
|
}
|