|  | @@ -5,6 +5,7 @@
 | 
	
		
			
				|  |  |  using System;
 | 
	
		
			
				|  |  |  using System.Reactive;
 | 
	
		
			
				|  |  |  using System.Reactive.Concurrency;
 | 
	
		
			
				|  |  | +using System.Reactive.Disposables;
 | 
	
		
			
				|  |  |  using System.Reactive.Linq;
 | 
	
		
			
				|  |  |  using System.Reactive.Threading.Tasks;
 | 
	
		
			
				|  |  |  using System.Threading;
 | 
	
	
		
			
				|  | @@ -756,6 +757,11 @@ namespace ReactiveTests.Tests
 | 
	
		
			
				|  |  |              ReactiveAssert.Throws<ArgumentNullException>(() => TaskObservableExtensions.ToTask<int>(null, new CancellationToken()));
 | 
	
		
			
				|  |  |              ReactiveAssert.Throws<ArgumentNullException>(() => TaskObservableExtensions.ToTask<int>(null, new object()));
 | 
	
		
			
				|  |  |              ReactiveAssert.Throws<ArgumentNullException>(() => TaskObservableExtensions.ToTask<int>(null, new CancellationToken(), new object()));
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            ReactiveAssert.Throws<ArgumentNullException>(() => TaskObservableExtensions.ToTask(Observable.Never<int>(), scheduler: null));
 | 
	
		
			
				|  |  | +            ReactiveAssert.Throws<ArgumentNullException>(() => TaskObservableExtensions.ToTask(Observable.Never<int>(), new CancellationToken(), scheduler: null));
 | 
	
		
			
				|  |  | +            ReactiveAssert.Throws<ArgumentNullException>(() => TaskObservableExtensions.ToTask(Observable.Never<int>(), new object(), scheduler: null));
 | 
	
		
			
				|  |  | +            ReactiveAssert.Throws<ArgumentNullException>(() => TaskObservableExtensions.ToTask(Observable.Never<int>(), new CancellationToken(), new object(), scheduler: null));
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          [Fact]
 | 
	
	
		
			
				|  | @@ -884,6 +890,314 @@ namespace ReactiveTests.Tests
 | 
	
		
			
				|  |  |              Assert.Equal(1, scheduler.Clock);
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +        [Fact]
 | 
	
		
			
				|  |  | +        public void ToTask_Scheduler_Resumed_On_Thread_Success()
 | 
	
		
			
				|  |  | +        {
 | 
	
		
			
				|  |  | +            var scheduler = new OneshotScheduler();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            var task = Observable.Return(1).ToTask(scheduler);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            Assert.True(scheduler.HasTask);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            scheduler.Run();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            Assert.False(scheduler.HasTask);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            Assert.True(task.IsCompleted);
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        [Fact]
 | 
	
		
			
				|  |  | +        public void ToTask_Scheduler_Resumed_On_Thread_Failure()
 | 
	
		
			
				|  |  | +        {
 | 
	
		
			
				|  |  | +            var scheduler = new OneshotScheduler();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            var task = Observable.Throw<int>(new InvalidOperationException("failure")).ToTask(scheduler);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            Assert.True(scheduler.HasTask);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            scheduler.Run();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            Assert.False(scheduler.HasTask);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            Assert.True(task.IsFaulted);
 | 
	
		
			
				|  |  | +            Assert.True(task.Exception.InnerException is InvalidOperationException);
 | 
	
		
			
				|  |  | +            Assert.Equal("failure", task.Exception.InnerException.Message);
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        [Fact]
 | 
	
		
			
				|  |  | +        public void ToTask_Scheduler_Resumed_On_Thread_Cancel()
 | 
	
		
			
				|  |  | +        {
 | 
	
		
			
				|  |  | +            var scheduler = new OneshotScheduler();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            var tcs = new TaskCompletionSource<int>();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            var task = tcs.Task.ContinueOnScheduler(scheduler);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            Assert.False(scheduler.HasTask);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            tcs.TrySetCanceled();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            Assert.True(scheduler.HasTask);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            scheduler.Run();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            Assert.False(scheduler.HasTask);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            Assert.True(task.IsCanceled);
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        [Fact]
 | 
	
		
			
				|  |  | +        public void ToTask_Scheduler_Resumed_On_Thread_Success_With_State()
 | 
	
		
			
				|  |  | +        {
 | 
	
		
			
				|  |  | +            var scheduler = new OneshotScheduler();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            var task = Observable.Return(1).ToTask("state", scheduler);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            Assert.True(scheduler.HasTask);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            scheduler.Run();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            Assert.False(scheduler.HasTask);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            Assert.True(task.IsCompleted);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            Assert.Equal("state", task.AsyncState);
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        [Fact]
 | 
	
		
			
				|  |  | +        public void ToTask_Scheduler_Resumed_On_Thread_Failure_With_State()
 | 
	
		
			
				|  |  | +        {
 | 
	
		
			
				|  |  | +            var scheduler = new OneshotScheduler();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            var task = Observable.Throw<int>(new InvalidOperationException("failure")).ToTask("state", scheduler);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            Assert.True(scheduler.HasTask);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            scheduler.Run();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            Assert.False(scheduler.HasTask);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            Assert.True(task.IsFaulted);
 | 
	
		
			
				|  |  | +            Assert.True(task.Exception.InnerException is InvalidOperationException);
 | 
	
		
			
				|  |  | +            Assert.Equal("failure", task.Exception.InnerException.Message);
 | 
	
		
			
				|  |  | +            Assert.Equal("state", task.AsyncState);
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        [Fact]
 | 
	
		
			
				|  |  | +        public void ToTask_Scheduler_Resumed_On_Thread_Cancel_With_State()
 | 
	
		
			
				|  |  | +        {
 | 
	
		
			
				|  |  | +            var scheduler = new OneshotScheduler();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            var tcs = new TaskCompletionSource<int>("state");
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            var task = tcs.Task.ContinueOnScheduler(scheduler);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            Assert.False(scheduler.HasTask);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            tcs.TrySetCanceled();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            Assert.True(scheduler.HasTask);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            scheduler.Run();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            Assert.False(scheduler.HasTask);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            Assert.True(task.IsCanceled);
 | 
	
		
			
				|  |  | +            Assert.Equal("state", task.AsyncState);
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        [Fact]
 | 
	
		
			
				|  |  | +        public void ToTask_Scheduler_Resumed_On_Thread_Success_With_Cancellation()
 | 
	
		
			
				|  |  | +        {
 | 
	
		
			
				|  |  | +            var scheduler = new OneshotScheduler();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            var task = Observable.Return(1).ToTask(cancellationToken: default, scheduler: scheduler);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            Assert.True(scheduler.HasTask);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            scheduler.Run();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            Assert.False(scheduler.HasTask);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            Assert.True(task.IsCompleted);
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        [Fact]
 | 
	
		
			
				|  |  | +        public void ToTask_Scheduler_Resumed_On_Thread_Failure_With_Cancellation()
 | 
	
		
			
				|  |  | +        {
 | 
	
		
			
				|  |  | +            var scheduler = new OneshotScheduler();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            var task = Observable.Throw<int>(new InvalidOperationException("failure")).ToTask(cancellationToken: default, scheduler: scheduler);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            Assert.True(scheduler.HasTask);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            scheduler.Run();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            Assert.False(scheduler.HasTask);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            Assert.True(task.IsFaulted);
 | 
	
		
			
				|  |  | +            Assert.True(task.Exception.InnerException is InvalidOperationException);
 | 
	
		
			
				|  |  | +            Assert.Equal("failure", task.Exception.InnerException.Message);
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        [Fact]
 | 
	
		
			
				|  |  | +        public void ToTask_Scheduler_Resumed_On_Thread_Cancel_With_Cancellation()
 | 
	
		
			
				|  |  | +        {
 | 
	
		
			
				|  |  | +            var scheduler = new OneshotScheduler();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            var cts = new CancellationTokenSource();
 | 
	
		
			
				|  |  | +            var task = Observable.Never<int>().ToTask(cts.Token, scheduler);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            Assert.False(scheduler.HasTask);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            cts.Cancel();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            Assert.True(scheduler.HasTask);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            scheduler.Run();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            Assert.False(scheduler.HasTask);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            Assert.True(task.IsCanceled);
 | 
	
		
			
				|  |  | +            Assert.Equal(new TaskCanceledException(task).CancellationToken, cts.Token);
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        [Fact]
 | 
	
		
			
				|  |  | +        public void ToTask_Scheduler_Resumed_On_Thread_Success_With_State_And_Cancellation()
 | 
	
		
			
				|  |  | +        {
 | 
	
		
			
				|  |  | +            var scheduler = new OneshotScheduler();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            var task = Observable.Return(1).ToTask(default, "state", scheduler);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            Assert.True(scheduler.HasTask);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            scheduler.Run();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            Assert.False(scheduler.HasTask);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            Assert.True(task.IsCompleted);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            Assert.Equal("state", task.AsyncState);
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        [Fact]
 | 
	
		
			
				|  |  | +        public void ToTask_Scheduler_Resumed_On_Thread_Failure_With_State_And_Cancellation()
 | 
	
		
			
				|  |  | +        {
 | 
	
		
			
				|  |  | +            var scheduler = new OneshotScheduler();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            var task = Observable.Throw<int>(new InvalidOperationException("failure")).ToTask(default, "state", scheduler);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            Assert.True(scheduler.HasTask);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            scheduler.Run();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            Assert.False(scheduler.HasTask);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            Assert.True(task.IsFaulted);
 | 
	
		
			
				|  |  | +            Assert.True(task.Exception.InnerException is InvalidOperationException);
 | 
	
		
			
				|  |  | +            Assert.Equal("failure", task.Exception.InnerException.Message);
 | 
	
		
			
				|  |  | +            Assert.Equal("state", task.AsyncState);
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        [Fact]
 | 
	
		
			
				|  |  | +        public void ToTask_Scheduler_Resumed_On_Thread_Cancel_With_State_And_Cancellation()
 | 
	
		
			
				|  |  | +        {
 | 
	
		
			
				|  |  | +            var scheduler = new OneshotScheduler();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            var cts = new CancellationTokenSource();
 | 
	
		
			
				|  |  | +            var task = Observable.Never<int>().ToTask(cts.Token, "state", scheduler);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            Assert.False(scheduler.HasTask);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            cts.Cancel();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            Assert.True(scheduler.HasTask);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            scheduler.Run();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            Assert.False(scheduler.HasTask);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            Assert.True(task.IsCanceled);
 | 
	
		
			
				|  |  | +            Assert.Equal("state", task.AsyncState);
 | 
	
		
			
				|  |  | +            Assert.Equal(new TaskCanceledException(task).CancellationToken, cts.Token);
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        sealed class OneshotScheduler : IScheduler
 | 
	
		
			
				|  |  | +        {
 | 
	
		
			
				|  |  | +            public DateTimeOffset Now => DateTimeOffset.Now;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            private volatile Action _task;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            public IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action)
 | 
	
		
			
				|  |  | +            {
 | 
	
		
			
				|  |  | +                var task = new Work<TState> { State = state, Action = action };
 | 
	
		
			
				|  |  | +                _task = () =>
 | 
	
		
			
				|  |  | +                {
 | 
	
		
			
				|  |  | +                    task.Action?.Invoke(this, task.State);
 | 
	
		
			
				|  |  | +                };
 | 
	
		
			
				|  |  | +                return task;
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            public IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)
 | 
	
		
			
				|  |  | +            {
 | 
	
		
			
				|  |  | +                throw new NotImplementedException("Not supported by this scheduler");
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            public IDisposable Schedule<TState>(TState state, DateTimeOffset dueTime, Func<IScheduler, TState, IDisposable> action)
 | 
	
		
			
				|  |  | +            {
 | 
	
		
			
				|  |  | +                throw new NotImplementedException("Not supported by this scheduler");
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            public void Run()
 | 
	
		
			
				|  |  | +            {
 | 
	
		
			
				|  |  | +                Interlocked.Exchange(ref _task, null)?.Invoke();
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            public bool HasTask => _task != null;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            sealed class Work<TState> : IDisposable
 | 
	
		
			
				|  |  | +            {
 | 
	
		
			
				|  |  | +                internal TState State;
 | 
	
		
			
				|  |  | +                internal Func<IScheduler, TState, IDisposable> Action;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                public void Dispose()
 | 
	
		
			
				|  |  | +                {
 | 
	
		
			
				|  |  | +                    Action = null;
 | 
	
		
			
				|  |  | +                }
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        [Fact]
 | 
	
		
			
				|  |  | +        public async Task ToTask_Scheduler_Dispose_Can_Propagate()
 | 
	
		
			
				|  |  | +        {
 | 
	
		
			
				|  |  | +            async Task asyncMethod()
 | 
	
		
			
				|  |  | +            {
 | 
	
		
			
				|  |  | +                await Task.Delay(500);
 | 
	
		
			
				|  |  | +                Console.WriteLine("Done");
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            var count = 0;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            var observable = Observable.Create<long>(observer =>
 | 
	
		
			
				|  |  | +            {
 | 
	
		
			
				|  |  | +                var d = Observable.Interval(TimeSpan.FromMilliseconds(200)).Subscribe(observer);
 | 
	
		
			
				|  |  | +                return new CompositeDisposable(d, Disposable.Create(() =>
 | 
	
		
			
				|  |  | +                {
 | 
	
		
			
				|  |  | +                    Interlocked.Increment(ref count);
 | 
	
		
			
				|  |  | +                }));
 | 
	
		
			
				|  |  | +            })
 | 
	
		
			
				|  |  | +            .Select(_ => Observable.FromAsync(asyncMethod))
 | 
	
		
			
				|  |  | +            .Concat()
 | 
	
		
			
				|  |  | +            .Take(1);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            await observable.ToTask(Scheduler.Default).ConfigureAwait(false);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            Thread.Sleep(500);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            Assert.Equal(1, Volatile.Read(ref count));
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |          #endregion
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  }
 |