瀏覽代碼

Enable #nullable for some more operators.

Bart De Smet 5 年之前
父節點
當前提交
5151e5b411

+ 39 - 31
Rx.NET/Source/src/System.Reactive/Linq/Observable/AppendPrepend.cs

@@ -2,8 +2,7 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-#nullable disable
-
+using System.Diagnostics;
 using System.Reactive.Concurrency;
 using System.Reactive.Disposables;
 
@@ -39,7 +38,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 var prev = new Node<TSource>(_value);
 
                 Node<TSource> appendNode;
-                Node<TSource> prependNode = null;
+                Node<TSource>? prependNode = null;
 
                 if (_append)
                 {
@@ -58,7 +57,7 @@ namespace System.Reactive.Linq.ObservableImpl
             {
                 var prev = new Node<TSource>(_value);
 
-                Node<TSource> appendNode = null;
+                Node<TSource>? appendNode = null;
                 Node<TSource> prependNode;
 
                 if (_append)
@@ -74,7 +73,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 return CreateAppendPrepend(prependNode, appendNode);
             }
 
-            private IAppendPrepend CreateAppendPrepend(Node<TSource> prepend, Node<TSource> append)
+            private IAppendPrepend CreateAppendPrepend(Node<TSource>? prepend, Node<TSource>? append)
             {
                 if (Scheduler is ISchedulerLongRunning longRunning)
                 {
@@ -106,7 +105,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 private readonly TSource _value;
                 private readonly IScheduler _scheduler;
                 private readonly bool _append;
-                private IDisposable _schedulerDisposable;
+                private IDisposable? _schedulerDisposable;
 
                 public _(SingleValue parent, IObserver<TSource> observer)
                     : base(observer)
@@ -165,12 +164,12 @@ namespace System.Reactive.Linq.ObservableImpl
         private sealed class Recursive : Producer<TSource, Recursive._>, IAppendPrepend
         {
             private readonly IObservable<TSource> _source;
-            private readonly Node<TSource> _appends;
-            private readonly Node<TSource> _prepends;
+            private readonly Node<TSource>? _appends;
+            private readonly Node<TSource>? _prepends;
 
             public IScheduler Scheduler { get; }
 
-            public Recursive(IObservable<TSource> source, Node<TSource> prepends, Node<TSource> appends, IScheduler scheduler)
+            public Recursive(IObservable<TSource> source, Node<TSource>? prepends, Node<TSource>? appends, IScheduler scheduler)
             {
                 _source = source;
                 _appends = appends;
@@ -201,11 +200,11 @@ namespace System.Reactive.Linq.ObservableImpl
             internal sealed class _ : IdentitySink<TSource>
             {
                 private readonly IObservable<TSource> _source;
-                private readonly Node<TSource> _appends;
+                private readonly Node<TSource>? _appends;
                 private readonly IScheduler _scheduler;
 
-                private Node<TSource> _currentPrependNode;
-                private TSource[] _appendArray;
+                private Node<TSource>? _currentPrependNode;
+                private TSource[]? _appendArray;
                 private int _currentAppendIndex;
                 private volatile bool _disposed;
 
@@ -230,7 +229,7 @@ namespace System.Reactive.Linq.ObservableImpl
                         // We never allow the scheduled work to be cancelled. Instead, the _disposed flag
                         // is used to have PrependValues() bail out.
                         //
-                        _scheduler.Schedule(this, (innerScheduler, @this) => @this.PrependValues(innerScheduler));
+                        _scheduler.Schedule(this, static (innerScheduler, @this) => @this.PrependValues(innerScheduler));
                     }
                 }
 
@@ -243,11 +242,12 @@ namespace System.Reactive.Linq.ObservableImpl
                     else
                     {
                         _appendArray = _appends.ToReverseArray();
+
                         //
                         // We never allow the scheduled work to be cancelled. Instead, the _disposed flag
                         // is used to have `AppendValues` bail out.
                         //
-                        _scheduler.Schedule(this, (innerScheduler, @this) => @this.AppendValues(innerScheduler));
+                        _scheduler.Schedule(this, static (innerScheduler, @this) => @this.AppendValues(innerScheduler));
                     }
                 }
 
@@ -268,7 +268,9 @@ namespace System.Reactive.Linq.ObservableImpl
                         return Disposable.Empty;
                     }
 
-                    var current = _currentPrependNode.Value;
+                    Debug.Assert(_currentPrependNode != null);
+
+                    var current = _currentPrependNode!.Value;
                     ForwardOnNext(current);
 
                     _currentPrependNode = _currentPrependNode.Parent;
@@ -282,7 +284,7 @@ namespace System.Reactive.Linq.ObservableImpl
                         // We never allow the scheduled work to be cancelled. Instead, the _disposed flag
                         // is used to have PrependValues() bail out.
                         //
-                        scheduler.Schedule(this, (innerScheduler, @this) => @this.PrependValues(innerScheduler));
+                        scheduler.Schedule(this, static (innerScheduler, @this) => @this.PrependValues(innerScheduler));
                     }
 
                     return Disposable.Empty;
@@ -295,7 +297,9 @@ namespace System.Reactive.Linq.ObservableImpl
                         return Disposable.Empty;
                     }
 
