Prechádzať zdrojové kódy

Merge pull request #1358 from dotnet/dev/bartde/rx_nullable_part16

Enable #nullable for Zip and CombineLatest.
Bart J.F. De Smet 5 rokov pred
rodič
commit
26bb960bd2

Rozdielové dáta súboru neboli zobrazené, pretože súbor je príliš veľký
+ 441 - 296
Rx.NET/Source/src/System.Reactive/Linq/Observable/CombineLatest.NAry.cs


+ 13 - 6
Rx.NET/Source/src/System.Reactive/Linq/Observable/CombineLatest.NAry.tt

@@ -8,8 +8,6 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-#nullable disable
-
 using System.Reactive.Disposables;
 
 namespace System.Reactive.Linq.ObservableImpl
@@ -25,7 +23,7 @@ for (var i = 3; i <= 16; i++)
 {
     var ts = string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j));
     var os = string.Join(", ", Enumerable.Range(1, i).Select(j => "IObservable<T" + j + "> source" + j));
-    var vs = string.Join(", ", Enumerable.Range(1, i).Select(j => "_observer" + j + ".Value"));
+    var vs = string.Join(", ", Enumerable.Range(1, i).Select(j => "_observer" + j + ".Value!"));
     var ss = string.Join(", ", Enumerable.Range(1, i).Select(j => "_source" + j));
 #>
     internal sealed class CombineLatest<<#=ts#>, TResult> : Producer<TResult, CombineLatest<<#=ts#>, TResult>._>
