Pārlūkot izejas kodu

Merge pull request #1359 from dotnet/dev/bartde/rx_nullable_part17

Enable #nullable for some straightforward operators.
Bart J.F. De Smet 5 gadi atpakaļ
vecāks
revīzija
15f1cc6e25

+ 0 - 2
Rx.NET/Source/src/System.Reactive/Linq/Observable/Amb.cs

@@ -2,8 +2,6 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-#nullable disable
-
 using System.Threading;
 
 namespace System.Reactive.Linq.ObservableImpl

+ 10 - 4
Rx.NET/Source/src/System.Reactive/Linq/Observable/AmbMany.cs

@@ -2,8 +2,6 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-#nullable disable
-
 using System.Collections.Generic;
 using System.Linq;
 using System.Reactive.Disposables;
@@ -57,18 +55,22 @@ namespace System.Reactive.Linq.ObservableImpl
     internal sealed class AmbCoordinator<T> : IDisposable
     {
         private readonly IObserver<T> _downstream;
-        private readonly InnerObserver[] _observers;
+        private readonly InnerObserver?[] _observers;
         private int _winner;
 
         internal AmbCoordinator(IObserver<T> downstream, int n)
         {
             _downstream = downstream;
-            var o = new InnerObserver[n];
+
+            var o = new InnerObserver?[n];
+
             for (var i = 0; i < n; i++)
             {
                 o[i] = new InnerObserver(this, i);
             }
+
             _observers = o;
+
             Volatile.Write(ref _winner, -1);
         }
 
@@ -98,10 +100,12 @@ namespace System.Reactive.Linq.ObservableImpl
             for (var i = 0; i < _observers.Length; i++)
             {
                 var inner = Volatile.Read(ref _observers[i]);
+
                 if (inner == null)
                 {
                     break;
                 }
+
                 inner.Run(sources[i]);
             }
         }
@@ -125,8 +129,10 @@ namespace System.Reactive.Linq.ObservableImpl
                         Interlocked.Exchange(ref _observers[i], null)?.Dispose();
                     }
                 }
+
                 return true;
             }
+
             return false;
         }
 

+ 1 - 2
Rx.NET/Source/src/System.Reactive/Linq/Observable/Case.cs

@@ -2,13 +2,12 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-#nullable disable
-
 using System.Collections.Generic;
 
 namespace System.Reactive.Linq.ObservableImpl
 {
     internal sealed class Case<TValue, TResult> : Producer<TResult, Case<TValue, TResult>._>, IEvaluatableObservable<TResult>
+        where TValue : notnull
     {
         private readonly Func<TValue> _selector;
         private readonly IDictionary<TValue, IObservable<TResult>> _sources;

+ 4 - 5
Rx.NET/Source/src/System.Reactive/Linq/Observable/Catch.cs

@@ -2,8 +2,6 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-#nullable disable
-
 using System.Collections.Generic;
 using System.Reactive.Disposables;
 using System.Threading;
@@ -30,7 +28,7 @@ namespace System.Reactive.Linq.ObservableImpl
             {
             }
 
-            protected override IEnumerable<IObservable<TSource>> Extract(IObservable<TSource> source)
+            protected override IEnumerable<IObservable<TSource>>? Extract(IObservable<TSource> source)
             {
                 if (source is Catch<TSource> @catch)
                 {
@@ -40,7 +38,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 return null;
             }
 
-            private Exception _lastException;
+            private Exception? _lastException;
 
             public override void OnError(Exception error)
             {
@@ -99,7 +97,7 @@ namespace System.Reactive.Linq.ObservableImpl
             }
 
             private bool _once;
-            private IDisposable _subscription;
+            private IDisposable? _subscription;
 
             public override void Run(IObservable<TSource> source)
             {
@@ -112,6 +110,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 {
                     Disposable.Dispose(ref _subscription);
                 }
+
                 base.Dispose(disposing);
             }
 

+ 0 - 2
Rx.NET/Source/src/System.Reactive/Linq/Observable/Concat.cs

@@ -2,8 +2,6 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-#nullable disable
-
 using System.Collections.Generic;
 
 namespace System.Reactive.Linq.ObservableImpl

+ 13 - 18
Rx.NET/Source/src/System.Reactive/Linq/Observable/ConcatMany.cs

@@ -2,8 +2,6 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-#nullable disable
-
 using System.Collections.Concurrent;
 using System.Reactive.Disposables;
 using System.Threading;
@@ -25,9 +23,12 @@ namespace System.Reactive.Linq.ObservableImpl
             {
                 throw new ArgumentNullException(nameof(observer));
             }
+
             var parent = new ConcatManyOuterObserver(observer);
+
             var d = _sources.SubscribeSafe(parent);
             parent.OnSubscribe(d);
+
             return parent;
         }
 
@@ -36,9 +37,10 @@ namespace System.Reactive.Linq.ObservableImpl
             private readonly IObserver<T> _downstream;
             private readonly ConcurrentQueue<IObservable<T>> _queue;
             private readonly InnerObserver _innerObserver;
-            private IDisposable _upstream;
+
+            private IDisposable? _upstream;
             private int _trampoline;
-            private Exception _error;
+            private Exception? _error;
             private bool _done;
             private int _active;
 
@@ -176,7 +178,7 @@ namespace System.Reactive.Linq.ObservableImpl
             {
                 private readonly ConcatManyOuterObserver _parent;
 
-                internal IDisposable Upstream;
+                internal IDisposable? Upstream;
 
                 internal InnerObserver(ConcatManyOuterObserver parent)
                 {
@@ -191,14 +193,16 @@ namespace System.Reactive.Linq.ObservableImpl
                 internal bool Finish()
                 {
                     var sad = Volatile.Read(ref Upstream);
+
                     if (sad != BooleanDisposable.True)
                     {
                         if (Interlocked.CompareExchange(ref Upstream, null, sad) == sad)
                         {
-                            sad.Dispose();
+                            sad!.Dispose(); // NB: Cannot be null when we get here; SetDisposable is called before Inner[Error|Completed] calls Finish.
                             return true;
                         }
                     }
+
                     return false;
                 }
 
@@ -207,20 +211,11 @@ namespace System.Reactive.Linq.ObservableImpl
                     Disposable.Dispose(ref Upstream);
                 }
 
-                public void OnCompleted()
-                {
-                    _parent.InnerComplete();
-                }
+                public void OnCompleted() => _parent.InnerComplete();
 
-                public void OnError(Exception error)
-                {
-                    _parent.InnerError(error);
-                }
+                public void OnError(Exception error) => _parent.InnerError(error);
 
-                public void OnNext(T value)
-                {
-                    _parent.InnerNext(value);
-                }
+                public void OnNext(T value) => _parent.InnerNext(value);
             }
         }
     }

