Browse Source

Merge pull request #1346 from dotnet/dev/bartde/rx_nullable_step10

Enable #nullable in scheduler code.
Bart J.F. De Smet 5 years ago
parent
commit
62c0f40f98
23 changed files with 83 additions and 112 deletions
  1. 2 4
      Rx.NET/Source/src/System.Reactive/Concurrency/CatchScheduler.cs
  2. 4 6
      Rx.NET/Source/src/System.Reactive/Concurrency/DefaultScheduler.cs
  3. 1 3
      Rx.NET/Source/src/System.Reactive/Concurrency/DisableOptimizationsScheduler.cs
  4. 4 6
      Rx.NET/Source/src/System.Reactive/Concurrency/EventLoopScheduler.cs
  5. 3 5
      Rx.NET/Source/src/System.Reactive/Concurrency/HistoricalScheduler.cs
  6. 2 4
      Rx.NET/Source/src/System.Reactive/Concurrency/LocalScheduler.cs
  7. 0 2
      Rx.NET/Source/src/System.Reactive/Concurrency/NewThreadScheduler.cs
  8. 5 7
      Rx.NET/Source/src/System.Reactive/Concurrency/ScheduledItem.cs
  9. 2 4
      Rx.NET/Source/src/System.Reactive/Concurrency/Scheduler.Async.cs
  10. 0 2
      Rx.NET/Source/src/System.Reactive/Concurrency/Scheduler.Recursive.cs
  11. 9 11
      Rx.NET/Source/src/System.Reactive/Concurrency/Scheduler.Services.Emulation.cs
  12. 4 6
      Rx.NET/Source/src/System.Reactive/Concurrency/Scheduler.Services.cs
  13. 0 2
      Rx.NET/Source/src/System.Reactive/Concurrency/Scheduler.Simple.cs
  14. 0 2
      Rx.NET/Source/src/System.Reactive/Concurrency/Scheduler.cs
  15. 2 4
      Rx.NET/Source/src/System.Reactive/Concurrency/SchedulerWrapper.cs
  16. 0 2
      Rx.NET/Source/src/System.Reactive/Concurrency/SynchronizationContextScheduler.cs
  17. 7 9
      Rx.NET/Source/src/System.Reactive/Concurrency/TaskPoolScheduler.cs
  18. 4 6
      Rx.NET/Source/src/System.Reactive/Concurrency/UserWorkItem.cs
  19. 12 9
      Rx.NET/Source/src/System.Reactive/Concurrency/VirtualTimeScheduler.cs
  20. 1 1
      Rx.NET/Source/src/System.Reactive/Internal/ScheduledObserver.cs
  21. 4 0
      Rx.NET/Source/src/System.Reactive/System.Reactive.csproj
  22. 4 4
      Rx.NET/Source/src/System.Reactive/Threading/Tasks/TaskObservableExtensions.cs
  23. 13 13
      Rx.NET/Source/tests/Tests.System.Reactive.ApiApprovals/Api/ApiApprovalTests.Core.verified.cs

+ 2 - 4
Rx.NET/Source/src/System.Reactive/Concurrency/CatchScheduler.cs

@@ -2,8 +2,6 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-#nullable disable
-
 using System.Reactive.Disposables;
 using System.Runtime.CompilerServices;
 
@@ -46,7 +44,7 @@ namespace System.Reactive.Concurrency
             return new CatchScheduler<TException>(scheduler, _handler, cache);
         }
 
-        protected override bool TryGetService(IServiceProvider provider, Type serviceType, out object service)
+        protected override bool TryGetService(IServiceProvider provider, Type serviceType, out object? service)
         {
             service = provider.GetService(serviceType);
 
@@ -111,7 +109,7 @@ namespace System.Reactive.Concurrency
                     _action = action;
 
                     // Note that avoiding closure allocation here would introduce infinite generic recursion over the TState argument
-                    Disposable.SetSingle(ref _cancel, scheduler._scheduler.SchedulePeriodic(state, period, state1 => this.Tick(state1).state ?? default));
+                    Disposable.SetSingle(ref _cancel, scheduler._scheduler.SchedulePeriodic(state, period, state1 => Tick(state1).state));
                 }
 
                 public void Dispose()

+ 4 - 6
Rx.NET/Source/src/System.Reactive/Concurrency/DefaultScheduler.cs

@@ -2,8 +2,6 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-#nullable disable
-
 using System.Reactive.Disposables;
 
 namespace System.Reactive.Concurrency
