Prechádzať zdrojové kódy

Enable #nullable for the last few operators.

Bart De Smet 5 rokov pred
rodič
commit
5f749e817f

+ 1 - 3
Rx.NET/Source/src/System.Reactive/Linq/Observable/Multicast.cs

@@ -2,8 +2,6 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-#nullable disable
-
 using System.Reactive.Disposables;
 using System.Reactive.Subjects;
 
@@ -28,7 +26,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
         internal sealed class _ : IdentitySink<TResult>
         {
-            private IDisposable _connection;
+            private IDisposable? _connection;
 
             public _(IObserver<TResult> observer)
                 : base(observer)

+ 21 - 15
Rx.NET/Source/src/System.Reactive/Linq/Observable/RefCount.cs

@@ -2,8 +2,6 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-#nullable disable
-
 using System.Reactive.Concurrency;
 using System.Reactive.Disposables;
 using System.Reactive.Subjects;
@@ -17,20 +15,20 @@ namespace System.Reactive.Linq.ObservableImpl
         {
             private readonly IConnectableObservable<TSource> _source;
 
-            private readonly object _gate;
+            private readonly object _gate = new object();
+
             /// <summary>
             /// Contains the current active connection's state or null
             /// if no connection is active at the moment.
             /// Should be manipulated while holding the <see cref="_gate"/> lock.
             /// </summary>
-            private RefConnection _connection;
+            private RefConnection? _connection;
 
             private readonly int _minObservers;
 
             public Eager(IConnectableObservable<TSource> source, int minObservers)
             {
                 _source = source;
-                _gate = new object();
                 _minObservers = minObservers;
             }
 
@@ -41,13 +39,14 @@ namespace System.Reactive.Linq.ObservableImpl
             internal sealed class _ : IdentitySink<TSource>
             {
                 private readonly Eager _parent;
+
                 /// <summary>
                 /// Contains the connection reference the downstream observer
                 /// has subscribed to. Its purpose is to
                 /// avoid subscribing, connecting and disconnecting
                 /// while holding a lock.
                 /// </summary>
-                private RefConnection _targetConnection;
+                private RefConnection? _targetConnection;
 
                 public _(IObserver<TSource> observer, Eager parent)
                     : base(observer)
@@ -58,12 +57,13 @@ namespace System.Reactive.Linq.ObservableImpl
                 public void Run()
                 {
                     bool doConnect;
-                    RefConnection conn;
+                    RefConnection? conn;
 
                     lock (_parent._gate)
                     {
                         // get the active connection state
                         conn = _parent._connection;
+
                         // if null, a new connection should be established
                         if (conn == null)
                         {
@@ -74,12 +74,14 @@ namespace System.Reactive.Linq.ObservableImpl
 
                         // this is the first observer, then connect
                         doConnect = ++conn._count == _parent._minObservers;
+
                         // save the current connection for this observer
                         _targetConnection = conn;
                     }
 
                     // subscribe to the source first
                     Run(_parent._source);
+
                     // then connect the source if necessary
                     if (doConnect && !Disposable.GetIsDisposed(ref conn._disposable))
                     {
@@ -94,10 +96,11 @@ namespace System.Reactive.Linq.ObservableImpl
                 protected override void Dispose(bool disposing)
                 {
                     base.Dispose(disposing);
+
                     if (disposing)
                     {
                         // get and forget the saved connection
-                        var targetConnection = _targetConnection;
+                        var targetConnection = _targetConnection!; // NB: Always set by Run prior to calling Dispose, and base class hardens protects against double-dispose.
                         _targetConnection = null;
 
                         lock (_parent._gate)
@@ -110,6 +113,7 @@ namespace System.Reactive.Linq.ObservableImpl
                                 // nothing to do.
                                 return;
                             }
+
                             // forget the current connection
                             _parent._connection = null;
                         }
@@ -127,7 +131,7 @@ namespace System.Reactive.Linq.ObservableImpl
             private sealed class RefConnection
             {
                 internal int _count;
-                internal IDisposable _disposable;
+                internal IDisposable? _disposable;
             }
         }
 
@@ -139,9 +143,9 @@ namespace System.Reactive.Linq.ObservableImpl
             private readonly IConnectableObservable<TSource> _source;
             private readonly int _minObservers;
 
-            private IDisposable _serial;
+            private IDisposable? _serial;
             private int _count;
-            private IDisposable _connectableSubscription;
+            private IDisposable? _connectableSubscription;
 
             public Lazy(IConnectableObservable<TSource> source, TimeSpan disconnectTime, IScheduler scheduler, int minObservers)
             {
@@ -182,7 +186,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
                     SetUpstream(Disposable.Create(
                         (parent, subscription),
-                        tuple =>
+                        static tuple =>
                         {
                             var (closureParent, closureSubscription) = tuple;
 
@@ -192,15 +196,17 @@ namespace System.Reactive.Linq.ObservableImpl
                             {
                                 if (--closureParent._count == 0)
                                 {
-                                    var cancelable = (SingleAssignmentDisposable)Volatile.Read(ref closureParent._serial);
+                                    // NB: _serial is guaranteed to be set by TrySetSerial earlier on.
+                                    var cancelable = (SingleAssignmentDisposable)Volatile.Read(ref closureParent._serial)!;
 
-                                    cancelable.Disposable = closureParent._scheduler.ScheduleAction((cancelable, closureParent), closureParent._disconnectTime, tuple2 =>
+                                    cancelable.Disposable = closureParent._scheduler.ScheduleAction((cancelable, closureParent), closureParent._disconnectTime, static tuple2 =>
                                     {
                                         lock (tuple2.closureParent._gate)
                                         {
                                             if (ReferenceEquals(Volatile.Read(ref tuple2.closureParent._serial), tuple2.cancelable))
                                             {
-                                                tuple2.closureParent._connectableSubscription.Dispose();
+                                                // NB: _connectableSubscription is guaranteed to be set above, and Disposable.Create protects against double-dispose.
+                                                tuple2.closureParent._connectableSubscription!.Dispose();
                                                 tuple2.closureParent._connectableSubscription = null;
                                             }
                                         }

+ 50 - 35
Rx.NET/Source/src/System.Reactive/Linq/Observable/Window.cs

@@ -2,8 +2,6 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-#nullable disable
-
 using System.Collections.Generic;
 using System.Reactive.Concurrency;
 using System.Reactive.Disposables;
@@ -146,7 +144,7 @@ namespace System.Reactive.Linq.ObservableImpl
                     _timeShift = parent._timeShift;
                 }
 
-                private RefCountDisposable _refCountDisposable;
+                private RefCountDisposable? _refCountDisposable;
                 private TimeSpan _totalTime;
                 private TimeSpan _nextShift;
                 private TimeSpan _nextSpan;
@@ -172,7 +170,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 {
                     var s = new Subject<TSource>();
                     _q.Enqueue(s);
-                    ForwardOnNext(new WindowObservable<TSource>(s, _refCountDisposable));
+                    ForwardOnNext(new WindowObservable<TSource>(s, _refCountDisposable!)); // NB: _refCountDisposable gets assigned in Run.
                 }
 
                 private void CreateTimer()
@@ -297,21 +295,22 @@ namespace System.Reactive.Linq.ObservableImpl
             internal sealed class _ : Sink<TSource, IObservable<TSource>>
             {
                 private readonly object _gate = new object();
+                private Subject<TSource> _subject;
 
                 public _(IObserver<IObservable<TSource>> observer)
                     : base(observer)
                 {
+                    _subject = new Subject<TSource>();
                 }
 
-                private Subject<TSource> _subject;
-                private RefCountDisposable _refCountDisposable;
+                private RefCountDisposable? _refCountDisposable;
 
                 public void Run(TimeHopping parent)
                 {
                     var groupDisposable = new CompositeDisposable(2);
                     _refCountDisposable = new RefCountDisposable(groupDisposable);
 
-                    CreateWindow();
+                    NextWindow();
 
                     groupDisposable.Add(parent._scheduler.SchedulePeriodic(this, parent._timeSpan, static @this => @this.Tick()));
                     groupDisposable.Add(parent._source.SubscribeSafe(this));
@@ -324,14 +323,15 @@ namespace System.Reactive.Linq.ObservableImpl
                     lock (_gate)
                     {
                         _subject.OnCompleted();
-                        CreateWindow();
+
+                        _subject = new Subject<TSource>();
+                        NextWindow();
                     }
                 }
 
-                private void CreateWindow()
+                private void NextWindow()
                 {
-                    _subject = new Subject<TSource>();
-                    ForwardOnNext(new WindowObservable<TSource>(_subject, _refCountDisposable));
+                    ForwardOnNext(new WindowObservable<TSource>(_subject, _refCountDisposable!)); // NB: _refCountDisposable gets assigned in Run.
                 }
 
                 public override void OnNext(TSource value)
@@ -392,26 +392,28 @@ namespace System.Reactive.Linq.ObservableImpl
                 private readonly TimeSpan _timeSpan;
                 private readonly IScheduler _scheduler;
 
+                private Subject<TSource> _s;
+
                 public _(Ferry parent, IObserver<IObservable<TSource>> observer)
                     : base(observer)
                 {
                     _count = parent._count;
                     _timeSpan = parent._timeSpan;
                     _scheduler = parent._scheduler;
+
+                    _s = new Subject<TSource>();
                 }
 
-                private Subject<TSource> _s;
                 private int _n;
 
-                private RefCountDisposable _refCountDisposable;
+                private RefCountDisposable? _refCountDisposable;
 
                 public override void Run(IObservable<TSource> source)
                 {
                     var groupDisposable = new CompositeDisposable(2) { _timerD };
                     _refCountDisposable = new RefCountDisposable(groupDisposable);
 
-                    _s = new Subject<TSource>();
-                    ForwardOnNext(new WindowObservable<TSource>(_s, _refCountDisposable));
+                    NextWindow();
                     CreateTimer(_s);
 
                     groupDisposable.Add(source.SubscribeSafe(this));
@@ -427,6 +429,11 @@ namespace System.Reactive.Linq.ObservableImpl
                     m.Disposable = _scheduler.ScheduleAction((@this: this, window), _timeSpan, static tuple => [email protected](tuple.window));
                 }
 
+                private void NextWindow()
+                {
+                    ForwardOnNext(new WindowObservable<TSource>(_s, _refCountDisposable!)); // NB: _refCountDisposable gets assigned in Run.
+                }
+
                 private void Tick(Subject<TSource> window)
                 {
                     Subject<TSource> newWindow;
@@ -443,7 +450,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
                         _s.OnCompleted();
                         _s = newWindow;
-                        ForwardOnNext(new WindowObservable<TSource>(_s, _refCountDisposable));
+                        NextWindow();
                     }
 
                     CreateTimer(newWindow);
@@ -451,7 +458,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
                 public override void OnNext(TSource value)
                 {
-                    Subject<TSource> newWindow = null;
+                    Subject<TSource>? newWindow = null;
 
                     lock (_gate)
                     {
@@ -465,7 +472,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
                             _s.OnCompleted();
                             _s = newWindow;
-                            ForwardOnNext(new WindowObservable<TSource>(_s, _refCountDisposable));
+                            NextWindow();
                         }
                     }
 
@@ -518,27 +525,26 @@ namespace System.Reactive.Linq.ObservableImpl
                 private readonly object _gate = new object();
                 private readonly AsyncLock _windowGate = new AsyncLock();
                 private readonly SerialDisposable _m = new SerialDisposable();
-
                 private readonly Func<IObservable<TWindowClosing>> _windowClosingSelector;
 
+                private Subject<TSource> _window;
+
                 public _(Selector parent, IObserver<IObservable<TSource>> observer)
                     : base(observer)
                 {
                     _windowClosingSelector = parent._windowClosingSelector;
+
+                    _window = new Subject<TSource>();
                 }
 
-                private ISubject<TSource> _window;
-                private RefCountDisposable _refCountDisposable;
+                private RefCountDisposable? _refCountDisposable;
 
                 public override void Run(IObservable<TSource> source)
                 {
-                    _window = new Subject<TSource>();
-
                     var groupDisposable = new CompositeDisposable(2) { _m };
                     _refCountDisposable = new RefCountDisposable(groupDisposable);
 
-                    var window = new WindowObservable<TSource>(_window, _refCountDisposable);
-                    ForwardOnNext(window);
+                    NextWindow();
 
                     groupDisposable.Add(source.SubscribeSafe(this));
 
@@ -547,6 +553,12 @@ namespace System.Reactive.Linq.ObservableImpl
                     SetUpstream(_refCountDisposable);
                 }
 
+                private void NextWindow()
+                {
+                    var window = new WindowObservable<TSource>(_window, _refCountDisposable!); // NB: _refCountDisposable gets assigned in Run.
+                    ForwardOnNext(window);
+                }
+
                 private void CreateWindowClose()
                 {
                     IObservable<TWindowClosing> windowClose;
@@ -577,8 +589,7 @@ namespace System.Reactive.Linq.ObservableImpl
                         _window.OnCompleted();
                         _window = new Subject<TSource>();
 
-                        var window = new WindowObservable<TSource>(_window, _refCountDisposable);
-                        ForwardOnNext(window);
+                        NextWindow();
                     }
 
                     _windowGate.Wait(this, static @this => @this.CreateWindowClose());
@@ -656,23 +667,22 @@ namespace System.Reactive.Linq.ObservableImpl
             {
                 private readonly object _gate = new object();
 
+                private Subject<TSource> _window;
+
                 public _(IObserver<IObservable<TSource>> observer)
                     : base(observer)
                 {
+                    _window = new Subject<TSource>();
                 }
 
-                private ISubject<TSource> _window;
-                private RefCountDisposable _refCountDisposable;
+                private RefCountDisposable? _refCountDisposable;
 
                 public void Run(Boundaries parent)
                 {
-                    _window = new Subject<TSource>();
-
                     var d = new CompositeDisposable(2);
                     _refCountDisposable = new RefCountDisposable(d);
 
-                    var window = new WindowObservable<TSource>(_window, _refCountDisposable);
-                    ForwardOnNext(window);
+                    NextWindow();
 
                     d.Add(parent._source.SubscribeSafe(this));
                     d.Add(parent._windowBoundaries.SubscribeSafe(new WindowClosingObserver(this)));
@@ -680,6 +690,12 @@ namespace System.Reactive.Linq.ObservableImpl
                     SetUpstream(_refCountDisposable);
                 }
 
+                private void NextWindow()
+                {
+                    var window = new WindowObservable<TSource>(_window, _refCountDisposable!); // NB: _refCountDisposable gets assigned in Run.
+                    ForwardOnNext(window);
+                }
+
                 private sealed class WindowClosingObserver : IObserver<TWindowClosing>
                 {
                     private readonly _ _parent;
@@ -696,8 +712,7 @@ namespace System.Reactive.Linq.ObservableImpl
                             _parent._window.OnCompleted();
                             _parent._window = new Subject<TSource>();
 
-                            var window = new WindowObservable<TSource>(_parent._window, _parent._refCountDisposable);
-                            _parent.ForwardOnNext(window);
+                            _parent.NextWindow();
                         }
                     }