Przeglądaj źródła

Adding IScheduler overloads for ToObservable on Task.

Bart De Smet 10 lat temu
rodzic
commit
627539549c

+ 1 - 1
Rx.NET/Source/System.Reactive.Experimental/Reactive/Linq/QbservableEx.Generated.cs

@@ -1,5 +1,5 @@
 /*
 /*
- * WARNING: Auto-generated file (11/21/2013 11:07:25 AM)
+ * WARNING: Auto-generated file (4/16/2015 23:47:35)
  * Run Rx's auto-homoiconizer tool to generate this file (in the HomoIcon directory).
  * Run Rx's auto-homoiconizer tool to generate this file (in the HomoIcon directory).
  */
  */
 
 

+ 8 - 0
Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/IQueryLanguage.cs

@@ -206,6 +206,8 @@ namespace System.Reactive.Linq
 #if !NO_TPL
 #if !NO_TPL
         IObservable<TSource> StartAsync<TSource>(Func<Task<TSource>> functionAsync);
         IObservable<TSource> StartAsync<TSource>(Func<Task<TSource>> functionAsync);
         IObservable<TSource> StartAsync<TSource>(Func<CancellationToken, Task<TSource>> functionAsync);
         IObservable<TSource> StartAsync<TSource>(Func<CancellationToken, Task<TSource>> functionAsync);
+        IObservable<TSource> StartAsync<TSource>(Func<Task<TSource>> functionAsync, IScheduler scheduler);
+        IObservable<TSource> StartAsync<TSource>(Func<CancellationToken, Task<TSource>> functionAsync, IScheduler scheduler);
 #endif
 #endif
 
 
         IObservable<Unit> Start(Action action);
         IObservable<Unit> Start(Action action);
@@ -214,6 +216,8 @@ namespace System.Reactive.Linq
 #if !NO_TPL
 #if !NO_TPL
         IObservable<Unit> StartAsync(Func<Task> actionAsync);
         IObservable<Unit> StartAsync(Func<Task> actionAsync);
         IObservable<Unit> StartAsync(Func<CancellationToken, Task> actionAsync);
         IObservable<Unit> StartAsync(Func<CancellationToken, Task> actionAsync);
+        IObservable<Unit> StartAsync(Func<Task> actionAsync, IScheduler scheduler);
+        IObservable<Unit> StartAsync(Func<CancellationToken, Task> actionAsync, IScheduler scheduler);
 #endif
 #endif
 
 
 #if !NO_TPL
 #if !NO_TPL
@@ -221,6 +225,10 @@ namespace System.Reactive.Linq
         IObservable<TResult> FromAsync<TResult>(Func<CancellationToken, Task<TResult>> functionAsync);
         IObservable<TResult> FromAsync<TResult>(Func<CancellationToken, Task<TResult>> functionAsync);
         IObservable<Unit> FromAsync(Func<Task> actionAsync);
         IObservable<Unit> FromAsync(Func<Task> actionAsync);
         IObservable<Unit> FromAsync(Func<CancellationToken, Task> actionAsync);
         IObservable<Unit> FromAsync(Func<CancellationToken, Task> actionAsync);
+        IObservable<TResult> FromAsync<TResult>(Func<Task<TResult>> functionAsync, IScheduler scheduler);
+        IObservable<TResult> FromAsync<TResult>(Func<CancellationToken, Task<TResult>> functionAsync, IScheduler scheduler);
+        IObservable<Unit> FromAsync(Func<Task> actionAsync, IScheduler scheduler);
+        IObservable<Unit> FromAsync(Func<CancellationToken, Task> actionAsync, IScheduler scheduler);
 #endif
 #endif
 
 
         Func<IObservable<TResult>> ToAsync<TResult>(Func<TResult> function);
         Func<IObservable<TResult>> ToAsync<TResult>(Func<TResult> function);

+ 196 - 0
Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable.Async.cs

@@ -953,6 +953,30 @@ namespace System.Reactive.Linq
             return s_impl.StartAsync<TResult>(functionAsync);
             return s_impl.StartAsync<TResult>(functionAsync);
         }
         }
 
 
+        /// <summary>
+        /// Invokes the asynchronous function, surfacing the result through an observable sequence.
+        /// </summary>
+        /// <typeparam name="TResult">The type of the result returned by the asynchronous function.</typeparam>
+        /// <param name="functionAsync">Asynchronous function to run.</param>
+        /// <param name="scheduler">Scheduler on which to notify observers.</param>
+        /// <returns>An observable sequence exposing the function's result value, or an exception.</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="functionAsync"/> is null or <paramref name="scheduler"/> is null.</exception>
+        /// <remarks>
+        /// <list type="bullet">
+        /// <item><description>The function is started immediately, not during the subscription of the resulting sequence.</description></item>
+        /// <item><description>Multiple subscriptions to the resulting sequence can observe the function's result.</description></item>
+        /// </list>
+        /// </remarks>
+        public static IObservable<TResult> StartAsync<TResult>(Func<Task<TResult>> functionAsync, IScheduler scheduler)
+        {
+            if (functionAsync == null)
+                throw new ArgumentNullException("functionAsync");
+            if (scheduler == null)
+                throw new ArgumentNullException("scheduler");
+
+            return s_impl.StartAsync<TResult>(functionAsync, scheduler);
+        }
+
         /// <summary>
         /// <summary>
         /// Invokes the asynchronous function, surfacing the result through an observable sequence.
         /// Invokes the asynchronous function, surfacing the result through an observable sequence.
         /// The CancellationToken is shared by all subscriptions on the resulting observable sequence. See the remarks section for more information.
         /// The CancellationToken is shared by all subscriptions on the resulting observable sequence. See the remarks section for more information.
@@ -981,6 +1005,38 @@ namespace System.Reactive.Linq
 
 
             return s_impl.StartAsync<TResult>(functionAsync);
             return s_impl.StartAsync<TResult>(functionAsync);
         }
         }
+
+        /// <summary>
+        /// Invokes the asynchronous function, surfacing the result through an observable sequence.
+        /// The CancellationToken is shared by all subscriptions on the resulting observable sequence. See the remarks section for more information.
+        /// </summary>
+        /// <typeparam name="TResult">The type of the result returned by the asynchronous function.</typeparam>
+        /// <param name="functionAsync">Asynchronous function to run.</param>
+        /// <param name="scheduler">Scheduler on which to notify observers.</param>
+        /// <returns>An observable sequence exposing the function's result value, or an exception.</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="functionAsync"/> is null or <paramref name="scheduler"/> is null.</exception>
+        /// <remarks>
+        /// <list type="bullet">
+        /// <item><description>The function is started immediately, not during the subscription of the resulting sequence.</description></item>
+        /// <item><description>Multiple subscriptions to the resulting sequence can observe the function's result.</description></item>
+        /// <item><description>
+        /// If any subscription to the resulting sequence is disposed, the CancellationToken is set. The observer associated to the disposed
+        /// subscription won't see the TaskCanceledException, but other observers will. You can protect against this using the Catch operator.
+        /// Be careful when handing out the resulting sequence because of this behavior. The most common use is to have a single subscription
+        /// to the resulting sequence, which controls the CancellationToken state. Alternatively, you can control subscription behavior using
+        /// multicast operators.
+        /// </description></item>
+        /// </list>
+        /// </remarks>
+        public static IObservable<TResult> StartAsync<TResult>(Func<CancellationToken, Task<TResult>> functionAsync, IScheduler scheduler)
+        {
+            if (functionAsync == null)
+                throw new ArgumentNullException("functionAsync");
+            if (scheduler == null)
+                throw new ArgumentNullException("scheduler");
+
+            return s_impl.StartAsync<TResult>(functionAsync, scheduler);
+        }
 #endif
 #endif
 
 
         #endregion
         #endregion
@@ -1051,6 +1107,29 @@ namespace System.Reactive.Linq
             return s_impl.StartAsync(actionAsync);
             return s_impl.StartAsync(actionAsync);
         }
         }
 
 
+        /// <summary>
+        /// Invokes the asynchronous action, surfacing the result through an observable sequence.
+        /// </summary>
+        /// <param name="actionAsync">Asynchronous action to run.</param>
+        /// <param name="scheduler">Scheduler on which to notify observers.</param>
+        /// <returns>An observable sequence exposing a Unit value upon completion of the action, or an exception.</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="actionAsync"/> is null or <paramref name="scheduler"/> is null.</exception>
+        /// <remarks>
+        /// <list type="bullet">
+        /// <item><description>The action is started immediately, not during the subscription of the resulting sequence.</description></item>
+        /// <item><description>Multiple subscriptions to the resulting sequence can observe the action's outcome.</description></item>
+        /// </list>
+        /// </remarks>
+        public static IObservable<Unit> StartAsync(Func<Task> actionAsync, IScheduler scheduler)
+        {
+            if (actionAsync == null)
+                throw new ArgumentNullException("actionAsync");
+            if (scheduler == null)
+                throw new ArgumentNullException("scheduler");
+
+            return s_impl.StartAsync(actionAsync, scheduler);
+        }
+
         /// <summary>
         /// <summary>
         /// Invokes the asynchronous action, surfacing the result through an observable sequence.
         /// Invokes the asynchronous action, surfacing the result through an observable sequence.
         /// The CancellationToken is shared by all subscriptions on the resulting observable sequence. See the remarks section for more information.
         /// The CancellationToken is shared by all subscriptions on the resulting observable sequence. See the remarks section for more information.