+ 6 - 6
Rx.NET/Source/src/System.Reactive/Linq/Observable/SequenceEqual.cs

@@ -2,8 +2,6 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-#nullable disable
-
 using System.Collections.Generic;
 using System.Reactive.Disposables;
 using System.Threading;
@@ -48,7 +46,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 private bool _donel;
                 private bool _doner;
 
-                private IDisposable _second;
+                private IDisposable? _second;
 
                 public void Run(Observable parent)
                 {
@@ -62,6 +60,7 @@ namespace System.Reactive.Linq.ObservableImpl
                     {
                         Disposable.Dispose(ref _second);
                     }
+
                     base.Dispose(disposing);
                 }
 
@@ -242,7 +241,7 @@ namespace System.Reactive.Linq.ObservableImpl
                     _comparer = comparer;
                 }
 
-                private IEnumerator<TSource> _enumerator;
+                private IEnumerator<TSource>? _enumerator;
 
                 private static readonly IEnumerator<TSource> DisposedEnumerator = MakeDisposedEnumerator();
 
@@ -286,6 +285,7 @@ namespace System.Reactive.Linq.ObservableImpl
                     {
                         Interlocked.Exchange(ref _enumerator, DisposedEnumerator)?.Dispose();
                     }
+
                     base.Dispose(disposing);
                 }
 
@@ -295,7 +295,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
                     try
                     {
-                        if (_enumerator.MoveNext())
+                        if (_enumerator!.MoveNext()) // NB: Non-null after Run is called.
                         {
                             var current = _enumerator.Current;
                             equal = _comparer.Equals(value, current);
@@ -319,7 +319,7 @@ namespace System.Reactive.Linq.ObservableImpl
                     bool hasNext;
                     try
                     {
-                        hasNext = _enumerator.MoveNext();
+                        hasNext = _enumerator!.MoveNext(); // NB: Non-null after Run is called.
                     }
                     catch (Exception exception)
                     {

+ 7 - 12
Rx.NET/Source/src/System.Reactive/Linq/Observable/WithLatestFrom.cs

@@ -2,8 +2,6 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-#nullable disable
-
 using System.Reactive.Disposables;
 
 namespace System.Reactive.Linq.ObservableImpl
@@ -27,6 +25,9 @@ namespace System.Reactive.Linq.ObservableImpl
 
         internal sealed class _ : IdentitySink<TResult>
         {
+            private readonly object _gate = new object();
+            private readonly object _latestGate = new object();
+
             private readonly Func<TFirst, TSecond, TResult> _resultSelector;
 
             public _(Func<TFirst, TSecond, TResult> resultSelector, IObserver<TResult> observer)
@@ -35,19 +36,13 @@ namespace System.Reactive.Linq.ObservableImpl
                 _resultSelector = resultSelector;
             }
 
-            private object _gate;
             private volatile bool _hasLatest;
-            private TSecond _latest;
-
-            private object _latestGate;
+            private TSecond? _latest;
 
-            private IDisposable _secondDisposable;
+            private IDisposable? _secondDisposable;
 
             public void Run(IObservable<TFirst> first, IObservable<TSecond> second)
             {
-                _gate = new object();
-                _latestGate = new object();
-
                 var fstO = new FirstObserver(this);
                 var sndO = new SecondObserver(this);
 
@@ -61,6 +56,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 {
                     Disposable.Dispose(ref _secondDisposable);
                 }
+
                 base.Dispose(disposing);
             }
 
@@ -93,12 +89,11 @@ namespace System.Reactive.Linq.ObservableImpl
                 {
                     if (_parent._hasLatest) // Volatile read
                     {
-
                         TSecond latest;
 
                         lock (_parent._latestGate)
                         {
-                            latest = _parent._latest;
+                            latest = _parent._latest!; // NB: Not null when hasLatest is true.
                         }
 
                         TResult res;