@@ -61,20 +59,29 @@ for (var j = 1; j <= i; j++)
         {
             private readonly Func<<#=ts#>, TResult> _resultSelector;
 
+<#
+for (var j = 1; j <= i; j++)
+{
+#>
+            private readonly CombineLatestObserver<T<#=j#>> _observer<#=j#>;
+<#
+}
+#>
+
             public _(Func<<#=ts#>, TResult> resultSelector, IObserver<TResult> observer)
                 : base(<#=i#>, observer)
             {
                 _resultSelector = resultSelector;
-            }
 
 <#
 for (var j = 1; j <= i; j++)
 {
 #>
-            private CombineLatestObserver<T<#=j#>> _observer<#=j#>;
+                _observer<#=j#> = new CombineLatestObserver<T<#=j#>>(_gate, this, <#=j - 1#>);
 <#
 }
 #>
+            }
 
             public void Run(<#=os#>)
             {
@@ -84,7 +91,7 @@ for (var j = 1; j <= i; j++)
 for (var j = 1; j <= i; j++)
 {
 #>
-                subscriptions[<#=j - 1#>] = _observer<#=j#> = new CombineLatestObserver<T<#=j#>>(_gate, this, <#=j - 1#>);
+                subscriptions[<#=j - 1#>] = _observer<#=j#>;
 <#
 }
 #>

+ 25 - 27
Rx.NET/Source/src/System.Reactive/Linq/Observable/CombineLatest.cs

@@ -2,8 +2,6 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-#nullable disable
-
 using System.Collections.Generic;
 using System.Collections.ObjectModel;
 using System.Linq;
@@ -33,6 +31,7 @@ namespace System.Reactive.Linq.ObservableImpl
         internal sealed class _ : IdentitySink<TResult>
         {
             private readonly Func<TFirst, TSecond, TResult> _resultSelector;
+            private readonly object _gate = new object();
 
             public _(Func<TFirst, TSecond, TResult> resultSelector, IObserver<TResult> observer)
                 : base(observer)
@@ -40,20 +39,16 @@ namespace System.Reactive.Linq.ObservableImpl
                 _resultSelector = resultSelector;
             }
 
-            private object _gate;
-
-            private IDisposable _firstDisposable;
-            private IDisposable _secondDisposable;
+            private IDisposable? _firstDisposable;
+            private IDisposable? _secondDisposable;
 
             public void Run(IObservable<TFirst> first, IObservable<TSecond> second)
             {
-                _gate = new object();
-
                 var fstO = new FirstObserver(this);
                 var sndO = new SecondObserver(this);
 
-                fstO.Other = sndO;
-                sndO.Other = fstO;
+                fstO.SetOther(sndO);
+                sndO.SetOther(fstO);
 
                 Disposable.SetSingle(ref _firstDisposable, first.SubscribeSafe(fstO));
                 Disposable.SetSingle(ref _secondDisposable, second.SubscribeSafe(sndO));
@@ -66,6 +61,7 @@ namespace System.Reactive.Linq.ObservableImpl
                     Disposable.Dispose(ref _firstDisposable);
                     Disposable.Dispose(ref _secondDisposable);
                 }
+
                 base.Dispose(disposing);
             }
 
@@ -77,12 +73,13 @@ namespace System.Reactive.Linq.ObservableImpl
                 public FirstObserver(_ parent)
                 {
                     _parent = parent;
+                    _other = default!; // NB: Will be set by SetOther.
                 }
 
-                public SecondObserver Other { set { _other = value; } }
+                public void SetOther(SecondObserver other) { _other = other; }
 
                 public bool HasValue { get; private set; }
-                public TFirst Value { get; private set; }
+                public TFirst? Value { get; private set; }
                 public bool Done { get; private set; }
 
                 public void OnNext(TFirst value)
@@ -97,7 +94,7 @@ namespace System.Reactive.Linq.ObservableImpl
                             TResult res;
                             try
                             {
-                                res = _parent._resultSelector(value, _other.Value);
+                                res = _parent._resultSelector(value, _other.Value!);
                             }
                             catch (Exception ex)
                             {
@@ -148,12 +145,13 @@ namespace System.Reactive.Linq.ObservableImpl
                 public SecondObserver(_ parent)
                 {
                     _parent = parent;
+                    _other = default!; // NB: Will be set by SetOther.
                 }
 
-                public FirstObserver Other { set { _other = value; } }
+                public void SetOther(FirstObserver other) { _other = other; }
 
                 public bool HasValue { get; private set; }
-                public TSecond Value { get; private set; }
+                public TSecond? Value { get; private set; }
                 public bool Done { get; private set; }
 
                 public void OnNext(TSecond value)
@@ -168,7 +166,7 @@ namespace System.Reactive.Linq.ObservableImpl
                             TResult res;
                             try
                             {
-                                res = _parent._resultSelector(_other.Value, value);
+                                res = _parent._resultSelector(_other.Value!, value);
                             }
                             catch (Exception ex)
                             {
@@ -330,7 +328,7 @@ namespace System.Reactive.Linq.ObservableImpl
         private readonly object _gate;
         private readonly ICombineLatest _parent;
         private readonly int _index;
-        private T _value;
+        private T? _value;
 
         public CombineLatestObserver(object gate, ICombineLatest parent, int index)
         {
@@ -339,7 +337,7 @@ namespace System.Reactive.Linq.ObservableImpl
             _index = index;
         }
 
-        public T Value => _value;
+        public T? Value => _value;
 
         public override void OnNext(T value)
         {
@@ -393,18 +391,24 @@ namespace System.Reactive.Linq.ObservableImpl
 
         internal sealed class _ : IdentitySink<TResult>
         {
+            private readonly object _gate = new object();
             private readonly Func<IList<TSource>, TResult> _resultSelector;
 
             public _(Func<IList<TSource>, TResult> resultSelector, IObserver<TResult> observer)
                 : base(observer)
             {
                 _resultSelector = resultSelector;
+
+                // NB: These will be set in Run before getting used.
+                _hasValue = null!;
+                _values = null!;
+                _isDone = null!;
+                _subscriptions = null!;
             }
 
-            private object _gate;
             private bool[] _hasValue;
             private bool _hasValueAll;
-            private List<TSource> _values;
+            private TSource[] _values;
             private bool[] _isDone;
             private IDisposable[] _subscriptions;
 
@@ -417,18 +421,12 @@ namespace System.Reactive.Linq.ObservableImpl
                 _hasValue = new bool[N];
                 _hasValueAll = false;
 
-                _values = new List<TSource>(N);
-                for (var i = 0; i < N; i++)
-                {
-                    _values.Add(default);
-                }
+                _values = new TSource[N];
 
                 _isDone = new bool[N];
 
                 _subscriptions = new IDisposable[N];
 
-                _gate = new object();
-
                 for (var i = 0; i < N; i++)
                 {
                     var j = i;

Rozdielové dáta súboru neboli zobrazené, pretože súbor je príliš veľký
+ 331 - 321
Rx.NET/Source/src/System.Reactive/Linq/Observable/Zip.NAry.cs


+ 12 - 6
Rx.NET/Source/src/System.Reactive/Linq/Observable/Zip.NAry.tt

@@ -8,8 +8,6 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-#nullable disable
-
 using System.Reactive.Disposables;
 
 namespace System.Reactive.Linq.ObservableImpl
@@ -61,20 +59,29 @@ for (var j = 1; j <= i; j++)
         {
             private readonly Func<<#=ts#>, TResult> _resultSelector;
 
+<#
+for (var j = 1; j <= i; j++)
+{
+#>
+            private readonly ZipObserver<T<#=j#>> _observer<#=j#>;
+<#
+}
+#>
+
             public _(Func<<#=ts#>, TResult> resultSelector, IObserver<TResult> observer)
                 : base(<#=i#>, observer)
             {
                 _resultSelector = resultSelector;
-            }
 
 <#
 for (var j = 1; j <= i; j++)
 {
 #>
-            private ZipObserver<T<#=j#>> _observer<#=j#>;
+                _observer<#=j#> = new ZipObserver<T<#=j#>>(_gate, this, <#=j - 1#>);
 <#
 }
 #>
+            }
 
             public void Run(<#=os#>)
             {
@@ -84,9 +91,8 @@ for (var j = 1; j <= i; j++)
 for (var j = 1; j <= i; j++)
 {
 #>
-                _observer<#=j#> = new ZipObserver<T<#=j#>>(_gate, this, <#=j - 1#>);
                 disposables[<#=j - 1#>] = _observer<#=j#>;
-                base.Queues[<#= j - 1#>] = _observer<#=j#>.Values;
+                Queues[<#= j - 1#>] = _observer<#=j#>.Values;
 
 <#
 }

+ 25 - 14
Rx.NET/Source/src/System.Reactive/Linq/Observable/Zip.cs

@@ -2,8 +2,6 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-#nullable disable
-
 using System.Collections;
 using System.Collections.Generic;
 using System.Linq;
@@ -40,10 +38,10 @@ namespace System.Reactive.Linq.ObservableImpl
                 private readonly object _gate;
 
                 private readonly FirstObserver _firstObserver;
-                private IDisposable _firstDisposable;
+                private IDisposable? _firstDisposable;
 
                 private readonly SecondObserver _secondObserver;
-                private IDisposable _secondDisposable;
+                private IDisposable? _secondDisposable;
 
                 public _(Func<TFirst, TSecond, TResult> resultSelector, IObserver<TResult> observer)
                     : base(observer)
@@ -53,8 +51,8 @@ namespace System.Reactive.Linq.ObservableImpl
                     _firstObserver = new FirstObserver(this);
                     _secondObserver = new SecondObserver(this);
 
-                    _firstObserver.Other = _secondObserver;
-                    _secondObserver.Other = _firstObserver;
+                    _firstObserver.SetOther(_secondObserver);
+                    _secondObserver.SetOther(_firstObserver);
 
                     _resultSelector = resultSelector;
                 }
@@ -80,22 +78,24 @@ namespace System.Reactive.Linq.ObservableImpl
                             _secondObserver.Dispose();
                         }
                     }
+
                     base.Dispose(disposing);
                 }
 
                 private sealed class FirstObserver : IObserver<TFirst>, IDisposable
                 {
                     private readonly _ _parent;
-                    private SecondObserver _other;
                     private readonly Queue<TFirst> _queue;
+                    private SecondObserver _other;
 
                     public FirstObserver(_ parent)
                     {
                         _parent = parent;
                         _queue = new Queue<TFirst>();
+                        _other = default!; // NB: Will be set by SetOther.
                     }
 
-                    public SecondObserver Other { set { _other = value; } }
+                    public void SetOther(SecondObserver other) { _other = other; }
 
                     public Queue<TFirst> Queue => _queue;
                     public bool Done { get; private set; }
@@ -168,16 +168,17 @@ namespace System.Reactive.Linq.ObservableImpl
                 private sealed class SecondObserver : IObserver<TSecond>, IDisposable
                 {
                     private readonly _ _parent;
-                    private FirstObserver _other;
                     private readonly Queue<TSecond> _queue;
+                    private FirstObserver _other;
 
                     public SecondObserver(_ parent)
                     {
                         _parent = parent;
                         _queue = new Queue<TSecond>();
+                        _other = default!; // NB: Will be set by SetOther.
                     }
 
-                    public FirstObserver Other { set { _other = value; } }
+                    public void SetOther(FirstObserver other) { _other = other; }
 
                     public Queue<TSecond> Queue => _queue;
                     public bool Done { get; private set; }
@@ -278,7 +279,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
                 int _enumerationInProgress;
 
-                private IEnumerator<TSecond> _rightEnumerator;
+                private IEnumerator<TSecond>? _rightEnumerator;
 
                 private static readonly IEnumerator<TSecond> DisposedEnumerator = MakeDisposedEnumerator();
 
@@ -324,28 +325,34 @@ namespace System.Reactive.Linq.ObservableImpl
                             Interlocked.Exchange(ref _rightEnumerator, DisposedEnumerator)?.Dispose();
                         }
                     }
+
                     base.Dispose(disposing);
                 }
 
                 public override void OnNext(TFirst value)
                 {
                     var currentEnumerator = Volatile.Read(ref _rightEnumerator);
+
                     if (currentEnumerator == DisposedEnumerator)
                     {
                         return;
                     }
+
                     if (Interlocked.Increment(ref _enumerationInProgress) != 1)
                     {
                         return;
                     }
+
                     bool hasNext;
                     TSecond right = default;
                     var wasDisposed = false;
+
                     try
                     {
                         try
                         {
-                            hasNext = currentEnumerator.MoveNext();
+                            hasNext = currentEnumerator!.MoveNext();
+
                             if (hasNext)
                             {
                                 right = currentEnumerator.Current;
@@ -376,7 +383,7 @@ namespace System.Reactive.Linq.ObservableImpl
                         TResult result;
                         try
                         {
-                            result = _resultSelector(value, right);
+                            result = _resultSelector(value, right!); // NB: Not null when hasNext is true.
                         }
                         catch (Exception ex)
                         {
@@ -590,11 +597,15 @@ namespace System.Reactive.Linq.ObservableImpl
                 : base(observer)
             {
                 _gate = new object();
+
+                // NB: These will be set in Run before getting used.
+                _queues = null!;
+                _isDone = null!;
             }
 
             private Queue<TSource>[] _queues;
             private bool[] _isDone;
-            private IDisposable[] _subscriptions;
+            private IDisposable[]? _subscriptions;
 
             public void Run(IEnumerable<IObservable<TSource>> sources)
             {

Niektoré súbory nie sú zobrazené, pretože je v týchto rozdielových dátach zmenené mnoho súborov