|
|
@@ -2,8 +2,6 @@
|
|
|
// The .NET Foundation licenses this file to you under the MIT License.
|
|
|
// See the LICENSE file in the project root for more information.
|
|
|
|
|
|
-#nullable disable
|
|
|
-
|
|
|
using System.Collections.Generic;
|
|
|
using System.Collections.ObjectModel;
|
|
|
using System.Linq;
|
|
|
@@ -33,6 +31,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
internal sealed class _ : IdentitySink<TResult>
|
|
|
{
|
|
|
private readonly Func<TFirst, TSecond, TResult> _resultSelector;
|
|
|
+ private readonly object _gate = new object();
|
|
|
|
|
|
public _(Func<TFirst, TSecond, TResult> resultSelector, IObserver<TResult> observer)
|
|
|
: base(observer)
|
|
|
@@ -40,20 +39,16 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
_resultSelector = resultSelector;
|
|
|
}
|
|
|
|
|
|
- private object _gate;
|
|
|
-
|
|
|
- private IDisposable _firstDisposable;
|
|
|
- private IDisposable _secondDisposable;
|
|
|
+ private IDisposable? _firstDisposable;
|
|
|
+ private IDisposable? _secondDisposable;
|
|
|
|
|
|
public void Run(IObservable<TFirst> first, IObservable<TSecond> second)
|
|
|
{
|
|
|
- _gate = new object();
|
|
|
-
|
|
|
var fstO = new FirstObserver(this);
|
|
|
var sndO = new SecondObserver(this);
|
|
|
|
|
|
- fstO.Other = sndO;
|
|
|
- sndO.Other = fstO;
|
|
|
+ fstO.SetOther(sndO);
|
|
|
+ sndO.SetOther(fstO);
|
|
|
|
|
|
Disposable.SetSingle(ref _firstDisposable, first.SubscribeSafe(fstO));
|
|
|
Disposable.SetSingle(ref _secondDisposable, second.SubscribeSafe(sndO));
|
|
|
@@ -66,6 +61,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
Disposable.Dispose(ref _firstDisposable);
|
|
|
Disposable.Dispose(ref _secondDisposable);
|
|
|
}
|
|
|
+
|
|
|
base.Dispose(disposing);
|
|
|
}
|
|
|
|
|
|
@@ -77,12 +73,13 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
public FirstObserver(_ parent)
|
|
|
{
|
|
|
_parent = parent;
|
|
|
+ _other = default!; // NB: Will be set by SetOther.
|
|
|
}
|
|
|
|
|
|
- public SecondObserver Other { set { _other = value; } }
|
|
|
+ public void SetOther(SecondObserver other) { _other = other; }
|
|
|
|
|
|
public bool HasValue { get; private set; }
|
|
|
- public TFirst Value { get; private set; }
|
|
|
+ public TFirst? Value { get; private set; }
|
|
|
public bool Done { get; private set; }
|
|
|
|
|
|
public void OnNext(TFirst value)
|
|
|
@@ -97,7 +94,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
TResult res;
|
|
|
try
|
|
|
{
|
|
|
- res = _parent._resultSelector(value, _other.Value);
|
|
|
+ res = _parent._resultSelector(value, _other.Value!);
|
|
|
}
|
|
|
catch (Exception ex)
|
|
|
{
|
|
|
@@ -148,12 +145,13 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
public SecondObserver(_ parent)
|
|
|
{
|
|
|
_parent = parent;
|
|
|
+ _other = default!; // NB: Will be set by SetOther.
|
|
|
}
|
|
|
|
|
|
- public FirstObserver Other { set { _other = value; } }
|
|
|
+ public void SetOther(FirstObserver other) { _other = other; }
|
|
|
|
|
|
public bool HasValue { get; private set; }
|
|
|
- public TSecond Value { get; private set; }
|
|
|
+ public TSecond? Value { get; private set; }
|
|
|
public bool Done { get; private set; }
|
|
|
|
|
|
public void OnNext(TSecond value)
|
|
|
@@ -168,7 +166,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
TResult res;
|
|
|
try
|
|
|
{
|
|
|
- res = _parent._resultSelector(_other.Value, value);
|
|
|
+ res = _parent._resultSelector(_other.Value!, value);
|
|
|
}
|
|
|
catch (Exception ex)
|
|
|
{
|
|
|
@@ -330,7 +328,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
private readonly object _gate;
|
|
|
private readonly ICombineLatest _parent;
|
|
|
private readonly int _index;
|
|
|
- private T _value;
|
|
|
+ private T? _value;
|
|
|
|
|
|
public CombineLatestObserver(object gate, ICombineLatest parent, int index)
|
|
|
{
|
|
|
@@ -339,7 +337,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
_index = index;
|
|
|
}
|
|
|
|
|
|
- public T Value => _value;
|
|
|
+ public T? Value => _value;
|
|
|
|
|
|
public override void OnNext(T value)
|
|
|
{
|
|
|
@@ -393,18 +391,24 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
|
|
|
internal sealed class _ : IdentitySink<TResult>
|
|
|
{
|
|
|
+ private readonly object _gate = new object();
|
|
|
private readonly Func<IList<TSource>, TResult> _resultSelector;
|
|
|
|
|
|
public _(Func<IList<TSource>, TResult> resultSelector, IObserver<TResult> observer)
|
|
|
: base(observer)
|
|
|
{
|
|
|
_resultSelector = resultSelector;
|
|
|
+
|
|
|
+ // NB: These will be set in Run before getting used.
|
|
|
+ _hasValue = null!;
|
|
|
+ _values = null!;
|
|
|
+ _isDone = null!;
|
|
|
+ _subscriptions = null!;
|
|
|
}
|
|
|
|
|
|
- private object _gate;
|
|
|
private bool[] _hasValue;
|
|
|
private bool _hasValueAll;
|
|
|
- private List<TSource> _values;
|
|
|
+ private TSource[] _values;
|
|
|
private bool[] _isDone;
|
|
|
private IDisposable[] _subscriptions;
|
|
|
|
|
|
@@ -417,18 +421,12 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
_hasValue = new bool[N];
|
|
|
_hasValueAll = false;
|
|
|
|
|
|
- _values = new List<TSource>(N);
|
|
|
- for (var i = 0; i < N; i++)
|
|
|
- {
|
|
|
- _values.Add(default);
|
|
|
- }
|
|
|
+ _values = new TSource[N];
|
|
|
|
|
|
_isDone = new bool[N];
|
|
|
|
|
|
_subscriptions = new IDisposable[N];
|
|
|
|
|
|
- _gate = new object();
|
|
|
-
|
|
|
for (var i = 0; i < N; i++)
|
|
|
{
|
|
|
var j = i;
|