|
@@ -476,26 +476,37 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- internal sealed class ZipObserver<T> : IObserver<T>
|
|
|
|
|
|
|
+ internal sealed class ZipObserver<T> : SafeObserver<T>
|
|
|
{
|
|
{
|
|
|
private readonly object _gate;
|
|
private readonly object _gate;
|
|
|
private readonly IZip _parent;
|
|
private readonly IZip _parent;
|
|
|
private readonly int _index;
|
|
private readonly int _index;
|
|
|
- private readonly IDisposable _self;
|
|
|
|
|
private readonly Queue<T> _values;
|
|
private readonly Queue<T> _values;
|
|
|
|
|
|
|
|
- public ZipObserver(object gate, IZip parent, int index, IDisposable self)
|
|
|
|
|
|
|
+ public ZipObserver(object gate, IZip parent, int index)
|
|
|
{
|
|
{
|
|
|
_gate = gate;
|
|
_gate = gate;
|
|
|
_parent = parent;
|
|
_parent = parent;
|
|
|
_index = index;
|
|
_index = index;
|
|
|
- _self = self;
|
|
|
|
|
_values = new Queue<T>();
|
|
_values = new Queue<T>();
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
public Queue<T> Values => _values;
|
|
public Queue<T> Values => _values;
|
|
|
|
|
|
|
|
- public void OnNext(T value)
|
|
|
|
|
|
|
+ protected override void Dispose(bool disposing)
|
|
|
|
|
+ {
|
|
|
|
|
+ base.Dispose(disposing);
|
|
|
|
|
+
|
|
|
|
|
+ if (disposing)
|
|
|
|
|
+ {
|
|
|
|
|
+ lock (_gate)
|
|
|
|
|
+ {
|
|
|
|
|
+ _values.Clear();
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ public override void OnNext(T value)
|
|
|
{
|
|
{
|
|
|
lock (_gate)
|
|
lock (_gate)
|
|
|
{
|
|
{
|
|
@@ -504,9 +515,9 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- public void OnError(Exception error)
|
|
|
|
|
|
|
+ public override void OnError(Exception error)
|
|
|
{
|
|
{
|
|
|
- _self.Dispose();
|
|
|
|
|
|
|
+ Dispose();
|
|
|
|
|
|
|
|
lock (_gate)
|
|
lock (_gate)
|
|
|
{
|
|
{
|
|
@@ -514,9 +525,9 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- public void OnCompleted()
|
|
|
|
|
|
|
+ public override void OnCompleted()
|
|
|
{
|
|
{
|
|
|
- _self.Dispose();
|
|
|
|
|
|
|
+ Dispose();
|
|
|
|
|
|
|
|
lock (_gate)
|
|
lock (_gate)
|
|
|
{
|
|
{
|