@@ -1078,6 +1157,37 @@ namespace System.Reactive.Linq
 
 
             return s_impl.StartAsync(actionAsync);
             return s_impl.StartAsync(actionAsync);
         }
         }
+
+        /// <summary>
+        /// Invokes the asynchronous action, surfacing the result through an observable sequence.
+        /// The CancellationToken is shared by all subscriptions on the resulting observable sequence. See the remarks section for more information.
+        /// </summary>
+        /// <param name="actionAsync">Asynchronous action to run.</param>
+        /// <param name="scheduler">Scheduler on which to notify observers.</param>
+        /// <returns>An observable sequence exposing a Unit value upon completion of the action, or an exception.</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="actionAsync"/> is null or <paramref name="scheduler"/> is null.</exception>
+        /// <remarks>
+        /// <list type="bullet">
+        /// <item><description>The action is started immediately, not during the subscription of the resulting sequence.</description></item>
+        /// <item><description>Multiple subscriptions to the resulting sequence can observe the action's outcome.</description></item>
+        /// <item><description>
+        /// If any subscription to the resulting sequence is disposed, the CancellationToken is set. The observer associated to the disposed
+        /// subscription won't see the TaskCanceledException, but other observers will. You can protect against this using the Catch operator.
+        /// Be careful when handing out the resulting sequence because of this behavior. The most common use is to have a single subscription
+        /// to the resulting sequence, which controls the CancellationToken state. Alternatively, you can control subscription behavior using
+        /// multicast operators.
+        /// </description></item>
+        /// </list>
+        /// </remarks>
+        public static IObservable<Unit> StartAsync(Func<CancellationToken, Task> actionAsync, IScheduler scheduler)
+        {
+            if (actionAsync == null)
+                throw new ArgumentNullException("actionAsync");
+            if (scheduler == null)
+                throw new ArgumentNullException("scheduler");
+
+            return s_impl.StartAsync(actionAsync, scheduler);
+        }
 #endif
 #endif
 
 
         #endregion
         #endregion
@@ -1099,9 +1209,30 @@ namespace System.Reactive.Linq
         /// <exception cref="ArgumentNullException"><paramref name="functionAsync"/> is null.</exception>
         /// <exception cref="ArgumentNullException"><paramref name="functionAsync"/> is null.</exception>
         public static IObservable<TResult> FromAsync<TResult>(Func<Task<TResult>> functionAsync)
         public static IObservable<TResult> FromAsync<TResult>(Func<Task<TResult>> functionAsync)
         {
         {
+            if (functionAsync == null)
+                throw new ArgumentNullException("functionAsync");
+
             return s_impl.FromAsync<TResult>(functionAsync);
             return s_impl.FromAsync<TResult>(functionAsync);
         }
         }
 
 
+        /// <summary>
+        /// Converts to asynchronous function into an observable sequence. Each subscription to the resulting sequence causes the function to be started.
+        /// </summary>
+        /// <typeparam name="TResult">The type of the result returned by the asynchronous function.</typeparam>
+        /// <param name="functionAsync">Asynchronous function to convert.</param>
+        /// <param name="scheduler">Scheduler on which to notify observers.</param>
+        /// <returns>An observable sequence exposing the result of invoking the function, or an exception.</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="functionAsync"/> is null or <paramref name="scheduler"/> is null.</exception>
+        public static IObservable<TResult> FromAsync<TResult>(Func<Task<TResult>> functionAsync, IScheduler scheduler)
+        {
+            if (functionAsync == null)
+                throw new ArgumentNullException("functionAsync");
+            if (scheduler == null)
+                throw new ArgumentNullException("scheduler");
+
+            return s_impl.FromAsync<TResult>(functionAsync, scheduler);
+        }
+
         /// <summary>
         /// <summary>
         /// Converts to asynchronous function into an observable sequence. Each subscription to the resulting sequence causes the function to be started.
         /// Converts to asynchronous function into an observable sequence. Each subscription to the resulting sequence causes the function to be started.
         /// The CancellationToken passed to the asynchronous function is tied to the observable sequence's subscription that triggered the function's invocation and can be used for best-effort cancellation.
         /// The CancellationToken passed to the asynchronous function is tied to the observable sequence's subscription that triggered the function's invocation and can be used for best-effort cancellation.
@@ -1113,9 +1244,32 @@ namespace System.Reactive.Linq
         /// <remarks>When a subscription to the resulting sequence is disposed, the CancellationToken that was fed to the asynchronous function will be signaled.</remarks>
         /// <remarks>When a subscription to the resulting sequence is disposed, the CancellationToken that was fed to the asynchronous function will be signaled.</remarks>
         public static IObservable<TResult> FromAsync<TResult>(Func<CancellationToken, Task<TResult>> functionAsync)
         public static IObservable<TResult> FromAsync<TResult>(Func<CancellationToken, Task<TResult>> functionAsync)
         {
         {
+            if (functionAsync == null)
+                throw new ArgumentNullException("functionAsync");
+
             return s_impl.FromAsync<TResult>(functionAsync);
             return s_impl.FromAsync<TResult>(functionAsync);
         }
         }
 
 
+        /// <summary>
+        /// Converts to asynchronous function into an observable sequence. Each subscription to the resulting sequence causes the function to be started.
+        /// The CancellationToken passed to the asynchronous function is tied to the observable sequence's subscription that triggered the function's invocation and can be used for best-effort cancellation.
+        /// </summary>
+        /// <typeparam name="TResult">The type of the result returned by the asynchronous function.</typeparam>
+        /// <param name="functionAsync">Asynchronous function to convert.</param>
+        /// <param name="scheduler">Scheduler on which to notify observers.</param>
+        /// <returns>An observable sequence exposing the result of invoking the function, or an exception.</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="functionAsync"/> is null or <paramref name="scheduler"/> is null.</exception>
+        /// <remarks>When a subscription to the resulting sequence is disposed, the CancellationToken that was fed to the asynchronous function will be signaled.</remarks>
+        public static IObservable<TResult> FromAsync<TResult>(Func<CancellationToken, Task<TResult>> functionAsync, IScheduler scheduler)
+        {
+            if (functionAsync == null)
+                throw new ArgumentNullException("functionAsync");
+            if (scheduler == null)
+                throw new ArgumentNullException("scheduler");
+
+            return s_impl.FromAsync<TResult>(functionAsync, scheduler);
+        }
+
         #endregion
         #endregion
 
 
         #region Action
         #region Action
@@ -1128,9 +1282,29 @@ namespace System.Reactive.Linq
         /// <exception cref="ArgumentNullException"><paramref name="actionAsync"/> is null.</exception>
         /// <exception cref="ArgumentNullException"><paramref name="actionAsync"/> is null.</exception>
         public static IObservable<Unit> FromAsync(Func<Task> actionAsync)
         public static IObservable<Unit> FromAsync(Func<Task> actionAsync)
         {
         {
+            if (actionAsync == null)
+                throw new ArgumentNullException("actionAsync");
+
             return s_impl.FromAsync(actionAsync);
             return s_impl.FromAsync(actionAsync);
         }
         }
 
 
+        /// <summary>
+        /// Converts to asynchronous action into an observable sequence. Each subscription to the resulting sequence causes the action to be started.
+        /// </summary>
+        /// <param name="actionAsync">Asynchronous action to convert.</param>
+        /// <param name="scheduler">Scheduler on which to notify observers.</param>
+        /// <returns>An observable sequence exposing a Unit value upon completion of the action, or an exception.</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="actionAsync"/> is null or <paramref name="scheduler"/> is null.</exception>
+        public static IObservable<Unit> FromAsync(Func<Task> actionAsync, IScheduler scheduler)
+        {
+            if (actionAsync == null)
+                throw new ArgumentNullException("actionAsync");
+            if (scheduler == null)
+                throw new ArgumentNullException("scheduler");
+
+            return s_impl.FromAsync(actionAsync, scheduler);
+        }
+
         /// <summary>
         /// <summary>
         /// Converts to asynchronous action into an observable sequence. Each subscription to the resulting sequence causes the action to be started.
         /// Converts to asynchronous action into an observable sequence. Each subscription to the resulting sequence causes the action to be started.
         /// The CancellationToken passed to the asynchronous action is tied to the observable sequence's subscription that triggered the action's invocation and can be used for best-effort cancellation.
         /// The CancellationToken passed to the asynchronous action is tied to the observable sequence's subscription that triggered the action's invocation and can be used for best-effort cancellation.
@@ -1141,9 +1315,31 @@ namespace System.Reactive.Linq
         /// <exception cref="ArgumentNullException"><paramref name="actionAsync"/> is null.</exception>
         /// <exception cref="ArgumentNullException"><paramref name="actionAsync"/> is null.</exception>
         public static IObservable<Unit> FromAsync(Func<CancellationToken, Task> actionAsync)
         public static IObservable<Unit> FromAsync(Func<CancellationToken, Task> actionAsync)
         {
         {
+            if (actionAsync == null)
+                throw new ArgumentNullException("actionAsync");
+
             return s_impl.FromAsync(actionAsync);
             return s_impl.FromAsync(actionAsync);
         }
         }
 
 
