浏览代码

Enable #nullable on subjects.

Bart De Smet 5 年之前
父节点
当前提交
19af7524aa

+ 25 - 10
Rx.NET/Source/src/System.Reactive/Subjects/AsyncSubject.cs

@@ -2,8 +2,6 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-#nullable disable
-
 using System.Reactive.Disposables;
 using System.Runtime.CompilerServices;
 using System.Threading;
@@ -20,9 +18,9 @@ namespace System.Reactive.Subjects
         #region Fields
 
         private AsyncSubjectDisposable[] _observers;
-        private T _value;
+        private T? _value;
         private bool _hasValue;
-        private Exception _exception;
+        private Exception? _exception;
 
         /// <summary>
         /// A pre-allocated empty array indicating the AsyncSubject has terminated
@@ -74,19 +72,23 @@ namespace System.Reactive.Subjects
             for (; ; )
             {
                 var observers = Volatile.Read(ref _observers);
+
                 if (observers == Disposed)
                 {
                     _exception = null;
                     ThrowDisposed();
                     break;
                 }
+
                 if (observers == Terminated)
                 {
                     break;
                 }
+
                 if (Interlocked.CompareExchange(ref _observers, Terminated, observers) == observers)
                 {
                     var hasValue = _hasValue;
+
                     if (hasValue)
                     {
                         var value = _value;
@@ -95,7 +97,7 @@ namespace System.Reactive.Subjects
                         {
                             if (!o.IsDisposed())
                             {
-                                o.Downstream.OnNext(value);
+                                o.Downstream.OnNext(value!);
                                 o.Downstream.OnCompleted();
                             }
                         }
@@ -129,6 +131,7 @@ namespace System.Reactive.Subjects
             for (; ; )
             {
                 var observers = Volatile.Read(ref _observers);
+
                 if (observers == Disposed)
                 {
                     _exception = null;
@@ -136,11 +139,14 @@ namespace System.Reactive.Subjects
                     ThrowDisposed();
                     break;
                 }
+
                 if (observers == Terminated)
                 {
                     break;
                 }
+
                 _exception = error;
+
                 if (Interlocked.CompareExchange(ref _observers, Terminated, observers) == observers)
                 {
                     foreach (var o in observers)
@@ -162,6 +168,7 @@ namespace System.Reactive.Subjects
         public override void OnNext(T value)
         {
             var observers = Volatile.Read(ref _observers);
+
             if (observers == Disposed)
             {
                 _value = default;
@@ -169,6 +176,7 @@ namespace System.Reactive.Subjects
                 ThrowDisposed();
                 return;
             }
+
             if (observers == Terminated)
             {
                 return;
@@ -200,6 +208,7 @@ namespace System.Reactive.Subjects
             if (!Add(parent))
             {
                 var ex = _exception;
+
                 if (ex != null)
                 {
                     observer.OnError(ex);
@@ -208,10 +217,12 @@ namespace System.Reactive.Subjects
                 {
                     if (_hasValue)
                     {
-                        observer.OnNext(_value);
+                        observer.OnNext(_value!);
                     }
+
                     observer.OnCompleted();
                 }
+
                 return Disposable.Empty;
             }
 
@@ -223,6 +234,7 @@ namespace System.Reactive.Subjects
             for (; ; )
             {
                 var a = Volatile.Read(ref _observers);
+
                 if (a == Disposed)
                 {
                     _value = default;
@@ -238,8 +250,10 @@ namespace System.Reactive.Subjects
 
                 var n = a.Length;
                 var b = new AsyncSubjectDisposable[n + 1];
+
                 Array.Copy(a, 0, b, 0, n);
                 b[n] = inner;
+
                 if (Interlocked.CompareExchange(ref _observers, b, a) == a)
                 {
                     return true;
@@ -284,6 +298,7 @@ namespace System.Reactive.Subjects
                 else
                 {
                     b = new AsyncSubjectDisposable[n - 1];
+
                     Array.Copy(a, 0, b, 0, j);
                     Array.Copy(a, j + 1, b, j, n - j - 1);
                 }
@@ -301,7 +316,7 @@ namespace System.Reactive.Subjects
         private sealed class AsyncSubjectDisposable : IDisposable
         {
             internal readonly IObserver<T> Downstream;
-            private AsyncSubject<T> _parent;
+            private AsyncSubject<T>? _parent;
 
             public AsyncSubjectDisposable(AsyncSubject<T> parent, IObserver<T> downstream)
             {
@@ -377,7 +392,7 @@ namespace System.Reactive.Subjects
 
         private sealed class AwaitObserver : IObserver<T>
         {
-            private readonly SynchronizationContext _context;
+            private readonly SynchronizationContext? _context;
             private readonly Action _callback;
 
             public AwaitObserver(Action callback, bool originalContext)
@@ -407,7 +422,7 @@ namespace System.Reactive.Subjects
                     // Task objects or the async method builder's interaction with the
                     // SynchronizationContext object.
                     //
-                    _context.Post(static c => ((Action)c)(), _callback);
+                    _context.Post(static c => ((Action)c!)(), _callback);
                 }
                 else
                 {
@@ -443,7 +458,7 @@ namespace System.Reactive.Subjects
                 throw new InvalidOperationException(Strings_Linq.NO_ELEMENTS);
             }
 
-            return _value;
+            return _value!;
         }
 
         #endregion

+ 9 - 10
Rx.NET/Source/src/System.Reactive/Subjects/BehaviorSubject.cs

@@ -2,8 +2,7 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-#nullable disable
-
+using System.Diagnostics.CodeAnalysis;
 using System.Reactive.Disposables;
 
 namespace System.Reactive.Subjects
@@ -22,7 +21,7 @@ namespace System.Reactive.Subjects
         private ImmutableList<IObserver<T>> _observers;
         private bool _isStopped;
         private T _value;
-        private Exception _exception;
+        private Exception? _exception;
         private bool _isDisposed;
 
         #endregion
@@ -111,7 +110,7 @@ namespace System.Reactive.Subjects
         /// In some cases, it may be necessary for a caller to use external synchronization to avoid race conditions.
         /// </alert>
         /// </remarks>
-        public bool TryGetValue(out T value)
+        public bool TryGetValue([MaybeNullWhen(false)] out T value)
         {
             lock (_gate)
             {
@@ -138,7 +137,7 @@ namespace System.Reactive.Subjects
         /// </summary>
         public override void OnCompleted()
         {
-            IObserver<T>[] os = null;
+            IObserver<T>[]? os = null;
 
             lock (_gate)
             {
@@ -173,7 +172,7 @@ namespace System.Reactive.Subjects
                 throw new ArgumentNullException(nameof(error));
             }
 
-            IObserver<T>[] os = null;
+            IObserver<T>[]? os = null;
 
             lock (_gate)
             {
@@ -203,7 +202,7 @@ namespace System.Reactive.Subjects
         /// <param name="value">The value to send to all observers.</param>
         public override void OnNext(T value)
         {
-            IObserver<T>[] os = null;
+            IObserver<T>[]? os = null;
 
             lock (_gate)
             {
@@ -242,7 +241,7 @@ namespace System.Reactive.Subjects
                 throw new ArgumentNullException(nameof(observer));
             }
 
-            Exception ex;
+            Exception? ex;
 
             lock (_gate)
             {
@@ -282,8 +281,8 @@ namespace System.Reactive.Subjects
             lock (_gate)
             {
                 _isDisposed = true;
-                _observers = null;
-                _value = default;
+                _observers = null!; // NB: Disposed checks happen prior to accessing _observers.
+                _value = default!;
                 _exception = null;
             }
         }

+ 9 - 11
Rx.NET/Source/src/System.Reactive/Subjects/ReplaySubject.cs

@@ -2,8 +2,6 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-#nullable disable
-
 using System.Collections.Generic;
 using System.Reactive.Concurrency;
 using System.Reactive.Disposables;
@@ -228,7 +226,7 @@ namespace System.Reactive.Subjects
             private ImmutableList<IScheduledObserver<T>> _observers;
 
             private bool _isStopped;
-            private Exception _error;
+            private Exception? _error;
             private bool _isDisposed;
 
             protected ReplayBase()
@@ -254,7 +252,7 @@ namespace System.Reactive.Subjects
 
             public override void OnNext(T value)
             {
-                IScheduledObserver<T>[] o = null;
+                IScheduledObserver<T>[]? o = null;
 
                 lock (_gate)
                 {
@@ -284,7 +282,7 @@ namespace System.Reactive.Subjects
 
             public override void OnError(Exception error)
             {
-                IScheduledObserver<T>[] o = null;
+                IScheduledObserver<T>[]? o = null;
 
                 lock (_gate)
                 {
@@ -317,7 +315,7 @@ namespace System.Reactive.Subjects
 
             public override void OnCompleted()
             {
-                IScheduledObserver<T>[] o = null;
+                IScheduledObserver<T>[]? o = null;
 
                 lock (_gate)
                 {
@@ -410,7 +408,7 @@ namespace System.Reactive.Subjects
                 lock (_gate)
                 {
                     _isDisposed = true;
-                    _observers = null;
+                    _observers = null!; // NB: Disposed checks happen prior to accessing _observers.
                     DisposeCore();
                 }
             }
@@ -576,7 +574,7 @@ namespace System.Reactive.Subjects
         private sealed class ReplayOne : ReplayBufferBase
         {
             private bool _hasValue;
-            private T _value;
+            private T? _value;
 
             protected override void Trim()
             {
@@ -598,7 +596,7 @@ namespace System.Reactive.Subjects
                 if (_hasValue)
                 {
                     n = 1;
-                    observer.OnNext(_value);
+                    observer.OnNext(_value!);
                 }
 
                 return n;
@@ -714,12 +712,12 @@ namespace System.Reactive.Subjects
         /// Standby queue to swap out for _queue when transferring ownership. This allows to reuse
         /// queues in case of busy subjects where the initial replay doesn't suffice to catch up.
         /// </summary>
-        private Queue<T> _queue2;
+        private Queue<T>? _queue2;
 
         /// <summary>
         /// Exception passed to an OnError notification, if any.
         /// </summary>
-        private Exception _error;
+        private Exception? _error;
 
         /// <summary>
         /// Indicates whether an OnCompleted notification was received.

+ 39 - 26
Rx.NET/Source/src/System.Reactive/Subjects/Subject.cs

@@ -2,8 +2,6 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-#nullable disable
-
 using System.Reactive.Disposables;
 using System.Threading;
 
@@ -18,8 +16,8 @@ namespace System.Reactive.Subjects
     {
         #region Fields
 
-        private SubjectDisposable[] _observers;
-        private Exception _exception;
+        private volatile SubjectDisposable[] _observers;
+        private Exception? _exception;
         private static readonly SubjectDisposable[] Terminated = new SubjectDisposable[0];
         private static readonly SubjectDisposable[] Disposed = new SubjectDisposable[0];
 
@@ -32,7 +30,7 @@ namespace System.Reactive.Subjects
         /// </summary>
         public Subject()
         {
-            Volatile.Write(ref _observers, Array.Empty<SubjectDisposable>());
+            _observers = Array.Empty<SubjectDisposable>();
         }
 
         #endregion
@@ -42,18 +40,12 @@ namespace System.Reactive.Subjects
         /// <summary>
         /// Indicates whether the subject has observers subscribed to it.
         /// </summary>
-        public override bool HasObservers
-        {
-            get
-            {
-                return Volatile.Read(ref _observers).Length != 0;
-            }
-        }
+        public override bool HasObservers => _observers.Length != 0;
 
         /// <summary>
         /// Indicates whether the subject has been disposed.
         /// </summary>
-        public override bool IsDisposed => Volatile.Read(ref _observers) == Disposed;
+        public override bool IsDisposed => _observers == Disposed;
 
         #endregion
 
@@ -61,10 +53,7 @@ namespace System.Reactive.Subjects
 
         #region IObserver<T> implementation
 
-        private static void ThrowDisposed()
-        {
-            throw new ObjectDisposedException(string.Empty);
-        }
+        private static void ThrowDisposed() => throw new ObjectDisposedException(string.Empty);
 
         /// <summary>
         /// Notifies all subscribed observers about the end of the sequence.
@@ -73,23 +62,27 @@ namespace System.Reactive.Subjects
         {
             for (; ; )
             {
-                var observers = Volatile.Read(ref _observers);
+                var observers = _observers;
+
                 if (observers == Disposed)
                 {
                     _exception = null;
                     ThrowDisposed();
                     break;
                 }
+
                 if (observers == Terminated)
                 {
                     break;
                 }
+
                 if (Interlocked.CompareExchange(ref _observers, Terminated, observers) == observers)
                 {
                     foreach (var observer in observers)
                     {
                         observer.Observer?.OnCompleted();
                     }
+
                     break;
                 }
             }
@@ -109,24 +102,29 @@ namespace System.Reactive.Subjects
 
             for (; ; )
             {
-                var observers = Volatile.Read(ref _observers);
+                var observers = _observers;
+
                 if (observers == Disposed)
                 {
                     _exception = null;
                     ThrowDisposed();
                     break;
                 }
+
                 if (observers == Terminated)
                 {
                     break;
                 }
+
                 _exception = error;
+
                 if (Interlocked.CompareExchange(ref _observers, Terminated, observers) == observers)
                 {
                     foreach (var observer in observers)
                     {
                         observer.Observer?.OnError(error);
                     }
+
                     break;
                 }
             }
@@ -138,13 +136,15 @@ namespace System.Reactive.Subjects
         /// <param name="value">The value to send to all currently subscribed observers.</param>
         public override void OnNext(T value)
         {
-            var observers = Volatile.Read(ref _observers);
+            var observers = _observers;
+
             if (observers == Disposed)
             {
                 _exception = null;
                 ThrowDisposed();
                 return;
             }
+
             foreach (var observer in observers)
             {
                 observer.Observer?.OnNext(value);
@@ -171,16 +171,20 @@ namespace System.Reactive.Subjects
             var disposable = default(SubjectDisposable);
             for (; ; )
             {
-                var observers = Volatile.Read(ref _observers);
+                var observers = _observers;
+
                 if (observers == Disposed)
                 {
                     _exception = null;
                     ThrowDisposed();
+
                     break;
                 }
+
                 if (observers == Terminated)
                 {
                     var ex = _exception;
+
                     if (ex != null)
                     {
                         observer.OnError(ex);
@@ -189,6 +193,7 @@ namespace System.Reactive.Subjects
                     {
                         observer.OnCompleted();
                     }
+
                     break;
                 }
 
@@ -199,13 +204,17 @@ namespace System.Reactive.Subjects
 
                 var n = observers.Length;
                 var b = new SubjectDisposable[n + 1];
+
                 Array.Copy(observers, 0, b, 0, n);
+
                 b[n] = disposable;
+
                 if (Interlocked.CompareExchange(ref _observers, b, observers) == observers)
                 {
                     return disposable;
                 }
             }
+
             return Disposable.Empty;
         }
 
@@ -213,8 +222,9 @@ namespace System.Reactive.Subjects
         {
             for (; ; )
             {
-                var a = Volatile.Read(ref _observers);
+                var a = _observers;
                 var n = a.Length;
+
                 if (n == 0)
                 {
                     break;
@@ -228,6 +238,7 @@ namespace System.Reactive.Subjects
                 }
 
                 SubjectDisposable[] b;
+
                 if (n == 1)
                 {
                     b = Array.Empty<SubjectDisposable>();
@@ -235,9 +246,11 @@ namespace System.Reactive.Subjects
                 else
                 {
                     b = new SubjectDisposable[n - 1];
+
                     Array.Copy(a, 0, b, 0, j);
                     Array.Copy(a, j + 1, b, j, n - j - 1);
                 }
+
                 if (Interlocked.CompareExchange(ref _observers, b, a) == a)
                 {
                     break;
@@ -248,12 +261,12 @@ namespace System.Reactive.Subjects
         private sealed class SubjectDisposable : IDisposable
         {
             private Subject<T> _subject;
-            private IObserver<T> _observer;
+            private volatile IObserver<T>? _observer;
 
             public SubjectDisposable(Subject<T> subject, IObserver<T> observer)
             {
                 _subject = subject;
-                Volatile.Write(ref _observer, observer);
+                _observer = observer;
             }
 
             public void Dispose()
@@ -265,10 +278,10 @@ namespace System.Reactive.Subjects
                 }
 
                 _subject.Unsubscribe(this);
-                _subject = null;
+                _subject = null!;
             }
 
-            public IObserver<T> Observer { get { return Volatile.Read(ref _observer); } }
+            public IObserver<T>? Observer => _observer;
         }
 
         #endregion

+ 1 - 1
Rx.NET/Source/tests/Tests.System.Reactive.ApiApprovals/Api/ApiApprovalTests.Core.verified.cs

@@ -2480,7 +2480,7 @@ namespace System.Reactive.Subjects
         public override void OnError(System.Exception error) { }
         public override void OnNext(T value) { }
         public override System.IDisposable Subscribe(System.IObserver<T> observer) { }
-        public bool TryGetValue(out T value) { }
+        public bool TryGetValue([System.Diagnostics.CodeAnalysis.MaybeNullWhen(false)] out T value) { }
     }
     public interface IConnectableObservable<out T> : System.IObservable<T>
     {