Bladeren bron

Merge pull request #1107 from akarnokd/ToTaskScheduler

Add ToTask overloads with IScheduler
Daniel C. Weber 5 jaren geleden
bovenliggende
commit
7d680cbba6

+ 85 - 0
Rx.NET/Source/src/System.Reactive/Threading/Tasks/TaskObservableExtensions.cs

@@ -327,6 +327,20 @@ namespace System.Reactive.Threading.Tasks
             return observable.ToTask(new CancellationToken(), state: null);
         }
 
+
+        /// <summary>
+        /// Returns a task that will receive the last value or the exception produced by the observable sequence.
+        /// </summary>
+        /// <typeparam name="TResult">The type of the elements in the source sequence.</typeparam>
+        /// <param name="observable">Observable sequence to convert to a task.</param>
+        /// <param name="scheduler">The scheduler used for overriding where the task completion signals will be issued.</param>
+        /// <returns>A task that will receive the last element or the exception produced by the observable sequence.</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="observable"/> or <paramref name="scheduler"/> is <c>null</c>.</exception>
+        public static Task<TResult> ToTask<TResult>(this IObservable<TResult> observable, IScheduler scheduler)
+        {
+            return observable.ToTask().ContinueOnScheduler(scheduler);
+        }
+
         /// <summary>
         /// Returns a task that will receive the last value or the exception produced by the observable sequence.
         /// </summary>
@@ -345,6 +359,20 @@ namespace System.Reactive.Threading.Tasks
             return observable.ToTask(new CancellationToken(), state);
         }
 
+        /// <summary>
+        /// Returns a task that will receive the last value or the exception produced by the observable sequence.
+        /// </summary>
+        /// <typeparam name="TResult">The type of the elements in the source sequence.</typeparam>
+        /// <param name="observable">Observable sequence to convert to a task.</param>
+        /// <param name="state">The state to use as the underlying task's AsyncState.</param>
+        /// <param name="scheduler">The scheduler used for overriding where the task completion signals will be issued.</param>
+        /// <returns>A task that will receive the last element or the exception produced by the observable sequence.</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="observable"/> or <paramref name="scheduler"/> is <c>null</c>.</exception>
+        public static Task<TResult> ToTask<TResult>(this IObservable<TResult> observable, object state, IScheduler scheduler)
+        {
+            return observable.ToTask(new CancellationToken(), state).ContinueOnScheduler(scheduler);
+        }
+
         /// <summary>
         /// Returns a task that will receive the last value or the exception produced by the observable sequence.
         /// </summary>
@@ -363,6 +391,48 @@ namespace System.Reactive.Threading.Tasks
             return observable.ToTask(cancellationToken, state: null);
         }
 
+        /// <summary>
+        /// Returns a task that will receive the last value or the exception produced by the observable sequence.
+        /// </summary>
+        /// <typeparam name="TResult">The type of the elements in the source sequence.</typeparam>
+        /// <param name="observable">Observable sequence to convert to a task.</param>
+        /// <param name="cancellationToken">Cancellation token that can be used to cancel the task, causing unsubscription from the observable sequence.</param>
+        /// <param name="scheduler">The scheduler used for overriding where the task completion signals will be issued.</param>
+        /// <returns>A task that will receive the last element or the exception produced by the observable sequence.</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="observable"/> or <paramref name="scheduler"/> is <c>null</c>.</exception>
+        public static Task<TResult> ToTask<TResult>(this IObservable<TResult> observable, CancellationToken cancellationToken, IScheduler scheduler)
+        {
+            return observable.ToTask(cancellationToken, state: null).ContinueOnScheduler(scheduler);
+        }
+
+        internal static Task<TResult> ContinueOnScheduler<TResult>(this Task<TResult> task, IScheduler scheduler)
+        {
+            if (scheduler == null)
+            {
+                throw new ArgumentNullException(nameof(scheduler));
+            }
+            var tcs = new TaskCompletionSource<TResult>(task.AsyncState);
+            task.ContinueWith(t =>
+            {
+                scheduler.Schedule(() =>
+                {
+                    if (t.IsCanceled)
+                    {
+                        tcs.TrySetCanceled(new TaskCanceledException(t).CancellationToken);
+                    }
+                    else if (t.IsFaulted)
+                    {
+                        tcs.TrySetException(t.Exception.InnerExceptions);
+                    }
+                    else
+                    {
+                        tcs.TrySetResult(t.Result);
+                    }
+                });
+            }, TaskContinuationOptions.ExecuteSynchronously);
+            return tcs.Task;
+        }
+
         private sealed class ToTaskObserver<TResult> : SafeObserver<TResult>
         {
             private readonly CancellationToken _ct;
@@ -466,5 +536,20 @@ namespace System.Reactive.Threading.Tasks
 
             return tcs.Task;
         }