+        /// <summary>
+        /// Converts to asynchronous action into an observable sequence. Each subscription to the resulting sequence causes the action to be started.
+        /// The CancellationToken passed to the asynchronous action is tied to the observable sequence's subscription that triggered the action's invocation and can be used for best-effort cancellation.
+        /// </summary>
+        /// <param name="actionAsync">Asynchronous action to convert.</param>
+        /// <param name="scheduler">Scheduler on which to notify observers.</param>
+        /// <returns>An observable sequence exposing a Unit value upon completion of the action, or an exception.</returns>
+        /// <remarks>When a subscription to the resulting sequence is disposed, the CancellationToken that was fed to the asynchronous function will be signaled.</remarks>
+        /// <exception cref="ArgumentNullException"><paramref name="actionAsync"/> is null or <paramref name="scheduler"/> is null.</exception>
+        public static IObservable<Unit> FromAsync(Func<CancellationToken, Task> actionAsync, IScheduler scheduler)
+        {
+            if (actionAsync == null)
+                throw new ArgumentNullException("actionAsync");
+            if (scheduler == null)
+                throw new ArgumentNullException("scheduler");
+
+            return s_impl.FromAsync(actionAsync, scheduler);
+        }
+
         #endregion
         #endregion
 
 
 #endif
 #endif

+ 8 - 8
Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable.StandardSequenceOperators.cs

@@ -820,7 +820,7 @@ namespace System.Reactive.Linq
         /// <param name="source">An observable sequence of elements to project.</param>
         /// <param name="source">An observable sequence of elements to project.</param>
         /// <param name="selector">A transform function to apply to each element.</param>
         /// <param name="selector">A transform function to apply to each element.</param>
         /// <returns>An observable sequence whose elements are the result of the tasks executed for each element of the input sequence.</returns>
         /// <returns>An observable sequence whose elements are the result of the tasks executed for each element of the input sequence.</returns>
