Răsfoiți Sursa

A few more fixes.

Bart De Smet 5 ani în urmă
părinte
comite
4002cad0f1

+ 6 - 11
Rx.NET/Source/src/System.Reactive/Subjects/AsyncSubject.cs

@@ -23,12 +23,12 @@ namespace System.Reactive.Subjects
         private Exception? _exception;
 
         /// <summary>
-        /// A pre-allocated empty array indicating the AsyncSubject has terminated
+        /// A pre-allocated empty array indicating the AsyncSubject has terminated.
         /// </summary>
         private static readonly AsyncSubjectDisposable[] Terminated = new AsyncSubjectDisposable[0];
 
         /// <summary>
-        /// A pre-allocated empty array indicating the AsyncSubject has terminated
+        /// A pre-allocated empty array indicating the AsyncSubject has been disposed.
         /// </summary>
         private static readonly AsyncSubjectDisposable[] Disposed = new AsyncSubjectDisposable[0];
 
@@ -39,10 +39,7 @@ namespace System.Reactive.Subjects
         /// <summary>
         /// Creates a subject that can only receive one value and that value is cached for all future observations.
         /// </summary>
-        public AsyncSubject()
-        {
-            _observers = Array.Empty<AsyncSubjectDisposable>();
-        }
+        public AsyncSubject() => _observers = Array.Empty<AsyncSubjectDisposable>();
 
         #endregion
 
@@ -51,7 +48,7 @@ namespace System.Reactive.Subjects
         /// <summary>
         /// Indicates whether the subject has observers subscribed to it.
         /// </summary>
-        public override bool HasObservers => _observers.Length != 0;
+        public override bool HasObservers => Volatile.Read(ref _observers).Length != 0;
 
         /// <summary>
         /// Indicates whether the subject has been disposed.
@@ -291,6 +288,7 @@ namespace System.Reactive.Subjects
                 }
 
                 AsyncSubjectDisposable[] b;
+
                 if (n == 1)
                 {
                     b = Array.Empty<AsyncSubjectDisposable>();
@@ -339,10 +337,7 @@ namespace System.Reactive.Subjects
 
         #region IDisposable implementation
 
-        private static void ThrowDisposed()
-        {
-            throw new ObjectDisposedException(string.Empty);
-        }
+        private static void ThrowDisposed() => throw new ObjectDisposedException(string.Empty);
 
         /// <summary>
         /// Unsubscribe all observers and release resources.

+ 1 - 1
Rx.NET/Source/src/System.Reactive/Subjects/BehaviorSubject.cs

@@ -300,7 +300,7 @@ namespace System.Reactive.Subjects
         private sealed class Subscription : IDisposable
         {
             private readonly BehaviorSubject<T> _subject;
-            private IObserver<T> _observer;
+            private IObserver<T>? _observer;
 
             public Subscription(BehaviorSubject<T> subject, IObserver<T> observer)
             {

+ 9 - 12
Rx.NET/Source/src/System.Reactive/Subjects/Subject.cs

@@ -16,7 +16,7 @@ namespace System.Reactive.Subjects
     {
         #region Fields
 
-        private volatile SubjectDisposable[] _observers;
+        private SubjectDisposable[] _observers;
         private Exception? _exception;
         private static readonly SubjectDisposable[] Terminated = new SubjectDisposable[0];
         private static readonly SubjectDisposable[] Disposed = new SubjectDisposable[0];
@@ -28,10 +28,7 @@ namespace System.Reactive.Subjects
         /// <summary>
         /// Creates a subject.
         /// </summary>
-        public Subject()
-        {
-            _observers = Array.Empty<SubjectDisposable>();
-        }
+        public Subject() => _observers = Array.Empty<SubjectDisposable>();
 
         #endregion
 
@@ -40,12 +37,12 @@ namespace System.Reactive.Subjects
         /// <summary>
         /// Indicates whether the subject has observers subscribed to it.
         /// </summary>
-        public override bool HasObservers => _observers.Length != 0;
+        public override bool HasObservers => Volatile.Read(ref _observers).Length != 0;
 
         /// <summary>
         /// Indicates whether the subject has been disposed.
         /// </summary>
-        public override bool IsDisposed => _observers == Disposed;
+        public override bool IsDisposed => Volatile.Read(ref _observers) == Disposed;
 
         #endregion
 
@@ -62,7 +59,7 @@ namespace System.Reactive.Subjects
         {
             for (; ; )
             {
-                var observers = _observers;
+                var observers = Volatile.Read(ref _observers);
 
                 if (observers == Disposed)
                 {
@@ -102,7 +99,7 @@ namespace System.Reactive.Subjects
 
             for (; ; )
             {
-                var observers = _observers;
+                var observers = Volatile.Read(ref _observers);
 
                 if (observers == Disposed)
                 {
@@ -136,7 +133,7 @@ namespace System.Reactive.Subjects
         /// <param name="value">The value to send to all currently subscribed observers.</param>
         public override void OnNext(T value)
         {
-            var observers = _observers;
+            var observers = Volatile.Read(ref _observers);
 
             if (observers == Disposed)
             {
@@ -171,7 +168,7 @@ namespace System.Reactive.Subjects
             var disposable = default(SubjectDisposable);
             for (; ; )
             {
-                var observers = _observers;
+                var observers = Volatile.Read(ref _observers);
 
                 if (observers == Disposed)
                 {
@@ -222,7 +219,7 @@ namespace System.Reactive.Subjects
         {
             for (; ; )
             {
-                var a = _observers;
+                var a = Volatile.Read(ref _observers);
                 var n = a.Length;
 
                 if (n == 0)