+
+        /// <summary>
+        /// Returns a task that will receive the last value or the exception produced by the observable sequence.
+        /// </summary>
+        /// <typeparam name="TResult">The type of the elements in the source sequence.</typeparam>
+        /// <param name="observable">Observable sequence to convert to a task.</param>
+        /// <param name="cancellationToken">Cancellation token that can be used to cancel the task, causing unsubscription from the observable sequence.</param>
+        /// <param name="state">The state to use as the underlying task's <see cref="Task.AsyncState"/>.</param>
+        /// <param name="scheduler">The scheduler used for overriding where the task completion signals will be issued.</param>
+        /// <returns>A task that will receive the last element or the exception produced by the observable sequence.</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="observable"/> or <paramref name="scheduler"/> is <c>null</c>.</exception>
+        public static Task<TResult> ToTask<TResult>(this IObservable<TResult> observable, CancellationToken cancellationToken, object state, IScheduler scheduler)
+        {
+            return observable.ToTask(cancellationToken, state).ContinueOnScheduler(scheduler);
+        }
     }
 }

+ 4 - 0
Rx.NET/Source/tests/Tests.System.Reactive.ApiApprovals/Api/ApiApprovalTests.Core.approved.cs

@@ -2527,8 +2527,12 @@ namespace System.Reactive.Threading.Tasks
         public static System.IObservable<TResult> ToObservable<TResult>(this System.Threading.Tasks.Task<TResult> task, System.Reactive.Concurrency.IScheduler scheduler) { }
         public static System.Threading.Tasks.Task<TResult> ToTask<TResult>(this System.IObservable<TResult> observable) { }
         public static System.Threading.Tasks.Task<TResult> ToTask<TResult>(this System.IObservable<TResult> observable, object state) { }
+        public static System.Threading.Tasks.Task<TResult> ToTask<TResult>(this System.IObservable<TResult> observable, System.Reactive.Concurrency.IScheduler scheduler) { }
         public static System.Threading.Tasks.Task<TResult> ToTask<TResult>(this System.IObservable<TResult> observable, System.Threading.CancellationToken cancellationToken) { }
+        public static System.Threading.Tasks.Task<TResult> ToTask<TResult>(this System.IObservable<TResult> observable, object state, System.Reactive.Concurrency.IScheduler scheduler) { }
         public static System.Threading.Tasks.Task<TResult> ToTask<TResult>(this System.IObservable<TResult> observable, System.Threading.CancellationToken cancellationToken, object state) { }
+        public static System.Threading.Tasks.Task<TResult> ToTask<TResult>(this System.IObservable<TResult> observable, System.Threading.CancellationToken cancellationToken, System.Reactive.Concurrency.IScheduler scheduler) { }
+        public static System.Threading.Tasks.Task<TResult> ToTask<TResult>(this System.IObservable<TResult> observable, System.Threading.CancellationToken cancellationToken, object state, System.Reactive.Concurrency.IScheduler scheduler) { }
     }
 }
 namespace System.Runtime.CompilerServices

+ 314 - 0
Rx.NET/Source/tests/Tests.System.Reactive/Tests/TaskObservableExtensionsTest.cs

@@ -5,6 +5,7 @@
 using System;
 using System.Reactive;
 using System.Reactive.Concurrency;
+using System.Reactive.Disposables;
 using System.Reactive.Linq;
 using System.Reactive.Threading.Tasks;
 using System.Threading;
@@ -756,6 +757,11 @@ namespace ReactiveTests.Tests
             ReactiveAssert.Throws<ArgumentNullException>(() => TaskObservableExtensions.ToTask<int>(null, new CancellationToken()));
             ReactiveAssert.Throws<ArgumentNullException>(() => TaskObservableExtensions.ToTask<int>(null, new object()));
             ReactiveAssert.Throws<ArgumentNullException>(() => TaskObservableExtensions.ToTask<int>(null, new CancellationToken(), new object()));
+
+            ReactiveAssert.Throws<ArgumentNullException>(() => TaskObservableExtensions.ToTask(Observable.Never<int>(), scheduler: null));
+            ReactiveAssert.Throws<ArgumentNullException>(() => TaskObservableExtensions.ToTask(Observable.Never<int>(), new CancellationToken(), scheduler: null));
+            ReactiveAssert.Throws<ArgumentNullException>(() => TaskObservableExtensions.ToTask(Observable.Never<int>(), new object(), scheduler: null));
+            ReactiveAssert.Throws<ArgumentNullException>(() => TaskObservableExtensions.ToTask(Observable.Never<int>(), new CancellationToken(), new object(), scheduler: null));
         }
 
         [Fact]
