Explorar o código

Merge pull request #1372 from dotnet/dev/bartde/rx_nullable_subjects

Enable #nullable for subjects.
Bart J.F. De Smet %!s(int64=5) %!d(string=hai) anos
pai
achega
638875b383

+ 31 - 21
Rx.NET/Source/src/System.Reactive/Subjects/AsyncSubject.cs

@@ -2,8 +2,6 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-#nullable disable
-
 using System.Reactive.Disposables;
 using System.Runtime.CompilerServices;
 using System.Threading;
@@ -20,17 +18,17 @@ namespace System.Reactive.Subjects
         #region Fields
 
         private AsyncSubjectDisposable[] _observers;
-        private T _value;
+        private T? _value;
         private bool _hasValue;
-        private Exception _exception;
+        private Exception? _exception;
 
         /// <summary>
-        /// A pre-allocated empty array indicating the AsyncSubject has terminated
+        /// A pre-allocated empty array indicating the AsyncSubject has terminated.
         /// </summary>
         private static readonly AsyncSubjectDisposable[] Terminated = new AsyncSubjectDisposable[0];
 
         /// <summary>
-        /// A pre-allocated empty array indicating the AsyncSubject has terminated
+        /// A pre-allocated empty array indicating the AsyncSubject has been disposed.
         /// </summary>
         private static readonly AsyncSubjectDisposable[] Disposed = new AsyncSubjectDisposable[0];
 
@@ -41,10 +39,7 @@ namespace System.Reactive.Subjects
         /// <summary>
         /// Creates a subject that can only receive one value and that value is cached for all future observations.
         /// </summary>
-        public AsyncSubject()
-        {
-            _observers = Array.Empty<AsyncSubjectDisposable>();
-        }
+        public AsyncSubject() => _observers = Array.Empty<AsyncSubjectDisposable>();
 
         #endregion
 
@@ -53,7 +48,7 @@ namespace System.Reactive.Subjects
         /// <summary>
         /// Indicates whether the subject has observers subscribed to it.
         /// </summary>
-        public override bool HasObservers => _observers.Length != 0;
+        public override bool HasObservers => Volatile.Read(ref _observers).Length != 0;
 
         /// <summary>
         /// Indicates whether the subject has been disposed.
