Sfoglia il codice sorgente

Nullable for TaskObservableExtensions.

Bart De Smet 5 anni fa
parent
commit
84015bd5a7

+ 13 - 2
Rx.NET/Source/src/System.Reactive/Concurrency/TaskHelpers.cs

@@ -2,8 +2,7 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-#nullable disable
-
+using System.Diagnostics;
 using System.Threading;
 using System.Threading.Tasks;
 
@@ -29,5 +28,17 @@ namespace System.Reactive.Concurrency
 
             return Task.Delay(delay, token);
         }
+
+        public static Exception GetSingleException(this Task t)
+        {
+            Debug.Assert(t.IsFaulted && t.Exception != null);
+
+            if (t.Exception.InnerException != null)
+            {
+                return t.Exception.InnerException;
+            }
+
+            return t.Exception;
+        }
     }
 }

+ 0 - 2
Rx.NET/Source/src/System.Reactive/Linq/Observable/Throw.cs

@@ -2,8 +2,6 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-#nullable disable
-
 using System.Reactive.Concurrency;
 using System.Reactive.Disposables;
 

+ 23 - 25
Rx.NET/Source/src/System.Reactive/Threading/Tasks/TaskObservableExtensions.cs

@@ -2,8 +2,6 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-#nullable disable
-
 using System.Reactive.Concurrency;
 using System.Reactive.Disposables;
 using System.Reactive.Linq;