-                    var current = _appendArray[_currentAppendIndex];
+                    Debug.Assert(_appendArray != null);
+
+                    var current = _appendArray![_currentAppendIndex];
                     ForwardOnNext(current);
 
                     _currentAppendIndex++;
@@ -310,7 +314,7 @@ namespace System.Reactive.Linq.ObservableImpl
                         // We never allow the scheduled work to be cancelled. Instead, the _disposed flag
                         // is used to have AppendValues() bail out.
                         //
-                        scheduler.Schedule(this, (innerScheduler, @this) => @this.AppendValues(innerScheduler));
+                        scheduler.Schedule(this, static (innerScheduler, @this) => @this.AppendValues(innerScheduler));
                     }
 
                     return Disposable.Empty;
@@ -321,13 +325,13 @@ namespace System.Reactive.Linq.ObservableImpl
         private sealed class LongRunning : Producer<TSource, LongRunning._>, IAppendPrepend
         {
             private readonly IObservable<TSource> _source;
-            private readonly Node<TSource> _appends;
-            private readonly Node<TSource> _prepends;
+            private readonly Node<TSource>? _appends;
+            private readonly Node<TSource>? _prepends;
             private readonly ISchedulerLongRunning _longRunningScheduler;
 
             public IScheduler Scheduler { get; }
 
-            public LongRunning(IObservable<TSource> source, Node<TSource> prepends, Node<TSource> appends, IScheduler scheduler, ISchedulerLongRunning longRunningScheduler)
+            public LongRunning(IObservable<TSource> source, Node<TSource>? prepends, Node<TSource>? appends, IScheduler scheduler, ISchedulerLongRunning longRunningScheduler)
             {
                 _source = source;
                 _appends = appends;
@@ -359,11 +363,11 @@ namespace System.Reactive.Linq.ObservableImpl
             internal sealed class _ : IdentitySink<TSource>
             {
                 private readonly IObservable<TSource> _source;
-                private readonly Node<TSource> _prepends; 
-                private readonly Node<TSource> _appends;
+                private readonly Node<TSource>? _prepends; 
+                private readonly Node<TSource>? _appends;
                 private readonly ISchedulerLongRunning _scheduler;
 
-                private IDisposable _schedulerDisposable;
+                private IDisposable? _schedulerDisposable;
 
                 public _(LongRunning parent, IObserver<TSource> observer)
                     : base(observer)
@@ -382,7 +386,7 @@ namespace System.Reactive.Linq.ObservableImpl
                     }
                     else
                     {
-                        var disposable = _scheduler.ScheduleLongRunning(this, (@this, cancel) => @this.PrependValues(cancel));
+                        var disposable = _scheduler.ScheduleLongRunning(this, static (@this, cancel) => @this.PrependValues(cancel));
                         Disposable.TrySetSingle(ref _schedulerDisposable, disposable);
                     }
                 }
@@ -395,7 +399,7 @@ namespace System.Reactive.Linq.ObservableImpl
                     }
                     else
                     {
-                        var disposable = _scheduler.ScheduleLongRunning(this, (@this, cancel) => @this.AppendValues(cancel));
+                        var disposable = _scheduler.ScheduleLongRunning(this, static (@this, cancel) => @this.AppendValues(cancel));
                         Disposable.TrySetSerial(ref _schedulerDisposable, disposable);
                     }
                 }