@@ -74,19 +69,23 @@ namespace System.Reactive.Subjects
             for (; ; )
             {
                 var observers = Volatile.Read(ref _observers);
+
                 if (observers == Disposed)
                 {
                     _exception = null;
                     ThrowDisposed();
                     break;
                 }
+
                 if (observers == Terminated)
                 {
                     break;
                 }
+
                 if (Interlocked.CompareExchange(ref _observers, Terminated, observers) == observers)
                 {
                     var hasValue = _hasValue;
+
                     if (hasValue)
                     {
                         var value = _value;
@@ -95,7 +94,7 @@ namespace System.Reactive.Subjects
                         {
                             if (!o.IsDisposed())
                             {
-                                o.Downstream.OnNext(value);
+                                o.Downstream.OnNext(value!);
                                 o.Downstream.OnCompleted();
                             }
                         }
@@ -129,6 +128,7 @@ namespace System.Reactive.Subjects
             for (; ; )
             {
                 var observers = Volatile.Read(ref _observers);
+
                 if (observers == Disposed)
                 {
                     _exception = null;
@@ -136,11 +136,14 @@ namespace System.Reactive.Subjects
                     ThrowDisposed();
                     break;
                 }
+
                 if (observers == Terminated)
                 {
                     break;
                 }
+
                 _exception = error;
+
                 if (Interlocked.CompareExchange(ref _observers, Terminated, observers) == observers)
                 {
                     foreach (var o in observers)
@@ -162,6 +165,7 @@ namespace System.Reactive.Subjects
         public override void OnNext(T value)
         {
             var observers = Volatile.Read(ref _observers);
+
             if (observers == Disposed)
             {
                 _value = default;
@@ -169,6 +173,7 @@ namespace System.Reactive.Subjects
                 ThrowDisposed();
                 return;
             }
+
             if (observers == Terminated)
             {
                 return;
@@ -200,6 +205,7 @@ namespace System.Reactive.Subjects
             if (!Add(parent))
             {
                 var ex = _exception;
+
                 if (ex != null)
                 {
                     observer.OnError(ex);
@@ -208,10 +214,12 @@ namespace System.Reactive.Subjects
                 {
                     if (_hasValue)
                     {
-                        observer.OnNext(_value);
+                        observer.OnNext(_value!);
                     }
+
                     observer.OnCompleted();
                 }
+
                 return Disposable.Empty;
             }
 
@@ -223,6 +231,7 @@ namespace System.Reactive.Subjects
             for (; ; )
             {
                 var a = Volatile.Read(ref _observers);
+
                 if (a == Disposed)
                 {
                     _value = default;
@@ -238,8 +247,10 @@ namespace System.Reactive.Subjects
 
                 var n = a.Length;
                 var b = new AsyncSubjectDisposable[n + 1];
+
                 Array.Copy(a, 0, b, 0, n);
                 b[n] = inner;
+
                 if (Interlocked.CompareExchange(ref _observers, b, a) == a)
                 {
                     return true;
@@ -277,6 +288,7 @@ namespace System.Reactive.Subjects
                 }
 
                 AsyncSubjectDisposable[] b;
+
                 if (n == 1)
                 {
                     b = Array.Empty<AsyncSubjectDisposable>();
@@ -284,6 +296,7 @@ namespace System.Reactive.Subjects
                 else
                 {
                     b = new AsyncSubjectDisposable[n - 1];
+
                     Array.Copy(a, 0, b, 0, j);
                     Array.Copy(a, j + 1, b, j, n - j - 1);
                 }
@@ -301,7 +314,7 @@ namespace System.Reactive.Subjects
         private sealed class AsyncSubjectDisposable : IDisposable
         {
             internal readonly IObserver<T> Downstream;
-            private AsyncSubject<T> _parent;
+            private AsyncSubject<T>? _parent;
 
             public AsyncSubjectDisposable(AsyncSubject<T> parent, IObserver<T> downstream)
             {
@@ -324,10 +337,7 @@ namespace System.Reactive.Subjects
 
         #region IDisposable implementation
 
-        private static void ThrowDisposed()
-        {
-            throw new ObjectDisposedException(string.Empty);
-        }
+        private static void ThrowDisposed() => throw new ObjectDisposedException(string.Empty);
 
         /// <summary>
         /// Unsubscribe all observers and release resources.
@@ -377,7 +387,7 @@ namespace System.Reactive.Subjects
 
         private sealed class AwaitObserver : IObserver<T>
         {
-            private readonly SynchronizationContext _context;
+            private readonly SynchronizationContext? _context;
             private readonly Action _callback;
 
             public AwaitObserver(Action callback, bool originalContext)
@@ -407,7 +417,7 @@ namespace System.Reactive.Subjects
                     // Task objects or the async method builder's interaction with the
                     // SynchronizationContext object.
                     //
-                    _context.Post(static c => ((Action)c)(), _callback);
+                    _context.Post(static c => ((Action)c!)(), _callback);
                 }
                 else
                 {
@@ -443,7 +453,7 @@ namespace System.Reactive.Subjects
                 throw new InvalidOperationException(Strings_Linq.NO_ELEMENTS);
             }
 
-            return _value;
+            return _value!;
         }
 
         #endregion

+ 10 - 11
Rx.NET/Source/src/System.Reactive/Subjects/BehaviorSubject.cs

@@ -2,8 +2,7 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-#nullable disable
-
+using System.Diagnostics.CodeAnalysis;
 using System.Reactive.Disposables;
 
 namespace System.Reactive.Subjects
@@ -22,7 +21,7 @@ namespace System.Reactive.Subjects
         private ImmutableList<IObserver<T>> _observers;
         private bool _isStopped;
         private T _value;
-        private Exception _exception;
+        private Exception? _exception;
         private bool _isDisposed;
 
         #endregion
@@ -111,7 +110,7 @@ namespace System.Reactive.Subjects
         /// In some cases, it may be necessary for a caller to use external synchronization to avoid race conditions.
         /// </alert>
         /// </remarks>
-        public bool TryGetValue(out T value)
+        public bool TryGetValue([MaybeNullWhen(false)] out T value)
         {
             lock (_gate)
             {
@@ -138,7 +137,7 @@ namespace System.Reactive.Subjects
         /// </summary>
         public override void OnCompleted()
         {
-            IObserver<T>[] os = null;
+            IObserver<T>[]? os = null;
 
             lock (_gate)
             {
@@ -173,7 +172,7 @@ namespace System.Reactive.Subjects
                 throw new ArgumentNullException(nameof(error));
             }
 
-            IObserver<T>[] os = null;
+            IObserver<T>[]? os = null;
 
             lock (_gate)
             {
@@ -203,7 +202,7 @@ namespace System.Reactive.Subjects
         /// <param name="value">The value to send to all observers.</param>
         public override void OnNext(T value)
         {
-            IObserver<T>[] os = null;
+            IObserver<T>[]? os = null;
 
             lock (_gate)
             {
@@ -242,7 +241,7 @@ namespace System.Reactive.Subjects
                 throw new ArgumentNullException(nameof(observer));
             }
 
-            Exception ex;
+            Exception? ex;
 
             lock (_gate)
             {
@@ -282,8 +281,8 @@ namespace System.Reactive.Subjects
             lock (_gate)
             {
                 _isDisposed = true;
-                _observers = null;
-                _value = default;
+                _observers = null!; // NB: Disposed checks happen prior to accessing _observers.
+                _value = default!;
                 _exception = null;
             }
         }
@@ -301,7 +300,7 @@ namespace System.Reactive.Subjects
         private sealed class Subscription : IDisposable
         {
             private readonly BehaviorSubject<T> _subject;
-            private IObserver<T> _observer;
+            private IObserver<T>? _observer;
 
             public Subscription(BehaviorSubject<T> subject, IObserver<T> observer)
             {

+ 9 - 11
Rx.NET/Source/src/System.Reactive/Subjects/ReplaySubject.cs

@@ -2,8 +2,6 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-#nullable disable
-
 using System.Collections.Generic;
 using System.Reactive.Concurrency;
 using System.Reactive.Disposables;
@@ -228,7 +226,7 @@ namespace System.Reactive.Subjects
             private ImmutableList<IScheduledObserver<T>> _observers;
 
             private bool _isStopped;
-            private Exception _error;
+            private Exception? _error;
             private bool _isDisposed;
 
             protected ReplayBase()
@@ -254,7 +252,7 @@ namespace System.Reactive.Subjects
 
             public override void OnNext(T value)
             {
-                IScheduledObserver<T>[] o = null;
+                IScheduledObserver<T>[]? o = null;
 
                 lock (_gate)
                 {
@@ -284,7 +282,7 @@ namespace System.Reactive.Subjects
 
             public override void OnError(Exception error)
             {
-                IScheduledObserver<T>[] o = null;
+                IScheduledObserver<T>[]? o = null;
 
                 lock (_gate)
                 {
@@ -317,7 +315,7 @@ namespace System.Reactive.Subjects
 
             public override void OnCompleted()
             {
-                IScheduledObserver<T>[] o = null;
+                IScheduledObserver<T>[]? o = null;
 
                 lock (_gate)
                 {
@@ -410,7 +408,7 @@ namespace System.Reactive.Subjects
                 lock (_gate)
                 {
                     _isDisposed = true;
-                    _observers = null;
+                    _observers = null!; // NB: Disposed checks happen prior to accessing _observers.
                     DisposeCore();
                 }
             }
@@ -576,7 +574,7 @@ namespace System.Reactive.Subjects
         private sealed class ReplayOne : ReplayBufferBase
         {
             private bool _hasValue;
-            private T _value;
+            private T? _value;
 
             protected override void Trim()
             {
@@ -598,7 +596,7 @@ namespace System.Reactive.Subjects
                 if (_hasValue)
                 {
                     n = 1;
-                    observer.OnNext(_value);
+                    observer.OnNext(_value!);
                 }
 
                 return n;
@@ -714,12 +712,12 @@ namespace System.Reactive.Subjects
         /// Standby queue to swap out for _queue when transferring ownership. This allows to reuse
         /// queues in case of busy subjects where the initial replay doesn't suffice to catch up.
         /// </summary>
-        private Queue<T> _queue2;
+        private Queue<T>? _queue2;
 
         /// <summary>
         /// Exception passed to an OnError notification, if any.
         /// </summary>
-        private Exception _error;
+        private Exception? _error;
 
         /// <summary>
         /// Indicates whether an OnCompleted notification was received.

+ 32 - 22
Rx.NET/Source/src/System.Reactive/Subjects/Subject.cs

@@ -2,8 +2,6 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-#nullable disable
-
 using System.Reactive.Disposables;
 using System.Threading;
 
@@ -19,7 +17,7 @@ namespace System.Reactive.Subjects
         #region Fields
 
         private SubjectDisposable[] _observers;
-        private Exception _exception;
+        private Exception? _exception;
         private static readonly SubjectDisposable[] Terminated = new SubjectDisposable[0];
         private static readonly SubjectDisposable[] Disposed = new SubjectDisposable[0];
 
@@ -30,10 +28,7 @@ namespace System.Reactive.Subjects
         /// <summary>
         /// Creates a subject.
         /// </summary>
-        public Subject()
-        {
-            Volatile.Write(ref _observers, Array.Empty<SubjectDisposable>());
-        }
+        public Subject() => _observers = Array.Empty<SubjectDisposable>();
 
         #endregion
 
@@ -42,13 +37,7 @@ namespace System.Reactive.Subjects
         /// <summary>
         /// Indicates whether the subject has observers subscribed to it.
         /// </summary>
-        public override bool HasObservers
-        {
-            get
-            {
-                return Volatile.Read(ref _observers).Length != 0;
-            }
-        }
+        public override bool HasObservers => Volatile.Read(ref _observers).Length != 0;
 
         /// <summary>
         /// Indicates whether the subject has been disposed.
@@ -61,10 +50,7 @@ namespace System.Reactive.Subjects
 
         #region IObserver<T> implementation
 
-        private static void ThrowDisposed()
-        {
-            throw new ObjectDisposedException(string.Empty);
-        }
+        private static void ThrowDisposed() => throw new ObjectDisposedException(string.Empty);
 
         /// <summary>
         /// Notifies all subscribed observers about the end of the sequence.
@@ -74,22 +60,26 @@ namespace System.Reactive.Subjects
             for (; ; )
             {
                 var observers = Volatile.Read(ref _observers);
+
                 if (observers == Disposed)
                 {
                     _exception = null;
                     ThrowDisposed();
                     break;
                 }
+
                 if (observers == Terminated)
                 {
                     break;
                 }
+
                 if (Interlocked.CompareExchange(ref _observers, Terminated, observers) == observers)
                 {
                     foreach (var observer in observers)
                     {
                         observer.Observer?.OnCompleted();
                     }
+
                     break;
                 }
             }
@@ -110,23 +100,28 @@ namespace System.Reactive.Subjects
             for (; ; )
             {
                 var observers = Volatile.Read(ref _observers);
+
                 if (observers == Disposed)
                 {
                     _exception = null;
                     ThrowDisposed();
                     break;
                 }
+
                 if (observers == Terminated)
                 {
                     break;
                 }
+
                 _exception = error;
+
                 if (Interlocked.CompareExchange(ref _observers, Terminated, observers) == observers)
                 {
                     foreach (var observer in observers)
                     {
                         observer.Observer?.OnError(error);
                     }
+
                     break;
                 }
             }
@@ -139,12 +134,14 @@ namespace System.Reactive.Subjects
         public override void OnNext(T value)
         {
             var observers = Volatile.Read(ref _observers);
+
             if (observers == Disposed)
             {
                 _exception = null;
                 ThrowDisposed();
                 return;
             }
+
             foreach (var observer in observers)
             {
                 observer.Observer?.OnNext(value);
@@ -172,15 +169,19 @@ namespace System.Reactive.Subjects
             for (; ; )
             {
                 var observers = Volatile.Read(ref _observers);
+
                 if (observers == Disposed)
                 {
                     _exception = null;
                     ThrowDisposed();
+
                     break;
                 }
+
                 if (observers == Terminated)
                 {
                     var ex = _exception;
+
                     if (ex != null)
                     {
                         observer.OnError(ex);
@@ -189,6 +190,7 @@ namespace System.Reactive.Subjects
                     {
                         observer.OnCompleted();
                     }
+
                     break;
                 }
 
@@ -199,13 +201,17 @@ namespace System.Reactive.Subjects
 
                 var n = observers.Length;
                 var b = new SubjectDisposable[n + 1];
+
                 Array.Copy(observers, 0, b, 0, n);
+
                 b[n] = disposable;
+
                 if (Interlocked.CompareExchange(ref _observers, b, observers) == observers)
                 {
                     return disposable;
                 }
             }
+
             return Disposable.Empty;
         }
 
@@ -215,6 +221,7 @@ namespace System.Reactive.Subjects
             {
                 var a = Volatile.Read(ref _observers);
                 var n = a.Length;
+
                 if (n == 0)
                 {
                     break;
@@ -228,6 +235,7 @@ namespace System.Reactive.Subjects
                 }
 
                 SubjectDisposable[] b;
+
                 if (n == 1)
                 {
                     b = Array.Empty<SubjectDisposable>();
@@ -235,9 +243,11 @@ namespace System.Reactive.Subjects
                 else
                 {
                     b = new SubjectDisposable[n - 1];
+
                     Array.Copy(a, 0, b, 0, j);
                     Array.Copy(a, j + 1, b, j, n - j - 1);
                 }
+
                 if (Interlocked.CompareExchange(ref _observers, b, a) == a)
                 {
                     break;
@@ -248,12 +258,12 @@ namespace System.Reactive.Subjects
         private sealed class SubjectDisposable : IDisposable
         {
             private Subject<T> _subject;
-            private IObserver<T> _observer;
+            private volatile IObserver<T>? _observer;
 
             public SubjectDisposable(Subject<T> subject, IObserver<T> observer)
             {
                 _subject = subject;
-                Volatile.Write(ref _observer, observer);
+                _observer = observer;
             }
 
             public void Dispose()
@@ -265,10 +275,10 @@ namespace System.Reactive.Subjects
                 }
 
                 _subject.Unsubscribe(this);
-                _subject = null;
+                _subject = null!;
             }
 
-            public IObserver<T> Observer { get { return Volatile.Read(ref _observer); } }
+            public IObserver<T>? Observer => _observer;
         }
 
         #endregion

+ 1 - 1
Rx.NET/Source/tests/Tests.System.Reactive.ApiApprovals/Api/ApiApprovalTests.Core.verified.cs

@@ -2480,7 +2480,7 @@ namespace System.Reactive.Subjects
         public override void OnError(System.Exception error) { }
         public override void OnNext(T value) { }
         public override System.IDisposable Subscribe(System.IObserver<T> observer) { }
-        public bool TryGetValue(out T value) { }
+        public bool TryGetValue([System.Diagnostics.CodeAnalysis.MaybeNullWhen(false)] out T value) { }
     }
     public interface IConnectableObservable<out T> : System.IObservable<T>
     {