@@ -14,13 +12,13 @@ namespace System.Reactive.Concurrency
     /// <seealso cref="Scheduler.Default">Singleton instance of this type exposed through this static property.</seealso>
     public sealed class DefaultScheduler : LocalScheduler, ISchedulerPeriodic
     {
-        private static readonly Lazy<DefaultScheduler> _instance = new Lazy<DefaultScheduler>(() => new DefaultScheduler());
+        private static readonly Lazy<DefaultScheduler> DefaultInstance = new Lazy<DefaultScheduler>(() => new DefaultScheduler());
         private static readonly IConcurrencyAbstractionLayer Cal = ConcurrencyAbstractionLayer.Current;
 
         /// <summary>
         /// Gets the singleton instance of the default scheduler.
         /// </summary>
-        public static DefaultScheduler Instance => _instance.Value;
+        public static DefaultScheduler Instance => DefaultInstance.Value;
 
         private DefaultScheduler()
         {
@@ -143,7 +141,7 @@ namespace System.Reactive.Concurrency
         /// </summary>
         /// <param name="serviceType">Scheduler service interface type to discover.</param>
         /// <returns>Object implementing the requested service, if available; null otherwise.</returns>
-        protected override object GetService(Type serviceType)
+        protected override object? GetService(Type serviceType)
         {
             if (serviceType == typeof(ISchedulerLongRunning))
             {
@@ -163,7 +161,7 @@ namespace System.Reactive.Concurrency
                 private readonly TState _state;
                 private readonly Action<TState, ICancelable> _action;
 
-                private IDisposable _cancel;
+                private IDisposable? _cancel;
 
                 public LongScheduledWorkItem(TState state, Action<TState, ICancelable> action)
                 {

+ 1 - 3
Rx.NET/Source/src/System.Reactive/Concurrency/DisableOptimizationsScheduler.cs

@@ -2,8 +2,6 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-#nullable disable
-
 using System.Linq;
 using System.Runtime.CompilerServices;
 
@@ -36,7 +34,7 @@ namespace System.Reactive.Concurrency
             return new DisableOptimizationsScheduler(scheduler, _optimizationInterfaces, cache);
         }
 
-        protected override bool TryGetService(IServiceProvider provider, Type serviceType, out object service)
+        protected override bool TryGetService(IServiceProvider provider, Type serviceType, out object? service)
         {
             service = null;
             return _optimizationInterfaces.Contains(serviceType);

+ 4 - 6
Rx.NET/Source/src/System.Reactive/Concurrency/EventLoopScheduler.cs

@@ -2,8 +2,6 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-#nullable disable
-
 using System.Collections.Generic;
 using System.Reactive.Disposables;
 using System.Threading;
@@ -36,7 +34,7 @@ namespace System.Reactive.Concurrency
         /// Thread used by the event loop to run work items on. No work should be run on any other thread.
         /// If ExitIfEmpty is set, the thread can quit and a new thread will be created when new work is scheduled.
         /// </summary>
-        private Thread _thread;
+        private Thread? _thread;
 
         /// <summary>
         /// Gate to protect data structures, including the work queue and the ready list.
@@ -63,12 +61,12 @@ namespace System.Reactive.Concurrency
         /// Work item that will be scheduled next. Used upon reevaluation of the queue to check whether the next
         /// item is still the same. If not, a new timer needs to be started (see below).
         /// </summary>
-        private ScheduledItem<TimeSpan> _nextItem;
+        private ScheduledItem<TimeSpan>? _nextItem;
 
         /// <summary>
         /// Disposable that always holds the timer to dispatch the first element in the queue.
         /// </summary>
-        private IDisposable _nextTimer;
+        private IDisposable? _nextTimer;
 
         /// <summary>
         /// Flag indicating whether the event loop should quit. When set, the event should be signaled as well to
@@ -299,7 +297,7 @@ namespace System.Reactive.Concurrency
             {
                 _evt.Wait();
 
-                ScheduledItem<TimeSpan>[] ready = null;
+                ScheduledItem<TimeSpan>[]? ready = null;
 
                 lock (_gate)
                 {

+ 3 - 5
Rx.NET/Source/src/System.Reactive/Concurrency/HistoricalScheduler.cs

@@ -2,8 +2,6 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-#nullable disable
-
 using System.Collections.Generic;
 using System.Diagnostics;
 
@@ -109,7 +107,7 @@ namespace System.Reactive.Concurrency
         /// Gets the next scheduled item to be executed.
         /// </summary>
         /// <returns>The next scheduled item.</returns>
-        protected override IScheduledItem<DateTimeOffset> GetNext()
+        protected override IScheduledItem<DateTimeOffset>? GetNext()
         {
             while (_queue.Count > 0)
             {
@@ -144,11 +142,11 @@ namespace System.Reactive.Concurrency
                 throw new ArgumentNullException(nameof(action));
             }
 
-            ScheduledItem<DateTimeOffset, TState> si = null;
+            ScheduledItem<DateTimeOffset, TState>? si = null;
 
             var run = new Func<IScheduler, TState, IDisposable>((scheduler, state1) =>
             {
-                _queue.Remove(si);
+                _queue.Remove(si!); // NB: Assigned before function is invoked.
                 return action(scheduler, state1);
             });
 

+ 2 - 4
Rx.NET/Source/src/System.Reactive/Concurrency/LocalScheduler.cs

@@ -2,8 +2,6 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-#nullable disable
-
 namespace System.Reactive.Concurrency
 {
     /// <summary>
@@ -73,7 +71,7 @@ namespace System.Reactive.Concurrency
         /// </remarks>
         public virtual IStopwatch StartStopwatch() => ConcurrencyAbstractionLayer.Current.StartStopwatch();
 
-        object IServiceProvider.GetService(Type serviceType) => GetService(serviceType);
+        object? IServiceProvider.GetService(Type serviceType) => GetService(serviceType);
 
         /// <summary>
         /// Discovers scheduler services by interface type. The base class implementation returns
@@ -82,7 +80,7 @@ namespace System.Reactive.Concurrency
         /// </summary>
         /// <param name="serviceType">Scheduler service interface type to discover.</param>
         /// <returns>Object implementing the requested service, if available; <c>null</c> otherwise.</returns>
-        protected virtual object GetService(Type serviceType)
+        protected virtual object? GetService(Type serviceType)
         {
             if (serviceType == typeof(IStopwatchProvider))
             {

+ 0 - 2
Rx.NET/Source/src/System.Reactive/Concurrency/NewThreadScheduler.cs

@@ -2,8 +2,6 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-#nullable disable
-
 using System.Reactive.Disposables;
 using System.Threading;
 

+ 5 - 7
Rx.NET/Source/src/System.Reactive/Concurrency/ScheduledItem.cs

@@ -2,8 +2,6 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-#nullable disable
-
 using System.Collections.Generic;
 using System.Reactive.Disposables;
 
@@ -16,7 +14,7 @@ namespace System.Reactive.Concurrency
     public abstract class ScheduledItem<TAbsolute> : IScheduledItem<TAbsolute>, IComparable<ScheduledItem<TAbsolute>>, IDisposable
         where TAbsolute : IComparable<TAbsolute>
     {
-        private IDisposable _disposable;
+        private IDisposable? _disposable;
         private readonly IComparer<TAbsolute> _comparer;
 
         /// <summary>
@@ -61,7 +59,7 @@ namespace System.Reactive.Concurrency
         /// <param name="other">Work item to compare the current work item to.</param>
         /// <returns>Relative ordering between this and the specified work item.</returns>
         /// <remarks>The inequality operators are overloaded to provide results consistent with the <see cref="IComparable"/> implementation. Equality operators implement traditional reference equality semantics.</remarks>
-        public int CompareTo(ScheduledItem<TAbsolute> other)
+        public int CompareTo(ScheduledItem<TAbsolute>? other)
         {
             // MSDN: By definition, any object compares greater than null, and two null references compare equal to each other.
             if (other is null)
@@ -119,7 +117,7 @@ namespace System.Reactive.Concurrency
         /// <param name="right">The second object to compare.</param>
         /// <returns><c>true</c> if both <see cref="ScheduledItem{TAbsolute, TValue}" /> are equal; otherwise, <c>false</c>.</returns>
         /// <remarks>This operator does not provide results consistent with the IComparable implementation. Instead, it implements reference equality.</remarks>
-        public static bool operator ==(ScheduledItem<TAbsolute> left, ScheduledItem<TAbsolute> right) => ReferenceEquals(left, right);
+        public static bool operator ==(ScheduledItem<TAbsolute>? left, ScheduledItem<TAbsolute>? right) => ReferenceEquals(left, right);
 
         /// <summary>
         /// Determines whether two specified <see cref="ScheduledItem{TAbsolute, TValue}" /> objects are inequal.
@@ -128,14 +126,14 @@ namespace System.Reactive.Concurrency
         /// <param name="right">The second object to compare.</param>
         /// <returns><c>true</c> if both <see cref="ScheduledItem{TAbsolute, TValue}" /> are inequal; otherwise, <c>false</c>.</returns>
         /// <remarks>This operator does not provide results consistent with the IComparable implementation. Instead, it implements reference equality.</remarks>
-        public static bool operator !=(ScheduledItem<TAbsolute> left, ScheduledItem<TAbsolute> right) => !(left == right);
+        public static bool operator !=(ScheduledItem<TAbsolute>? left, ScheduledItem<TAbsolute>? right) => !(left == right);
 
         /// <summary>
         /// Determines whether a <see cref="ScheduledItem{TAbsolute}" /> object is equal to the specified object.
         /// </summary>
         /// <param name="obj">The object to compare to the current <see cref="ScheduledItem{TAbsolute}" /> object.</param>
         /// <returns><c>true</c> if the obj parameter is a <see cref="ScheduledItem{TAbsolute}" /> object and is equal to the current <see cref="ScheduledItem{TAbsolute}" /> object; otherwise, <c>false</c>.</returns>
-        public override bool Equals(object obj) => ReferenceEquals(this, obj);
+        public override bool Equals(object? obj) => ReferenceEquals(this, obj);
 
         /// <summary>
         /// Returns the hash code for the current <see cref="ScheduledItem{TAbsolute}" /> object.

+ 2 - 4
Rx.NET/Source/src/System.Reactive/Concurrency/Scheduler.Async.cs

@@ -2,8 +2,6 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-#nullable disable
-
 using System.Reactive.Disposables;
 using System.Threading;
 using System.Threading.Tasks;
@@ -15,7 +13,7 @@ namespace System.Reactive.Concurrency
         private sealed class AsyncInvocation<TState> : IDisposable
         {
             private readonly CancellationTokenSource _cts = new CancellationTokenSource();
-            private IDisposable _run;
+            private IDisposable? _run;
 
             public IDisposable Run(IScheduler self, TState s, Func<IScheduler, TState, CancellationToken, Task<IDisposable>> action)
             {
@@ -25,7 +23,7 @@ namespace System.Reactive.Concurrency
                 action(new CancelableScheduler(self, _cts.Token), s, _cts.Token).ContinueWith(
                     static (t, thisObject) =>
                     {
-                        var @this = (AsyncInvocation<TState>)thisObject;
+                        var @this = (AsyncInvocation<TState>)thisObject!;
 
                         t.Exception?.Handle(static e => e is OperationCanceledException);
 

+ 0 - 2
Rx.NET/Source/src/System.Reactive/Concurrency/Scheduler.Recursive.cs

@@ -2,8 +2,6 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-#nullable disable
-
 using System.Reactive.Disposables;
 
 namespace System.Reactive.Concurrency

+ 9 - 11
Rx.NET/Source/src/System.Reactive/Concurrency/Scheduler.Services.Emulation.cs

@@ -2,8 +2,6 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-#nullable disable
-
 using System.Diagnostics;
 using System.Reactive.Disposables;
 using System.Reactive.PlatformServices;
@@ -340,7 +338,7 @@ namespace System.Reactive.Concurrency
             private readonly object _gate = new object();
             private readonly AutoResetEvent _resumeEvent = new AutoResetEvent(false);
             private volatile int _runState;
-            private IStopwatch _stopwatch;
+            private IStopwatch? _stopwatch;
             private TimeSpan _nextDue;
             private TimeSpan _suspendedAt;
             private TimeSpan _inactiveTime;
@@ -369,7 +367,7 @@ namespace System.Reactive.Concurrency
             private const int Suspended = 2;
             private const int Disposed = 3;
 
-            private IDisposable _task;
+            private IDisposable? _task;
 
             public IDisposable Start()
             {
@@ -408,7 +406,7 @@ namespace System.Reactive.Concurrency
                             // recorded as inactive based on cumulative deltas computed in
                             // the suspend and resume event handlers.
                             //
-                            next = Normalize(_nextDue - (_stopwatch.Elapsed - _inactiveTime));
+                            next = Normalize(_nextDue - (_stopwatch!.Elapsed - _inactiveTime));
                             break;
                         }
 
@@ -460,7 +458,7 @@ namespace System.Reactive.Concurrency
                 }
             }
 
-            private void Suspending(object sender, HostSuspendingEventArgs args)
+            private void Suspending(object? sender, HostSuspendingEventArgs args)
             {
                 //
                 // The host is telling us we're about to be suspended. At this point, time
@@ -480,7 +478,7 @@ namespace System.Reactive.Concurrency
                 {
                     if (_runState == Running)
                     {
-                        _suspendedAt = _stopwatch.Elapsed;
+                        _suspendedAt = _stopwatch!.Elapsed; // NB: Non-null when >= Running.
                         _runState = Suspended;
 
                         if (!Environment.HasShutdownStarted)
@@ -491,7 +489,7 @@ namespace System.Reactive.Concurrency
                 }
             }
 
-            private void Resuming(object sender, HostResumingEventArgs args)
+            private void Resuming(object? sender, HostResumingEventArgs args)
             {
                 //
                 // The host is telling us we're being resumed. At this point, code will
@@ -514,7 +512,7 @@ namespace System.Reactive.Concurrency
                 {
                     if (_runState == Suspended)
                     {
-                        _inactiveTime += _stopwatch.Elapsed - _suspendedAt;
+                        _inactiveTime += _stopwatch!.Elapsed - _suspendedAt; // NB: Non-null when >= Running.
                         _runState = Running;
 
                         if (!Environment.HasShutdownStarted)
@@ -557,7 +555,7 @@ namespace System.Reactive.Concurrency
 
             private TState _state;
             private int _pendingTickCount;
-            private IDisposable _cancel;
+            private IDisposable? _cancel;
 
             public IDisposable Start()
             {
@@ -610,7 +608,7 @@ namespace System.Reactive.Concurrency
                         }
                         catch (Exception e)
                         {
-                            _cancel.Dispose();
+                            _cancel!.Dispose(); // NB: Non-null after Start is called.
                             e.Throw();
                         }
 

+ 4 - 6
Rx.NET/Source/src/System.Reactive/Concurrency/Scheduler.Services.cs

@@ -2,8 +2,6 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-#nullable disable
-
 namespace System.Reactive.Concurrency
 {
     //
@@ -28,7 +26,7 @@ namespace System.Reactive.Concurrency
         /// This helper method is made available for query operator authors in order to discover scheduler services by using the required
         /// IServiceProvider pattern, which allows for interception or redefinition of scheduler services.
         /// </remarks>
-        public static ISchedulerLongRunning AsLongRunning(this IScheduler scheduler) => As<ISchedulerLongRunning>(scheduler);
+        public static ISchedulerLongRunning? AsLongRunning(this IScheduler scheduler) => As<ISchedulerLongRunning>(scheduler);
 
         /// <summary>
         /// Returns the <see cref="IStopwatchProvider"/> implementation of the specified scheduler, or <c>null</c> if no such implementation is available.
@@ -46,7 +44,7 @@ namespace System.Reactive.Concurrency
         /// scheduler service, where the caller falls back to not using stopwatches if this facility wasn't found.
         /// </para>
         /// </remarks>
-        public static IStopwatchProvider AsStopwatchProvider(this IScheduler scheduler) => As<IStopwatchProvider>(scheduler);
+        public static IStopwatchProvider? AsStopwatchProvider(this IScheduler scheduler) => As<IStopwatchProvider>(scheduler);
 
         /// <summary>
         /// Returns the <see cref="ISchedulerPeriodic"/> implementation of the specified scheduler, or <c>null</c> if no such implementation is available.
@@ -65,9 +63,9 @@ namespace System.Reactive.Concurrency
         /// facility wasn't found.
         /// </para>
         /// </remarks>
-        public static ISchedulerPeriodic AsPeriodic(this IScheduler scheduler) => As<ISchedulerPeriodic>(scheduler);
+        public static ISchedulerPeriodic? AsPeriodic(this IScheduler scheduler) => As<ISchedulerPeriodic>(scheduler);
 
-        private static T As<T>(IScheduler scheduler)
+        private static T? As<T>(IScheduler scheduler)
             where T : class
         {
             if (scheduler is IServiceProvider svc)

+ 0 - 2
Rx.NET/Source/src/System.Reactive/Concurrency/Scheduler.Simple.cs

@@ -2,8 +2,6 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-#nullable disable
-
 using System.Reactive.Disposables;
 
 namespace System.Reactive.Concurrency

+ 0 - 2
Rx.NET/Source/src/System.Reactive/Concurrency/Scheduler.cs

@@ -2,8 +2,6 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-#nullable disable
-
 using System.Globalization;
 using System.Reactive.PlatformServices;
 

+ 2 - 4
Rx.NET/Source/src/System.Reactive/Concurrency/SchedulerWrapper.cs

@@ -2,8 +2,6 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-#nullable disable
-
 using System.Runtime.CompilerServices;
 
 namespace System.Reactive.Concurrency
@@ -69,7 +67,7 @@ namespace System.Reactive.Concurrency
 
         protected abstract SchedulerWrapper Clone(IScheduler scheduler, ConditionalWeakTable<IScheduler, IScheduler> cache);
 
-        public object GetService(Type serviceType)
+        public object? GetService(Type serviceType)
         {
             if (!(_scheduler is IServiceProvider serviceProvider))
             {
@@ -84,6 +82,6 @@ namespace System.Reactive.Concurrency
             return serviceProvider.GetService(serviceType);
         }
 
-        protected abstract bool TryGetService(IServiceProvider provider, Type serviceType, out object service);
+        protected abstract bool TryGetService(IServiceProvider provider, Type serviceType, out object? service);
     }
 }

+ 0 - 2
Rx.NET/Source/src/System.Reactive/Concurrency/SynchronizationContextScheduler.cs

@@ -2,8 +2,6 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-#nullable disable
-
 using System.Reactive.Disposables;
 using System.Threading;
 

+ 7 - 9
Rx.NET/Source/src/System.Reactive/Concurrency/TaskPoolScheduler.cs

@@ -2,8 +2,6 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-#nullable disable
-
 using System.Reactive.Disposables;
 using System.Threading;
 using System.Threading.Tasks;
@@ -37,7 +35,7 @@ namespace System.Reactive.Concurrency
                 scheduler._taskFactory.StartNew(
                     thisObject =>
                     {
-                        var @this = (ScheduledWorkItem<TState>)thisObject;
+                        var @this = (ScheduledWorkItem<TState>)thisObject!;
                         //
                         // BREAKING CHANGE v2.0 > v1.x - No longer escalating exceptions using a throwing
                         //                               helper thread.
@@ -94,7 +92,7 @@ namespace System.Reactive.Concurrency
                 TaskHelpers.Delay(dueTime, ct.Token).ContinueWith(
                     (_, thisObject) =>
                     {
-                        var @this = (SlowlyScheduledWorkItem<TState>)thisObject;
+                        var @this = (SlowlyScheduledWorkItem<TState>)thisObject!;
 
                         if (!Disposable.GetIsDisposed(ref @this._cancel))
                         {
@@ -104,7 +102,7 @@ namespace System.Reactive.Concurrency
                     this,
                     CancellationToken.None,
                     TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnRanToCompletion,
-                    scheduler._taskFactory.Scheduler);
+                    scheduler._taskFactory.Scheduler ?? TaskScheduler.Default);
             }
 
             public void Dispose()
@@ -118,7 +116,7 @@ namespace System.Reactive.Concurrency
             private readonly TState _state;
             private readonly Action<TState, ICancelable> _action;
 
-            private IDisposable _cancel;
+            private IDisposable? _cancel;
 
             public LongScheduledWorkItem(TaskPoolScheduler scheduler, TState state, Action<TState, ICancelable> action)
             {
@@ -128,7 +126,7 @@ namespace System.Reactive.Concurrency
                 scheduler._taskFactory.StartNew(
                     thisObject =>
                     {
-                        var @this = (LongScheduledWorkItem<TState>)thisObject;
+                        var @this = (LongScheduledWorkItem<TState>)thisObject!;
 
                         //
                         // Notice we don't check _cancel.IsDisposed. The contract for ISchedulerLongRunning
@@ -298,7 +296,7 @@ namespace System.Reactive.Concurrency
                 TaskHelpers.Delay(_period, _cts.Token).ContinueWith(
                     static (_, thisObject) =>
                     {
-                        var @this = (PeriodicallyScheduledWorkItem<TState>)thisObject;
+                        var @this = (PeriodicallyScheduledWorkItem<TState>)thisObject!;
 
                         @this.MoveNext();
 
@@ -309,7 +307,7 @@ namespace System.Reactive.Concurrency
                     this,
                     CancellationToken.None,
                     TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnRanToCompletion,
-                    _taskFactory.Scheduler
+                    _taskFactory.Scheduler ?? TaskScheduler.Default
                 );
             }
         }

+ 4 - 6
Rx.NET/Source/src/System.Reactive/Concurrency/UserWorkItem.cs

@@ -1,17 +1,15 @@
-// Licensed to the .NET Foundation under one or more agreements.
+// Licensed to the .NET Foundation under one or more agreements.
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information.
 
-#nullable disable
-
 using System.Reactive.Disposables;
 
 namespace System.Reactive.Concurrency
 {
     internal sealed class UserWorkItem<TState> : IDisposable
     {
-        private IDisposable _cancelRunDisposable;
-        private IDisposable _cancelQueueDisposable;
+        private IDisposable? _cancelRunDisposable;
+        private IDisposable? _cancelQueueDisposable;
 
         private readonly TState _state;
         private readonly IScheduler _scheduler;
@@ -44,4 +42,4 @@ namespace System.Reactive.Concurrency
             Disposable.TryDispose(ref _cancelRunDisposable);
         }
     }
-}
+}

+ 12 - 9
Rx.NET/Source/src/System.Reactive/Concurrency/VirtualTimeScheduler.cs

@@ -2,8 +2,6 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-#nullable disable
-
 using System.Collections.Generic;
 using System.Globalization;
 
@@ -21,8 +19,13 @@ namespace System.Reactive.Concurrency
         /// Creates a new virtual time scheduler with the default value of TAbsolute as the initial clock value.
         /// </summary>
         protected VirtualTimeSchedulerBase()
-            : this(default, Comparer<TAbsolute>.Default)
+            : this(default!, Comparer<TAbsolute>.Default)
         {
+            //
+            // NB: We allow a default value for TAbsolute here, which typically is a struct. For compat reasons, we can't
+            //     add a generic constraint (either struct or, better, new()), and maybe a derived class has handled null
+            //     in all abstract methods.
+            //
         }
 
         /// <summary>
@@ -307,9 +310,9 @@ namespace System.Reactive.Concurrency
         /// </summary>
         /// <returns>The next scheduled item.</returns>
         [Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1024:UsePropertiesWhereAppropriate", Justification = "By design. Side-effecting operation to retrieve the next element.")]
-        protected abstract IScheduledItem<TAbsolute> GetNext();
+        protected abstract IScheduledItem<TAbsolute>? GetNext();
 
-        object IServiceProvider.GetService(Type serviceType) => GetService(serviceType);
+        object? IServiceProvider.GetService(Type serviceType) => GetService(serviceType);
 
         /// <summary>
         /// Discovers scheduler services by interface type. The base class implementation supports
@@ -318,7 +321,7 @@ namespace System.Reactive.Concurrency
         /// </summary>
         /// <param name="serviceType">Scheduler service interface type to discover.</param>
         /// <returns>Object implementing the requested service, if available; null otherwise.</returns>
-        protected virtual object GetService(Type serviceType)
+        protected virtual object? GetService(Type serviceType)
         {
             if (serviceType == typeof(IStopwatchProvider))
             {
@@ -387,7 +390,7 @@ namespace System.Reactive.Concurrency
         /// Gets the next scheduled item to be executed.
         /// </summary>
         /// <returns>The next scheduled item.</returns>
-        protected override IScheduledItem<TAbsolute> GetNext()
+        protected override IScheduledItem<TAbsolute>? GetNext()
         {
             lock (_queue)
             {
@@ -424,13 +427,13 @@ namespace System.Reactive.Concurrency
                 throw new ArgumentNullException(nameof(action));
             }
 
-            ScheduledItem<TAbsolute, TState> si = null;
+            ScheduledItem<TAbsolute, TState>? si = null;
 
             var run = new Func<IScheduler, TState, IDisposable>((scheduler, state1) =>
             {
                 lock (_queue)
                 {
-                    _queue.Remove(si);
+                    _queue.Remove(si!); // NB: Assigned before function is invoked.
                 }
 
                 return action(scheduler, state1);

+ 1 - 1
Rx.NET/Source/src/System.Reactive/Internal/ScheduledObserver.cs

@@ -68,7 +68,7 @@ namespace System.Reactive
                 {
                     if (_dispatcherJob == null)
                     {
-                        _dispatcherJob = _longRunning.ScheduleLongRunning(Dispatch);
+                        _dispatcherJob = _longRunning!.ScheduleLongRunning(Dispatch); // NB: Only reachable when long-running.
 
                         Disposable.TrySetSerial(ref _disposable, StableCompositeDisposable.Create
                         (

+ 4 - 0
Rx.NET/Source/src/System.Reactive/System.Reactive.csproj

@@ -181,4 +181,8 @@
     <None Include="build\System.Reactive.targets" PackagePath="build\netcoreapp3.1" Pack="true" />
   </ItemGroup>
 
+  <ItemGroup>
+    <Service Include="{508349b6-6b84-4df5-91f0-309beebad82d}" />
+  </ItemGroup>
+
 </Project>

+ 4 - 4
Rx.NET/Source/src/System.Reactive/Threading/Tasks/TaskObservableExtensions.cs

@@ -44,10 +44,10 @@ namespace System.Reactive.Threading.Tasks
                 else
                 {
                     _task.ContinueWithState(
-                        static (task, tuple) => tuple.@this._scheduler.ScheduleAction(
+                        static (task, tuple) => tuple.scheduler.ScheduleAction(
                             (task, tuple.observer),
                             static tuple2 => tuple2.task.EmitTaskResult(tuple2.observer)),
-                        (@this: this, observer),
+                        (scheduler: _scheduler, observer),
                         cts.Token,
                         options);
                 }
@@ -84,10 +84,10 @@ namespace System.Reactive.Threading.Tasks
                 else
                 {
                     _task.ContinueWithState(
-                        static (task, tuple) => tuple.@this._scheduler.ScheduleAction(
+                        static (task, tuple) => tuple.scheduler.ScheduleAction(
                             (task, tuple.observer),
                             static tuple2 => tuple2.task.EmitTaskResult(tuple2.observer)),
-                        (@this: this, observer),
+                        (scheduler: _scheduler, observer),
                         cts.Token,
                         options);
                 }

+ 13 - 13
Rx.NET/Source/tests/Tests.System.Reactive.ApiApprovals/Api/ApiApprovalTests.Core.verified.cs

@@ -253,7 +253,7 @@ namespace System.Reactive.Concurrency
     public sealed class DefaultScheduler : System.Reactive.Concurrency.LocalScheduler, System.Reactive.Concurrency.ISchedulerPeriodic
     {
         public static System.Reactive.Concurrency.DefaultScheduler Instance { get; }
-        protected override object GetService(System.Type serviceType) { }
+        protected override object? GetService(System.Type serviceType) { }
         public override System.IDisposable Schedule<TState>(TState state, System.Func<System.Reactive.Concurrency.IScheduler, TState, System.IDisposable> action) { }
         public override System.IDisposable Schedule<TState>(TState state, System.TimeSpan dueTime, System.Func<System.Reactive.Concurrency.IScheduler, TState, System.IDisposable> action) { }
         public System.IDisposable SchedulePeriodic<TState>(TState state, System.TimeSpan period, System.Func<TState, TState> action) { }
@@ -287,7 +287,7 @@ namespace System.Reactive.Concurrency
         public HistoricalScheduler() { }
         public HistoricalScheduler(System.DateTimeOffset initialClock) { }
         public HistoricalScheduler(System.DateTimeOffset initialClock, System.Collections.Generic.IComparer<System.DateTimeOffset> comparer) { }
-        protected override System.Reactive.Concurrency.IScheduledItem<System.DateTimeOffset> GetNext() { }
+        protected override System.Reactive.Concurrency.IScheduledItem<System.DateTimeOffset>? GetNext() { }
         public override System.IDisposable ScheduleAbsolute<TState>(TState state, System.DateTimeOffset dueTime, System.Func<System.Reactive.Concurrency.IScheduler, TState, System.IDisposable> action) { }
     }
     public abstract class HistoricalSchedulerBase : System.Reactive.Concurrency.VirtualTimeSchedulerBase<System.DateTimeOffset, System.TimeSpan>
@@ -347,7 +347,7 @@ namespace System.Reactive.Concurrency
     {
         protected LocalScheduler() { }
         public virtual System.DateTimeOffset Now { get; }
-        protected virtual object GetService(System.Type serviceType) { }
+        protected virtual object? GetService(System.Type serviceType) { }
         public virtual System.IDisposable Schedule<TState>(TState state, System.Func<System.Reactive.Concurrency.IScheduler, TState, System.IDisposable> action) { }
         public virtual System.IDisposable Schedule<TState>(TState state, System.DateTimeOffset dueTime, System.Func<System.Reactive.Concurrency.IScheduler, TState, System.IDisposable> action) { }
         public abstract System.IDisposable Schedule<TState>(TState state, System.TimeSpan dueTime, System.Func<System.Reactive.Concurrency.IScheduler, TState, System.IDisposable> action);
@@ -370,15 +370,15 @@ namespace System.Reactive.Concurrency
         public TAbsolute DueTime { get; }
         public bool IsCanceled { get; }
         public void Cancel() { }
-        public int CompareTo(System.Reactive.Concurrency.ScheduledItem<TAbsolute> other) { }
-        public override bool Equals(object obj) { }
+        public int CompareTo(System.Reactive.Concurrency.ScheduledItem<TAbsolute>? other) { }
+        public override bool Equals(object? obj) { }
         public override int GetHashCode() { }
         public void Invoke() { }
         protected abstract System.IDisposable InvokeCore();
-        public static bool operator !=(System.Reactive.Concurrency.ScheduledItem<TAbsolute> left, System.Reactive.Concurrency.ScheduledItem<TAbsolute> right) { }
+        public static bool operator !=(System.Reactive.Concurrency.ScheduledItem<TAbsolute>? left, System.Reactive.Concurrency.ScheduledItem<TAbsolute>? right) { }
         public static bool operator <(System.Reactive.Concurrency.ScheduledItem<TAbsolute> left, System.Reactive.Concurrency.ScheduledItem<TAbsolute> right) { }
         public static bool operator <=(System.Reactive.Concurrency.ScheduledItem<TAbsolute> left, System.Reactive.Concurrency.ScheduledItem<TAbsolute> right) { }
-        public static bool operator ==(System.Reactive.Concurrency.ScheduledItem<TAbsolute> left, System.Reactive.Concurrency.ScheduledItem<TAbsolute> right) { }
+        public static bool operator ==(System.Reactive.Concurrency.ScheduledItem<TAbsolute>? left, System.Reactive.Concurrency.ScheduledItem<TAbsolute>? right) { }
         public static bool operator >(System.Reactive.Concurrency.ScheduledItem<TAbsolute> left, System.Reactive.Concurrency.ScheduledItem<TAbsolute> right) { }
         public static bool operator >=(System.Reactive.Concurrency.ScheduledItem<TAbsolute> left, System.Reactive.Concurrency.ScheduledItem<TAbsolute> right) { }
     }
@@ -405,9 +405,9 @@ namespace System.Reactive.Concurrency
         public static System.Reactive.Concurrency.IScheduler TaskPool { get; }
         [System.Obsolete(@"This property is no longer supported due to refactoring of the API surface and elimination of platform-specific dependencies. Consider using Scheduler.Default to obtain the platform's most appropriate pool-based scheduler. In order to access a specific pool-based scheduler, please add a reference to the System.Reactive.PlatformServices assembly for your target platform and use the appropriate scheduler in the System.Reactive.Concurrency namespace.")]
         public static System.Reactive.Concurrency.IScheduler ThreadPool { get; }
-        public static System.Reactive.Concurrency.ISchedulerLongRunning AsLongRunning(this System.Reactive.Concurrency.IScheduler scheduler) { }
-        public static System.Reactive.Concurrency.ISchedulerPeriodic AsPeriodic(this System.Reactive.Concurrency.IScheduler scheduler) { }
-        public static System.Reactive.Concurrency.IStopwatchProvider AsStopwatchProvider(this System.Reactive.Concurrency.IScheduler scheduler) { }
+        public static System.Reactive.Concurrency.ISchedulerLongRunning? AsLongRunning(this System.Reactive.Concurrency.IScheduler scheduler) { }
+        public static System.Reactive.Concurrency.ISchedulerPeriodic? AsPeriodic(this System.Reactive.Concurrency.IScheduler scheduler) { }
+        public static System.Reactive.Concurrency.IStopwatchProvider? AsStopwatchProvider(this System.Reactive.Concurrency.IScheduler scheduler) { }
         public static System.Reactive.Concurrency.IScheduler Catch<TException>(this System.Reactive.Concurrency.IScheduler scheduler, System.Func<TException, bool> handler)
             where TException : System.Exception { }
         public static System.Reactive.Concurrency.IScheduler DisableOptimizations(this System.Reactive.Concurrency.IScheduler scheduler) { }
@@ -515,8 +515,8 @@ namespace System.Reactive.Concurrency
         protected abstract TAbsolute Add(TAbsolute absolute, TRelative relative);
         public void AdvanceBy(TRelative time) { }
         public void AdvanceTo(TAbsolute time) { }
-        protected abstract System.Reactive.Concurrency.IScheduledItem<TAbsolute> GetNext();
-        protected virtual object GetService(System.Type serviceType) { }
+        protected abstract System.Reactive.Concurrency.IScheduledItem<TAbsolute>? GetNext();
+        protected virtual object? GetService(System.Type serviceType) { }
         public System.IDisposable Schedule<TState>(TState state, System.Func<System.Reactive.Concurrency.IScheduler, TState, System.IDisposable> action) { }
         public System.IDisposable Schedule<TState>(TState state, System.DateTimeOffset dueTime, System.Func<System.Reactive.Concurrency.IScheduler, TState, System.IDisposable> action) { }
         public System.IDisposable Schedule<TState>(TState state, System.TimeSpan dueTime, System.Func<System.Reactive.Concurrency.IScheduler, TState, System.IDisposable> action) { }
@@ -541,7 +541,7 @@ namespace System.Reactive.Concurrency
     {
         protected VirtualTimeScheduler() { }
         protected VirtualTimeScheduler(TAbsolute initialClock, System.Collections.Generic.IComparer<TAbsolute> comparer) { }
-        protected override System.Reactive.Concurrency.IScheduledItem<TAbsolute> GetNext() { }
+        protected override System.Reactive.Concurrency.IScheduledItem<TAbsolute>? GetNext() { }
         public override System.IDisposable ScheduleAbsolute<TState>(TState state, TAbsolute dueTime, System.Func<System.Reactive.Concurrency.IScheduler, TState, System.IDisposable> action) { }
     }
 }