-        /// <remarks>This overload supports composition of observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="TaskObservableExtensions.ToObservable&lt;TResult&gt;"/>.</remarks>
+        /// <remarks>This overload supports composition of observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="TaskObservableExtensions.ToObservable{TResult}(Task{TResult})"/>.</remarks>
         /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="selector"/> is null.</exception>
         /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="selector"/> is null.</exception>
         public static IObservable<TResult> SelectMany<TSource, TResult>(this IObservable<TSource> source, Func<TSource, Task<TResult>> selector)
         public static IObservable<TResult> SelectMany<TSource, TResult>(this IObservable<TSource> source, Func<TSource, Task<TResult>> selector)
         {
         {
@@ -840,7 +840,7 @@ namespace System.Reactive.Linq
         /// <param name="source">An observable sequence of elements to project.</param>
         /// <param name="source">An observable sequence of elements to project.</param>
         /// <param name="selector">A transform function to apply to each element; the second parameter of the function represents the index of the source element.</param>
         /// <param name="selector">A transform function to apply to each element; the second parameter of the function represents the index of the source element.</param>
         /// <returns>An observable sequence whose elements are the result of the tasks executed for each element of the input sequence.</returns>
         /// <returns>An observable sequence whose elements are the result of the tasks executed for each element of the input sequence.</returns>
-        /// <remarks>This overload supports composition of observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="TaskObservableExtensions.ToObservable&lt;TResult&gt;"/>.</remarks>
+        /// <remarks>This overload supports composition of observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="TaskObservableExtensions.ToObservable{TResult}(Task{TResult})"/>.</remarks>
         /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="selector"/> is null.</exception>
         /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="selector"/> is null.</exception>
         public static IObservable<TResult> SelectMany<TSource, TResult>(this IObservable<TSource> source, Func<TSource, int, Task<TResult>> selector)
         public static IObservable<TResult> SelectMany<TSource, TResult>(this IObservable<TSource> source, Func<TSource, int, Task<TResult>> selector)
         {
         {
@@ -860,7 +860,7 @@ namespace System.Reactive.Linq
         /// <param name="source">An observable sequence of elements to project.</param>
         /// <param name="source">An observable sequence of elements to project.</param>
         /// <param name="selector">A transform function to apply to each element.</param>
         /// <param name="selector">A transform function to apply to each element.</param>
         /// <returns>An observable sequence whose elements are the result of the tasks executed for each element of the input sequence.</returns>
         /// <returns>An observable sequence whose elements are the result of the tasks executed for each element of the input sequence.</returns>
-        /// <remarks>This overload supports composition of observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="TaskObservableExtensions.ToObservable&lt;TResult&gt;"/>.</remarks>
+        /// <remarks>This overload supports composition of observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="TaskObservableExtensions.ToObservable{TResult}(Task{TResult})"/>.</remarks>
         /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="selector"/> is null.</exception>
         /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="selector"/> is null.</exception>
         public static IObservable<TResult> SelectMany<TSource, TResult>(this IObservable<TSource> source, Func<TSource, CancellationToken, Task<TResult>> selector)
         public static IObservable<TResult> SelectMany<TSource, TResult>(this IObservable<TSource> source, Func<TSource, CancellationToken, Task<TResult>> selector)
         {
         {
@@ -880,7 +880,7 @@ namespace System.Reactive.Linq
         /// <param name="source">An observable sequence of elements to project.</param>
         /// <param name="source">An observable sequence of elements to project.</param>
         /// <param name="selector">A transform function to apply to each element; the second parameter of the function represents the index of the source element.</param>
         /// <param name="selector">A transform function to apply to each element; the second parameter of the function represents the index of the source element.</param>
         /// <returns>An observable sequence whose elements are the result of the tasks executed for each element of the input sequence.</returns>
         /// <returns>An observable sequence whose elements are the result of the tasks executed for each element of the input sequence.</returns>
-        /// <remarks>This overload supports composition of observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="TaskObservableExtensions.ToObservable&lt;TResult&gt;"/>.</remarks>
+        /// <remarks>This overload supports composition of observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="TaskObservableExtensions.ToObservable{TResult}(Task{TResult})"/>.</remarks>
         /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="selector"/> is null.</exception>
         /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="selector"/> is null.</exception>
         public static IObservable<TResult> SelectMany<TSource, TResult>(this IObservable<TSource> source, Func<TSource, int, CancellationToken, Task<TResult>> selector)
         public static IObservable<TResult> SelectMany<TSource, TResult>(this IObservable<TSource> source, Func<TSource, int, CancellationToken, Task<TResult>> selector)
         {
         {
@@ -951,7 +951,7 @@ namespace System.Reactive.Linq
         /// <param name="resultSelector">A transform function to apply to each element of the intermediate sequence.</param>
         /// <param name="resultSelector">A transform function to apply to each element of the intermediate sequence.</param>
         /// <returns>An observable sequence whose elements are the result of obtaining a task for each element of the input sequence and then mapping the task's result and its corresponding source element to a result element.</returns>
         /// <returns>An observable sequence whose elements are the result of obtaining a task for each element of the input sequence and then mapping the task's result and its corresponding source element to a result element.</returns>
         /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="taskSelector"/> or <paramref name="resultSelector"/> is null.</exception>
         /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="taskSelector"/> or <paramref name="resultSelector"/> is null.</exception>
-        /// <remarks>This overload supports using LINQ query comprehension syntax in C# and Visual Basic to compose observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="TaskObservableExtensions.ToObservable&lt;TResult&gt;"/>.</remarks>
+        /// <remarks>This overload supports using LINQ query comprehension syntax in C# and Visual Basic to compose observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="TaskObservableExtensions.ToObservable{TResult}(Task{TResult})"/>.</remarks>
         public static IObservable<TResult> SelectMany<TSource, TTaskResult, TResult>(this IObservable<TSource> source, Func<TSource, Task<TTaskResult>> taskSelector, Func<TSource, TTaskResult, TResult> resultSelector)
         public static IObservable<TResult> SelectMany<TSource, TTaskResult, TResult>(this IObservable<TSource> source, Func<TSource, Task<TTaskResult>> taskSelector, Func<TSource, TTaskResult, TResult> resultSelector)
         {
         {
             if (source == null)
             if (source == null)
@@ -975,7 +975,7 @@ namespace System.Reactive.Linq
         /// <param name="resultSelector">A transform function to apply to each element of the intermediate sequence; the second parameter of the function represents the index of the source element.</param>
         /// <param name="resultSelector">A transform function to apply to each element of the intermediate sequence; the second parameter of the function represents the index of the source element.</param>
         /// <returns>An observable sequence whose elements are the result of obtaining a task for each element of the input sequence and then mapping the task's result and its corresponding source element to a result element.</returns>
         /// <returns>An observable sequence whose elements are the result of obtaining a task for each element of the input sequence and then mapping the task's result and its corresponding source element to a result element.</returns>
         /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="taskSelector"/> or <paramref name="resultSelector"/> is null.</exception>
         /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="taskSelector"/> or <paramref name="resultSelector"/> is null.</exception>
-        /// <remarks>This overload supports using LINQ query comprehension syntax in C# and Visual Basic to compose observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="TaskObservableExtensions.ToObservable&lt;TResult&gt;"/>.</remarks>
+        /// <remarks>This overload supports using LINQ query comprehension syntax in C# and Visual Basic to compose observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="TaskObservableExtensions.ToObservable{TResult}(Task{TResult})"/>.</remarks>
         public static IObservable<TResult> SelectMany<TSource, TTaskResult, TResult>(this IObservable<TSource> source, Func<TSource, int, Task<TTaskResult>> taskSelector, Func<TSource, int, TTaskResult, TResult> resultSelector)
         public static IObservable<TResult> SelectMany<TSource, TTaskResult, TResult>(this IObservable<TSource> source, Func<TSource, int, Task<TTaskResult>> taskSelector, Func<TSource, int, TTaskResult, TResult> resultSelector)
         {
         {
             if (source == null)
             if (source == null)
@@ -999,7 +999,7 @@ namespace System.Reactive.Linq
         /// <param name="resultSelector">A transform function to apply to each element of the intermediate sequence.</param>
         /// <param name="resultSelector">A transform function to apply to each element of the intermediate sequence.</param>
         /// <returns>An observable sequence whose elements are the result of obtaining a task for each element of the input sequence and then mapping the task's result and its corresponding source element to a result element.</returns>
         /// <returns>An observable sequence whose elements are the result of obtaining a task for each element of the input sequence and then mapping the task's result and its corresponding source element to a result element.</returns>
         /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="taskSelector"/> or <paramref name="resultSelector"/> is null.</exception>
         /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="taskSelector"/> or <paramref name="resultSelector"/> is null.</exception>
-        /// <remarks>This overload supports using LINQ query comprehension syntax in C# and Visual Basic to compose observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="TaskObservableExtensions.ToObservable&lt;TResult&gt;"/>.</remarks>
+        /// <remarks>This overload supports using LINQ query comprehension syntax in C# and Visual Basic to compose observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="TaskObservableExtensions.ToObservable{TResult}(Task{TResult})"/>.</remarks>
         public static IObservable<TResult> SelectMany<TSource, TTaskResult, TResult>(this IObservable<TSource> source, Func<TSource, CancellationToken, Task<TTaskResult>> taskSelector, Func<TSource, TTaskResult, TResult> resultSelector)
         public static IObservable<TResult> SelectMany<TSource, TTaskResult, TResult>(this IObservable<TSource> source, Func<TSource, CancellationToken, Task<TTaskResult>> taskSelector, Func<TSource, TTaskResult, TResult> resultSelector)
         {
         {
             if (source == null)
             if (source == null)
@@ -1023,7 +1023,7 @@ namespace System.Reactive.Linq
         /// <param name="resultSelector">A transform function to apply to each element of the intermediate sequence; the second parameter of the function represents the index of the source element.</param>
         /// <param name="resultSelector">A transform function to apply to each element of the intermediate sequence; the second parameter of the function represents the index of the source element.</param>
         /// <returns>An observable sequence whose elements are the result of obtaining a task for each element of the input sequence and then mapping the task's result and its corresponding source element to a result element.</returns>
         /// <returns>An observable sequence whose elements are the result of obtaining a task for each element of the input sequence and then mapping the task's result and its corresponding source element to a result element.</returns>
         /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="taskSelector"/> or <paramref name="resultSelector"/> is null.</exception>
         /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="taskSelector"/> or <paramref name="resultSelector"/> is null.</exception>
-        /// <remarks>This overload supports using LINQ query comprehension syntax in C# and Visual Basic to compose observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="TaskObservableExtensions.ToObservable&lt;TResult&gt;"/>.</remarks>
+        /// <remarks>This overload supports using LINQ query comprehension syntax in C# and Visual Basic to compose observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="TaskObservableExtensions.ToObservable{TResult}(Task{TResult})"/>.</remarks>
         public static IObservable<TResult> SelectMany<TSource, TTaskResult, TResult>(this IObservable<TSource> source, Func<TSource, int, CancellationToken, Task<TTaskResult>> taskSelector, Func<TSource, int, TTaskResult, TResult> resultSelector)
         public static IObservable<TResult> SelectMany<TSource, TTaskResult, TResult>(this IObservable<TSource> source, Func<TSource, int, CancellationToken, Task<TTaskResult>> taskSelector, Func<TSource, int, TTaskResult, TResult> resultSelector)
         {
         {
             if (source == null)
             if (source == null)

+ 96 - 4
Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Async.cs

@@ -665,6 +665,16 @@ namespace System.Reactive.Linq
 
 
 #if !NO_TPL
 #if !NO_TPL
         public virtual IObservable<TSource> StartAsync<TSource>(Func<Task<TSource>> functionAsync)
         public virtual IObservable<TSource> StartAsync<TSource>(Func<Task<TSource>> functionAsync)
+        {
+            return StartAsyncImpl(functionAsync, null);
+        }
+
+        public virtual IObservable<TSource> StartAsync<TSource>(Func<Task<TSource>> functionAsync, IScheduler scheduler)
+        {
+            return StartAsyncImpl(functionAsync, scheduler);
+        }
+
+        private IObservable<TSource> StartAsyncImpl<TSource>(Func<Task<TSource>> functionAsync, IScheduler scheduler)
         {
         {
             var task = default(Task<TSource>);
             var task = default(Task<TSource>);
             try
             try
@@ -676,10 +686,27 @@ namespace System.Reactive.Linq
                 return Throw<TSource>(exception);
                 return Throw<TSource>(exception);
             }
             }
 
 
-            return task.ToObservable();
+            if (scheduler != null)
+            {
+                return task.ToObservable(scheduler);
+            }
+            else
+            {
+                return task.ToObservable();
+            }
         }
         }
 
 
         public virtual IObservable<TSource> StartAsync<TSource>(Func<CancellationToken, Task<TSource>> functionAsync)
         public virtual IObservable<TSource> StartAsync<TSource>(Func<CancellationToken, Task<TSource>> functionAsync)
+        {
+            return StartAsyncImpl(functionAsync, null);
+        }
+
+        public virtual IObservable<TSource> StartAsync<TSource>(Func<CancellationToken, Task<TSource>> functionAsync, IScheduler scheduler)
+        {
+            return StartAsyncImpl(functionAsync, scheduler);
+        }
+
+        private IObservable<TSource> StartAsyncImpl<TSource>(Func<CancellationToken, Task<TSource>> functionAsync, IScheduler scheduler)
         {
         {
             var cancellable = new CancellationDisposable();
             var cancellable = new CancellationDisposable();
 
 
@@ -693,7 +720,16 @@ namespace System.Reactive.Linq
                 return Throw<TSource>(exception);
                 return Throw<TSource>(exception);
             }
             }
 
 
-            var result = task.ToObservable();
+            var result = default(IObservable<TSource>);
+
+            if (scheduler != null)
+            {
+                result = task.ToObservable(scheduler);
+            }
+            else
+            {
+                result = task.ToObservable();
+            }
 
 
             return new AnonymousObservable<TSource>(observer =>
             return new AnonymousObservable<TSource>(observer =>
             {
             {
@@ -722,6 +758,16 @@ namespace System.Reactive.Linq
 
 
 #if !NO_TPL
 #if !NO_TPL
         public virtual IObservable<Unit> StartAsync(Func<Task> actionAsync)
         public virtual IObservable<Unit> StartAsync(Func<Task> actionAsync)
+        {
+            return StartAsyncImpl(actionAsync, null);
+        }
+
+        public virtual IObservable<Unit> StartAsync(Func<Task> actionAsync, IScheduler scheduler)
+        {
+            return StartAsyncImpl(actionAsync, scheduler);
+        }
+
+        private IObservable<Unit> StartAsyncImpl(Func<Task> actionAsync, IScheduler scheduler)
         {
         {
             var task = default(Task);
             var task = default(Task);
             try
             try
@@ -733,10 +779,27 @@ namespace System.Reactive.Linq
                 return Throw<Unit>(exception);
                 return Throw<Unit>(exception);
             }
             }
 
 
-            return task.ToObservable();
+            if (scheduler != null)
+            {
+                return task.ToObservable(scheduler);
+            }
+            else
+            {
+                return task.ToObservable();
+            }
         }
         }
 
 
         public virtual IObservable<Unit> StartAsync(Func<CancellationToken, Task> actionAsync)
         public virtual IObservable<Unit> StartAsync(Func<CancellationToken, Task> actionAsync)
+        {
+            return StartAsyncImpl(actionAsync, null);
+        }
+
+        public virtual IObservable<Unit> StartAsync(Func<CancellationToken, Task> actionAsync, IScheduler scheduler)
+        {
+            return StartAsyncImpl(actionAsync, scheduler);
+        }
+
+        private IObservable<Unit> StartAsyncImpl(Func<CancellationToken, Task> actionAsync, IScheduler scheduler)
         {
         {
             var cancellable = new CancellationDisposable();
             var cancellable = new CancellationDisposable();
 
 
@@ -750,7 +813,16 @@ namespace System.Reactive.Linq
                 return Throw<Unit>(exception);
                 return Throw<Unit>(exception);
             }
             }
 
 
-            var result = task.ToObservable();
+            var result = default(IObservable<Unit>);
+
+            if (scheduler != null)
+            {
+                result = task.ToObservable(scheduler);
+            }
+            else
+            {
+                result = task.ToObservable();
+            }
 
 
             return new AnonymousObservable<Unit>(observer =>
             return new AnonymousObservable<Unit>(observer =>
             {
             {
@@ -783,6 +855,16 @@ namespace System.Reactive.Linq
             return Defer(() => StartAsync(functionAsync));
             return Defer(() => StartAsync(functionAsync));
         }
         }
 
 
+        public virtual IObservable<TResult> FromAsync<TResult>(Func<Task<TResult>> functionAsync, IScheduler scheduler)
+        {
+            return Defer(() => StartAsync(functionAsync, scheduler));
+        }
+
+        public virtual IObservable<TResult> FromAsync<TResult>(Func<CancellationToken, Task<TResult>> functionAsync, IScheduler scheduler)
+        {
+            return Defer(() => StartAsync(functionAsync, scheduler));
+        }
+
         #endregion
         #endregion
 
 
         #region Action
         #region Action
@@ -797,6 +879,16 @@ namespace System.Reactive.Linq
             return Defer(() => StartAsync(actionAsync));
             return Defer(() => StartAsync(actionAsync));
         }
         }
 
 
+        public virtual IObservable<Unit> FromAsync(Func<Task> actionAsync, IScheduler scheduler)
+        {
+            return Defer(() => StartAsync(actionAsync, scheduler));
+        }
+
+        public virtual IObservable<Unit> FromAsync(Func<CancellationToken, Task> actionAsync, IScheduler scheduler)
+        {
+            return Defer(() => StartAsync(actionAsync, scheduler));
+        }
+
         #endregion
         #endregion
 
 
 #endif
 #endif

+ 141 - 20
Rx.NET/Source/System.Reactive.Linq/Reactive/Threading/Tasks/TaskObservableExtensions.cs

@@ -6,6 +6,8 @@ using System.Threading.Tasks;
 using System.Threading;
 using System.Threading;
 using System.Reactive.Linq;
 using System.Reactive.Linq;
 using System.Reactive.Subjects;
 using System.Reactive.Subjects;
+using System.Reactive.Concurrency;
+using System.Reactive.Linq.ObservableImpl;
 
 
 namespace System.Reactive.Threading.Tasks
 namespace System.Reactive.Threading.Tasks
 {
 {
@@ -26,29 +28,71 @@ namespace System.Reactive.Threading.Tasks
             if (task == null)
             if (task == null)
                 throw new ArgumentNullException("task");
                 throw new ArgumentNullException("task");
 
 
-            var subject = new AsyncSubject<Unit>();
+            return ToObservableImpl(task, null);
+        }
+
+        /// <summary>
+        /// Returns an observable sequence that signals when the task completes.
+        /// </summary>
+        /// <param name="task">Task to convert to an observable sequence.</param>
+        /// <param name="scheduler">Scheduler on which to notify observers about completion, cancellation or failure.</param>
+        /// <returns>An observable sequence that produces a unit value when the task completes, or propagates the exception produced by the task.</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="task"/> is null or <paramref name="scheduler"/> is null.</exception>
+        /// <remarks>If the specified task object supports cancellation, consider using <see cref="Observable.FromAsync(Func{CancellationToken, Task})"/> instead.</remarks>
+        public static IObservable<Unit> ToObservable(this Task task, IScheduler scheduler)
+        {
+            if (task == null)
+                throw new ArgumentNullException("task");
+            if (scheduler == null)
+                throw new ArgumentNullException("scheduler");
+
+            return ToObservableImpl(task, scheduler);
+        }
+
+        private static IObservable<Unit> ToObservableImpl(Task task, IScheduler scheduler)
+        {
+            var res = default(IObservable<Unit>);
 
 
             if (task.IsCompleted)
             if (task.IsCompleted)
             {
             {
-                ToObservableDone(task, subject);
+                scheduler = scheduler ?? ImmediateScheduler.Instance;
+
+                switch (task.Status)
+                {
+                    case TaskStatus.RanToCompletion:
+                        res = new Return<Unit>(Unit.Default, scheduler);
+                        break;
+                    case TaskStatus.Faulted:
+                        res = new Throw<Unit>(task.Exception.InnerException, scheduler);
+                        break;
+                    case TaskStatus.Canceled:
+                        res = new Throw<Unit>(new TaskCanceledException(task), scheduler);
+                        break;
+                }
             }
             }
             else
             else
             {
             {
-                ToObservableSlow(task, subject);
+                //
+                // Separate method to avoid closure in synchronous completion case.
+                //
+                res = ToObservableSlow(task, scheduler);
             }
             }
 
 
-            return subject.AsObservable();
+            return res;
         }
         }
 
 
-        private static void ToObservableSlow(Task task, AsyncSubject<Unit> subject)
+        private static IObservable<Unit> ToObservableSlow(Task task, IScheduler scheduler)
         {
         {
-            //
-            // Separate method to avoid closure in synchronous completion case.
-            //
-            task.ContinueWith(t => ToObservableDone(task, subject));
+            var subject = new AsyncSubject<Unit>();
+
+            var options = GetTaskContinuationOptions(scheduler);
+
+            task.ContinueWith(t => ToObservableDone(task, subject), options);
+
+            return ToObservableResult(subject, scheduler);
         }
         }
 
 
-        private static void ToObservableDone(Task task, AsyncSubject<Unit> subject)
+        private static void ToObservableDone(Task task, IObserver<Unit> subject)
         {
         {
             switch (task.Status)
             switch (task.Status)
             {
             {
@@ -78,29 +122,72 @@ namespace System.Reactive.Threading.Tasks
             if (task == null)
             if (task == null)
                 throw new ArgumentNullException("task");
                 throw new ArgumentNullException("task");
 
 
-            var subject = new AsyncSubject<TResult>();
+            return ToObservableImpl(task, null);
+        }
+
+        /// <summary>
+        /// Returns an observable sequence that propagates the result of the task.
+        /// </summary>
+        /// <typeparam name="TResult">The type of the result produced by the task.</typeparam>
+        /// <param name="task">Task to convert to an observable sequence.</param>
+        /// <param name="scheduler">Scheduler on which to notify observers about completion, cancellation or failure.</param>
+        /// <returns>An observable sequence that produces the task's result, or propagates the exception produced by the task.</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="task"/> is null or <paramref name="scheduler"/> is null.</exception>
+        /// <remarks>If the specified task object supports cancellation, consider using <see cref="Observable.FromAsync{TResult}(Func{CancellationToken, Task{TResult}})"/> instead.</remarks>
+        public static IObservable<TResult> ToObservable<TResult>(this Task<TResult> task, IScheduler scheduler)
+        {
+            if (task == null)
+                throw new ArgumentNullException("task");
+            if (scheduler == null)
+                throw new ArgumentNullException("scheduler");
+
+            return ToObservableImpl(task, scheduler);
+        }
+
+        private static IObservable<TResult> ToObservableImpl<TResult>(Task<TResult> task, IScheduler scheduler)
+        {
+            var res = default(IObservable<TResult>);
 
 
             if (task.IsCompleted)
             if (task.IsCompleted)
             {
             {
-                ToObservableDone<TResult>(task, subject);
+                scheduler = scheduler ?? ImmediateScheduler.Instance;
+
+                switch (task.Status)
+                {
+                    case TaskStatus.RanToCompletion:
+                        res = new Return<TResult>(task.Result, scheduler);
+                        break;
+                    case TaskStatus.Faulted:
+                        res = new Throw<TResult>(task.Exception.InnerException, scheduler);
+                        break;
+                    case TaskStatus.Canceled:
+                        res = new Throw<TResult>(new TaskCanceledException(task), scheduler);
+                        break;
+                }
             }
             }
             else
             else
             {
             {
-                ToObservableSlow<TResult>(task, subject);
+                //
+                // Separate method to avoid closure in synchronous completion case.
+                //
+                res = ToObservableSlow(task, scheduler);
             }
             }
 
 
-            return subject.AsObservable();
+            return res;
         }
         }
 
 
-        private static void ToObservableSlow<TResult>(Task<TResult> task, AsyncSubject<TResult> subject)
+        private static IObservable<TResult> ToObservableSlow<TResult>(Task<TResult> task, IScheduler scheduler)
         {
         {
-            //
-            // Separate method to avoid closure in synchronous completion case.
-            //
-            task.ContinueWith(t => ToObservableDone(t, subject));
+            var subject = new AsyncSubject<TResult>();
+
+            var options = GetTaskContinuationOptions(scheduler);
+
+            task.ContinueWith(t => ToObservableDone(task, subject), options);
+
+            return ToObservableResult(subject, scheduler);
         }
         }
 
 
-        private static void ToObservableDone<TResult>(Task<TResult> task, AsyncSubject<TResult> subject)
+        private static void ToObservableDone<TResult>(Task<TResult> task, IObserver<TResult> subject)
         {
         {
             switch (task.Status)
             switch (task.Status)
             {
             {
@@ -117,6 +204,40 @@ namespace System.Reactive.Threading.Tasks
             }
             }
         }
         }
 
 
+        private static TaskContinuationOptions GetTaskContinuationOptions(IScheduler scheduler)
+        {
+            var options = TaskContinuationOptions.None;
+
+            if (scheduler != null)
+            {
+                //
+                // We explicitly don't special-case the immediate scheduler here. If the user asks for a
+                // synchronous completion, we'll try our best. However, there's no guarantee due to the
+                // internal stack probing in the TPL, which may cause asynchronous completion on a thread
+                // pool thread in order to avoid stack overflows. Therefore we can only attempt to be more
+                // efficient in the case where the user specified a scheduler, hence we know that the
+                // continuation will trigger a scheduling operation. In case of the immediate scheduler,
+                // it really becomes "immediate scheduling" wherever the TPL decided to run the continuation,
+                // i.e. not necessarily where the task was completed from.
+                //
+                options |= TaskContinuationOptions.ExecuteSynchronously;
+            }
+
+            return options;
+        }
+
+        private static IObservable<TResult> ToObservableResult<TResult>(AsyncSubject<TResult> subject, IScheduler scheduler)
+        {
+            if (scheduler != null)
+            {
+                return subject.ObserveOn(scheduler);
+            }
+            else
+            {
+                return subject.AsObservable();
+            }
+        }
+
         /// <summary>
         /// <summary>
         /// Returns a task that will receive the last value or the exception produced by the observable sequence.
         /// Returns a task that will receive the last value or the exception produced by the observable sequence.
         /// </summary>
         /// </summary>

+ 8 - 8
Rx.NET/Source/System.Reactive.Observable.Aliases/Observable.Aliases.cs

@@ -116,7 +116,7 @@ namespace System.Reactive.Observable.Aliases
         /// <param name="source">An observable sequence of elements to project.</param>
         /// <param name="source">An observable sequence of elements to project.</param>
         /// <param name="selector">A transform function to apply to each element.</param>
         /// <param name="selector">A transform function to apply to each element.</param>
         /// <returns>An observable sequence whose elements are the result of the tasks executed for each element of the input sequence.</returns>
         /// <returns>An observable sequence whose elements are the result of the tasks executed for each element of the input sequence.</returns>
-        /// <remarks>This overload supports composition of observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="TaskObservableExtensions.ToObservable&lt;TResult&gt;"/>.</remarks>
+        /// <remarks>This overload supports composition of observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="TaskObservableExtensions.ToObservable{TResult}(Task{TResult})"/>.</remarks>
         /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="selector"/> is null.</exception>
         /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="selector"/> is null.</exception>
         public static IObservable<TResult> FlatMap<TSource, TResult>(this IObservable<TSource> source, Func<TSource, Task<TResult>> selector)
         public static IObservable<TResult> FlatMap<TSource, TResult>(this IObservable<TSource> source, Func<TSource, Task<TResult>> selector)
         {
         {
@@ -132,7 +132,7 @@ namespace System.Reactive.Observable.Aliases
         /// <param name="source">An observable sequence of elements to project.</param>
         /// <param name="source">An observable sequence of elements to project.</param>
         /// <param name="selector">A transform function to apply to each element; the second parameter of the function represents the index of the source element.</param>
         /// <param name="selector">A transform function to apply to each element; the second parameter of the function represents the index of the source element.</param>
         /// <returns>An observable sequence whose elements are the result of the tasks executed for each element of the input sequence.</returns>
         /// <returns>An observable sequence whose elements are the result of the tasks executed for each element of the input sequence.</returns>
-        /// <remarks>This overload supports composition of observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="TaskObservableExtensions.ToObservable&lt;TResult&gt;"/>.</remarks>
+        /// <remarks>This overload supports composition of observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="TaskObservableExtensions.ToObservable{TResult}(Task{TResult})"/>.</remarks>
         /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="selector"/> is null.</exception>
         /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="selector"/> is null.</exception>
         public static IObservable<TResult> FlatMap<TSource, TResult>(this IObservable<TSource> source, Func<TSource, int, Task<TResult>> selector)
         public static IObservable<TResult> FlatMap<TSource, TResult>(this IObservable<TSource> source, Func<TSource, int, Task<TResult>> selector)
         {
         {
@@ -148,7 +148,7 @@ namespace System.Reactive.Observable.Aliases
         /// <param name="source">An observable sequence of elements to project.</param>
         /// <param name="source">An observable sequence of elements to project.</param>
         /// <param name="selector">A transform function to apply to each element.</param>
         /// <param name="selector">A transform function to apply to each element.</param>
         /// <returns>An observable sequence whose elements are the result of the tasks executed for each element of the input sequence.</returns>
         /// <returns>An observable sequence whose elements are the result of the tasks executed for each element of the input sequence.</returns>
-        /// <remarks>This overload supports composition of observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="TaskObservableExtensions.ToObservable&lt;TResult&gt;"/>.</remarks>
+        /// <remarks>This overload supports composition of observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="TaskObservableExtensions.ToObservable{TResult}(Task{TResult})"/>.</remarks>
         /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="selector"/> is null.</exception>
         /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="selector"/> is null.</exception>
         public static IObservable<TResult> FlatMap<TSource, TResult>(this IObservable<TSource> source, Func<TSource, CancellationToken, Task<TResult>> selector)
         public static IObservable<TResult> FlatMap<TSource, TResult>(this IObservable<TSource> source, Func<TSource, CancellationToken, Task<TResult>> selector)
         {
         {
@@ -164,7 +164,7 @@ namespace System.Reactive.Observable.Aliases
         /// <param name="source">An observable sequence of elements to project.</param>
         /// <param name="source">An observable sequence of elements to project.</param>
         /// <param name="selector">A transform function to apply to each element; the second parameter of the function represents the index of the source element.</param>
         /// <param name="selector">A transform function to apply to each element; the second parameter of the function represents the index of the source element.</param>
         /// <returns>An observable sequence whose elements are the result of the tasks executed for each element of the input sequence.</returns>
         /// <returns>An observable sequence whose elements are the result of the tasks executed for each element of the input sequence.</returns>
-        /// <remarks>This overload supports composition of observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="TaskObservableExtensions.ToObservable&lt;TResult&gt;"/>.</remarks>
+        /// <remarks>This overload supports composition of observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="TaskObservableExtensions.ToObservable{TResult}(Task{TResult})"/>.</remarks>
         /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="selector"/> is null.</exception>
         /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="selector"/> is null.</exception>
         public static IObservable<TResult> FlatMap<TSource, TResult>(this IObservable<TSource> source, Func<TSource, int, CancellationToken, Task<TResult>> selector)
         public static IObservable<TResult> FlatMap<TSource, TResult>(this IObservable<TSource> source, Func<TSource, int, CancellationToken, Task<TResult>> selector)
         {
         {
@@ -219,7 +219,7 @@ namespace System.Reactive.Observable.Aliases
         /// <param name="resultSelector">A transform function to apply to each element of the intermediate sequence.</param>
         /// <param name="resultSelector">A transform function to apply to each element of the intermediate sequence.</param>
         /// <returns>An observable sequence whose elements are the result of obtaining a task for each element of the input sequence and then mapping the task's result and its corresponding source element to a result element.</returns>
         /// <returns>An observable sequence whose elements are the result of obtaining a task for each element of the input sequence and then mapping the task's result and its corresponding source element to a result element.</returns>
         /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="taskSelector"/> or <paramref name="resultSelector"/> is null.</exception>
         /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="taskSelector"/> or <paramref name="resultSelector"/> is null.</exception>
-        /// <remarks>This overload supports using LINQ query comprehension syntax in C# and Visual Basic to compose observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="TaskObservableExtensions.ToObservable&lt;TResult&gt;"/>.</remarks>
+        /// <remarks>This overload supports using LINQ query comprehension syntax in C# and Visual Basic to compose observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="TaskObservableExtensions.ToObservable{TResult}(Task{TResult})"/>.</remarks>
         public static IObservable<TResult> FlatMap<TSource, TTaskResult, TResult>(this IObservable<TSource> source, Func<TSource, Task<TTaskResult>> taskSelector, Func<TSource, TTaskResult, TResult> resultSelector)
         public static IObservable<TResult> FlatMap<TSource, TTaskResult, TResult>(this IObservable<TSource> source, Func<TSource, Task<TTaskResult>> taskSelector, Func<TSource, TTaskResult, TResult> resultSelector)
         {
         {
             return source.SelectMany<TSource, TTaskResult, TResult>(taskSelector, resultSelector);
             return source.SelectMany<TSource, TTaskResult, TResult>(taskSelector, resultSelector);
@@ -237,7 +237,7 @@ namespace System.Reactive.Observable.Aliases
         /// <param name="resultSelector">A transform function to apply to each element of the intermediate sequence; the second parameter of the function represents the index of the source element.</param>
         /// <param name="resultSelector">A transform function to apply to each element of the intermediate sequence; the second parameter of the function represents the index of the source element.</param>
         /// <returns>An observable sequence whose elements are the result of obtaining a task for each element of the input sequence and then mapping the task's result and its corresponding source element to a result element.</returns>
         /// <returns>An observable sequence whose elements are the result of obtaining a task for each element of the input sequence and then mapping the task's result and its corresponding source element to a result element.</returns>
         /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="taskSelector"/> or <paramref name="resultSelector"/> is null.</exception>
         /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="taskSelector"/> or <paramref name="resultSelector"/> is null.</exception>
-        /// <remarks>This overload supports using LINQ query comprehension syntax in C# and Visual Basic to compose observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="TaskObservableExtensions.ToObservable&lt;TResult&gt;"/>.</remarks>
+        /// <remarks>This overload supports using LINQ query comprehension syntax in C# and Visual Basic to compose observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="TaskObservableExtensions.ToObservable{TResult}(Task{TResult})"/>.</remarks>
         public static IObservable<TResult> FlatMap<TSource, TTaskResult, TResult>(this IObservable<TSource> source, Func<TSource, int, Task<TTaskResult>> taskSelector, Func<TSource, int, TTaskResult, TResult> resultSelector)
         public static IObservable<TResult> FlatMap<TSource, TTaskResult, TResult>(this IObservable<TSource> source, Func<TSource, int, Task<TTaskResult>> taskSelector, Func<TSource, int, TTaskResult, TResult> resultSelector)
         {
         {
             return source.SelectMany<TSource, TTaskResult, TResult>(taskSelector, resultSelector);
             return source.SelectMany<TSource, TTaskResult, TResult>(taskSelector, resultSelector);
@@ -255,7 +255,7 @@ namespace System.Reactive.Observable.Aliases
         /// <param name="resultSelector">A transform function to apply to each element of the intermediate sequence.</param>
         /// <param name="resultSelector">A transform function to apply to each element of the intermediate sequence.</param>
         /// <returns>An observable sequence whose elements are the result of obtaining a task for each element of the input sequence and then mapping the task's result and its corresponding source element to a result element.</returns>
         /// <returns>An observable sequence whose elements are the result of obtaining a task for each element of the input sequence and then mapping the task's result and its corresponding source element to a result element.</returns>
         /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="taskSelector"/> or <paramref name="resultSelector"/> is null.</exception>
         /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="taskSelector"/> or <paramref name="resultSelector"/> is null.</exception>
-        /// <remarks>This overload supports using LINQ query comprehension syntax in C# and Visual Basic to compose observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="TaskObservableExtensions.ToObservable&lt;TResult&gt;"/>.</remarks>
+        /// <remarks>This overload supports using LINQ query comprehension syntax in C# and Visual Basic to compose observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="TaskObservableExtensions.ToObservable{TResult}(Task{TResult})"/>.</remarks>
         public static IObservable<TResult> FlatMap<TSource, TTaskResult, TResult>(this IObservable<TSource> source, Func<TSource, CancellationToken, Task<TTaskResult>> taskSelector, Func<TSource, TTaskResult, TResult> resultSelector)
         public static IObservable<TResult> FlatMap<TSource, TTaskResult, TResult>(this IObservable<TSource> source, Func<TSource, CancellationToken, Task<TTaskResult>> taskSelector, Func<TSource, TTaskResult, TResult> resultSelector)
         {
         {
             return source.SelectMany<TSource, TTaskResult, TResult>(taskSelector, resultSelector);
             return source.SelectMany<TSource, TTaskResult, TResult>(taskSelector, resultSelector);
@@ -273,7 +273,7 @@ namespace System.Reactive.Observable.Aliases
         /// <param name="resultSelector">A transform function to apply to each element of the intermediate sequence; the second parameter of the function represents the index of the source element.</param>
         /// <param name="resultSelector">A transform function to apply to each element of the intermediate sequence; the second parameter of the function represents the index of the source element.</param>
         /// <returns>An observable sequence whose elements are the result of obtaining a task for each element of the input sequence and then mapping the task's result and its corresponding source element to a result element.</returns>
         /// <returns>An observable sequence whose elements are the result of obtaining a task for each element of the input sequence and then mapping the task's result and its corresponding source element to a result element.</returns>
         /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="taskSelector"/> or <paramref name="resultSelector"/> is null.</exception>
         /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="taskSelector"/> or <paramref name="resultSelector"/> is null.</exception>
-        /// <remarks>This overload supports using LINQ query comprehension syntax in C# and Visual Basic to compose observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="TaskObservableExtensions.ToObservable&lt;TResult&gt;"/>.</remarks>
+        /// <remarks>This overload supports using LINQ query comprehension syntax in C# and Visual Basic to compose observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="TaskObservableExtensions.ToObservable{TResult}(Task{TResult})"/>.</remarks>
         public static IObservable<TResult> FlatMap<TSource, TTaskResult, TResult>(this IObservable<TSource> source, Func<TSource, int, CancellationToken, Task<TTaskResult>> taskSelector, Func<TSource, int, TTaskResult, TResult> resultSelector)
         public static IObservable<TResult> FlatMap<TSource, TTaskResult, TResult>(this IObservable<TSource> source, Func<TSource, int, CancellationToken, Task<TTaskResult>> taskSelector, Func<TSource, int, TTaskResult, TResult> resultSelector)
         {
         {
             return source.SelectMany<TSource, TTaskResult, TResult>(taskSelector, resultSelector);
             return source.SelectMany<TSource, TTaskResult, TResult>(taskSelector, resultSelector);

+ 1 - 1
Rx.NET/Source/System.Reactive.Observable.Aliases/Qbservable.Aliases.Generated.cs

@@ -1,5 +1,5 @@
 /*
 /*
- * WARNING: Auto-generated file (11/21/2013 11:07:25 AM)
+ * WARNING: Auto-generated file (4/16/2015 23:47:35)
  * Run Rx's auto-homoiconizer tool to generate this file (in the HomoIcon directory).
  * Run Rx's auto-homoiconizer tool to generate this file (in the HomoIcon directory).
  */
  */
 
 

Plik diff jest za duży
+ 458 - 112
Rx.NET/Source/System.Reactive.Providers/Reactive/Linq/Qbservable.Generated.cs


+ 239 - 0
Rx.NET/Source/Tests.System.Reactive/Tests/Linq/ObservableAsyncTest.cs

@@ -1291,8 +1291,16 @@ namespace ReactiveTests.Tests
         [TestMethod]
         [TestMethod]
         public void StartAsync_Func_ArgumentChecking()
         public void StartAsync_Func_ArgumentChecking()
         {
         {
+            var s = Scheduler.Immediate;
+
             ReactiveAssert.Throws<ArgumentNullException>(() => Observable.StartAsync<int>(default(Func<Task<int>>)));
             ReactiveAssert.Throws<ArgumentNullException>(() => Observable.StartAsync<int>(default(Func<Task<int>>)));
             ReactiveAssert.Throws<ArgumentNullException>(() => Observable.StartAsync<int>(default(Func<CancellationToken, Task<int>>)));
             ReactiveAssert.Throws<ArgumentNullException>(() => Observable.StartAsync<int>(default(Func<CancellationToken, Task<int>>)));
+
+            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.StartAsync<int>(default(Func<Task<int>>), s));
+            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.StartAsync<int>(default(Func<CancellationToken, Task<int>>), s));
+
+            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.StartAsync<int>(() => Task.FromResult(1), default(IScheduler)));
+            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.StartAsync<int>(ct => Task.FromResult(1), default(IScheduler)));
         }
         }
 
 
         [TestMethod]
         [TestMethod]
@@ -1427,6 +1435,58 @@ namespace ReactiveTests.Tests
             }
             }
         }
         }
 
 
+#if DESKTOPCLR
+        [TestMethod]
+        public void StartAsync_Func_Scheduler1()
+        {
+            var tcs = new TaskCompletionSource<int>();
+
+            var e = new ManualResetEvent(false);
+            var x = default(int);
+            var t = default(int);
+
+            var xs = Observable.StartAsync(() => tcs.Task, Scheduler.Immediate);
+            xs.Subscribe(res =>
+            {
+                x = res;
+                t = Thread.CurrentThread.ManagedThreadId;
+                e.Set();
+            });
+
+            tcs.SetResult(42);
+
+            e.WaitOne();
+
+            Assert.AreEqual(42, x);
+            Assert.AreEqual(Thread.CurrentThread.ManagedThreadId, t);
+        }
+
+        [TestMethod]
+        public void StartAsync_Func_Scheduler2()
+        {
+            var tcs = new TaskCompletionSource<int>();
+
+            var e = new ManualResetEvent(false);
+            var x = default(int);
+            var t = default(int);
+
+            var xs = Observable.StartAsync(ct => tcs.Task, Scheduler.Immediate);
+            xs.Subscribe(res =>
+            {
+                x = res;
+                t = Thread.CurrentThread.ManagedThreadId;
+                e.Set();
+            });
+
+            tcs.SetResult(42);
+
+            e.WaitOne();
+
+            Assert.AreEqual(42, x);
+            Assert.AreEqual(Thread.CurrentThread.ManagedThreadId, t);
+        }
+#endif
+
         #endregion
         #endregion
 
 
         #region Action
         #region Action
@@ -1434,8 +1494,15 @@ namespace ReactiveTests.Tests
         [TestMethod]
         [TestMethod]
         public void StartAsync_Action_ArgumentChecking()
         public void StartAsync_Action_ArgumentChecking()
         {
         {
+            var s = Scheduler.Immediate;
+
             ReactiveAssert.Throws<ArgumentNullException>(() => Observable.StartAsync(default(Func<Task>)));
             ReactiveAssert.Throws<ArgumentNullException>(() => Observable.StartAsync(default(Func<Task>)));
             ReactiveAssert.Throws<ArgumentNullException>(() => Observable.StartAsync(default(Func<CancellationToken, Task>)));
             ReactiveAssert.Throws<ArgumentNullException>(() => Observable.StartAsync(default(Func<CancellationToken, Task>)));
+
+            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.StartAsync(default(Func<Task>), s));
+            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.StartAsync(default(Func<CancellationToken, Task>), s));
+            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.StartAsync(() => (Task)Task.FromResult(1), default(IScheduler)));
+            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.StartAsync(ct => (Task)Task.FromResult(1), default(IScheduler)));
         }
         }
 
 
         [TestMethod]
         [TestMethod]
@@ -1562,6 +1629,52 @@ namespace ReactiveTests.Tests
             }
             }
         }
         }
 
 
+#if DESKTOPCLR
+        [TestMethod]
+        public void StartAsync_Action_Scheduler1()
+        {
+            var tcs = new TaskCompletionSource<int>();
+
+            var e = new ManualResetEvent(false);
+            var t = default(int);
+
+            var xs = Observable.StartAsync(() => (Task)tcs.Task, Scheduler.Immediate);
+            xs.Subscribe(res =>
+            {
+                t = Thread.CurrentThread.ManagedThreadId;
+                e.Set();
+            });
+
+            tcs.SetResult(42);
+
+            e.WaitOne();
+
+            Assert.AreEqual(Thread.CurrentThread.ManagedThreadId, t);
+        }
+
+        [TestMethod]
+        public void StartAsync_Action_Scheduler2()
+        {
+            var tcs = new TaskCompletionSource<int>();
+
+            var e = new ManualResetEvent(false);
+            var t = default(int);
+
+            var xs = Observable.StartAsync(ct => (Task)tcs.Task, Scheduler.Immediate);
+            xs.Subscribe(res =>
+            {
+                t = Thread.CurrentThread.ManagedThreadId;
+                e.Set();
+            });
+
+            tcs.SetResult(42);
+
+            e.WaitOne();
+
+            Assert.AreEqual(Thread.CurrentThread.ManagedThreadId, t);
+        }
+#endif
+
         #endregion
         #endregion
 
 
 #endif
 #endif
@@ -1574,6 +1687,20 @@ namespace ReactiveTests.Tests
 
 
         #region Func
         #region Func
 
 
+        [TestMethod]
+        public void FromAsync_Func_ArgumentChecking()
+        {
+            var s = Scheduler.Immediate;
+
+            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.FromAsync<int>(default(Func<Task<int>>)));
+            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.FromAsync<int>(default(Func<CancellationToken, Task<int>>)));
+
+            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.FromAsync<int>(default(Func<Task<int>>), s));
+            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.FromAsync<int>(() => Task.FromResult(42), default(IScheduler)));
+            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.FromAsync<int>(default(Func<CancellationToken, Task<int>>), s));
+            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.FromAsync<int>(ct => Task.FromResult(42), default(IScheduler)));
+        }
+
         [TestMethod]
         [TestMethod]
         public void FromAsync_Func_Success()
         public void FromAsync_Func_Success()
         {
         {
@@ -1694,10 +1821,76 @@ namespace ReactiveTests.Tests
                 ;
                 ;
         }
         }
 
 
+#if DESKTOPCLR
+        [TestMethod]
+        public void FromAsync_Func_Scheduler1()
+        {
+            var e = new ManualResetEvent(false);
+            var x = default(int);
+            var t = default(int);
+
+            var tcs = new TaskCompletionSource<int>();
+
+            var xs = Observable.FromAsync(() => tcs.Task, Scheduler.Immediate);
+            xs.Subscribe(res =>
+            {
+                x = res;
+                t = Thread.CurrentThread.ManagedThreadId;
+                e.Set();
+            });
+
+            tcs.SetResult(42);
+
+            e.WaitOne();
+
+            Assert.AreEqual(42, x);
+            Assert.AreEqual(Thread.CurrentThread.ManagedThreadId, t);
+        }
+
+        [TestMethod]
+        public void FromAsync_Func_Scheduler2()
+        {
+            var e = new ManualResetEvent(false);
+            var x = default(int);
+            var t = default(int);
+
+            var tcs = new TaskCompletionSource<int>();
+
+            var xs = Observable.FromAsync(ct => tcs.Task, Scheduler.Immediate);
+            xs.Subscribe(res =>
+            {
+                x = res;
+                t = Thread.CurrentThread.ManagedThreadId;
+                e.Set();
+            });
+
+            tcs.SetResult(42);
+
+            e.WaitOne();
+
+            Assert.AreEqual(42, x);
+            Assert.AreEqual(Thread.CurrentThread.ManagedThreadId, t);
+        }
+#endif
+
         #endregion
         #endregion
 
 
         #region Action
         #region Action
 
 
+        [TestMethod]
+        public void FromAsync_Action_ArgumentChecking()
+        {
+            var s = Scheduler.Immediate;
+
+            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.FromAsync(default(Func<Task>)));
+            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.FromAsync(default(Func<CancellationToken, Task>)));
+
+            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.FromAsync(default(Func<Task>), s));
+            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.FromAsync(() => (Task)Task.FromResult(42), default(IScheduler)));
+            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.FromAsync(default(Func<CancellationToken, Task>), s));
+            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.FromAsync(ct => (Task)Task.FromResult(42), default(IScheduler)));
+        }
+
         [TestMethod]
         [TestMethod]
         public void FromAsync_Action_Success()
         public void FromAsync_Action_Success()
         {
         {
@@ -1814,6 +2007,52 @@ namespace ReactiveTests.Tests
                 ;
                 ;
         }
         }
 
 
+#if DESKTOPCLR
+        [TestMethod]
+        public void FromAsync_Action_Scheduler1()
+        {
+            var e = new ManualResetEvent(false);
+            var t = default(int);
+
+            var tcs = new TaskCompletionSource<int>();
+
+            var xs = Observable.FromAsync(() => (Task)tcs.Task, Scheduler.Immediate);
+            xs.Subscribe(res =>
+            {
+                t = Thread.CurrentThread.ManagedThreadId;
+                e.Set();
+            });
+
+            tcs.SetResult(42);
+
+            e.WaitOne();
+
+            Assert.AreEqual(Thread.CurrentThread.ManagedThreadId, t);
+        }
+
+        [TestMethod]
+        public void FromAsync_Action_Scheduler2()
+        {
+            var e = new ManualResetEvent(false);
+            var t = default(int);
+
+            var tcs = new TaskCompletionSource<int>();
+
+            var xs = Observable.FromAsync(ct => (Task)tcs.Task, Scheduler.Immediate);
+            xs.Subscribe(res =>
+            {
+                t = Thread.CurrentThread.ManagedThreadId;
+                e.Set();
+            });
+
+            tcs.SetResult(42);
+
+            e.WaitOne();
+
+            Assert.AreEqual(Thread.CurrentThread.ManagedThreadId, t);
+        }
+#endif
+
         #endregion
         #endregion
 
 
 #endif
 #endif

+ 65 - 2
Rx.NET/Source/Tests.System.Reactive/Tests/TaskObservableExtensionsTest.cs

@@ -22,7 +22,13 @@ namespace ReactiveTests.Tests
         [TestMethod]
         [TestMethod]
         public void TaskToObservable_NonVoid_ArgumentChecking()
         public void TaskToObservable_NonVoid_ArgumentChecking()
         {
         {
-            ReactiveAssert.Throws<ArgumentNullException>(() => TaskObservableExtensions.ToObservable((System.Threading.Tasks.Task<int>)null));
+            var s = Scheduler.Immediate;
+
+            ReactiveAssert.Throws<ArgumentNullException>(() => TaskObservableExtensions.ToObservable((Task<int>)null));
+
+            ReactiveAssert.Throws<ArgumentNullException>(() => TaskObservableExtensions.ToObservable((Task<int>)null, s));
+            ReactiveAssert.Throws<ArgumentNullException>(() => TaskObservableExtensions.ToObservable(Task.FromResult(42), default(IScheduler)));
+
             var tcs = new System.Threading.Tasks.TaskCompletionSource<int>();
             var tcs = new System.Threading.Tasks.TaskCompletionSource<int>();
             var task = tcs.Task;
             var task = tcs.Task;
             ReactiveAssert.Throws<ArgumentNullException>(() => task.ToObservable().Subscribe(null));
             ReactiveAssert.Throws<ArgumentNullException>(() => task.ToObservable().Subscribe(null));
@@ -346,10 +352,43 @@ namespace ReactiveTests.Tests
             );
             );
         }
         }
 
 
+#if DESKTOPCLR
+        [TestMethod]
+        public void TaskToObservable_NonVoid_Scheduler()
+        {
+            var e = new ManualResetEvent(false);
+            var x = default(int);
+            var t = default(int);
+
+            var cts = new TaskCompletionSource<int>();
+
+            var xs = cts.Task.ToObservable(Scheduler.Immediate);
+            xs.Subscribe(res =>
+            {
+                x = res;
+                t = Thread.CurrentThread.ManagedThreadId;
+                e.Set();
+            });
+
+            cts.SetResult(42);
+
+            e.WaitOne();
+
+            Assert.AreEqual(42, x);
+            Assert.AreEqual(Thread.CurrentThread.ManagedThreadId, t);
+        }
+#endif
+
         [TestMethod]
         [TestMethod]
         public void TaskToObservable_Void_ArgumentChecking()
         public void TaskToObservable_Void_ArgumentChecking()
         {
         {
-            ReactiveAssert.Throws<ArgumentNullException>(() => TaskObservableExtensions.ToObservable((System.Threading.Tasks.Task)null));
+            var s = Scheduler.Immediate;
+
+            ReactiveAssert.Throws<ArgumentNullException>(() => TaskObservableExtensions.ToObservable((Task)null));
+
+            ReactiveAssert.Throws<ArgumentNullException>(() => TaskObservableExtensions.ToObservable((Task)null, s));
+            ReactiveAssert.Throws<ArgumentNullException>(() => TaskObservableExtensions.ToObservable((Task)Task.FromResult(42), default(IScheduler)));
+
             var tcs = new System.Threading.Tasks.TaskCompletionSource<int>();
             var tcs = new System.Threading.Tasks.TaskCompletionSource<int>();
             System.Threading.Tasks.Task task = tcs.Task;
             System.Threading.Tasks.Task task = tcs.Task;
             ReactiveAssert.Throws<ArgumentNullException>(() => task.ToObservable().Subscribe(null));
             ReactiveAssert.Throws<ArgumentNullException>(() => task.ToObservable().Subscribe(null));
@@ -673,6 +712,30 @@ namespace ReactiveTests.Tests
             );
             );
         }
         }
 
 
+#if DESKTOPCLR
+        [TestMethod]
+        public void TaskToObservable_Void_Scheduler()
+        {
+            var e = new ManualResetEvent(false);
+            var t = default(int);
+
+            var tcs = new TaskCompletionSource<int>();
+
+            var xs = ((Task)tcs.Task).ToObservable(Scheduler.Immediate);
+            xs.Subscribe(res =>
+            {
+                t = Thread.CurrentThread.ManagedThreadId;
+                e.Set();
+            });
+
+            tcs.SetResult(42);
+
+            e.WaitOne();
+
+            Assert.AreEqual(Thread.CurrentThread.ManagedThreadId, t);
+        }
+#endif
+
         #endregion
         #endregion
 
 
         #region ToTask
         #region ToTask

Niektóre pliki nie zostały wyświetlone z powodu dużej ilości zmienionych plików