@@ -21,9 +19,9 @@ namespace System.Reactive.Threading.Tasks
         private sealed class SlowTaskObservable : IObservable<Unit>
         {
             private readonly Task _task;
-            private readonly IScheduler _scheduler;
+            private readonly IScheduler? _scheduler;
 
-            public SlowTaskObservable(Task task, IScheduler scheduler)
+            public SlowTaskObservable(Task task, IScheduler? scheduler)
             {
                 _task = task;
                 _scheduler = scheduler;
@@ -41,7 +39,7 @@ namespace System.Reactive.Threading.Tasks
 
                 if (_scheduler == null)
                 {
-                    _task.ContinueWith(static (t, subjectObject) => t.EmitTaskResult((IObserver<Unit>)subjectObject), observer, cts.Token, options, TaskScheduler.Current);
+                    _task.ContinueWith(static (t, subjectObject) => t.EmitTaskResult((IObserver<Unit>)subjectObject!), observer, cts.Token, options, TaskScheduler.Current);
                 }
                 else
                 {
@@ -61,9 +59,9 @@ namespace System.Reactive.Threading.Tasks
         private sealed class SlowTaskObservable<TResult> : IObservable<TResult>
         {
             private readonly Task<TResult> _task;
-            private readonly IScheduler _scheduler;
+            private readonly IScheduler? _scheduler;
 
-            public SlowTaskObservable(Task<TResult> task, IScheduler scheduler)
+            public SlowTaskObservable(Task<TResult> task, IScheduler? scheduler)
             {
                 _task = task;
                 _scheduler = scheduler;
@@ -81,7 +79,7 @@ namespace System.Reactive.Threading.Tasks
 
                 if (_scheduler == null)
                 {
-                    _task.ContinueWith(static (t, subjectObject) => t.EmitTaskResult((IObserver<TResult>)subjectObject), observer, cts.Token, options, TaskScheduler.Current);
+                    _task.ContinueWith(static (t, subjectObject) => t.EmitTaskResult((IObserver<TResult>)subjectObject!), observer, cts.Token, options, TaskScheduler.Current);
                 }
                 else
                 {
@@ -137,7 +135,7 @@ namespace System.Reactive.Threading.Tasks
             return ToObservableImpl(task, scheduler);
         }
 
-        private static IObservable<Unit> ToObservableImpl(Task task, IScheduler scheduler)
+        private static IObservable<Unit> ToObservableImpl(Task task, IScheduler? scheduler)
         {
             if (task.IsCompleted)
             {
@@ -145,7 +143,7 @@ namespace System.Reactive.Threading.Tasks
 
                 return task.Status switch
                 {
-                    TaskStatus.Faulted => new Throw<Unit>(task.Exception.InnerException, scheduler),
+                    TaskStatus.Faulted => new Throw<Unit>(task.GetSingleException(), scheduler),
                     TaskStatus.Canceled => new Throw<Unit>(new TaskCanceledException(task), scheduler),
                     _ => new Return<Unit>(Unit.Default, scheduler)
                 };
@@ -163,7 +161,7 @@ namespace System.Reactive.Threading.Tasks
                     subject.OnCompleted();
                     break;
                 case TaskStatus.Faulted:
-                    subject.OnError(task.Exception.InnerException);
+                    subject.OnError(task.GetSingleException());
                     break;
                 case TaskStatus.Canceled:
                     subject.OnError(new TaskCanceledException(task));
@@ -182,7 +180,7 @@ namespace System.Reactive.Threading.Tasks
             var cts = new CancellationDisposable();
 
             task.ContinueWith(
-                static (t, observerObject) => t.EmitTaskResult((IObserver<Unit>)observerObject), 
+                static (t, observerObject) => t.EmitTaskResult((IObserver<Unit>)observerObject!), 
                 observer, 
                 cts.Token, 
                 TaskContinuationOptions.ExecuteSynchronously, 
@@ -233,7 +231,7 @@ namespace System.Reactive.Threading.Tasks
             return ToObservableImpl(task, scheduler);
         }
 
-        private static IObservable<TResult> ToObservableImpl<TResult>(Task<TResult> task, IScheduler scheduler)
+        private static IObservable<TResult> ToObservableImpl<TResult>(Task<TResult> task, IScheduler? scheduler)
         {
             if (task.IsCompleted)
             {
@@ -241,7 +239,7 @@ namespace System.Reactive.Threading.Tasks
 
                 return task.Status switch
                 {
-                    TaskStatus.Faulted => new Throw<TResult>(task.Exception.InnerException, scheduler),
+                    TaskStatus.Faulted => new Throw<TResult>(task.GetSingleException(), scheduler),
                     TaskStatus.Canceled => new Throw<TResult>(new TaskCanceledException(task), scheduler),
                     _ => new Return<TResult>(task.Result, scheduler)
                 };
@@ -259,7 +257,7 @@ namespace System.Reactive.Threading.Tasks
                     subject.OnCompleted();
                     break;
                 case TaskStatus.Faulted:
-                    subject.OnError(task.Exception.InnerException);
+                    subject.OnError(task.GetSingleException());
                     break;
                 case TaskStatus.Canceled:
                     subject.OnError(new TaskCanceledException(task));
@@ -267,7 +265,7 @@ namespace System.Reactive.Threading.Tasks
             }
         }
 
-        private static TaskContinuationOptions GetTaskContinuationOptions(IScheduler scheduler)
+        private static TaskContinuationOptions GetTaskContinuationOptions(IScheduler? scheduler)
         {
             var options = TaskContinuationOptions.None;
 
@@ -300,7 +298,7 @@ namespace System.Reactive.Threading.Tasks
             var cts = new CancellationDisposable();
 
             task.ContinueWith(
-                static (t, observerObject) => t.EmitTaskResult((IObserver<TResult>)observerObject), 
+                static (t, observerObject) => t.EmitTaskResult((IObserver<TResult>)observerObject!), 
                 observer, 
                 cts.Token, 
                 TaskContinuationOptions.ExecuteSynchronously, 
@@ -348,7 +346,7 @@ namespace System.Reactive.Threading.Tasks
         /// <param name="state">The state to use as the underlying task's AsyncState.</param>
         /// <returns>A task that will receive the last element or the exception produced by the observable sequence.</returns>
         /// <exception cref="ArgumentNullException"><paramref name="observable"/> is <c>null</c>.</exception>
-        public static Task<TResult> ToTask<TResult>(this IObservable<TResult> observable, object state)
+        public static Task<TResult> ToTask<TResult>(this IObservable<TResult> observable, object? state)
         {
             if (observable == null)
             {
@@ -367,7 +365,7 @@ namespace System.Reactive.Threading.Tasks
         /// <param name="scheduler">The scheduler used for overriding where the task completion signals will be issued.</param>
         /// <returns>A task that will receive the last element or the exception produced by the observable sequence.</returns>
         /// <exception cref="ArgumentNullException"><paramref name="observable"/> or <paramref name="scheduler"/> is <c>null</c>.</exception>
-        public static Task<TResult> ToTask<TResult>(this IObservable<TResult> observable, object state, IScheduler scheduler)
+        public static Task<TResult> ToTask<TResult>(this IObservable<TResult> observable, object? state, IScheduler scheduler)
         {
             return observable.ToTask(new CancellationToken(), state).ContinueOnScheduler(scheduler);
         }
@@ -414,7 +412,7 @@ namespace System.Reactive.Threading.Tasks
             task.ContinueWith(
                 static (t, o) =>
                 {
-                    var (scheduler, tcs) = ((IScheduler, TaskCompletionSource<TResult>))o;
+                    var (scheduler, tcs) = ((IScheduler, TaskCompletionSource<TResult>))o!;
 
                     scheduler.ScheduleAction((t, tcs), static state =>
                     {
@@ -424,7 +422,7 @@ namespace System.Reactive.Threading.Tasks
                         }
                         else if (state.t.IsFaulted)
                         {
-                            state.tcs.TrySetException(state.t.Exception.InnerExceptions);
+                            state.tcs.TrySetException(state.t.GetSingleException());
                         }
                         else
                         {
@@ -444,7 +442,7 @@ namespace System.Reactive.Threading.Tasks
             private readonly CancellationTokenRegistration _ctr;
 
             private bool _hasValue;
-            private TResult _lastValue;
+            private TResult? _lastValue;
 
             public ToTaskObserver(TaskCompletionSource<TResult> tcs, CancellationToken ct)
             {
@@ -453,7 +451,7 @@ namespace System.Reactive.Threading.Tasks
 
                 if (ct.CanBeCanceled)
                 {
-                    _ctr = ct.Register(static @this => ((ToTaskObserver<TResult>)@this).Cancel(), this);
+                    _ctr = ct.Register(static @this => ((ToTaskObserver<TResult>)@this!).Cancel(), this);
                 }
             }
 
@@ -475,7 +473,7 @@ namespace System.Reactive.Threading.Tasks
             {
                 if (_hasValue)
                 {
-                    _tcs.TrySetResult(_lastValue);
+                    _tcs.TrySetResult(_lastValue!);
                 }
                 else
                 {
@@ -509,7 +507,7 @@ namespace System.Reactive.Threading.Tasks
         /// <param name="state">The state to use as the underlying task's <see cref="Task.AsyncState"/>.</param>
         /// <returns>A task that will receive the last element or the exception produced by the observable sequence.</returns>
         /// <exception cref="ArgumentNullException"><paramref name="observable"/> is <c>null</c>.</exception>
-        public static Task<TResult> ToTask<TResult>(this IObservable<TResult> observable, CancellationToken cancellationToken, object state)
+        public static Task<TResult> ToTask<TResult>(this IObservable<TResult> observable, CancellationToken cancellationToken, object? state)
         {
             if (observable == null)
             {