@@ -412,7 +416,9 @@ namespace System.Reactive.Linq.ObservableImpl
 
                 private void PrependValues(ICancelable cancel)
                 {
-                    var current = _prepends;
+                    Debug.Assert(_prepends != null);
+
+                    var current = _prepends!;
 
                     while (!cancel.IsDisposed)
                     {
@@ -429,7 +435,9 @@ namespace System.Reactive.Linq.ObservableImpl
 
                 private void AppendValues(ICancelable cancel)
                 {
-                    var array = _appends.ToReverseArray();
+                    Debug.Assert(_appends != null);
+
+                    var array = _appends!.ToReverseArray();
                     var i = 0;
 
                     while (!cancel.IsDisposed)
@@ -449,7 +457,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
         private sealed class Node<T>
         {
-            public readonly Node<T> Parent;
+            public readonly Node<T>? Parent;
             public readonly T Value;
             public readonly int Count;
 
@@ -458,7 +466,7 @@ namespace System.Reactive.Linq.ObservableImpl
             {
             }
 
-            public Node(Node<T> parent, T value)
+            public Node(Node<T>? parent, T value)
             {
                 Parent = parent;
                 Value = value;
@@ -484,7 +492,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 var current = this;
                 for (var i = Count - 1; i >= 0; i--)
                 {
-                    array[i] = current.Value;
+                    array[i] = current!.Value; // NB: Count property ensures non-nullability.
                     current = current.Parent;
                 }
                 return array;

+ 21 - 28
Rx.NET/Source/src/System.Reactive/Linq/Observable/Buffer.cs

@@ -2,8 +2,6 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-#nullable disable
-
 using System.Collections.Generic;
 using System.Reactive.Concurrency;
 using System.Reactive.Disposables;
@@ -31,7 +29,7 @@ namespace System.Reactive.Linq.ObservableImpl
             {
                 private readonly int _count;
                 private int _index;
-                private IList<TSource> _buffer;
+                private IList<TSource>? _buffer;
 
                 internal ExactSink(IObserver<IList<TSource>> observer, int count) : base(observer)
                 {
@@ -104,7 +102,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 private readonly int _count;
                 private readonly int _skip;
                 private int _index;
-                private IList<TSource> _buffer;
+                private IList<TSource>? _buffer;
 
                 internal SkipSink(IObserver<IList<TSource>> observer, int count, int skip) : base(observer)
                 {
@@ -127,7 +125,7 @@ namespace System.Reactive.Linq.ObservableImpl
                     if (++idx == _count)
                     {
                         _buffer = null;
-                        ForwardOnNext(buffer);
+                        ForwardOnNext(buffer!); // NB: Counting logic with _index ensures non-null.
                     }
 
                     if (idx == _skip)
@@ -272,7 +270,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 private readonly IScheduler _scheduler;
                 private readonly object _gate = new object();
                 private readonly Queue<List<TSource>> _q = new Queue<List<TSource>>();
-                private IDisposable _timerSerial;
+                private IDisposable? _timerSerial;
 
                 public _(TimeSliding parent, IObserver<IList<TSource>> observer)
                     : base(observer)
@@ -348,7 +346,7 @@ namespace System.Reactive.Linq.ObservableImpl
                         _nextShift += _timeShift;
                     }
 
-                    m.Disposable = _scheduler.ScheduleAction((@this: this, isSpan, isShift), ts, tuple => [email protected](tuple.isSpan, tuple.isShift));
+                    m.Disposable = _scheduler.ScheduleAction((@this: this, isSpan, isShift), ts, static tuple => [email protected](tuple.isSpan, tuple.isShift));
                 }
 
                 private void Tick(bool isSpan, bool isShift)
@@ -438,20 +436,18 @@ namespace System.Reactive.Linq.ObservableImpl
             internal sealed class _ : Sink<TSource, IList<TSource>>
             {
                 private readonly object _gate = new object();
+                private List<TSource> _list = new List<TSource>();
 
                 public _(IObserver<IList<TSource>> observer)
                     : base(observer)
                 {
                 }
 
-                private List<TSource> _list;
-                private IDisposable _periodicDisposable;
+                private IDisposable? _periodicDisposable;
 
                 public void Run(TimeHopping parent)
                 {
-                    _list = new List<TSource>();
-
-                    Disposable.SetSingle(ref _periodicDisposable, parent._scheduler.SchedulePeriodic(this, parent._timeSpan, @this => @this.Tick()));
+                    Disposable.SetSingle(ref _periodicDisposable, parent._scheduler.SchedulePeriodic(this, parent._timeSpan, static @this => @this.Tick()));
                     Run(parent._source);
                 }
 
@@ -525,7 +521,7 @@ namespace System.Reactive.Linq.ObservableImpl
             {
                 private readonly Ferry _parent;
                 private readonly object _gate = new object();
-                private IDisposable _timerSerial;
+                private List<TSource> _s = new List<TSource>();
 
                 public _(Ferry parent, IObserver<IList<TSource>> observer)
                     : base(observer)
@@ -533,13 +529,12 @@ namespace System.Reactive.Linq.ObservableImpl
                     _parent = parent;
                 }
 
-                private IList<TSource> _s;
+                private IDisposable? _timerSerial;
                 private int _n;
                 private int _windowId;
 
                 public void Run()
                 {
-                    _s = new List<TSource>();
                     _n = 0;
                     _windowId = 0;
 
@@ -554,6 +549,7 @@ namespace System.Reactive.Linq.ObservableImpl
                     {
                         Disposable.Dispose(ref _timerSerial);
                     }
+
                     base.Dispose(disposing);
                 }
 
@@ -562,7 +558,7 @@ namespace System.Reactive.Linq.ObservableImpl
                     var m = new SingleAssignmentDisposable();
                     Disposable.TrySetSerial(ref _timerSerial, m);
 
-                    m.Disposable = _parent._scheduler.ScheduleAction((@this: this, id), _parent._timeSpan, tuple => [email protected](tuple.id));
+                    m.Disposable = _parent._scheduler.ScheduleAction((@this: this, id), _parent._timeSpan, static tuple => [email protected](tuple.id));
                 }
 
                 private void Tick(int id)
@@ -655,24 +651,22 @@ namespace System.Reactive.Linq.ObservableImpl
             {
                 private readonly object _gate = new object();
                 private readonly AsyncLock _bufferGate = new AsyncLock();
-                private IDisposable _bufferClosingSerialDisposable;
                 private readonly Func<IObservable<TBufferClosing>> _bufferClosingSelector;
 
+                private List<TSource> _buffer = new List<TSource>();
+                private IDisposable? _bufferClosingSerialDisposable;
+
                 public _(Selector parent, IObserver<IList<TSource>> observer)
                     : base(observer)
                 {
                     _bufferClosingSelector = parent._bufferClosingSelector;
                 }
 
-                private IList<TSource> _buffer;
-
                 public override void Run(IObservable<TSource> source)
                 {
-                    _buffer = new List<TSource>();
-
                     base.Run(source);
 
-                    _bufferGate.Wait(this, @this => @this.CreateBufferClose());
+                    _bufferGate.Wait(this, static @this => @this.CreateBufferClose());
                 }
 
                 protected override void Dispose(bool disposing)
@@ -716,7 +710,7 @@ namespace System.Reactive.Linq.ObservableImpl
                         ForwardOnNext(res);
                     }
 
-                    _bufferGate.Wait(this, @this => @this.CreateBufferClose());
+                    _bufferGate.Wait(this, static @this => @this.CreateBufferClose());
                 }
 
                 private sealed class BufferClosingObserver : SafeObserver<TBufferClosing>
@@ -791,18 +785,16 @@ namespace System.Reactive.Linq.ObservableImpl
             {
                 private readonly object _gate = new object();
 
+                private List<TSource> _buffer = new List<TSource>();
+                private IDisposable? _boundariesDisposable;
+
                 public _(IObserver<IList<TSource>> observer)
                     : base(observer)
                 {
                 }
 
-                private IList<TSource> _buffer;
-                private IDisposable _boundariesDisposable;
-
                 public void Run(Boundaries parent)
                 {
-                    _buffer = new List<TSource>();
-
                     Run(parent._source);
                     Disposable.SetSingle(ref _boundariesDisposable, parent._bufferBoundaries.SubscribeSafe(new BufferClosingObserver(this)));
                 }
@@ -813,6 +805,7 @@ namespace System.Reactive.Linq.ObservableImpl
                     {
                         Disposable.Dispose(ref _boundariesDisposable);
                     }
+
                     base.Dispose(disposing);
                 }
 

+ 41 - 37
Rx.NET/Source/src/System.Reactive/Linq/Observable/Delay.cs

@@ -2,8 +2,6 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-#nullable disable
-
 using System.Collections.Generic;
 using System.Reactive.Concurrency;
 using System.Reactive.Disposables;
@@ -27,8 +25,8 @@ namespace System.Reactive.Linq.ObservableImpl
 
             internal abstract class _ : IdentitySink<TSource>
             {
-                protected IStopwatch _watch;
-                protected IScheduler _scheduler;
+                protected readonly IScheduler _scheduler;
+                private IStopwatch? _watch;
 
                 protected _(TParent parent, IObserver<TSource> observer)
                     : base(observer)
@@ -45,13 +43,15 @@ namespace System.Reactive.Linq.ObservableImpl
                     base.Run(parent._source);
                 }
 
+                protected TimeSpan Elapsed => _watch!.Elapsed; // NB: Only used after Run is called.
+
                 protected abstract void RunCore(TParent parent);
             }
 
             internal abstract class S : _
             {
                 protected readonly object _gate = new object();
-                protected IDisposable _cancelable;
+                protected IDisposable? _cancelable;
 
                 protected S(TParent parent, IObserver<TSource> observer)
                     : base(parent, observer)
@@ -67,7 +67,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 private bool _hasCompleted;
                 private TimeSpan _completeAt;
                 private bool _hasFailed;
-                private Exception _exception;
+                private Exception? _exception;
 
                 protected override void Dispose(bool disposing)
                 {
@@ -85,7 +85,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
                     lock (_gate)
                     {
-                        var next = _watch.Elapsed.Add(_delay);
+                        var next = Elapsed.Add(_delay);
 
                         _queue.Enqueue(new Reactive.TimeInterval<TSource>(value, next));
 
@@ -95,7 +95,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
                     if (shouldRun)
                     {
-                        Disposable.TrySetSerial(ref _cancelable, _scheduler.Schedule(this, _delay, (@this, a) => @this.DrainQueue(a)));
+                        DrainQueue(_delay);
                     }
                 }
 
@@ -129,7 +129,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
                     lock (_gate)
                     {
-                        var next = _watch.Elapsed.Add(_delay);
+                        var next = Elapsed.Add(_delay);
 
                         _completeAt = next;
                         _hasCompleted = true;
@@ -140,11 +140,16 @@ namespace System.Reactive.Linq.ObservableImpl
 
                     if (shouldRun)
                     {
-                        Disposable.TrySetSerial(ref _cancelable, _scheduler.Schedule(this, _delay, (@this, a) => @this.DrainQueue(a)));
+                        DrainQueue(_delay);
                     }
                 }
 
-                protected void DrainQueue(Action<S, TimeSpan> recurse)
+                protected void DrainQueue(TimeSpan next)
+                {
+                    Disposable.TrySetSerial(ref _cancelable, _scheduler.Schedule(this, next, static (@this, a) => @this.DrainQueue(a)));
+                }
+
+                private void DrainQueue(Action<S, TimeSpan> recurse)
                 {
                     lock (_gate)
                     {
@@ -189,7 +194,7 @@ namespace System.Reactive.Linq.ObservableImpl
                             }
                             else
                             {
-                                var now = _watch.Elapsed;
+                                var now = Elapsed;
 
                                 if (_queue.Count > 0)
                                 {
@@ -230,7 +235,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
                         if (hasValue)
                         {
-                            ForwardOnNext(value);
+                            ForwardOnNext(value!);
                             shouldYield = true;
                         }
                         else
@@ -241,7 +246,7 @@ namespace System.Reactive.Linq.ObservableImpl
                             }
                             else if (hasFailed)
                             {
-                                ForwardOnError(error);
+                                ForwardOnError(error!);
                             }
                             else if (shouldRecurse)
                             {
@@ -257,7 +262,6 @@ namespace System.Reactive.Linq.ObservableImpl
             protected abstract class L : _
             {
                 protected readonly object _gate = new object();
-                protected IDisposable _cancelable;
                 private readonly SemaphoreSlim _evt = new SemaphoreSlim(0);
 
                 protected L(TParent parent, IObserver<TSource> observer)
@@ -265,14 +269,14 @@ namespace System.Reactive.Linq.ObservableImpl
                 {
                 }
 
-                protected TimeSpan _delay;
                 protected Queue<Reactive.TimeInterval<TSource>> _queue = new Queue<Reactive.TimeInterval<TSource>>();
+                protected IDisposable? _cancelable;
+                protected TimeSpan _delay;
 
-                private CancellationTokenSource _stop;
                 private bool _hasCompleted;
                 private TimeSpan _completeAt;
                 private bool _hasFailed;
-                private Exception _exception;
+                private Exception? _exception;
 
                 protected override void Dispose(bool disposing)
                 {
@@ -286,17 +290,17 @@ namespace System.Reactive.Linq.ObservableImpl
 
                 protected void ScheduleDrain()
                 {
-                    _stop = new CancellationTokenSource();
-                    Disposable.TrySetSerial(ref _cancelable, new CancellationDisposable(_stop));
+                    var cd = new CancellationDisposable();
+                    Disposable.TrySetSerial(ref _cancelable, cd);
 
-                    _scheduler.AsLongRunning().ScheduleLongRunning(DrainQueue);
+                    _scheduler.AsLongRunning()!.ScheduleLongRunning(cd.Token, DrainQueue); // NB: This class is only used with long-running schedulers.
                 }
 
                 public override void OnNext(TSource value)
                 {
                     lock (_gate)
                     {
-                        var next = _watch.Elapsed.Add(_delay);
+                        var next = Elapsed.Add(_delay);
 
                         _queue.Enqueue(new Reactive.TimeInterval<TSource>(value, next));
 
@@ -325,7 +329,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
                     lock (_gate)
                     {
-                        var next = _watch.Elapsed.Add(_delay);
+                        var next = Elapsed.Add(_delay);
 
                         _completeAt = next;
                         _hasCompleted = true;
@@ -334,13 +338,13 @@ namespace System.Reactive.Linq.ObservableImpl
                     }
                 }
 
-                private void DrainQueue(ICancelable cancel)
+                private void DrainQueue(CancellationToken token, ICancelable cancel)
                 {
                     while (true)
                     {
                         try
                         {
-                            _evt.Wait(_stop.Token);
+                            _evt.Wait(token);
                         }
                         catch (OperationCanceledException)
                         {
@@ -366,7 +370,7 @@ namespace System.Reactive.Linq.ObservableImpl
                             }
                             else
                             {
-                                var now = _watch.Elapsed;
+                                var now = Elapsed;
 
                                 if (_queue.Count > 0)
                                 {
@@ -398,11 +402,11 @@ namespace System.Reactive.Linq.ObservableImpl
                         if (shouldWait)
                         {
                             var timer = new ManualResetEventSlim();
-                            _scheduler.ScheduleAction(timer, waitTime, slimTimer => { slimTimer.Set(); });
+                            _scheduler.ScheduleAction(timer, waitTime, static slimTimer => { slimTimer.Set(); });
 
                             try
                             {
-                                timer.Wait(_stop.Token);
+                                timer.Wait(token);
                             }
                             catch (OperationCanceledException)
                             {
@@ -412,7 +416,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
                         if (hasValue)
                         {
-                            ForwardOnNext(value);
+                            ForwardOnNext(value!);
                         }
                         else
                         {
@@ -422,7 +426,7 @@ namespace System.Reactive.Linq.ObservableImpl
                             }
                             else if (hasFailed)
                             {
-                                ForwardOnError(error);
+                                ForwardOnError(error!);
                             }
 
                             return;
@@ -457,7 +461,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 {
                     _ready = false;
 
-                    Disposable.TrySetSingle(ref _cancelable, parent._scheduler.ScheduleAction(this, parent._dueTime, @this => @this.Start()));
+                    Disposable.TrySetSingle(ref _cancelable, parent._scheduler.ScheduleAction(this, parent._dueTime, static @this => @this.Start()));
                 }
 
                 private void Start()
@@ -467,7 +471,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
                     lock (_gate)
                     {
-                        _delay = _watch.Elapsed;
+                        _delay = Elapsed;
 
                         var oldQueue = _queue;
                         _queue = new Queue<Reactive.TimeInterval<TSource>>();
@@ -491,7 +495,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
                     if (shouldRun)
                     {
-                        Disposable.TrySetSerial(ref _cancelable, _scheduler.Schedule((Base<Absolute>.S)this, next, (@this, a) => DrainQueue(a)));
+                        DrainQueue(next);
                     }
                 }
             }
@@ -508,14 +512,14 @@ namespace System.Reactive.Linq.ObservableImpl
                     // ScheduleDrain might have already set a newer disposable
                     // using TrySetSerial would cancel it, stopping the emission
                     // and hang the consumer
-                    Disposable.TrySetSingle(ref _cancelable, parent._scheduler.ScheduleAction(this, parent._dueTime, @this => @this.Start()));
+                    Disposable.TrySetSingle(ref _cancelable, parent._scheduler.ScheduleAction(this, parent._dueTime, static @this => @this.Start()));
                 }
 
                 private void Start()
                 {
                     lock (_gate)
                     {
-                        _delay = _watch.Elapsed;
+                        _delay = Elapsed;
 
                         var oldQueue = _queue;
                         _queue = new Queue<Reactive.TimeInterval<TSource>>();
@@ -603,7 +607,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 }
 
                 private bool _atEnd;
-                private IDisposable _subscription;
+                private IDisposable? _subscription;
 
                 public void Run(TParent parent)
                 {
@@ -778,7 +782,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 {
                     private readonly _ _parent;
                     private readonly IObservable<TSource> _source;
-                    private IDisposable _subscription;
+                    private IDisposable? _subscription;
 
                     public SubscriptionDelayObserver(_ parent, IObservable<TSource> source)
                     {

+ 12 - 11
Rx.NET/Source/src/System.Reactive/Linq/Observable/Merge.cs

@@ -2,9 +2,8 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-#nullable disable
-
 using System.Collections.Generic;
+using System.Reactive.Concurrency;
 using System.Reactive.Disposables;
 using System.Threading;
 using System.Threading.Tasks;
@@ -30,7 +29,10 @@ namespace System.Reactive.Linq.ObservableImpl
 
             internal sealed class _ : Sink<IObservable<TSource>, TSource>
             {
+                private readonly object _gate = new object();
                 private readonly int _maxConcurrent;
+                private readonly Queue<IObservable<TSource>> _q = new Queue<IObservable<TSource>>();
+                private readonly CompositeDisposable _group = new CompositeDisposable();
 
                 public _(int maxConcurrent, IObserver<TSource> observer)
                     : base(observer)
@@ -38,10 +40,7 @@ namespace System.Reactive.Linq.ObservableImpl
                     _maxConcurrent = maxConcurrent;
                 }
 
-                private readonly object _gate = new object();
-                private readonly Queue<IObservable<TSource>> _q = new Queue<IObservable<TSource>>();
                 private volatile bool _isStopped;
-                private readonly CompositeDisposable _group = new CompositeDisposable();
                 private int _activeCount;
 
                 public override void OnNext(IObservable<TSource> value)
@@ -165,14 +164,15 @@ namespace System.Reactive.Linq.ObservableImpl
 
             internal sealed class _ : Sink<IObservable<TSource>, TSource>
             {
+                private readonly object _gate = new object();
+                private readonly CompositeDisposable _group = new CompositeDisposable();
+
                 public _(IObserver<TSource> observer)
                     : base(observer)
                 {
                 }
 
-                private readonly object _gate = new object();
                 private volatile bool _isStopped;
-                private readonly CompositeDisposable _group = new CompositeDisposable();
 
                 public override void OnNext(IObservable<TSource> value)
                 {
@@ -284,13 +284,14 @@ namespace System.Reactive.Linq.ObservableImpl
 
             internal sealed class _ : Sink<Task<TSource>, TSource>
             {
+                private readonly object _gate = new object();
+                private readonly CancellationTokenSource _cts = new CancellationTokenSource();
+
                 public _(IObserver<TSource> observer)
                     : base(observer)
                 {
                 }
 
-                private readonly object _gate = new object();
-                private readonly CancellationTokenSource _cts = new CancellationTokenSource();
                 private volatile int _count = 1;
 
                 public override void OnNext(Task<TSource> value)
@@ -302,7 +303,7 @@ namespace System.Reactive.Linq.ObservableImpl
                     }
                     else
                     {
-                        value.ContinueWith((t, thisObject) => ((_)thisObject).OnCompletedTask(t), this, _cts.Token);
+                        value.ContinueWith((t, thisObject) => ((_)thisObject!).OnCompletedTask(t), this, _cts.Token);
                     }
                 }
 
@@ -324,7 +325,7 @@ namespace System.Reactive.Linq.ObservableImpl
                         {
                             lock (_gate)
                             {
-                                ForwardOnError(task.Exception.InnerException);
+                                ForwardOnError(TaskHelpers.GetSingleException(task));
                             }
                         }
                         break;

+ 10 - 10
Rx.NET/Source/src/System.Reactive/Linq/Observable/Sample.cs

@@ -2,8 +2,6 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-#nullable disable
-
 using System.Reactive.Concurrency;
 using System.Reactive.Disposables;
 
@@ -33,11 +31,11 @@ namespace System.Reactive.Linq.ObservableImpl
             {
             }
 
-            private IDisposable _sourceDisposable;
-            private IDisposable _samplerDisposable;
+            private IDisposable? _sourceDisposable;
+            private IDisposable? _samplerDisposable;
 
             private bool _hasValue;
-            private TSource _value;
+            private TSource? _value;
             private bool _sourceAtEnd;
             private bool _samplerAtEnd;
 
@@ -55,6 +53,7 @@ namespace System.Reactive.Linq.ObservableImpl
                     Disposable.Dispose(ref _sourceDisposable);
                     Disposable.Dispose(ref _samplerDisposable);
                 }
+
                 base.Dispose(disposing);
             }
 
@@ -108,7 +107,7 @@ namespace System.Reactive.Linq.ObservableImpl
                         if (_parent._hasValue)
                         {
                             _parent._hasValue = false;
-                            _parent.ForwardOnNext(_parent._value);
+                            _parent.ForwardOnNext(_parent._value!);
                         }
 
                         if (_parent._sourceAtEnd)
@@ -136,7 +135,7 @@ namespace System.Reactive.Linq.ObservableImpl
                         if (_parent._hasValue)
                         {
                             _parent._hasValue = false;
-                            _parent.ForwardOnNext(_parent._value);
+                            _parent.ForwardOnNext(_parent._value!);
                         }
 
                         if (_parent._sourceAtEnd)
@@ -179,10 +178,10 @@ namespace System.Reactive.Linq.ObservableImpl
             {
             }
 
-            private IDisposable _sourceDisposable;
+            private IDisposable? _sourceDisposable;
 
             private bool _hasValue;
-            private TSource _value;
+            private TSource? _value;
             private bool _atEnd;
 
             public void Run(Sample<TSource> parent)
@@ -198,6 +197,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 {
                     Disposable.Dispose(ref _sourceDisposable);
                 }
+
                 base.Dispose(disposing);
             }
 
@@ -208,7 +208,7 @@ namespace System.Reactive.Linq.ObservableImpl
                     if (_hasValue)
                     {
                         _hasValue = false;
-                        ForwardOnNext(_value);
+                        ForwardOnNext(_value!);
                     }
 
                     if (_atEnd)

+ 4 - 6
Rx.NET/Source/src/System.Reactive/Linq/Observable/SelectMany.cs

@@ -2,8 +2,6 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-#nullable disable
-
 using System.Collections.Generic;
 using System.Reactive.Disposables;
 using System.Threading;
@@ -419,7 +417,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
                             if (hasNext)
                             {
-                                ForwardOnNext(current);
+                                ForwardOnNext(current!);
                             }
                         }
                     }
@@ -510,7 +508,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
                             if (hasNext)
                             {
-                                ForwardOnNext(current);
+                                ForwardOnNext(current!);
                             }
                         }
                     }
@@ -1373,7 +1371,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
                             if (hasNext)
                             {
-                                ForwardOnNext(current);
+                                ForwardOnNext(current!);
                             }
                         }
                     }
@@ -1456,7 +1454,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
                             if (hasNext)
                             {
-                                ForwardOnNext(current);
+                                ForwardOnNext(current!);
                             }
                         }
                     }

+ 1 - 3
Rx.NET/Source/src/System.Reactive/Linq/Observable/Switch.cs

@@ -2,8 +2,6 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-#nullable disable
-
 using System.Reactive.Disposables;
 
 namespace System.Reactive.Linq.ObservableImpl
@@ -30,7 +28,7 @@ namespace System.Reactive.Linq.ObservableImpl
             {
             }
 
-            private IDisposable _innerSerialDisposable;
+            private IDisposable? _innerSerialDisposable;
             private bool _isStopped;
             private ulong _latest;
             private bool _hasLatest;

+ 10 - 11
Rx.NET/Source/src/System.Reactive/Linq/Observable/Throttle.cs

@@ -2,8 +2,6 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-#nullable disable
-
 using System.Reactive.Concurrency;
 using System.Reactive.Disposables;
 
@@ -28,6 +26,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
         internal sealed class _ : IdentitySink<TSource>
         {
+            private readonly object _gate = new object();
             private readonly TimeSpan _dueTime;
             private readonly IScheduler _scheduler;
 
@@ -38,10 +37,9 @@ namespace System.Reactive.Linq.ObservableImpl
                 _scheduler = parent._scheduler;
             }
 
-            private readonly object _gate = new object();
-            private TSource _value;
+            private TSource? _value;
             private bool _hasValue;
-            private IDisposable _serialCancelable;
+            private IDisposable? _serialCancelable;
             private ulong _id;
 
             protected override void Dispose(bool disposing)
@@ -76,7 +74,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 {
                     if (_hasValue && _id == currentid)
                     {
-                        ForwardOnNext(_value);
+                        ForwardOnNext(_value!);
                     }
 
                     _hasValue = false;
@@ -104,7 +102,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 {
                     if (_hasValue)
                     {
-                        ForwardOnNext(_value);
+                        ForwardOnNext(_value!);
                     }
 
                     ForwardOnCompleted();
@@ -133,6 +131,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
         internal sealed class _ : IdentitySink<TSource>
         {
+            private readonly object _gate = new object();
             private readonly Func<TSource, IObservable<TThrottle>> _throttleSelector;
 
             public _(Throttle<TSource, TThrottle> parent, IObserver<TSource> observer)
@@ -141,10 +140,9 @@ namespace System.Reactive.Linq.ObservableImpl
                 _throttleSelector = parent._throttleSelector;
             }
 
-            private readonly object _gate = new object();
-            private TSource _value;
+            private TSource? _value;
             private bool _hasValue;
-            private IDisposable _serialCancelable;
+            private IDisposable? _serialCancelable;
             private ulong _id;
 
             protected override void Dispose(bool disposing)
@@ -153,6 +151,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 {
                     Disposable.Dispose(ref _serialCancelable);
                 }
+
                 base.Dispose(disposing);
             }
 
@@ -212,7 +211,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 {
                     if (_hasValue)
                     {
-                        ForwardOnNext(_value);
+                        ForwardOnNext(_value!);
                     }
 
                     ForwardOnCompleted();