@@ -884,6 +890,314 @@ namespace ReactiveTests.Tests
             Assert.Equal(1, scheduler.Clock);
         }
 
+        [Fact]
+        public void ToTask_Scheduler_Resumed_On_Thread_Success()
+        {
+            var scheduler = new OneshotScheduler();
+
+            var task = Observable.Return(1).ToTask(scheduler);
+
+            Assert.True(scheduler.HasTask);
+
+            scheduler.Run();
+
+            Assert.False(scheduler.HasTask);
+
+            Assert.True(task.IsCompleted);
+        }
+
+        [Fact]
+        public void ToTask_Scheduler_Resumed_On_Thread_Failure()
+        {
+            var scheduler = new OneshotScheduler();
+
+            var task = Observable.Throw<int>(new InvalidOperationException("failure")).ToTask(scheduler);
+
+            Assert.True(scheduler.HasTask);
+
+            scheduler.Run();
+
+            Assert.False(scheduler.HasTask);
+
+            Assert.True(task.IsFaulted);
+            Assert.True(task.Exception.InnerException is InvalidOperationException);
+            Assert.Equal("failure", task.Exception.InnerException.Message);
+        }
+
+        [Fact]
+        public void ToTask_Scheduler_Resumed_On_Thread_Cancel()
+        {
+            var scheduler = new OneshotScheduler();
+
+            var tcs = new TaskCompletionSource<int>();
+
+            var task = tcs.Task.ContinueOnScheduler(scheduler);
+
+            Assert.False(scheduler.HasTask);
+
+            tcs.TrySetCanceled();
+
+            Assert.True(scheduler.HasTask);
+
+            scheduler.Run();
+
+            Assert.False(scheduler.HasTask);
+
+            Assert.True(task.IsCanceled);
+        }
+
+
+        [Fact]
+        public void ToTask_Scheduler_Resumed_On_Thread_Success_With_State()
+        {
+            var scheduler = new OneshotScheduler();
+
+            var task = Observable.Return(1).ToTask("state", scheduler);
+
+            Assert.True(scheduler.HasTask);
+
+            scheduler.Run();
+
+            Assert.False(scheduler.HasTask);
+
+            Assert.True(task.IsCompleted);
+
+            Assert.Equal("state", task.AsyncState);
+        }
+
+        [Fact]
+        public void ToTask_Scheduler_Resumed_On_Thread_Failure_With_State()
+        {
+            var scheduler = new OneshotScheduler();
+
+            var task = Observable.Throw<int>(new InvalidOperationException("failure")).ToTask("state", scheduler);
+
+            Assert.True(scheduler.HasTask);
+
+            scheduler.Run();
+
+            Assert.False(scheduler.HasTask);
+
+            Assert.True(task.IsFaulted);
+            Assert.True(task.Exception.InnerException is InvalidOperationException);
+            Assert.Equal("failure", task.Exception.InnerException.Message);
+            Assert.Equal("state", task.AsyncState);
+        }
+
+        [Fact]
+        public void ToTask_Scheduler_Resumed_On_Thread_Cancel_With_State()
+        {
+            var scheduler = new OneshotScheduler();
+
+            var tcs = new TaskCompletionSource<int>("state");
+
+            var task = tcs.Task.ContinueOnScheduler(scheduler);
+
+            Assert.False(scheduler.HasTask);
+
+            tcs.TrySetCanceled();
+
+            Assert.True(scheduler.HasTask);
+
+            scheduler.Run();
+
+            Assert.False(scheduler.HasTask);
+
+            Assert.True(task.IsCanceled);
+            Assert.Equal("state", task.AsyncState);
+        }
+
+        [Fact]
+        public void ToTask_Scheduler_Resumed_On_Thread_Success_With_Cancellation()
+        {
+            var scheduler = new OneshotScheduler();
+
+            var task = Observable.Return(1).ToTask(cancellationToken: default, scheduler: scheduler);
+
+            Assert.True(scheduler.HasTask);
+
+            scheduler.Run();
+
+            Assert.False(scheduler.HasTask);
+
+            Assert.True(task.IsCompleted);
+        }
+
+        [Fact]
+        public void ToTask_Scheduler_Resumed_On_Thread_Failure_With_Cancellation()
+        {
+            var scheduler = new OneshotScheduler();
+
+            var task = Observable.Throw<int>(new InvalidOperationException("failure")).ToTask(cancellationToken: default, scheduler: scheduler);
+
+            Assert.True(scheduler.HasTask);
+
+            scheduler.Run();
+
+            Assert.False(scheduler.HasTask);
+
+            Assert.True(task.IsFaulted);
+            Assert.True(task.Exception.InnerException is InvalidOperationException);
+            Assert.Equal("failure", task.Exception.InnerException.Message);
+        }
+
+        [Fact]
+        public void ToTask_Scheduler_Resumed_On_Thread_Cancel_With_Cancellation()
+        {
+            var scheduler = new OneshotScheduler();
+
+            var cts = new CancellationTokenSource();
+            var task = Observable.Never<int>().ToTask(cts.Token, scheduler);
+
+            Assert.False(scheduler.HasTask);
+
+            cts.Cancel();
+
+            Assert.True(scheduler.HasTask);
+
+            scheduler.Run();
+
+            Assert.False(scheduler.HasTask);
+
+            Assert.True(task.IsCanceled);
+            Assert.Equal(new TaskCanceledException(task).CancellationToken, cts.Token);
+        }
+
+        [Fact]
+        public void ToTask_Scheduler_Resumed_On_Thread_Success_With_State_And_Cancellation()
+        {
+            var scheduler = new OneshotScheduler();
+
+            var task = Observable.Return(1).ToTask(default, "state", scheduler);
+
+            Assert.True(scheduler.HasTask);
+
+            scheduler.Run();
+
+            Assert.False(scheduler.HasTask);
+
+            Assert.True(task.IsCompleted);
+
+            Assert.Equal("state", task.AsyncState);
+        }
+
+        [Fact]
+        public void ToTask_Scheduler_Resumed_On_Thread_Failure_With_State_And_Cancellation()
+        {
+            var scheduler = new OneshotScheduler();
+
+            var task = Observable.Throw<int>(new InvalidOperationException("failure")).ToTask(default, "state", scheduler);
+
+            Assert.True(scheduler.HasTask);
+
+            scheduler.Run();
+
+            Assert.False(scheduler.HasTask);
+
+            Assert.True(task.IsFaulted);
+            Assert.True(task.Exception.InnerException is InvalidOperationException);
+            Assert.Equal("failure", task.Exception.InnerException.Message);
+            Assert.Equal("state", task.AsyncState);
+        }
+
+        [Fact]
+        public void ToTask_Scheduler_Resumed_On_Thread_Cancel_With_State_And_Cancellation()
+        {
+            var scheduler = new OneshotScheduler();
+
+            var cts = new CancellationTokenSource();
+            var task = Observable.Never<int>().ToTask(cts.Token, "state", scheduler);
+
+            Assert.False(scheduler.HasTask);
+
+            cts.Cancel();
+
+            Assert.True(scheduler.HasTask);
+
+            scheduler.Run();
+
+            Assert.False(scheduler.HasTask);
+
+            Assert.True(task.IsCanceled);
+            Assert.Equal("state", task.AsyncState);
+            Assert.Equal(new TaskCanceledException(task).CancellationToken, cts.Token);
+        }
+
+        sealed class OneshotScheduler : IScheduler
+        {
+            public DateTimeOffset Now => DateTimeOffset.Now;
+
+            private volatile Action _task;
+
+            public IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action)
+            {
+                var task = new Work<TState> { State = state, Action = action };
+                _task = () =>
+                {
+                    task.Action?.Invoke(this, task.State);
+                };
+                return task;
+            }
+
+            public IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)
+            {
+                throw new NotImplementedException("Not supported by this scheduler");
+            }
+
+            public IDisposable Schedule<TState>(TState state, DateTimeOffset dueTime, Func<IScheduler, TState, IDisposable> action)
+            {
+                throw new NotImplementedException("Not supported by this scheduler");
+            }
+
+            public void Run()
+            {
+                Interlocked.Exchange(ref _task, null)?.Invoke();
+            }
+
+            public bool HasTask => _task != null;
+
+            sealed class Work<TState> : IDisposable
+            {
+                internal TState State;
+                internal Func<IScheduler, TState, IDisposable> Action;
+
+                public void Dispose()
+                {
+                    Action = null;
+                }
+            }
+        }
+
+        [Fact]
+        public async Task ToTask_Scheduler_Dispose_Can_Propagate()
+        {
+            async Task asyncMethod()
+            {
+                await Task.Delay(500);
+                Console.WriteLine("Done");
+            }
+
+            var count = 0;
+
+            var observable = Observable.Create<long>(observer =>
+            {
+                var d = Observable.Interval(TimeSpan.FromMilliseconds(200)).Subscribe(observer);
+                return new CompositeDisposable(d, Disposable.Create(() =>
+                {
+                    Interlocked.Increment(ref count);
+                }));
+            })
+            .Select(_ => Observable.FromAsync(asyncMethod))
+            .Concat()
+            .Take(1);
+
+            await observable.ToTask(Scheduler.Default).ConfigureAwait(false);
+
+            Thread.Sleep(500);
+
+            Assert.Equal(1, Volatile.Read(ref count));
+        }
+
         #endregion
     }
 }