浏览代码

Allow opt-out from TaskScheduler.UnobservedExceptions (#1914)

Ian Griffiths 2 年之前
父节点
当前提交
3507d5e628

+ 5 - 0
.gitignore

@@ -189,3 +189,8 @@ nuget.exe
 
 # JetBrains Rider adds these
 .idea/
+
+# Local NCrunch settings
+*.v3.ncrunchproject
+*.v3.ncrunchsolution
+/Rx.NET/Source/.NCrunch_*/StoredText

+ 1 - 1
Rx.NET/Source/src/Microsoft.Reactive.Testing/Microsoft.Reactive.Testing.csproj

@@ -8,7 +8,7 @@
     <PackageTags>Rx;Reactive;Extensions;Observable;LINQ;Events</PackageTags>    
     <Description>Reactive Extensions (Rx) for .NET - Testing Library</Description>
     <!-- NB: A lot of CA warnings are disabled because of the .cs files included from xunit.assert.source. -->
-    <NoWarn>$(NoWarn);IDE0054;IDE0066;CA1305;CA1307;CA1032;CA1064;CA1822;CA1812;CA1823</NoWarn>
+    <NoWarn>$(NoWarn);IDE0054;IDE0066;CA1305;CA1307;CA1032;CA1064;CA1822;CA1812;CA1820;CA1823</NoWarn>
   </PropertyGroup>
 
   <ItemGroup>

+ 132 - 0
Rx.NET/Source/src/System.Reactive/Concurrency/TaskObservationOptions.cs

@@ -0,0 +1,132 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT License.
+// See the LICENSE file in the project root for more information. 
+
+using System.Threading.Tasks;
+
+namespace System.Reactive.Concurrency
+{
+    /// <summary>
+    /// Controls how completion or failure is handled when a <see cref="Task"/> or
+    /// <see cref="Task{TResult}"/> is wrapped as an <see cref="IObservable{T}"/> and observed by
+    /// an <see cref="IObserver{T}"/>.
+    /// </summary>
+    /// <remarks>
+    /// <para>
+    /// This type can be passed to overloads of the various method that adapt a TPL task as an
+    /// <see cref="IObservable{T}"/>. It deals with two concerns that arise whenever this is done:
+    /// the scheduler through which notifications are delivered, and the handling of exceptions
+    /// that occur after all observers have unsubscribed.
+    /// </para>
+    /// <para>
+    /// If the <see cref="Scheduler"/> property is non-null, it will be used to deliver all
+    /// notifications to observers, whether those notifications occur immediately (because the task
+    /// had already finished by the time it was observed) or they happen later.
+    /// </para>
+    /// <para>
+    /// The <see cref="IgnoreExceptionsAfterUnsubscribe"/> property determines how to deal with tasks
+    /// that fail after unsubscription (i.e., if an application calls <see cref="IObservable{T}.Subscribe(IObserver{T})"/>
+    /// on an observable wrapping, then calls Dispose on the result before that task completes, and
+    /// the task subsequently enters a faulted state). Overloads that don't take a <see cref="TaskObservationOptions"/>
+    /// argument do not observe the <see cref="Task.Exception"/> in this case, with the result that
+    /// the exception will then emerge from <see cref="TaskScheduler.UnobservedTaskException"/>
+    /// (which could terminate the process, depending on how the .NET application has been
+    /// configured). This is consistent with how unobserved <see cref="Task"/> failures are
+    /// normally handled, but it is not consistent with how Rx handles post-unsubcription failures
+    /// in general. For example, if the projection callback for Select is in progress at the moment
+    /// an observer unsubscribes, and that callback then goes on to throw an exception, that
+    /// exception is simply swallowed. (One could argue that it should instead be sent to some
+    /// application-level unhandled exception handler, but the current behaviour has been in place
+    /// for well over a decade, so it's not something we can change.) So there is an argument that
+    /// post-unsubscribe failures in <see cref="IObservable{T}"/>-wrapped tasks should be
+    /// ignored in exactly the same way: the default behaviour for post-unsubscribe failures in
+    /// tasks is inconsistent with the handling of all other post-unsubscribe failures. This has
+    /// also been the case for over a decade, so that inconsistency of defaults cannot be changed,
+    /// but the <see cref="IgnoreExceptionsAfterUnsubscribe"/> property enables applications to
+    /// ask for task-originated post-unsubscribe exceptions to be ignored in the same way as
+    /// non-task-originated post-unsubscribe exceptions are. (Where possible, applications should
+    /// avoid getting into situations where they throw exceptions in scenarios where nothing is
+    /// able to observe them is. This setting is a last resort for situations in which this is
+    /// truly unavoidable.)
+    /// </para>
+    /// </remarks>
+    public sealed class TaskObservationOptions
+    {
+        public TaskObservationOptions(
+            IScheduler? scheduler,
+            bool ignoreExceptionsAfterUnsubscribe)
+        {
+            Scheduler = scheduler;
+            IgnoreExceptionsAfterUnsubscribe = ignoreExceptionsAfterUnsubscribe;
+        }
+
+        /// <summary>
+        /// Gets the optional scheduler to use when delivering notifications of the tasks's
+        /// progress.
+        /// </summary>
+        /// <remarks>
+        /// If this is null, the behaviour depends on whether the task has already completed. If
+        /// the task has finished, the relevant completion or error notifications will be delivered
+        /// via <see cref="ImmediateScheduler.Instance"/>. If the task is still running (or not yet
+        /// started) at the instant at which it is observed through Rx, no scheduler will be used
+        /// if this property is null.
+        /// </remarks>
+        public IScheduler? Scheduler { get; }
+
+        /// <summary>
+        /// Gets a flag controlling handling of exceptions that occur after cancellation
+        /// has been initiated by unsubscribing from the observable representing the task's
+        /// progress.
+        /// </summary>
+        /// <remarks>
+        /// If this is <c>true</c>, exceptions that occur after all observers have unsubscribed
+        /// will be handled and silently ignored. If <c>false</c>, they will go unobserved, meaning
+        /// they will eventually emerge through <see cref="TaskScheduler.UnobservedTaskException"/>.
+        /// </remarks>
+        public bool IgnoreExceptionsAfterUnsubscribe { get; }
+
+        internal Value ToValue() => new Value(this.Scheduler, this.IgnoreExceptionsAfterUnsubscribe);
+
+        /// <summary>
+        /// Value-type representation.
+        /// </summary>
+        /// <remarks>
+        /// <para>
+        /// The public API surface area for <see cref="TaskObservationOptions"/> is a class because
+        /// using a value type would run into various issues. The type might appear in expression
+        /// trees due to use of <see cref="System.Reactive.Linq.IQbservable{T}"/>, which limits us
+        /// to a fairly old subset of C#. It means we can't use the <c>in</c> modifier on
+        /// parameters, which in turn prevents us from passing options by reference, increasing the
+        /// overhead of each method call. Also, options types such as this aren't normally value
+        /// types, so it would be a curious design choice.
+        /// </para>
+        /// <para>
+        /// The downside of using a class is that it entails an extra allocation. Since the feature
+        /// for which this is designed (the ability to swallow unhandled exceptions thrown by tasks
+        /// after unsubscription) is one we don't expect most applications to use, that shouldn't
+        /// be a problem. However, to accommodate this feature, common code paths shared by various
+        /// overloads need the information that a <see cref="TaskObservationOptions"/> holds. The
+        /// easy approach would be to construct an instance of this type in overloads that don't
+        /// take one as an argument. But that would be impose an additional allocation on code that
+        /// doesn't want this new feature.
+        /// </para>
+        /// <para>
+        /// So although we can't use a value type with <c>in</c> in public APIs dues to constraints
+        /// on expression trees, we can do so internally. This type is a value-typed version of
+        /// <see cref="TaskObservationOptions"/> enabling us to share code paths without forcing
+        /// new allocations on existing code.
+        /// </para>
+        /// </remarks>
+        internal readonly struct Value
+        {
+            internal Value(IScheduler? scheduler, bool ignoreExceptionsAfterUnsubscribe)
+            {
+                Scheduler = scheduler;
+                IgnoreExceptionsAfterUnsubscribe = ignoreExceptionsAfterUnsubscribe;
+            }
+
+            public IScheduler? Scheduler { get; }
+            public bool IgnoreExceptionsAfterUnsubscribe { get; }
+        }
+    }
+}

+ 10 - 10
Rx.NET/Source/src/System.Reactive/Linq/IQueryLanguage.cs

@@ -195,25 +195,25 @@ namespace System.Reactive.Linq
 
         IObservable<TSource> StartAsync<TSource>(Func<Task<TSource>> functionAsync);
         IObservable<TSource> StartAsync<TSource>(Func<CancellationToken, Task<TSource>> functionAsync);
-        IObservable<TSource> StartAsync<TSource>(Func<Task<TSource>> functionAsync, IScheduler scheduler);
-        IObservable<TSource> StartAsync<TSource>(Func<CancellationToken, Task<TSource>> functionAsync, IScheduler scheduler);
+        IObservable<TSource> StartAsync<TSource>(Func<Task<TSource>> functionAsync, in TaskObservationOptions.Value options);
+        IObservable<TSource> StartAsync<TSource>(Func<CancellationToken, Task<TSource>> functionAsync, in TaskObservationOptions.Value options);
 
         IObservable<Unit> Start(Action action);
         IObservable<Unit> Start(Action action, IScheduler scheduler);
 
         IObservable<Unit> StartAsync(Func<Task> actionAsync);
         IObservable<Unit> StartAsync(Func<CancellationToken, Task> actionAsync);
-        IObservable<Unit> StartAsync(Func<Task> actionAsync, IScheduler scheduler);
-        IObservable<Unit> StartAsync(Func<CancellationToken, Task> actionAsync, IScheduler scheduler);
+        IObservable<Unit> StartAsync(Func<Task> actionAsync, in TaskObservationOptions.Value options);
+        IObservable<Unit> StartAsync(Func<CancellationToken, Task> actionAsync, in TaskObservationOptions.Value options);
 
         IObservable<TResult> FromAsync<TResult>(Func<Task<TResult>> functionAsync);
         IObservable<TResult> FromAsync<TResult>(Func<CancellationToken, Task<TResult>> functionAsync);
         IObservable<Unit> FromAsync(Func<Task> actionAsync);
         IObservable<Unit> FromAsync(Func<CancellationToken, Task> actionAsync);
-        IObservable<TResult> FromAsync<TResult>(Func<Task<TResult>> functionAsync, IScheduler scheduler);
-        IObservable<TResult> FromAsync<TResult>(Func<CancellationToken, Task<TResult>> functionAsync, IScheduler scheduler);
-        IObservable<Unit> FromAsync(Func<Task> actionAsync, IScheduler scheduler);
-        IObservable<Unit> FromAsync(Func<CancellationToken, Task> actionAsync, IScheduler scheduler);
+        IObservable<TResult> FromAsync<TResult>(Func<Task<TResult>> functionAsync, TaskObservationOptions.Value options);
+        IObservable<TResult> FromAsync<TResult>(Func<CancellationToken, Task<TResult>> functionAsync, TaskObservationOptions.Value options);
+        IObservable<Unit> FromAsync(Func<Task> actionAsync, TaskObservationOptions.Value options);
+        IObservable<Unit> FromAsync(Func<CancellationToken, Task> actionAsync, TaskObservationOptions.Value options);
 
         Func<IObservable<TResult>> ToAsync<TResult>(Func<TResult> function);
         Func<IObservable<TResult>> ToAsync<TResult>(Func<TResult> function, IScheduler scheduler);
@@ -398,8 +398,8 @@ namespace System.Reactive.Linq
 
         IObservable<TValue> Defer<TValue>(Func<IObservable<TValue>> observableFactory);
 
-        IObservable<TValue> Defer<TValue>(Func<Task<IObservable<TValue>>> observableFactoryAsync);
-        IObservable<TValue> Defer<TValue>(Func<CancellationToken, Task<IObservable<TValue>>> observableFactoryAsync);
+        IObservable<TValue> Defer<TValue>(Func<Task<IObservable<TValue>>> observableFactoryAsync, bool ignoreExceptionsAfterUnsubscribe);
+        IObservable<TValue> Defer<TValue>(Func<CancellationToken, Task<IObservable<TValue>>> observableFactoryAsync, bool ignoreExceptionsAfterUnsubscribe);
 
         IObservable<TResult> Empty<TResult>();
         IObservable<TResult> Empty<TResult>(IScheduler scheduler);

+ 231 - 8
Rx.NET/Source/src/System.Reactive/Linq/Observable.Async.cs

@@ -1072,7 +1072,36 @@ namespace System.Reactive.Linq
                 throw new ArgumentNullException(nameof(scheduler));
             }
 
-            return s_impl.StartAsync(functionAsync, scheduler);
+            return s_impl.StartAsync(functionAsync, new TaskObservationOptions.Value(scheduler, ignoreExceptionsAfterUnsubscribe: false));
+        }
+
+        /// <summary>
+        /// Invokes the asynchronous function, surfacing the result through an observable sequence.
+        /// </summary>
+        /// <typeparam name="TResult">The type of the result returned by the asynchronous function.</typeparam>
+        /// <param name="functionAsync">Asynchronous function to run.</param>
+        /// <param name="options">Controls how the tasks's progress is observed.</param>
+        /// <returns>An observable sequence exposing the function's result value, or an exception.</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="functionAsync"/> is null.</exception>
+        /// <remarks>
+        /// <list type="bullet">
+        /// <item><description>The function is started immediately, not during the subscription of the resulting sequence.</description></item>
+        /// <item><description>Multiple subscriptions to the resulting sequence can observe the function's result.</description></item>
+        /// </list>
+        /// </remarks>
+        public static IObservable<TResult> StartAsync<TResult>(Func<Task<TResult>> functionAsync, TaskObservationOptions options)
+        {
+            if (functionAsync == null)
+            {
+                throw new ArgumentNullException(nameof(functionAsync));
+            }
+
+            if (options == null)
+            {
+                throw new ArgumentNullException(nameof(options));
+            }
+
+            return s_impl.StartAsync(functionAsync, options.ToValue());
         }
 
         /// <summary>
@@ -1140,7 +1169,43 @@ namespace System.Reactive.Linq
                 throw new ArgumentNullException(nameof(scheduler));
             }
 
-            return s_impl.StartAsync(functionAsync, scheduler);
+           return s_impl.StartAsync(functionAsync, new TaskObservationOptions.Value(scheduler, ignoreExceptionsAfterUnsubscribe: false));
+        }
+
+        /// <summary>
+        /// Invokes the asynchronous function, surfacing the result through an observable sequence.
+        /// The CancellationToken is shared by all subscriptions on the resulting observable sequence. See the remarks section for more information.
+        /// </summary>
+        /// <typeparam name="TResult">The type of the result returned by the asynchronous function.</typeparam>
+        /// <param name="functionAsync">Asynchronous function to run.</param>
+        /// <param name="options">Controls how the tasks's progress is observed.</param>
+        /// <returns>An observable sequence exposing the function's result value, or an exception.</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="functionAsync"/> is null.</exception>
+        /// <remarks>
+        /// <list type="bullet">
+        /// <item><description>The function is started immediately, not during the subscription of the resulting sequence.</description></item>
+        /// <item><description>Multiple subscriptions to the resulting sequence can observe the function's result.</description></item>
+        /// <item><description>
+        /// If any subscription to the resulting sequence is disposed, the CancellationToken is set. The observer associated to the disposed
+        /// subscription won't see the TaskCanceledException, but other observers will. You can protect against this using the Catch operator.
+        /// Be careful when handing out the resulting sequence because of this behavior. The most common use is to have a single subscription
+        /// to the resulting sequence, which controls the CancellationToken state. Alternatively, you can control subscription behavior using
+        /// multicast operators.
+        /// </description></item>
+        /// </list>
+        /// </remarks>
+        public static IObservable<TResult> StartAsync<TResult>(Func<CancellationToken, Task<TResult>> functionAsync, TaskObservationOptions options)
+        {
+            if (functionAsync == null)
+            {
+                throw new ArgumentNullException(nameof(functionAsync));
+            }
+            if (options == null)
+            {
+                throw new ArgumentNullException(nameof(options));
+            }
+
+            return s_impl.StartAsync(functionAsync, options.ToValue());
         }
 
         #endregion
@@ -1244,7 +1309,35 @@ namespace System.Reactive.Linq
                 throw new ArgumentNullException(nameof(scheduler));
             }
 
-            return s_impl.StartAsync(actionAsync, scheduler);
+            return s_impl.StartAsync(actionAsync, new TaskObservationOptions.Value(scheduler, ignoreExceptionsAfterUnsubscribe: false));
+        }
+
+        /// <summary>
+        /// Invokes the asynchronous action, surfacing the result through an observable sequence.
+        /// </summary>
+        /// <param name="actionAsync">Asynchronous action to run.</param>
+        /// <param name="options">Controls how the tasks's progress is observed.</param>
+        /// <returns>An observable sequence exposing a Unit value upon completion of the action, or an exception.</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="actionAsync"/> is null.</exception>
+        /// <remarks>
+        /// <list type="bullet">
+        /// <item><description>The action is started immediately, not during the subscription of the resulting sequence.</description></item>
+        /// <item><description>Multiple subscriptions to the resulting sequence can observe the action's outcome.</description></item>
+        /// </list>
+        /// </remarks>
+        public static IObservable<Unit> StartAsync(Func<Task> actionAsync, TaskObservationOptions options)
+        {
+            if (actionAsync == null)
+            {
+                throw new ArgumentNullException(nameof(actionAsync));
+            }
+
+            if (options == null)
+            {
+                throw new ArgumentNullException(nameof(options));
+            }
+
+            return s_impl.StartAsync(actionAsync, options.ToValue());
         }
 
         /// <summary>
@@ -1310,7 +1403,43 @@ namespace System.Reactive.Linq
                 throw new ArgumentNullException(nameof(scheduler));
             }
 
-            return s_impl.StartAsync(actionAsync, scheduler);
+            return s_impl.StartAsync(actionAsync, new TaskObservationOptions.Value(scheduler, ignoreExceptionsAfterUnsubscribe: false));
+        }
+
+        /// <summary>
+        /// Invokes the asynchronous action, surfacing the result through an observable sequence.
+        /// The CancellationToken is shared by all subscriptions on the resulting observable sequence. See the remarks section for more information.
+        /// </summary>
+        /// <param name="actionAsync">Asynchronous action to run.</param>
+        /// <param name="options">Controls how the tasks's progress is observed.</param>
+        /// <returns>An observable sequence exposing a Unit value upon completion of the action, or an exception.</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="actionAsync"/> is null.</exception>
+        /// <remarks>
+        /// <list type="bullet">
+        /// <item><description>The action is started immediately, not during the subscription of the resulting sequence.</description></item>
+        /// <item><description>Multiple subscriptions to the resulting sequence can observe the action's outcome.</description></item>
+        /// <item><description>
+        /// If any subscription to the resulting sequence is disposed, the CancellationToken is set. The observer associated to the disposed
+        /// subscription won't see the TaskCanceledException, but other observers will. You can protect against this using the Catch operator.
+        /// Be careful when handing out the resulting sequence because of this behavior. The most common use is to have a single subscription
+        /// to the resulting sequence, which controls the CancellationToken state. Alternatively, you can control subscription behavior using
+        /// multicast operators.
+        /// </description></item>
+        /// </list>
+        /// </remarks>
+        public static IObservable<Unit> StartAsync(Func<CancellationToken, Task> actionAsync, TaskObservationOptions options)
+        {
+            if (actionAsync == null)
+            {
+                throw new ArgumentNullException(nameof(actionAsync));
+            }
+
+            if (options == null)
+            {
+                throw new ArgumentNullException(nameof(options));
+            }
+
+            return s_impl.StartAsync(actionAsync, options.ToValue());
         }
 
         #endregion
@@ -1359,7 +1488,30 @@ namespace System.Reactive.Linq
                 throw new ArgumentNullException(nameof(scheduler));
             }
 
-            return s_impl.FromAsync(functionAsync, scheduler);
+            return s_impl.FromAsync(functionAsync, new TaskObservationOptions.Value(scheduler, ignoreExceptionsAfterUnsubscribe: false));
+        }
+
+        /// <summary>
+        /// Converts an asynchronous function into an observable sequence. Each subscription to the resulting sequence causes the function to be started.
+        /// </summary>
+        /// <typeparam name="TResult">The type of the result returned by the asynchronous function.</typeparam>
+        /// <param name="functionAsync">Asynchronous function to convert.</param>
+        /// <param name="options">Controls how the tasks's progress is observed.</param>
+        /// <returns>An observable sequence exposing the result of invoking the function, or an exception.</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="functionAsync"/> is null.</exception>
+        public static IObservable<TResult> FromAsync<TResult>(Func<Task<TResult>> functionAsync, TaskObservationOptions options)
+        {
+            if (functionAsync == null)
+            {
+                throw new ArgumentNullException(nameof(functionAsync));
+            }
+
+            if (options == null)
+            {
+                throw new ArgumentNullException(nameof(options));
+            }
+
+            return s_impl.FromAsync(functionAsync, options.ToValue());
         }
 
         /// <summary>
@@ -1403,7 +1555,32 @@ namespace System.Reactive.Linq
                 throw new ArgumentNullException(nameof(scheduler));
             }
 
-            return s_impl.FromAsync(functionAsync, scheduler);
+            return s_impl.FromAsync(functionAsync, new TaskObservationOptions.Value(scheduler, ignoreExceptionsAfterUnsubscribe: false));
+        }
+
+        /// <summary>
+        /// Converts an asynchronous function into an observable sequence. Each subscription to the resulting sequence causes the function to be started.
+        /// The CancellationToken passed to the asynchronous function is tied to the observable sequence's subscription that triggered the function's invocation and can be used for best-effort cancellation.
+        /// </summary>
+        /// <typeparam name="TResult">The type of the result returned by the asynchronous function.</typeparam>
+        /// <param name="functionAsync">Asynchronous function to convert.</param>
+        /// <param name="options">Controls how the tasks's progress is observed.</param>
+        /// <returns>An observable sequence exposing the result of invoking the function, or an exception.</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="functionAsync"/> is null.</exception>
+        /// <remarks>When a subscription to the resulting sequence is disposed, the CancellationToken that was fed to the asynchronous function will be signaled.</remarks>
+        public static IObservable<TResult> FromAsync<TResult>(Func<CancellationToken, Task<TResult>> functionAsync, TaskObservationOptions options)
+        {
+            if (functionAsync == null)
+            {
+                throw new ArgumentNullException(nameof(functionAsync));
+            }
+
+            if (options == null)
+            {
+                throw new ArgumentNullException(nameof(options));
+            }
+
+            return s_impl.FromAsync(functionAsync, options.ToValue());
         }
 
         #endregion
@@ -1445,7 +1622,29 @@ namespace System.Reactive.Linq
                 throw new ArgumentNullException(nameof(scheduler));
             }
 
-            return s_impl.FromAsync(actionAsync, scheduler);
+            return s_impl.FromAsync(actionAsync, new TaskObservationOptions.Value(scheduler, ignoreExceptionsAfterUnsubscribe: false));
+        }
+
+        /// <summary>
+        /// Converts an asynchronous action into an observable sequence. Each subscription to the resulting sequence causes the action to be started.
+        /// </summary>
+        /// <param name="actionAsync">Asynchronous action to convert.</param>
+        /// <param name="options">Controls how the tasks's progress is observed.</param>
+        /// <returns>An observable sequence exposing a Unit value upon completion of the action, or an exception.</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="actionAsync"/> is null.</exception>
+        public static IObservable<Unit> FromAsync(Func<Task> actionAsync, TaskObservationOptions options)
+        {
+            if (actionAsync == null)
+            {
+                throw new ArgumentNullException(nameof(actionAsync));
+            }
+
+            if (options == null)
+            {
+                throw new ArgumentNullException(nameof(options));
+            }
+
+            return s_impl.FromAsync(actionAsync, options.ToValue());
         }
 
         /// <summary>
@@ -1487,7 +1686,31 @@ namespace System.Reactive.Linq
                 throw new ArgumentNullException(nameof(scheduler));
             }
 
-            return s_impl.FromAsync(actionAsync, scheduler);
+            return s_impl.FromAsync(actionAsync, new TaskObservationOptions.Value(scheduler, ignoreExceptionsAfterUnsubscribe: false));
+        }
+
+        /// <summary>
+        /// Converts an asynchronous action into an observable sequence. Each subscription to the resulting sequence causes the action to be started.
+        /// The CancellationToken passed to the asynchronous action is tied to the observable sequence's subscription that triggered the action's invocation and can be used for best-effort cancellation.
+        /// </summary>
+        /// <param name="actionAsync">Asynchronous action to convert.</param>
+        /// <param name="options">Controls how the tasks's progress is observed.</param>
+        /// <returns>An observable sequence exposing a Unit value upon completion of the action, or an exception.</returns>
+        /// <remarks>When a subscription to the resulting sequence is disposed, the CancellationToken that was fed to the asynchronous function will be signaled.</remarks>
+        /// <exception cref="ArgumentNullException"><paramref name="actionAsync"/> is null.</exception>
+        public static IObservable<Unit> FromAsync(Func<CancellationToken, Task> actionAsync, TaskObservationOptions options)
+        {
+            if (actionAsync == null)
+            {
+                throw new ArgumentNullException(nameof(actionAsync));
+            }
+
+            if (options == null)
+            {
+                throw new ArgumentNullException(nameof(options));
+            }
+
+            return s_impl.FromAsync(actionAsync, options.ToValue());
         }
 
         #endregion

+ 40 - 2
Rx.NET/Source/src/System.Reactive/Linq/Observable.Creation.cs

@@ -208,13 +208,31 @@ namespace System.Reactive.Linq
         /// <exception cref="ArgumentNullException"><paramref name="observableFactoryAsync"/> is null.</exception>
         /// <remarks>This operator is especially useful in conjunction with the asynchronous programming features introduced in C# 5.0 and Visual Basic 11.</remarks>
         public static IObservable<TResult> Defer<TResult>(Func<Task<IObservable<TResult>>> observableFactoryAsync)
+        {
+            return Defer(observableFactoryAsync, ignoreExceptionsAfterUnsubscribe: false);
+        }
+
+        /// <summary>
+        /// Returns an observable sequence that starts the specified asynchronous factory function whenever a new observer subscribes.
+        /// </summary>
+        /// <typeparam name="TResult">The type of the elements in the sequence returned by the factory function, and in the resulting sequence.</typeparam>
+        /// <param name="observableFactoryAsync">Asynchronous factory function to start for each observer that subscribes to the resulting sequence.</param>
+        /// <param name="ignoreExceptionsAfterUnsubscribe">
+        /// If true, exceptions that occur after cancellation has been initiated by unsubscribing from the observable
+        /// this method returns will be handled and silently ignored. If false, they will go unobserved, meaning they
+        /// will eventually emerge through <see cref="TaskScheduler.UnobservedTaskException"/>.
+        /// </param>
+        /// <returns>An observable sequence whose observers trigger the given asynchronous observable factory function to be started.</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="observableFactoryAsync"/> is null.</exception>
+        /// <remarks>This operator is especially useful in conjunction with the asynchronous programming features introduced in C# 5.0 and Visual Basic 11.</remarks>
+        public static IObservable<TResult> Defer<TResult>(Func<Task<IObservable<TResult>>> observableFactoryAsync, bool ignoreExceptionsAfterUnsubscribe)
         {
             if (observableFactoryAsync == null)
             {
                 throw new ArgumentNullException(nameof(observableFactoryAsync));
             }
 
-            return s_impl.Defer(observableFactoryAsync);
+            return s_impl.Defer(observableFactoryAsync, ignoreExceptionsAfterUnsubscribe);
         }
 
         /// <summary>
@@ -228,13 +246,33 @@ namespace System.Reactive.Linq
         /// <remarks>This operator is especially useful in conjunction with the asynchronous programming features introduced in C# 5.0 and Visual Basic 11.</remarks>
         /// <remarks>When a subscription to the resulting sequence is disposed, the CancellationToken that was fed to the asynchronous observable factory function will be signaled.</remarks>
         public static IObservable<TResult> DeferAsync<TResult>(Func<CancellationToken, Task<IObservable<TResult>>> observableFactoryAsync)
+        {
+            return DeferAsync(observableFactoryAsync, ignoreExceptionsAfterUnsubscribe: false);
+        }
+
+        /// <summary>
+        /// Returns an observable sequence that starts the specified cancellable asynchronous factory function whenever a new observer subscribes.
+        /// The CancellationToken passed to the asynchronous factory function is tied to the returned disposable subscription, allowing best-effort cancellation.
+        /// </summary>
+        /// <typeparam name="TResult">The type of the elements in the sequence returned by the factory function, and in the resulting sequence.</typeparam>
+        /// <param name="observableFactoryAsync">Asynchronous factory function to start for each observer that subscribes to the resulting sequence.</param>
+        /// <param name="ignoreExceptionsAfterUnsubscribe">
+        /// If true, exceptions that occur after cancellation has been initiated by unsubscribing from the observable
+        /// this method returns will be handled and silently ignored. If false, they will go unobserved, meaning they
+        /// will eventually emerge through <see cref="TaskScheduler.UnobservedTaskException"/>.
+        /// </param>
+        /// <returns>An observable sequence whose observers trigger the given asynchronous observable factory function to be started.</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="observableFactoryAsync"/> is null.</exception>
+        /// <remarks>This operator is especially useful in conjunction with the asynchronous programming features introduced in C# 5.0 and Visual Basic 11.</remarks>
+        /// <remarks>When a subscription to the resulting sequence is disposed, the CancellationToken that was fed to the asynchronous observable factory function will be signaled.</remarks>
+        public static IObservable<TResult> DeferAsync<TResult>(Func<CancellationToken, Task<IObservable<TResult>>> observableFactoryAsync, bool ignoreExceptionsAfterUnsubscribe)
         {
             if (observableFactoryAsync == null)
             {
                 throw new ArgumentNullException(nameof(observableFactoryAsync));
             }
 
-            return s_impl.Defer(observableFactoryAsync);
+            return s_impl.Defer(observableFactoryAsync, ignoreExceptionsAfterUnsubscribe);
         }
 
         #endregion

+ 425 - 17
Rx.NET/Source/src/System.Reactive/Linq/Qbservable.Generated.cs

@@ -1,5 +1,5 @@
 /*
- * WARNING: Auto-generated file (merged on 06/13/2018)
+ * WARNING: Auto-generated file (merged on 04/19/2023)
  * Run Rx's auto-homoiconizer tool to generate this file (in the HomoIcon directory).
  */
 #nullable enable
@@ -3600,6 +3600,43 @@ namespace System.Reactive.Linq
             );
         }
 
+        /// <summary>
+        /// Returns an observable sequence that starts the specified asynchronous factory function whenever a new observer subscribes.
+        /// </summary>
+        /// <param name="provider">Query provider used to construct the <see cref="IQbservable{T}"/> data source.</param>
+        /// <typeparam name="TResult">The type of the elements in the sequence returned by the factory function, and in the resulting sequence.</typeparam>
+        /// <param name="observableFactoryAsync">Asynchronous factory function to start for each observer that subscribes to the resulting sequence.</param>
+        /// <param name="ignoreExceptionsAfterUnsubscribe">
+        /// If true, exceptions that occur after cancellation has been initiated by unsubscribing from the observable
+        /// this method returns will be handled and silently ignored. If false, they will go unobserved, meaning they
+        /// will eventually emerge through <see cref="E:System.Threading.Tasks.TaskScheduler.UnobservedTaskException" />.
+        /// </param>
+        /// <returns>An observable sequence whose observers trigger the given asynchronous observable factory function to be started.</returns>
+        /// <exception cref="ArgumentNullException">
+        /// <paramref name="observableFactoryAsync" /> is null.</exception>
+        /// <remarks>This operator is especially useful in conjunction with the asynchronous programming features introduced in C# 5.0 and Visual Basic 11.</remarks>
+        public static IQbservable<TResult> Defer<TResult>(this IQbservableProvider provider, Expression<Func<Task<IObservable<TResult>>>> observableFactoryAsync, bool ignoreExceptionsAfterUnsubscribe)
+        {
+            if (provider == null)
+                throw new ArgumentNullException(nameof(provider));
+            if (observableFactoryAsync == null)
+                throw new ArgumentNullException(nameof(observableFactoryAsync));
+
+            return provider.CreateQuery<TResult>(
+                Expression.Call(
+                    null,
+#if CRIPPLED_REFLECTION
+                    InfoOf(() => Qbservable.Defer<TResult>(default(IQbservableProvider), default(Expression<Func<Task<IObservable<TResult>>>>), default(bool))),
+#else
+                    ((MethodInfo)MethodInfo.GetCurrentMethod()!).MakeGenericMethod(typeof(TResult)),
+#endif
+                    Expression.Constant(provider, typeof(IQbservableProvider)),
+                    observableFactoryAsync,
+                    Expression.Constant(ignoreExceptionsAfterUnsubscribe, typeof(bool))
+                )
+            );
+        }
+
         /// <summary>
         /// Returns an observable sequence that starts the specified cancellable asynchronous factory function whenever a new observer subscribes.
         /// The CancellationToken passed to the asynchronous factory function is tied to the returned disposable subscription, allowing best-effort cancellation.
@@ -3633,6 +3670,45 @@ namespace System.Reactive.Linq
             );
         }
 
+        /// <summary>
+        /// Returns an observable sequence that starts the specified cancellable asynchronous factory function whenever a new observer subscribes.
+        /// The CancellationToken passed to the asynchronous factory function is tied to the returned disposable subscription, allowing best-effort cancellation.
+        /// </summary>
+        /// <param name="provider">Query provider used to construct the <see cref="IQbservable{T}"/> data source.</param>
+        /// <typeparam name="TResult">The type of the elements in the sequence returned by the factory function, and in the resulting sequence.</typeparam>
+        /// <param name="observableFactoryAsync">Asynchronous factory function to start for each observer that subscribes to the resulting sequence.</param>
+        /// <param name="ignoreExceptionsAfterUnsubscribe">
+        /// If true, exceptions that occur after cancellation has been initiated by unsubscribing from the observable
+        /// this method returns will be handled and silently ignored. If false, they will go unobserved, meaning they
+        /// will eventually emerge through <see cref="E:System.Threading.Tasks.TaskScheduler.UnobservedTaskException" />.
+        /// </param>
+        /// <returns>An observable sequence whose observers trigger the given asynchronous observable factory function to be started.</returns>
+        /// <exception cref="ArgumentNullException">
+        /// <paramref name="observableFactoryAsync" /> is null.</exception>
+        /// <remarks>This operator is especially useful in conjunction with the asynchronous programming features introduced in C# 5.0 and Visual Basic 11.</remarks>
+        /// <remarks>When a subscription to the resulting sequence is disposed, the CancellationToken that was fed to the asynchronous observable factory function will be signaled.</remarks>
+        public static IQbservable<TResult> DeferAsync<TResult>(this IQbservableProvider provider, Expression<Func<CancellationToken, Task<IObservable<TResult>>>> observableFactoryAsync, bool ignoreExceptionsAfterUnsubscribe)
+        {
+            if (provider == null)
+                throw new ArgumentNullException(nameof(provider));
+            if (observableFactoryAsync == null)
+                throw new ArgumentNullException(nameof(observableFactoryAsync));
+
+            return provider.CreateQuery<TResult>(
+                Expression.Call(
+                    null,
+#if CRIPPLED_REFLECTION
+                    InfoOf(() => Qbservable.DeferAsync<TResult>(default(IQbservableProvider), default(Expression<Func<CancellationToken, Task<IObservable<TResult>>>>), default(bool))),
+#else
+                    ((MethodInfo)MethodInfo.GetCurrentMethod()!).MakeGenericMethod(typeof(TResult)),
+#endif
+                    Expression.Constant(provider, typeof(IQbservableProvider)),
+                    observableFactoryAsync,
+                    Expression.Constant(ignoreExceptionsAfterUnsubscribe, typeof(bool))
+                )
+            );
+        }
+
         /// <summary>
         /// Time shifts the observable sequence to start propagating notifications at the specified absolute time.
         /// The relative time intervals between the values are preserved.
@@ -4862,7 +4938,7 @@ namespace System.Reactive.Linq
         }
 
         /// <summary>
-        /// Converts to asynchronous action into an observable sequence. Each subscription to the resulting sequence causes the action to be started.
+        /// Converts an asynchronous action into an observable sequence. Each subscription to the resulting sequence causes the action to be started.
         /// </summary>
         /// <param name="provider">Query provider used to construct the <see cref="IQbservable{T}"/> data source.</param>
         /// <param name="actionAsync">Asynchronous action to convert.</param>
@@ -4891,7 +4967,40 @@ namespace System.Reactive.Linq
         }
 
         /// <summary>
-        /// Converts to asynchronous action into an observable sequence. Each subscription to the resulting sequence causes the action to be started.
+        /// Converts an asynchronous action into an observable sequence. Each subscription to the resulting sequence causes the action to be started.
+        /// </summary>
+        /// <param name="provider">Query provider used to construct the <see cref="IQbservable{T}"/> data source.</param>
+        /// <param name="actionAsync">Asynchronous action to convert.</param>
+        /// <param name="options">Controls how the tasks's progress is observed.</param>
+        /// <returns>An observable sequence exposing a Unit value upon completion of the action, or an exception.</returns>
+        /// <exception cref="ArgumentNullException">
+        /// <paramref name="actionAsync" /> is null.</exception>
+        public static IQbservable<Unit> FromAsync(this IQbservableProvider provider, Expression<Func<Task>> actionAsync, TaskObservationOptions options)
+        {
+            if (provider == null)
+                throw new ArgumentNullException(nameof(provider));
+            if (actionAsync == null)
+                throw new ArgumentNullException(nameof(actionAsync));
+            if (options == null)
+                throw new ArgumentNullException(nameof(options));
+
+            return provider.CreateQuery<Unit>(
+                Expression.Call(
+                    null,
+#if CRIPPLED_REFLECTION
+                    InfoOf(() => Qbservable.FromAsync(default(IQbservableProvider), default(Expression<Func<Task>>), default(TaskObservationOptions))),
+#else
+                    (MethodInfo)MethodInfo.GetCurrentMethod()!,
+#endif
+                    Expression.Constant(provider, typeof(IQbservableProvider)),
+                    actionAsync,
+                    Expression.Constant(options, typeof(TaskObservationOptions))
+                )
+            );
+        }
+
+        /// <summary>
+        /// Converts an asynchronous action into an observable sequence. Each subscription to the resulting sequence causes the action to be started.
         /// </summary>
         /// <param name="provider">Query provider used to construct the <see cref="IQbservable{T}"/> data source.</param>
         /// <param name="actionAsync">Asynchronous action to convert.</param>
@@ -4924,7 +5033,7 @@ namespace System.Reactive.Linq
         }
 
         /// <summary>
-        /// Converts to asynchronous action into an observable sequence. Each subscription to the resulting sequence causes the action to be started.
+        /// Converts an asynchronous action into an observable sequence. Each subscription to the resulting sequence causes the action to be started.
         /// The CancellationToken passed to the asynchronous action is tied to the observable sequence's subscription that triggered the action's invocation and can be used for best-effort cancellation.
         /// </summary>
         /// <param name="provider">Query provider used to construct the <see cref="IQbservable{T}"/> data source.</param>
@@ -4955,7 +5064,42 @@ namespace System.Reactive.Linq
         }
 
         /// <summary>
-        /// Converts to asynchronous action into an observable sequence. Each subscription to the resulting sequence causes the action to be started.
+        /// Converts an asynchronous action into an observable sequence. Each subscription to the resulting sequence causes the action to be started.
+        /// The CancellationToken passed to the asynchronous action is tied to the observable sequence's subscription that triggered the action's invocation and can be used for best-effort cancellation.
+        /// </summary>
+        /// <param name="provider">Query provider used to construct the <see cref="IQbservable{T}"/> data source.</param>
+        /// <param name="actionAsync">Asynchronous action to convert.</param>
+        /// <param name="options">Controls how the tasks's progress is observed.</param>
+        /// <returns>An observable sequence exposing a Unit value upon completion of the action, or an exception.</returns>
+        /// <remarks>When a subscription to the resulting sequence is disposed, the CancellationToken that was fed to the asynchronous function will be signaled.</remarks>
+        /// <exception cref="ArgumentNullException">
+        /// <paramref name="actionAsync" /> is null.</exception>
+        public static IQbservable<Unit> FromAsync(this IQbservableProvider provider, Expression<Func<CancellationToken, Task>> actionAsync, TaskObservationOptions options)
+        {
+            if (provider == null)
+                throw new ArgumentNullException(nameof(provider));
+            if (actionAsync == null)
+                throw new ArgumentNullException(nameof(actionAsync));
+            if (options == null)
+                throw new ArgumentNullException(nameof(options));
+
+            return provider.CreateQuery<Unit>(
+                Expression.Call(
+                    null,
+#if CRIPPLED_REFLECTION
+                    InfoOf(() => Qbservable.FromAsync(default(IQbservableProvider), default(Expression<Func<CancellationToken, Task>>), default(TaskObservationOptions))),
+#else
+                    (MethodInfo)MethodInfo.GetCurrentMethod()!,
+#endif
+                    Expression.Constant(provider, typeof(IQbservableProvider)),
+                    actionAsync,
+                    Expression.Constant(options, typeof(TaskObservationOptions))
+                )
+            );
+        }
+
+        /// <summary>
+        /// Converts an asynchronous action into an observable sequence. Each subscription to the resulting sequence causes the action to be started.
         /// The CancellationToken passed to the asynchronous action is tied to the observable sequence's subscription that triggered the action's invocation and can be used for best-effort cancellation.
         /// </summary>
         /// <param name="provider">Query provider used to construct the <see cref="IQbservable{T}"/> data source.</param>
@@ -4990,7 +5134,7 @@ namespace System.Reactive.Linq
         }
 
         /// <summary>
-        /// Converts to asynchronous function into an observable sequence. Each subscription to the resulting sequence causes the function to be started.
+        /// Converts an asynchronous function into an observable sequence. Each subscription to the resulting sequence causes the function to be started.
         /// </summary>
         /// <param name="provider">Query provider used to construct the <see cref="IQbservable{T}"/> data source.</param>
         /// <typeparam name="TResult">The type of the result returned by the asynchronous function.</typeparam>
@@ -5020,7 +5164,7 @@ namespace System.Reactive.Linq
         }
 
         /// <summary>
-        /// Converts to asynchronous function into an observable sequence. Each subscription to the resulting sequence causes the function to be started.
+        /// Converts an asynchronous function into an observable sequence. Each subscription to the resulting sequence causes the function to be started.
         /// The CancellationToken passed to the asynchronous function is tied to the observable sequence's subscription that triggered the function's invocation and can be used for best-effort cancellation.
         /// </summary>
         /// <param name="provider">Query provider used to construct the <see cref="IQbservable{T}"/> data source.</param>
@@ -5052,7 +5196,77 @@ namespace System.Reactive.Linq
         }
 
         /// <summary>
-        /// Converts to asynchronous function into an observable sequence. Each subscription to the resulting sequence causes the function to be started.
+        /// Converts an asynchronous function into an observable sequence. Each subscription to the resulting sequence causes the function to be started.
+        /// </summary>
+        /// <param name="provider">Query provider used to construct the <see cref="IQbservable{T}"/> data source.</param>
+        /// <typeparam name="TResult">The type of the result returned by the asynchronous function.</typeparam>
+        /// <param name="functionAsync">Asynchronous function to convert.</param>
+        /// <param name="options">Controls how the tasks's progress is observed.</param>
+        /// <returns>An observable sequence exposing the result of invoking the function, or an exception.</returns>
+        /// <exception cref="ArgumentNullException">
+        /// <paramref name="functionAsync" /> is null.</exception>
+        public static IQbservable<TResult> FromAsync<TResult>(this IQbservableProvider provider, Expression<Func<Task<TResult>>> functionAsync, TaskObservationOptions options)
+        {
+            if (provider == null)
+                throw new ArgumentNullException(nameof(provider));
+            if (functionAsync == null)
+                throw new ArgumentNullException(nameof(functionAsync));
+            if (options == null)
+                throw new ArgumentNullException(nameof(options));
+
+            return provider.CreateQuery<TResult>(
+                Expression.Call(
+                    null,
+#if CRIPPLED_REFLECTION
+                    InfoOf(() => Qbservable.FromAsync<TResult>(default(IQbservableProvider), default(Expression<Func<Task<TResult>>>), default(TaskObservationOptions))),
+#else
+                    ((MethodInfo)MethodInfo.GetCurrentMethod()!).MakeGenericMethod(typeof(TResult)),
+#endif
+                    Expression.Constant(provider, typeof(IQbservableProvider)),
+                    functionAsync,
+                    Expression.Constant(options, typeof(TaskObservationOptions))
+                )
+            );
+        }
+
+        /// <summary>
+        /// Converts an asynchronous function into an observable sequence. Each subscription to the resulting sequence causes the function to be started.
+        /// The CancellationToken passed to the asynchronous function is tied to the observable sequence's subscription that triggered the function's invocation and can be used for best-effort cancellation.
+        /// </summary>
+        /// <param name="provider">Query provider used to construct the <see cref="IQbservable{T}"/> data source.</param>
+        /// <typeparam name="TResult">The type of the result returned by the asynchronous function.</typeparam>
+        /// <param name="functionAsync">Asynchronous function to convert.</param>
+        /// <param name="options">Controls how the tasks's progress is observed.</param>
+        /// <returns>An observable sequence exposing the result of invoking the function, or an exception.</returns>
+        /// <exception cref="ArgumentNullException">
+        /// <paramref name="functionAsync" /> is null.</exception>
+        /// <remarks>When a subscription to the resulting sequence is disposed, the CancellationToken that was fed to the asynchronous function will be signaled.</remarks>
+        public static IQbservable<TResult> FromAsync<TResult>(this IQbservableProvider provider, Expression<Func<CancellationToken, Task<TResult>>> functionAsync, TaskObservationOptions options)
+        {
+            if (provider == null)
+                throw new ArgumentNullException(nameof(provider));
+            if (functionAsync == null)
+                throw new ArgumentNullException(nameof(functionAsync));
+            if (options == null)
+                throw new ArgumentNullException(nameof(options));
+
+            return provider.CreateQuery<TResult>(
+                Expression.Call(
+                    null,
+#if CRIPPLED_REFLECTION
+                    InfoOf(() => Qbservable.FromAsync<TResult>(default(IQbservableProvider), default(Expression<Func<CancellationToken, Task<TResult>>>), default(TaskObservationOptions))),
+#else
+                    ((MethodInfo)MethodInfo.GetCurrentMethod()!).MakeGenericMethod(typeof(TResult)),
+#endif
+                    Expression.Constant(provider, typeof(IQbservableProvider)),
+                    functionAsync,
+                    Expression.Constant(options, typeof(TaskObservationOptions))
+                )
+            );
+        }
+
+        /// <summary>
+        /// Converts an asynchronous function into an observable sequence. Each subscription to the resulting sequence causes the function to be started.
         /// </summary>
         /// <param name="provider">Query provider used to construct the <see cref="IQbservable{T}"/> data source.</param>
         /// <typeparam name="TResult">The type of the result returned by the asynchronous function.</typeparam>
@@ -5086,7 +5300,7 @@ namespace System.Reactive.Linq
         }
 
         /// <summary>
-        /// Converts to asynchronous function into an observable sequence. Each subscription to the resulting sequence causes the function to be started.
+        /// Converts an asynchronous function into an observable sequence. Each subscription to the resulting sequence causes the function to be started.
         /// The CancellationToken passed to the asynchronous function is tied to the observable sequence's subscription that triggered the function's invocation and can be used for best-effort cancellation.
         /// </summary>
         /// <param name="provider">Query provider used to construct the <see cref="IQbservable{T}"/> data source.</param>
@@ -11175,7 +11389,7 @@ namespace System.Reactive.Linq
         /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
         /// <typeparam name="TSignal">The arbitrary element type signaled by the handler observable.</typeparam>
         /// <param name="source">Observable sequence to keep repeating when it successfully terminates.</param>
-        /// <param name="handler">The function that is called for each observer and takes an observable sequence objects.
+        /// <param name="handler">The function that is called for each observer and takes an observable sequence of objects.
         /// It should return an observable of arbitrary items that should signal that arbitrary item in
         /// response to receiving the completion signal from the source observable. If this observable signals
         /// a terminal event, the sequence is terminated with that signal instead.</param>
@@ -13391,6 +13605,49 @@ namespace System.Reactive.Linq
             );
         }
 
+        /// <summary>
+        /// Invokes the asynchronous action, surfacing the result through an observable sequence.
+        /// </summary>
+        /// <param name="provider">Query provider used to construct the <see cref="IQbservable{T}"/> data source.</param>
+        /// <param name="actionAsync">Asynchronous action to run.</param>
+        /// <param name="options">Controls how the tasks's progress is observed.</param>
+        /// <returns>An observable sequence exposing a Unit value upon completion of the action, or an exception.</returns>
+        /// <exception cref="ArgumentNullException">
+        /// <paramref name="actionAsync" /> is null.</exception>
+        /// <remarks>
+        /// <list type="bullet">
+        /// <item>
+        /// <description>The action is started immediately, not during the subscription of the resulting sequence.</description>
+        /// </item>
+        /// <item>
+        /// <description>Multiple subscriptions to the resulting sequence can observe the action's outcome.</description>
+        /// </item>
+        /// </list>
+        /// </remarks>
+        public static IQbservable<Unit> StartAsync(this IQbservableProvider provider, Expression<Func<Task>> actionAsync, TaskObservationOptions options)
+        {
+            if (provider == null)
+                throw new ArgumentNullException(nameof(provider));
+            if (actionAsync == null)
+                throw new ArgumentNullException(nameof(actionAsync));
+            if (options == null)
+                throw new ArgumentNullException(nameof(options));
+
+            return provider.CreateQuery<Unit>(
+                Expression.Call(
+                    null,
+#if CRIPPLED_REFLECTION
+                    InfoOf(() => Qbservable.StartAsync(default(IQbservableProvider), default(Expression<Func<Task>>), default(TaskObservationOptions))),
+#else
+                    (MethodInfo)MethodInfo.GetCurrentMethod()!,
+#endif
+                    Expression.Constant(provider, typeof(IQbservableProvider)),
+                    actionAsync,
+                    Expression.Constant(options, typeof(TaskObservationOptions))
+                )
+            );
+        }
+
         /// <summary>
         /// Invokes the asynchronous action, surfacing the result through an observable sequence.
         /// </summary>
@@ -13483,6 +13740,59 @@ namespace System.Reactive.Linq
             );
         }
 
+        /// <summary>
+        /// Invokes the asynchronous action, surfacing the result through an observable sequence.
+        /// The CancellationToken is shared by all subscriptions on the resulting observable sequence. See the remarks section for more information.
+        /// </summary>
+        /// <param name="provider">Query provider used to construct the <see cref="IQbservable{T}"/> data source.</param>
+        /// <param name="actionAsync">Asynchronous action to run.</param>
+        /// <param name="options">Controls how the tasks's progress is observed.</param>
+        /// <returns>An observable sequence exposing a Unit value upon completion of the action, or an exception.</returns>
+        /// <exception cref="ArgumentNullException">
+        /// <paramref name="actionAsync" /> is null.</exception>
+        /// <remarks>
+        /// <list type="bullet">
+        /// <item>
+        /// <description>The action is started immediately, not during the subscription of the resulting sequence.</description>
+        /// </item>
+        /// <item>
+        /// <description>Multiple subscriptions to the resulting sequence can observe the action's outcome.</description>
+        /// </item>
+        /// <item>
+        /// <description>
+        /// If any subscription to the resulting sequence is disposed, the CancellationToken is set. The observer associated to the disposed
+        /// subscription won't see the TaskCanceledException, but other observers will. You can protect against this using the Catch operator.
+        /// Be careful when handing out the resulting sequence because of this behavior. The most common use is to have a single subscription
+        /// to the resulting sequence, which controls the CancellationToken state. Alternatively, you can control subscription behavior using
+        /// multicast operators.
+        /// </description>
+        /// </item>
+        /// </list>
+        /// </remarks>
+        public static IQbservable<Unit> StartAsync(this IQbservableProvider provider, Expression<Func<CancellationToken, Task>> actionAsync, TaskObservationOptions options)
+        {
+            if (provider == null)
+                throw new ArgumentNullException(nameof(provider));
+            if (actionAsync == null)
+                throw new ArgumentNullException(nameof(actionAsync));
+            if (options == null)
+                throw new ArgumentNullException(nameof(options));
+
+            return provider.CreateQuery<Unit>(
+                Expression.Call(
+                    null,
+#if CRIPPLED_REFLECTION
+                    InfoOf(() => Qbservable.StartAsync(default(IQbservableProvider), default(Expression<Func<CancellationToken, Task>>), default(TaskObservationOptions))),
+#else
+                    (MethodInfo)MethodInfo.GetCurrentMethod()!,
+#endif
+                    Expression.Constant(provider, typeof(IQbservableProvider)),
+                    actionAsync,
+                    Expression.Constant(options, typeof(TaskObservationOptions))
+                )
+            );
+        }
+
         /// <summary>
         /// Invokes the asynchronous action, surfacing the result through an observable sequence.
         /// The CancellationToken is shared by all subscriptions on the resulting observable sequence. See the remarks section for more information.
@@ -13628,14 +13938,68 @@ namespace System.Reactive.Linq
 
         /// <summary>
         /// Invokes the asynchronous function, surfacing the result through an observable sequence.
+        /// The CancellationToken is shared by all subscriptions on the resulting observable sequence. See the remarks section for more information.
         /// </summary>
         /// <param name="provider">Query provider used to construct the <see cref="IQbservable{T}"/> data source.</param>
         /// <typeparam name="TResult">The type of the result returned by the asynchronous function.</typeparam>
         /// <param name="functionAsync">Asynchronous function to run.</param>
-        /// <param name="scheduler">Scheduler on which to notify observers.</param>
+        /// <param name="options">Controls how the tasks's progress is observed.</param>
         /// <returns>An observable sequence exposing the function's result value, or an exception.</returns>
         /// <exception cref="ArgumentNullException">
-        /// <paramref name="functionAsync" /> is null or <paramref name="scheduler" /> is null.</exception>
+        /// <paramref name="functionAsync" /> is null.</exception>
+        /// <remarks>
+        /// <list type="bullet">
+        /// <item>
+        /// <description>The function is started immediately, not during the subscription of the resulting sequence.</description>
+        /// </item>
+        /// <item>
+        /// <description>Multiple subscriptions to the resulting sequence can observe the function's result.</description>
+        /// </item>
+        /// <item>
+        /// <description>
+        /// If any subscription to the resulting sequence is disposed, the CancellationToken is set. The observer associated to the disposed
+        /// subscription won't see the TaskCanceledException, but other observers will. You can protect against this using the Catch operator.
+        /// Be careful when handing out the resulting sequence because of this behavior. The most common use is to have a single subscription
+        /// to the resulting sequence, which controls the CancellationToken state. Alternatively, you can control subscription behavior using
+        /// multicast operators.
+        /// </description>
+        /// </item>
+        /// </list>
+        /// </remarks>
+        public static IQbservable<TResult> StartAsync<TResult>(this IQbservableProvider provider, Expression<Func<CancellationToken, Task<TResult>>> functionAsync, TaskObservationOptions options)
+        {
+            if (provider == null)
+                throw new ArgumentNullException(nameof(provider));
+            if (functionAsync == null)
+                throw new ArgumentNullException(nameof(functionAsync));
+            if (options == null)
+                throw new ArgumentNullException(nameof(options));
+
+            return provider.CreateQuery<TResult>(
+                Expression.Call(
+                    null,
+#if CRIPPLED_REFLECTION
+                    InfoOf(() => Qbservable.StartAsync<TResult>(default(IQbservableProvider), default(Expression<Func<CancellationToken, Task<TResult>>>), default(TaskObservationOptions))),
+#else
+                    ((MethodInfo)MethodInfo.GetCurrentMethod()!).MakeGenericMethod(typeof(TResult)),
+#endif
+                    Expression.Constant(provider, typeof(IQbservableProvider)),
+                    functionAsync,
+                    Expression.Constant(options, typeof(TaskObservationOptions))
+                )
+            );
+        }
+
+        /// <summary>
+        /// Invokes the asynchronous function, surfacing the result through an observable sequence.
+        /// </summary>
+        /// <param name="provider">Query provider used to construct the <see cref="IQbservable{T}"/> data source.</param>
+        /// <typeparam name="TResult">The type of the result returned by the asynchronous function.</typeparam>
+        /// <param name="functionAsync">Asynchronous function to run.</param>
+        /// <param name="options">Controls how the tasks's progress is observed.</param>
+        /// <returns>An observable sequence exposing the function's result value, or an exception.</returns>
+        /// <exception cref="ArgumentNullException">
+        /// <paramref name="functionAsync" /> is null.</exception>
         /// <remarks>
         /// <list type="bullet">
         /// <item>
@@ -13646,26 +14010,26 @@ namespace System.Reactive.Linq
         /// </item>
         /// </list>
         /// </remarks>
-        public static IQbservable<TResult> StartAsync<TResult>(this IQbservableProvider provider, Expression<Func<Task<TResult>>> functionAsync, IScheduler scheduler)
+        public static IQbservable<TResult> StartAsync<TResult>(this IQbservableProvider provider, Expression<Func<Task<TResult>>> functionAsync, TaskObservationOptions options)
         {
             if (provider == null)
                 throw new ArgumentNullException(nameof(provider));
             if (functionAsync == null)
                 throw new ArgumentNullException(nameof(functionAsync));
-            if (scheduler == null)
-                throw new ArgumentNullException(nameof(scheduler));
+            if (options == null)
+                throw new ArgumentNullException(nameof(options));
 
             return provider.CreateQuery<TResult>(
                 Expression.Call(
                     null,
 #if CRIPPLED_REFLECTION
-                    InfoOf(() => Qbservable.StartAsync<TResult>(default(IQbservableProvider), default(Expression<Func<Task<TResult>>>), default(IScheduler))),
+                    InfoOf(() => Qbservable.StartAsync<TResult>(default(IQbservableProvider), default(Expression<Func<Task<TResult>>>), default(TaskObservationOptions))),
 #else
                     ((MethodInfo)MethodInfo.GetCurrentMethod()!).MakeGenericMethod(typeof(TResult)),
 #endif
                     Expression.Constant(provider, typeof(IQbservableProvider)),
                     functionAsync,
-                    Expression.Constant(scheduler, typeof(IScheduler))
+                    Expression.Constant(options, typeof(TaskObservationOptions))
                 )
             );
         }
@@ -13724,6 +14088,50 @@ namespace System.Reactive.Linq
             );
         }
 
+        /// <summary>
+        /// Invokes the asynchronous function, surfacing the result through an observable sequence.
+        /// </summary>
+        /// <param name="provider">Query provider used to construct the <see cref="IQbservable{T}"/> data source.</param>
+        /// <typeparam name="TResult">The type of the result returned by the asynchronous function.</typeparam>
+        /// <param name="functionAsync">Asynchronous function to run.</param>
+        /// <param name="scheduler">Scheduler on which to notify observers.</param>
+        /// <returns>An observable sequence exposing the function's result value, or an exception.</returns>
+        /// <exception cref="ArgumentNullException">
+        /// <paramref name="functionAsync" /> is null or <paramref name="scheduler" /> is null.</exception>
+        /// <remarks>
+        /// <list type="bullet">
+        /// <item>
+        /// <description>The function is started immediately, not during the subscription of the resulting sequence.</description>
+        /// </item>
+        /// <item>
+        /// <description>Multiple subscriptions to the resulting sequence can observe the function's result.</description>
+        /// </item>
+        /// </list>
+        /// </remarks>
+        public static IQbservable<TResult> StartAsync<TResult>(this IQbservableProvider provider, Expression<Func<Task<TResult>>> functionAsync, IScheduler scheduler)
+        {
+            if (provider == null)
+                throw new ArgumentNullException(nameof(provider));
+            if (functionAsync == null)
+                throw new ArgumentNullException(nameof(functionAsync));
+            if (scheduler == null)
+                throw new ArgumentNullException(nameof(scheduler));
+
+            return provider.CreateQuery<TResult>(
+                Expression.Call(
+                    null,
+#if CRIPPLED_REFLECTION
+                    InfoOf(() => Qbservable.StartAsync<TResult>(default(IQbservableProvider), default(Expression<Func<Task<TResult>>>), default(IScheduler))),
+#else
+                    ((MethodInfo)MethodInfo.GetCurrentMethod()!).MakeGenericMethod(typeof(TResult)),
+#endif
+                    Expression.Constant(provider, typeof(IQbservableProvider)),
+                    functionAsync,
+                    Expression.Constant(scheduler, typeof(IScheduler))
+                )
+            );
+        }
+
         /// <summary>
         /// Prepends a sequence of values to an observable sequence.
         /// </summary>

+ 28 - 56
Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Async.cs

@@ -660,15 +660,15 @@ namespace System.Reactive.Linq
 
         public virtual IObservable<TSource> StartAsync<TSource>(Func<Task<TSource>> functionAsync)
         {
-            return StartAsyncImpl(functionAsync, null);
+            return StartAsyncImpl(functionAsync, new TaskObservationOptions.Value(null, ignoreExceptionsAfterUnsubscribe: false));
         }
 
-        public virtual IObservable<TSource> StartAsync<TSource>(Func<Task<TSource>> functionAsync, IScheduler scheduler)
+        public virtual IObservable<TSource> StartAsync<TSource>(Func<Task<TSource>> functionAsync, in TaskObservationOptions.Value options)
         {
-            return StartAsyncImpl(functionAsync, scheduler);
+            return StartAsyncImpl(functionAsync, options);
         }
 
-        private IObservable<TSource> StartAsyncImpl<TSource>(Func<Task<TSource>> functionAsync, IScheduler? scheduler)
+        private IObservable<TSource> StartAsyncImpl<TSource>(Func<Task<TSource>> functionAsync, in TaskObservationOptions.Value options)
         {
             Task<TSource> task;
             try
@@ -680,25 +680,20 @@ namespace System.Reactive.Linq
                 return Throw<TSource>(exception);
             }
 
-            if (scheduler != null)
-            {
-                return task.ToObservable(scheduler);
-            }
-            
-            return task.ToObservable();
+            return task.ToObservable(options);
         }
 
         public virtual IObservable<TSource> StartAsync<TSource>(Func<CancellationToken, Task<TSource>> functionAsync)
         {
-            return StartAsyncImpl(functionAsync, null);
+            return StartAsyncImpl(functionAsync, new TaskObservationOptions.Value(null, false));
         }
 
-        public virtual IObservable<TSource> StartAsync<TSource>(Func<CancellationToken, Task<TSource>> functionAsync, IScheduler scheduler)
+        public virtual IObservable<TSource> StartAsync<TSource>(Func<CancellationToken, Task<TSource>> functionAsync, in TaskObservationOptions.Value options)
         {
-            return StartAsyncImpl(functionAsync, scheduler);
+            return StartAsyncImpl(functionAsync, options);
         }
 
-        private IObservable<TSource> StartAsyncImpl<TSource>(Func<CancellationToken, Task<TSource>> functionAsync, IScheduler? scheduler)
+        private IObservable<TSource> StartAsyncImpl<TSource>(Func<CancellationToken, Task<TSource>> functionAsync, in TaskObservationOptions.Value options)
         {
             var cancellable = new CancellationDisposable();
 
@@ -712,16 +707,7 @@ namespace System.Reactive.Linq
                 return Throw<TSource>(exception);
             }
 
-            IObservable<TSource> result;
-
-            if (scheduler != null)
-            {
-                result = task.ToObservable(scheduler);
-            }
-            else
-            {
-                result = task.ToObservable();
-            }
+            var result = task.ToObservable(options);
 
             return new StartAsyncObservable<TSource>(cancellable, result);
         }
@@ -763,15 +749,15 @@ namespace System.Reactive.Linq
 
         public virtual IObservable<Unit> StartAsync(Func<Task> actionAsync)
         {
-            return StartAsyncImpl(actionAsync, null);
+            return StartAsyncImpl(actionAsync, new TaskObservationOptions.Value(null, ignoreExceptionsAfterUnsubscribe: false));
         }
 
-        public virtual IObservable<Unit> StartAsync(Func<Task> actionAsync, IScheduler scheduler)
+        public virtual IObservable<Unit> StartAsync(Func<Task> actionAsync, in TaskObservationOptions.Value options)
         {
-            return StartAsyncImpl(actionAsync, scheduler);
+            return StartAsyncImpl(actionAsync, options);
         }
 
-        private IObservable<Unit> StartAsyncImpl(Func<Task> actionAsync, IScheduler? scheduler)
+        private IObservable<Unit> StartAsyncImpl(Func<Task> actionAsync, in TaskObservationOptions.Value options)
         {
             Task task;
             try
@@ -783,25 +769,20 @@ namespace System.Reactive.Linq
                 return Throw<Unit>(exception);
             }
 
-            if (scheduler != null)
-            {
-                return task.ToObservable(scheduler);
-            }
-            
-            return task.ToObservable();
+            return task.ToObservable(options);
         }
 
         public virtual IObservable<Unit> StartAsync(Func<CancellationToken, Task> actionAsync)
         {
-            return StartAsyncImpl(actionAsync, null);
+            return StartAsyncImpl(actionAsync, new TaskObservationOptions.Value(null, ignoreExceptionsAfterUnsubscribe: false));
         }
 
-        public virtual IObservable<Unit> StartAsync(Func<CancellationToken, Task> actionAsync, IScheduler scheduler)
+        public virtual IObservable<Unit> StartAsync(Func<CancellationToken, Task> actionAsync, in TaskObservationOptions.Value options)
         {
-            return StartAsyncImpl(actionAsync, scheduler);
+            return StartAsyncImpl(actionAsync, options);
         }
 
-        private IObservable<Unit> StartAsyncImpl(Func<CancellationToken, Task> actionAsync, IScheduler? scheduler)
+        private IObservable<Unit> StartAsyncImpl(Func<CancellationToken, Task> actionAsync, in TaskObservationOptions.Value options)
         {
             var cancellable = new CancellationDisposable();
 
@@ -815,16 +796,7 @@ namespace System.Reactive.Linq
                 return Throw<Unit>(exception);
             }
 
-            IObservable<Unit> result;
-
-            if (scheduler != null)
-            {
-                result = task.ToObservable(scheduler);
-            }
-            else
-            {
-                result = task.ToObservable();
-            }
+            var result = task.ToObservable(options);
 
             return new StartAsyncObservable<Unit>(cancellable, result);
         }
@@ -847,14 +819,14 @@ namespace System.Reactive.Linq
             return Defer(() => StartAsync(functionAsync));
         }
 
-        public virtual IObservable<TResult> FromAsync<TResult>(Func<Task<TResult>> functionAsync, IScheduler scheduler)
+        public virtual IObservable<TResult> FromAsync<TResult>(Func<Task<TResult>> functionAsync, TaskObservationOptions.Value options)
         {
-            return Defer(() => StartAsync(functionAsync, scheduler));
+            return Defer(() => StartAsync(functionAsync, options));
         }
 
-        public virtual IObservable<TResult> FromAsync<TResult>(Func<CancellationToken, Task<TResult>> functionAsync, IScheduler scheduler)
+        public virtual IObservable<TResult> FromAsync<TResult>(Func<CancellationToken, Task<TResult>> functionAsync, TaskObservationOptions.Value options)
         {
-            return Defer(() => StartAsync(functionAsync, scheduler));
+            return Defer(() => StartAsync(functionAsync, options));
         }
 
         #endregion
@@ -871,14 +843,14 @@ namespace System.Reactive.Linq
             return Defer(() => StartAsync(actionAsync));
         }
 
-        public virtual IObservable<Unit> FromAsync(Func<Task> actionAsync, IScheduler scheduler)
+        public virtual IObservable<Unit> FromAsync(Func<Task> actionAsync, TaskObservationOptions.Value options)
         {
-            return Defer(() => StartAsync(actionAsync, scheduler));
+            return Defer(() => StartAsync(actionAsync, options));
         }
 
-        public virtual IObservable<Unit> FromAsync(Func<CancellationToken, Task> actionAsync, IScheduler scheduler)
+        public virtual IObservable<Unit> FromAsync(Func<CancellationToken, Task> actionAsync, TaskObservationOptions.Value options)
         {
-            return Defer(() => StartAsync(actionAsync, scheduler));
+            return Defer(() => StartAsync(actionAsync, options));
         }
 
         #endregion

+ 4 - 4
Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Creation.cs

@@ -299,14 +299,14 @@ namespace System.Reactive.Linq
 
         #region + DeferAsync +
 
-        public virtual IObservable<TValue> Defer<TValue>(Func<Task<IObservable<TValue>>> observableFactoryAsync)
+        public virtual IObservable<TValue> Defer<TValue>(Func<Task<IObservable<TValue>>> observableFactoryAsync, bool ignoreExceptionsAfterUnsubscribe)
         {
-            return Defer(() => StartAsync(observableFactoryAsync).Merge());
+            return Defer(() => StartAsync(observableFactoryAsync, new TaskObservationOptions.Value(null, ignoreExceptionsAfterUnsubscribe)).Merge());
         }
 
-        public virtual IObservable<TValue> Defer<TValue>(Func<CancellationToken, Task<IObservable<TValue>>> observableFactoryAsync)
+        public virtual IObservable<TValue> Defer<TValue>(Func<CancellationToken, Task<IObservable<TValue>>> observableFactoryAsync, bool ignoreExceptionsAfterUnsubscribe)
         {
-            return Defer(() => StartAsync(observableFactoryAsync).Merge());
+            return Defer(() => StartAsync(observableFactoryAsync, new TaskObservationOptions.Value(null, ignoreExceptionsAfterUnsubscribe)).Merge());
         }
 
         #endregion

+ 87 - 14
Rx.NET/Source/src/System.Reactive/Threading/Tasks/TaskObservableExtensions.cs

@@ -20,11 +20,13 @@ namespace System.Reactive.Threading.Tasks
         {
             private readonly Task _task;
             private readonly IScheduler? _scheduler;
+            private readonly bool _ignoreExceptionsAfterUnsubscribe;
 
-            public SlowTaskObservable(Task task, IScheduler? scheduler)
+            public SlowTaskObservable(Task task, IScheduler? scheduler, bool ignoreExceptionsAfterUnsubscribe)
             {
                 _task = task;
                 _scheduler = scheduler;
+                _ignoreExceptionsAfterUnsubscribe = ignoreExceptionsAfterUnsubscribe;
             }
 
             public IDisposable Subscribe(IObserver<Unit> observer)
@@ -52,6 +54,11 @@ namespace System.Reactive.Threading.Tasks
                         options);
                 }
 
+                if (_ignoreExceptionsAfterUnsubscribe)
+                {
+                    _task.ContinueWith(t => _ = t.Exception, TaskContinuationOptions.OnlyOnFaulted);
+                }
+
                 return cts;
             }
         }
@@ -60,11 +67,13 @@ namespace System.Reactive.Threading.Tasks
         {
             private readonly Task<TResult> _task;
             private readonly IScheduler? _scheduler;
+            private readonly bool _ignoreExceptionsAfterUnsubscribe;
 
-            public SlowTaskObservable(Task<TResult> task, IScheduler? scheduler)
+            public SlowTaskObservable(Task<TResult> task, IScheduler? scheduler, bool ignoreExceptionsAfterUnsubscribe)
             {
                 _task = task;
                 _scheduler = scheduler;
+                _ignoreExceptionsAfterUnsubscribe = ignoreExceptionsAfterUnsubscribe;
             }
 
             public IDisposable Subscribe(IObserver<TResult> observer)
@@ -92,9 +101,15 @@ namespace System.Reactive.Threading.Tasks
                         options);
                 }
 
+                if (_ignoreExceptionsAfterUnsubscribe)
+                {
+                    _task.ContinueWith(t => _ = t.Exception, TaskContinuationOptions.OnlyOnFaulted);
+                }
+
                 return cts;
             }
         }
+
         /// <summary>
         /// Returns an observable sequence that signals when the task completes.
         /// </summary>
@@ -109,7 +124,7 @@ namespace System.Reactive.Threading.Tasks
                 throw new ArgumentNullException(nameof(task));
             }
 
-            return ToObservableImpl(task, scheduler: null);
+            return ToObservableImpl(task, scheduler: null, ignoreExceptionsAfterUnsubscribe: false);
         }
 
         /// <summary>
@@ -121,21 +136,50 @@ namespace System.Reactive.Threading.Tasks
         /// <exception cref="ArgumentNullException"><paramref name="task"/> is <c>null</c> or <paramref name="scheduler"/> is <c>null</c>.</exception>
         /// <remarks>If the specified task object supports cancellation, consider using <see cref="Observable.FromAsync(Func{CancellationToken, Task})"/> instead.</remarks>
         public static IObservable<Unit> ToObservable(this Task task, IScheduler scheduler)
+        {
+            if (scheduler == null)
+            {
+                throw new ArgumentNullException(nameof(scheduler));
+            }
+
+            return ToObservable(task, new TaskObservationOptions(scheduler, ignoreExceptionsAfterUnsubscribe: false));
+        }
+
+        /// <summary>
+        /// Returns an observable sequence that signals when the task completes.
+        /// </summary>
+        /// <param name="task">Task to convert to an observable sequence.</param>
+        /// <param name="options">Controls how the tasks's progress is observed.</param>
+        /// <returns>An observable sequence that produces a unit value when the task completes, or propagates the exception produced by the task.</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="task"/> is <c>null</c>.</exception>
+        /// <remarks>If the specified task object supports cancellation, consider using <see cref="Observable.FromAsync(Func{CancellationToken, Task})"/> instead.</remarks>
+        public static IObservable<Unit> ToObservable(this Task task, TaskObservationOptions options)
         {
             if (task == null)
             {
                 throw new ArgumentNullException(nameof(task));
             }
 
-            if (scheduler == null)
+            if (options == null)
             {
-                throw new ArgumentNullException(nameof(scheduler));
+                throw new ArgumentNullException(nameof(options));
             }
 
-            return ToObservableImpl(task, scheduler);
+            return ToObservableImpl(task, options.Scheduler, options.IgnoreExceptionsAfterUnsubscribe);
         }
 
-        private static IObservable<Unit> ToObservableImpl(Task task, IScheduler? scheduler)
+        internal static IObservable<Unit> ToObservable(this Task task, TaskObservationOptions.Value options)
+        {
+            if (task == null)
+            {
+                throw new ArgumentNullException(nameof(task));
+            }
+
+            return ToObservableImpl(task, options.Scheduler, options.IgnoreExceptionsAfterUnsubscribe);
+        }
+
+
+        private static IObservable<Unit> ToObservableImpl(Task task, IScheduler? scheduler, bool ignoreExceptionsAfterUnsubscribe)
         {
             if (task.IsCompleted)
             {
@@ -149,7 +193,7 @@ namespace System.Reactive.Threading.Tasks
                 };
             }
 
-            return new SlowTaskObservable(task, scheduler);
+            return new SlowTaskObservable(task, scheduler, ignoreExceptionsAfterUnsubscribe);
         }
 
         private static void EmitTaskResult(this Task task, IObserver<Unit> subject)
@@ -204,7 +248,7 @@ namespace System.Reactive.Threading.Tasks
                 throw new ArgumentNullException(nameof(task));
             }
 
-            return ToObservableImpl(task, scheduler: null);
+            return ToObservableImpl(task, scheduler: null, ignoreExceptionsAfterUnsubscribe: false);
         }
 
         /// <summary>
@@ -217,21 +261,50 @@ namespace System.Reactive.Threading.Tasks
         /// <exception cref="ArgumentNullException"><paramref name="task"/> is <c>null</c> or <paramref name="scheduler"/> is <c>null</c>.</exception>
         /// <remarks>If the specified task object supports cancellation, consider using <see cref="Observable.FromAsync{TResult}(Func{CancellationToken, Task{TResult}})"/> instead.</remarks>
         public static IObservable<TResult> ToObservable<TResult>(this Task<TResult> task, IScheduler scheduler)
+        {
+            if (scheduler == null)
+            {
+                throw new ArgumentNullException(nameof(scheduler));
+            }
+
+            return ToObservable(task, new TaskObservationOptions(scheduler, ignoreExceptionsAfterUnsubscribe: false));
+        }
+
+        /// <summary>
+        /// Returns an observable sequence that propagates the result of the task.
+        /// </summary>
+        /// <typeparam name="TResult">The type of the result produced by the task.</typeparam>
+        /// <param name="task">Task to convert to an observable sequence.</param>
+        /// <param name="options">Controls how the tasks's progress is observed.</param>
+        /// <returns>An observable sequence that produces the task's result, or propagates the exception produced by the task.</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="task"/> is <c>null</c>.</exception>
+        /// <remarks>If the specified task object supports cancellation, consider using <see cref="Observable.FromAsync{TResult}(Func{CancellationToken, Task{TResult}})"/> instead.</remarks>
+        public static IObservable<TResult> ToObservable<TResult>(this Task<TResult> task, TaskObservationOptions options)
         {
             if (task == null)
             {
                 throw new ArgumentNullException(nameof(task));
             }
 
-            if (scheduler == null)
+            if (options == null)
             {
-                throw new ArgumentNullException(nameof(scheduler));
+                throw new ArgumentNullException(nameof(options));
+            }
+
+            return ToObservableImpl(task, options.Scheduler, options.IgnoreExceptionsAfterUnsubscribe);
+        }
+
+        internal static IObservable<TResult> ToObservable<TResult>(this Task<TResult> task, TaskObservationOptions.Value options)
+        {
+            if (task == null)
+            {
+                throw new ArgumentNullException(nameof(task));
             }
 
-            return ToObservableImpl(task, scheduler);
+            return ToObservableImpl(task, options.Scheduler, options.IgnoreExceptionsAfterUnsubscribe);
         }
 
-        private static IObservable<TResult> ToObservableImpl<TResult>(Task<TResult> task, IScheduler? scheduler)
+        private static IObservable<TResult> ToObservableImpl<TResult>(Task<TResult> task, IScheduler? scheduler, bool ignoreExceptionsAfterUnsubscribe)
         {
             if (task.IsCompleted)
             {
@@ -245,7 +318,7 @@ namespace System.Reactive.Threading.Tasks
                 };
             }
 
-            return new SlowTaskObservable<TResult>(task, scheduler);
+            return new SlowTaskObservable<TResult>(task, scheduler, ignoreExceptionsAfterUnsubscribe);
         }
 
         private static void EmitTaskResult<TResult>(this Task<TResult> task, IObserver<TResult> subject)

+ 28 - 0
Rx.NET/Source/tests/Tests.System.Reactive.ApiApprovals/Api/ApiApprovalTests.Core.verified.cs

@@ -487,6 +487,12 @@ namespace System.Reactive.Concurrency
         public override System.IDisposable Schedule<TState>(TState state, System.Func<System.Reactive.Concurrency.IScheduler, TState, System.IDisposable> action) { }
         public override System.IDisposable Schedule<TState>(TState state, System.TimeSpan dueTime, System.Func<System.Reactive.Concurrency.IScheduler, TState, System.IDisposable> action) { }
     }
+    public sealed class TaskObservationOptions
+    {
+        public TaskObservationOptions(System.Reactive.Concurrency.IScheduler? scheduler, bool ignoreExceptionsAfterUnsubscribe) { }
+        public bool IgnoreExceptionsAfterUnsubscribe { get; }
+        public System.Reactive.Concurrency.IScheduler? Scheduler { get; }
+    }
     public sealed class TaskPoolScheduler : System.Reactive.Concurrency.LocalScheduler, System.Reactive.Concurrency.ISchedulerLongRunning, System.Reactive.Concurrency.ISchedulerPeriodic
     {
         public TaskPoolScheduler(System.Threading.Tasks.TaskFactory taskFactory) { }
@@ -990,7 +996,9 @@ namespace System.Reactive.Linq
         public static System.IObservable<TSource> DefaultIfEmpty<TSource>(this System.IObservable<TSource> source, TSource defaultValue) { }
         public static System.IObservable<TResult> Defer<TResult>(System.Func<System.IObservable<TResult>> observableFactory) { }
         public static System.IObservable<TResult> Defer<TResult>(System.Func<System.Threading.Tasks.Task<System.IObservable<TResult>>> observableFactoryAsync) { }
+        public static System.IObservable<TResult> Defer<TResult>(System.Func<System.Threading.Tasks.Task<System.IObservable<TResult>>> observableFactoryAsync, bool ignoreExceptionsAfterUnsubscribe) { }
         public static System.IObservable<TResult> DeferAsync<TResult>(System.Func<System.Threading.CancellationToken, System.Threading.Tasks.Task<System.IObservable<TResult>>> observableFactoryAsync) { }
+        public static System.IObservable<TResult> DeferAsync<TResult>(System.Func<System.Threading.CancellationToken, System.Threading.Tasks.Task<System.IObservable<TResult>>> observableFactoryAsync, bool ignoreExceptionsAfterUnsubscribe) { }
         public static System.IObservable<TSource> Delay<TSource>(this System.IObservable<TSource> source, System.DateTimeOffset dueTime) { }
         public static System.IObservable<TSource> Delay<TSource>(this System.IObservable<TSource> source, System.TimeSpan dueTime) { }
         public static System.IObservable<TSource> Delay<TSource>(this System.IObservable<TSource> source, System.DateTimeOffset dueTime, System.Reactive.Concurrency.IScheduler scheduler) { }
@@ -1049,11 +1057,15 @@ namespace System.Reactive.Linq
         public static System.IObservable<System.Reactive.Unit> FromAsync(System.Func<System.Threading.Tasks.Task> actionAsync) { }
         public static System.IObservable<System.Reactive.Unit> FromAsync(System.Func<System.Threading.CancellationToken, System.Threading.Tasks.Task> actionAsync) { }
         public static System.IObservable<System.Reactive.Unit> FromAsync(System.Func<System.Threading.Tasks.Task> actionAsync, System.Reactive.Concurrency.IScheduler scheduler) { }
+        public static System.IObservable<System.Reactive.Unit> FromAsync(System.Func<System.Threading.Tasks.Task> actionAsync, System.Reactive.Concurrency.TaskObservationOptions options) { }
         public static System.IObservable<System.Reactive.Unit> FromAsync(System.Func<System.Threading.CancellationToken, System.Threading.Tasks.Task> actionAsync, System.Reactive.Concurrency.IScheduler scheduler) { }
+        public static System.IObservable<System.Reactive.Unit> FromAsync(System.Func<System.Threading.CancellationToken, System.Threading.Tasks.Task> actionAsync, System.Reactive.Concurrency.TaskObservationOptions options) { }
         public static System.IObservable<TResult> FromAsync<TResult>(System.Func<System.Threading.Tasks.Task<TResult>> functionAsync) { }
         public static System.IObservable<TResult> FromAsync<TResult>(System.Func<System.Threading.CancellationToken, System.Threading.Tasks.Task<TResult>> functionAsync) { }
         public static System.IObservable<TResult> FromAsync<TResult>(System.Func<System.Threading.Tasks.Task<TResult>> functionAsync, System.Reactive.Concurrency.IScheduler scheduler) { }
+        public static System.IObservable<TResult> FromAsync<TResult>(System.Func<System.Threading.Tasks.Task<TResult>> functionAsync, System.Reactive.Concurrency.TaskObservationOptions options) { }
         public static System.IObservable<TResult> FromAsync<TResult>(System.Func<System.Threading.CancellationToken, System.Threading.Tasks.Task<TResult>> functionAsync, System.Reactive.Concurrency.IScheduler scheduler) { }
+        public static System.IObservable<TResult> FromAsync<TResult>(System.Func<System.Threading.CancellationToken, System.Threading.Tasks.Task<TResult>> functionAsync, System.Reactive.Concurrency.TaskObservationOptions options) { }
         [System.Obsolete(@"This conversion is no longer supported. Replace use of the Begin/End asynchronous method pair with a new Task-based async method, and convert the result using ToObservable. If no Task-based async method is available, use Task.Factory.FromAsync to obtain a Task object.")]
         public static System.Func<System.IObservable<System.Reactive.Unit>> FromAsyncPattern(System.Func<System.AsyncCallback, object?, System.IAsyncResult> begin, System.Action<System.IAsyncResult> end) { }
         [System.Obsolete(@"This conversion is no longer supported. Replace use of the Begin/End asynchronous method pair with a new Task-based async method, and convert the result using ToObservable. If no Task-based async method is available, use Task.Factory.FromAsync to obtain a Task object.")]
@@ -1379,11 +1391,15 @@ namespace System.Reactive.Linq
         public static System.IObservable<System.Reactive.Unit> StartAsync(System.Func<System.Threading.Tasks.Task> actionAsync) { }
         public static System.IObservable<System.Reactive.Unit> StartAsync(System.Func<System.Threading.CancellationToken, System.Threading.Tasks.Task> actionAsync) { }
         public static System.IObservable<System.Reactive.Unit> StartAsync(System.Func<System.Threading.Tasks.Task> actionAsync, System.Reactive.Concurrency.IScheduler scheduler) { }
+        public static System.IObservable<System.Reactive.Unit> StartAsync(System.Func<System.Threading.Tasks.Task> actionAsync, System.Reactive.Concurrency.TaskObservationOptions options) { }
         public static System.IObservable<System.Reactive.Unit> StartAsync(System.Func<System.Threading.CancellationToken, System.Threading.Tasks.Task> actionAsync, System.Reactive.Concurrency.IScheduler scheduler) { }
+        public static System.IObservable<System.Reactive.Unit> StartAsync(System.Func<System.Threading.CancellationToken, System.Threading.Tasks.Task> actionAsync, System.Reactive.Concurrency.TaskObservationOptions options) { }
         public static System.IObservable<TResult> StartAsync<TResult>(System.Func<System.Threading.Tasks.Task<TResult>> functionAsync) { }
         public static System.IObservable<TResult> StartAsync<TResult>(System.Func<System.Threading.CancellationToken, System.Threading.Tasks.Task<TResult>> functionAsync) { }
         public static System.IObservable<TResult> StartAsync<TResult>(System.Func<System.Threading.Tasks.Task<TResult>> functionAsync, System.Reactive.Concurrency.IScheduler scheduler) { }
+        public static System.IObservable<TResult> StartAsync<TResult>(System.Func<System.Threading.Tasks.Task<TResult>> functionAsync, System.Reactive.Concurrency.TaskObservationOptions options) { }
         public static System.IObservable<TResult> StartAsync<TResult>(System.Func<System.Threading.CancellationToken, System.Threading.Tasks.Task<TResult>> functionAsync, System.Reactive.Concurrency.IScheduler scheduler) { }
+        public static System.IObservable<TResult> StartAsync<TResult>(System.Func<System.Threading.CancellationToken, System.Threading.Tasks.Task<TResult>> functionAsync, System.Reactive.Concurrency.TaskObservationOptions options) { }
         public static System.IObservable<TSource> StartWith<TSource>(this System.IObservable<TSource> source, System.Collections.Generic.IEnumerable<TSource> values) { }
         public static System.IObservable<TSource> StartWith<TSource>(this System.IObservable<TSource> source, params TSource[] values) { }
         public static System.IObservable<TSource> StartWith<TSource>(this System.IObservable<TSource> source, System.Reactive.Concurrency.IScheduler scheduler, System.Collections.Generic.IEnumerable<TSource> values) { }
@@ -1899,7 +1915,9 @@ namespace System.Reactive.Linq
         public static System.Reactive.Linq.IQbservable<TSource> DefaultIfEmpty<TSource>(this System.Reactive.Linq.IQbservable<TSource> source, TSource defaultValue) { }
         public static System.Reactive.Linq.IQbservable<TResult> Defer<TResult>(this System.Reactive.Linq.IQbservableProvider provider, System.Linq.Expressions.Expression<System.Func<System.IObservable<TResult>>> observableFactory) { }
         public static System.Reactive.Linq.IQbservable<TResult> Defer<TResult>(this System.Reactive.Linq.IQbservableProvider provider, System.Linq.Expressions.Expression<System.Func<System.Threading.Tasks.Task<System.IObservable<TResult>>>> observableFactoryAsync) { }
+        public static System.Reactive.Linq.IQbservable<TResult> Defer<TResult>(this System.Reactive.Linq.IQbservableProvider provider, System.Linq.Expressions.Expression<System.Func<System.Threading.Tasks.Task<System.IObservable<TResult>>>> observableFactoryAsync, bool ignoreExceptionsAfterUnsubscribe) { }
         public static System.Reactive.Linq.IQbservable<TResult> DeferAsync<TResult>(this System.Reactive.Linq.IQbservableProvider provider, System.Linq.Expressions.Expression<System.Func<System.Threading.CancellationToken, System.Threading.Tasks.Task<System.IObservable<TResult>>>> observableFactoryAsync) { }
+        public static System.Reactive.Linq.IQbservable<TResult> DeferAsync<TResult>(this System.Reactive.Linq.IQbservableProvider provider, System.Linq.Expressions.Expression<System.Func<System.Threading.CancellationToken, System.Threading.Tasks.Task<System.IObservable<TResult>>>> observableFactoryAsync, bool ignoreExceptionsAfterUnsubscribe) { }
         public static System.Reactive.Linq.IQbservable<TSource> Delay<TSource>(this System.Reactive.Linq.IQbservable<TSource> source, System.DateTimeOffset dueTime) { }
         public static System.Reactive.Linq.IQbservable<TSource> Delay<TSource>(this System.Reactive.Linq.IQbservable<TSource> source, System.TimeSpan dueTime) { }
         public static System.Reactive.Linq.IQbservable<TSource> Delay<TSource>(this System.Reactive.Linq.IQbservable<TSource> source, System.DateTimeOffset dueTime, System.Reactive.Concurrency.IScheduler scheduler) { }
@@ -1940,11 +1958,15 @@ namespace System.Reactive.Linq
         public static System.Reactive.Linq.IQbservable<System.Reactive.Unit> FromAsync(this System.Reactive.Linq.IQbservableProvider provider, System.Linq.Expressions.Expression<System.Func<System.Threading.Tasks.Task>> actionAsync) { }
         public static System.Reactive.Linq.IQbservable<System.Reactive.Unit> FromAsync(this System.Reactive.Linq.IQbservableProvider provider, System.Linq.Expressions.Expression<System.Func<System.Threading.CancellationToken, System.Threading.Tasks.Task>> actionAsync) { }
         public static System.Reactive.Linq.IQbservable<System.Reactive.Unit> FromAsync(this System.Reactive.Linq.IQbservableProvider provider, System.Linq.Expressions.Expression<System.Func<System.Threading.Tasks.Task>> actionAsync, System.Reactive.Concurrency.IScheduler scheduler) { }
+        public static System.Reactive.Linq.IQbservable<System.Reactive.Unit> FromAsync(this System.Reactive.Linq.IQbservableProvider provider, System.Linq.Expressions.Expression<System.Func<System.Threading.Tasks.Task>> actionAsync, System.Reactive.Concurrency.TaskObservationOptions options) { }
         public static System.Reactive.Linq.IQbservable<System.Reactive.Unit> FromAsync(this System.Reactive.Linq.IQbservableProvider provider, System.Linq.Expressions.Expression<System.Func<System.Threading.CancellationToken, System.Threading.Tasks.Task>> actionAsync, System.Reactive.Concurrency.IScheduler scheduler) { }
+        public static System.Reactive.Linq.IQbservable<System.Reactive.Unit> FromAsync(this System.Reactive.Linq.IQbservableProvider provider, System.Linq.Expressions.Expression<System.Func<System.Threading.CancellationToken, System.Threading.Tasks.Task>> actionAsync, System.Reactive.Concurrency.TaskObservationOptions options) { }
         public static System.Reactive.Linq.IQbservable<TResult> FromAsync<TResult>(this System.Reactive.Linq.IQbservableProvider provider, System.Linq.Expressions.Expression<System.Func<System.Threading.Tasks.Task<TResult>>> functionAsync) { }
         public static System.Reactive.Linq.IQbservable<TResult> FromAsync<TResult>(this System.Reactive.Linq.IQbservableProvider provider, System.Linq.Expressions.Expression<System.Func<System.Threading.CancellationToken, System.Threading.Tasks.Task<TResult>>> functionAsync) { }
         public static System.Reactive.Linq.IQbservable<TResult> FromAsync<TResult>(this System.Reactive.Linq.IQbservableProvider provider, System.Linq.Expressions.Expression<System.Func<System.Threading.Tasks.Task<TResult>>> functionAsync, System.Reactive.Concurrency.IScheduler scheduler) { }
+        public static System.Reactive.Linq.IQbservable<TResult> FromAsync<TResult>(this System.Reactive.Linq.IQbservableProvider provider, System.Linq.Expressions.Expression<System.Func<System.Threading.Tasks.Task<TResult>>> functionAsync, System.Reactive.Concurrency.TaskObservationOptions options) { }
         public static System.Reactive.Linq.IQbservable<TResult> FromAsync<TResult>(this System.Reactive.Linq.IQbservableProvider provider, System.Linq.Expressions.Expression<System.Func<System.Threading.CancellationToken, System.Threading.Tasks.Task<TResult>>> functionAsync, System.Reactive.Concurrency.IScheduler scheduler) { }
+        public static System.Reactive.Linq.IQbservable<TResult> FromAsync<TResult>(this System.Reactive.Linq.IQbservableProvider provider, System.Linq.Expressions.Expression<System.Func<System.Threading.CancellationToken, System.Threading.Tasks.Task<TResult>>> functionAsync, System.Reactive.Concurrency.TaskObservationOptions options) { }
         public static System.Func<System.Reactive.Linq.IQbservable<System.Reactive.Unit>> FromAsyncPattern(this System.Reactive.Linq.IQbservableProvider provider, System.Linq.Expressions.Expression<System.Func<System.AsyncCallback, object, System.IAsyncResult>> begin, System.Linq.Expressions.Expression<System.Action<System.IAsyncResult>> end) { }
         public static System.Func<System.Reactive.Linq.IQbservable<TResult>> FromAsyncPattern<TResult>(this System.Reactive.Linq.IQbservableProvider provider, System.Linq.Expressions.Expression<System.Func<System.AsyncCallback, object, System.IAsyncResult>> begin, System.Linq.Expressions.Expression<System.Func<System.IAsyncResult, TResult>> end) { }
         public static System.Func<TArg1, System.Reactive.Linq.IQbservable<System.Reactive.Unit>> FromAsyncPattern<TArg1>(this System.Reactive.Linq.IQbservableProvider provider, System.Linq.Expressions.Expression<System.Func<TArg1, System.AsyncCallback, object, System.IAsyncResult>> begin, System.Linq.Expressions.Expression<System.Action<System.IAsyncResult>> end) { }
@@ -2203,11 +2225,15 @@ namespace System.Reactive.Linq
         public static System.Reactive.Linq.IQbservable<System.Reactive.Unit> StartAsync(this System.Reactive.Linq.IQbservableProvider provider, System.Linq.Expressions.Expression<System.Func<System.Threading.Tasks.Task>> actionAsync) { }
         public static System.Reactive.Linq.IQbservable<System.Reactive.Unit> StartAsync(this System.Reactive.Linq.IQbservableProvider provider, System.Linq.Expressions.Expression<System.Func<System.Threading.CancellationToken, System.Threading.Tasks.Task>> actionAsync) { }
         public static System.Reactive.Linq.IQbservable<System.Reactive.Unit> StartAsync(this System.Reactive.Linq.IQbservableProvider provider, System.Linq.Expressions.Expression<System.Func<System.Threading.Tasks.Task>> actionAsync, System.Reactive.Concurrency.IScheduler scheduler) { }
+        public static System.Reactive.Linq.IQbservable<System.Reactive.Unit> StartAsync(this System.Reactive.Linq.IQbservableProvider provider, System.Linq.Expressions.Expression<System.Func<System.Threading.Tasks.Task>> actionAsync, System.Reactive.Concurrency.TaskObservationOptions options) { }
         public static System.Reactive.Linq.IQbservable<System.Reactive.Unit> StartAsync(this System.Reactive.Linq.IQbservableProvider provider, System.Linq.Expressions.Expression<System.Func<System.Threading.CancellationToken, System.Threading.Tasks.Task>> actionAsync, System.Reactive.Concurrency.IScheduler scheduler) { }
+        public static System.Reactive.Linq.IQbservable<System.Reactive.Unit> StartAsync(this System.Reactive.Linq.IQbservableProvider provider, System.Linq.Expressions.Expression<System.Func<System.Threading.CancellationToken, System.Threading.Tasks.Task>> actionAsync, System.Reactive.Concurrency.TaskObservationOptions options) { }
         public static System.Reactive.Linq.IQbservable<TResult> StartAsync<TResult>(this System.Reactive.Linq.IQbservableProvider provider, System.Linq.Expressions.Expression<System.Func<System.Threading.Tasks.Task<TResult>>> functionAsync) { }
         public static System.Reactive.Linq.IQbservable<TResult> StartAsync<TResult>(this System.Reactive.Linq.IQbservableProvider provider, System.Linq.Expressions.Expression<System.Func<System.Threading.CancellationToken, System.Threading.Tasks.Task<TResult>>> functionAsync) { }
         public static System.Reactive.Linq.IQbservable<TResult> StartAsync<TResult>(this System.Reactive.Linq.IQbservableProvider provider, System.Linq.Expressions.Expression<System.Func<System.Threading.Tasks.Task<TResult>>> functionAsync, System.Reactive.Concurrency.IScheduler scheduler) { }
+        public static System.Reactive.Linq.IQbservable<TResult> StartAsync<TResult>(this System.Reactive.Linq.IQbservableProvider provider, System.Linq.Expressions.Expression<System.Func<System.Threading.Tasks.Task<TResult>>> functionAsync, System.Reactive.Concurrency.TaskObservationOptions options) { }
         public static System.Reactive.Linq.IQbservable<TResult> StartAsync<TResult>(this System.Reactive.Linq.IQbservableProvider provider, System.Linq.Expressions.Expression<System.Func<System.Threading.CancellationToken, System.Threading.Tasks.Task<TResult>>> functionAsync, System.Reactive.Concurrency.IScheduler scheduler) { }
+        public static System.Reactive.Linq.IQbservable<TResult> StartAsync<TResult>(this System.Reactive.Linq.IQbservableProvider provider, System.Linq.Expressions.Expression<System.Func<System.Threading.CancellationToken, System.Threading.Tasks.Task<TResult>>> functionAsync, System.Reactive.Concurrency.TaskObservationOptions options) { }
         public static System.Reactive.Linq.IQbservable<TSource> StartWith<TSource>(this System.Reactive.Linq.IQbservable<TSource> source, System.Collections.Generic.IEnumerable<TSource> values) { }
         public static System.Reactive.Linq.IQbservable<TSource> StartWith<TSource>(this System.Reactive.Linq.IQbservable<TSource> source, params TSource[] values) { }
         public static System.Reactive.Linq.IQbservable<TSource> StartWith<TSource>(this System.Reactive.Linq.IQbservable<TSource> source, System.Reactive.Concurrency.IScheduler scheduler, System.Collections.Generic.IEnumerable<TSource> values) { }
@@ -3121,8 +3147,10 @@ namespace System.Reactive.Threading.Tasks
     {
         public static System.IObservable<System.Reactive.Unit> ToObservable(this System.Threading.Tasks.Task task) { }
         public static System.IObservable<System.Reactive.Unit> ToObservable(this System.Threading.Tasks.Task task, System.Reactive.Concurrency.IScheduler scheduler) { }
+        public static System.IObservable<System.Reactive.Unit> ToObservable(this System.Threading.Tasks.Task task, System.Reactive.Concurrency.TaskObservationOptions options) { }
         public static System.IObservable<TResult> ToObservable<TResult>(this System.Threading.Tasks.Task<TResult> task) { }
         public static System.IObservable<TResult> ToObservable<TResult>(this System.Threading.Tasks.Task<TResult> task, System.Reactive.Concurrency.IScheduler scheduler) { }
+        public static System.IObservable<TResult> ToObservable<TResult>(this System.Threading.Tasks.Task<TResult> task, System.Reactive.Concurrency.TaskObservationOptions options) { }
         public static System.Threading.Tasks.Task<TResult> ToTask<TResult>(this System.IObservable<TResult> observable) { }
         public static System.Threading.Tasks.Task<TResult> ToTask<TResult>(this System.IObservable<TResult> observable, System.Reactive.Concurrency.IScheduler scheduler) { }
         public static System.Threading.Tasks.Task<TResult> ToTask<TResult>(this System.IObservable<TResult> observable, System.Threading.CancellationToken cancellationToken) { }

+ 162 - 0
Rx.NET/Source/tests/Tests.System.Reactive/TaskErrorObservation.cs

@@ -0,0 +1,162 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT License.
+// See the LICENSE file in the project root for more information. 
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Runtime.CompilerServices;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+
+using Xunit;
+
+namespace Tests.System.Reactive
+{
+    /// <summary>
+    /// Verifies behavior around unobserved exceptions from tasks.
+    /// </summary>
+    /// <remarks>
+    /// Testing whether unhandled exceptions emerge from <see cref="TaskScheduler.UnobservedTaskException"/> is not
+    /// entirely straightforward. A few tests need to do this because we have some historical behaviour described in
+    /// https://github.com/dotnet/reactive/issues/1256 that needs to be preserved for backwards compatibility, along
+    /// with some new functionality enabling optional different behavior regarding unobserved exceptions. This provides
+    /// common mechanism to enable such testing.
+    /// </remarks>
+    internal class TaskErrorObservation : IDisposable
+    {
+        private ManualResetEventSlim _exceptionReportedAsUnobserved;
+        private WeakReference<Task> _taskWeakReference;
+
+        public TaskErrorObservation()
+        {
+            _exceptionReportedAsUnobserved = new(false);
+            TaskScheduler.UnobservedTaskException += HandleUnobservedException;
+        }
+
+        public Exception Exception { get; } = new();
+
+        public void Dispose()
+        {
+            if (_exceptionReportedAsUnobserved is not null)
+            {
+                _exceptionReportedAsUnobserved.Dispose();
+                _exceptionReportedAsUnobserved = null;
+                TaskScheduler.UnobservedTaskException -= HandleUnobservedException;
+            }
+        }
+
+        [MethodImpl(MethodImplOptions.NoInlining)]
+        public IDisposable SuscribeWithoutKeepingSourceReachable(
+            Func<Func<Task, Task>, Exception, IDisposable> subscribe)
+        {
+            // We provide nested function because the temporary storage location that
+            // holds the value returned by a call to, say, Observable.StartAsync can end up keeping it reachable
+            // for GC purposes, which in turn keeps the task reachable. That stops the
+            // finalization-driven unobserved exception detection from working.
+            // By calling Subscribe in a method whose stack frame is then immediately torn
+            // down, we ensure that we don't hang onto anything other than the IDisposable
+            // it returns.
+
+            return subscribe(
+                t =>
+                {
+                    _taskWeakReference = new(t);
+                    return t;
+                },
+                this.Exception);
+        }
+
+        public IDisposable SuscribeWithoutKeepingSourceReachable<T>(
+            Func<Func<Task<T>, Task<T>>, Exception, IDisposable> subscribe)
+        {
+            return SuscribeWithoutKeepingSourceReachable(
+                (Func<Task, Task> setTask, Exception ex) => subscribe(
+                    t =>
+                    {
+                        setTask(t);
+                        return t;
+                    }, ex));
+        }
+
+
+        public void AssertExceptionReportedAsUnobserved()
+        {
+            var start = Environment.TickCount;
+            var firstIteration = true;
+            while (!_exceptionReportedAsUnobserved.Wait(TimeSpan.FromSeconds(firstIteration ? 0 : 0.001)) &&
+                ((Environment.TickCount - start) < 5000))
+            {
+                firstIteration = false;
+                GC.Collect();
+                GC.WaitForPendingFinalizers();
+            }
+
+            Assert.True(_exceptionReportedAsUnobserved.Wait(TimeSpan.FromSeconds(0.01)));
+        }
+
+        /// <summary>
+        /// Waits for the task to become unreachable, and then verifies that this did not result in
+        /// <see cref="TaskScheduler.UnobservedTaskException"/> reporting the failure.
+        /// </summary>
+        /// <exception cref="InvalidOperationException"></exception>
+        public void AssertExceptionNotReportedAsUnobserved()
+        {
+            if (_taskWeakReference is null)
+            {
+                throw new InvalidOperationException("Test did not supply task to " + nameof(TaskErrorObservation));
+            }
+
+            var start = Environment.TickCount;
+            var firstIteration = true;
+            do
+            {
+                // We try to get away without sleeping, to enable tests to run as quickly as
+                // possible, but if the object remains reachable after the initial attempt to
+                // force a GC and then immediately run finalizers, there's probably some deferred
+                // work waiting to happen somewhere, so we are better off backing off and giving
+                // that a chance to run.
+                if (firstIteration)
+                {
+                    firstIteration = false;
+                }
+                else
+                {
+                    Thread.Sleep(1);
+                }
+                GC.Collect();
+                GC.WaitForPendingFinalizers();
+            } while (IsTaskStillReachable() &&
+                     ((Environment.TickCount - start) < 5000));
+
+
+            // The task is now unreachable, but it's possible that this happened in between our
+            // last call to GC.WaitForPendingFinalizers and our test for reachability, in which
+            // case it might still be awaiting finalization, so we need one more of these to ensure
+            // it gets flushed through:
+            GC.WaitForPendingFinalizers();
+
+            Assert.False(_exceptionReportedAsUnobserved.IsSet);
+        }
+
+        // This needs to be done in a separate method to ensure that when the weak reference returns a task, we
+        // immediately destroy the stack frame containing the temporary variable into which it was returned,
+        // to avoid keeping the task reachable by accident.
+        [MethodImpl(MethodImplOptions.NoInlining)]
+        private bool IsTaskStillReachable()
+        {
+            return _taskWeakReference.TryGetTarget(out _);
+        }
+
+
+        private void HandleUnobservedException(object sender, UnobservedTaskExceptionEventArgs e)
+        {
+            if (e.Exception.InnerException == this.Exception)
+            {
+                e.SetObserved();
+                _exceptionReportedAsUnobserved.Set();
+            }
+        }
+    }
+}

+ 3 - 0
Rx.NET/Source/tests/Tests.System.Reactive/Tests/ArgumentValidationTest.cs

@@ -67,6 +67,7 @@ namespace ReactiveTests.Tests
                 { "Object", new object() },
                 { "Exception", new Exception() },
                 { "String", "String" },
+                { "Boolean", false },
 
                 { "IDictionary`2[Int32, IObservable`1[Int32]]", new Dictionary<int, IObservable<int>>() },
 
@@ -92,6 +93,8 @@ namespace ReactiveTests.Tests
 
                 { "CancellationToken", new CancellationToken() },
 
+                { "TaskObservationOptions", new TaskObservationOptions(null, false) },
+
                 { "Action", new Action(() => { }) },
 
                 { "Action`1[Int32]", new Action<int>(v => { }) },

+ 225 - 4
Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/Observable/FromAsyncTest.cs

@@ -12,6 +12,8 @@ using System.Threading.Tasks;
 using Microsoft.Reactive.Testing;
 using Microsoft.VisualStudio.TestTools.UnitTesting;
 
+using Tests.System.Reactive;
+
 using Assert = Xunit.Assert;
 
 namespace ReactiveTests.Tests
@@ -39,9 +41,9 @@ namespace ReactiveTests.Tests
             ReactiveAssert.Throws<ArgumentNullException>(() => Observable.FromAsync(default(Func<CancellationToken, Task<int>>)));
 
             ReactiveAssert.Throws<ArgumentNullException>(() => Observable.FromAsync(default(Func<Task<int>>), s));
-            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.FromAsync(() => _doneTask, default));
+            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.FromAsync(() => _doneTask, default(IScheduler)));
             ReactiveAssert.Throws<ArgumentNullException>(() => Observable.FromAsync(default(Func<CancellationToken, Task<int>>), s));
-            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.FromAsync(ct => _doneTask, default));
+            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.FromAsync(ct => _doneTask, default(IScheduler)));
         }
 
         [TestMethod]
@@ -168,6 +170,94 @@ namespace ReactiveTests.Tests
             }
         }
 
+        [TestMethod]
+        public void FromAsync_Func_UnsubscribeThenError_ErrorReportedAsUnobserved()
+        {
+            FromAsync_Func_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
+                createTask => Observable.FromAsync(createTask),
+                errorObservation =>
+                {
+                    errorObservation.AssertExceptionReportedAsUnobserved();
+                });
+        }
+
+        [TestMethod]
+        public void FromAsync_FuncWithCancel_UnsubscribeThenError_ErrorReportedAsUnobserved()
+        {
+            FromAsync_Func_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
+                createTask => Observable.FromAsync(_ => createTask()),
+                errorObservation =>
+                {
+                    errorObservation.AssertExceptionReportedAsUnobserved();
+                });
+        }
+
+        [TestMethod]
+        public void FromAsync_Func_WithScheduler_UnsubscribeThenError_ErrorReportedAsUnobserved()
+        {
+            FromAsync_Func_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
+                createTask => Observable.FromAsync(createTask, TaskPoolScheduler.Default),
+                errorObservation =>
+                {
+                    errorObservation.AssertExceptionReportedAsUnobserved();
+                });
+        }
+
+        [TestMethod]
+        public void FromAsync_FuncWithCancel_WithScheduler_UnsubscribeThenError_ErrorReportedAsUnobserved()
+        {
+            FromAsync_Func_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
+                createTask => Observable.FromAsync(_ => createTask(), TaskPoolScheduler.Default),
+                errorObservation =>
+                {
+                    errorObservation.AssertExceptionReportedAsUnobserved();
+                });
+        }
+
+        [TestMethod]
+        public void FromAsync_Func_IgnorePostUnsubscribeErrors_UnsubscribeThenError_ErrorNotReportedAsUnobserved()
+        {
+            FromAsync_Func_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
+                createTask => Observable.FromAsync(createTask, new TaskObservationOptions(null, ignoreExceptionsAfterUnsubscribe: true)),
+                errorObservation =>
+                {
+                    errorObservation.AssertExceptionNotReportedAsUnobserved();
+                });
+        }
+
+        [TestMethod]
+        public void FromAsync_FuncWithCancel_IgnorePostUnsubscribeErrors_UnsubscribeThenError_ErrorNotReportedAsUnobserved()
+        {
+            FromAsync_Func_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
+                createTask => Observable.FromAsync(_ => createTask(), new TaskObservationOptions(null, ignoreExceptionsAfterUnsubscribe: true)),
+                errorObservation =>
+                {
+                    errorObservation.AssertExceptionNotReportedAsUnobserved();
+                });
+        }
+
+        [TestMethod]
+        public void FromAsync_Func_WithScheduler_IgnorePostUnsubscribeErrors_UnsubscribeThenError_ErrorNotReportedAsUnobserved()
+        {
+            FromAsync_Func_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
+                createTask => Observable.FromAsync(createTask, new TaskObservationOptions(TaskPoolScheduler.Default, ignoreExceptionsAfterUnsubscribe: true)),
+                errorObservation =>
+                {
+                    errorObservation.AssertExceptionNotReportedAsUnobserved();
+                });
+        }
+
+        [TestMethod]
+        public void FromAsync_FuncWithCancel_WithScheduler_IgnorePostUnsubscribeErrors_UnsubscribeThenError_ErrorNotReportedAsUnobserved()
+        {
+            FromAsync_Func_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
+                createTask => Observable.FromAsync(_ => createTask(), new TaskObservationOptions(TaskPoolScheduler.Default, ignoreExceptionsAfterUnsubscribe: true)),
+                errorObservation =>
+                {
+                    errorObservation.AssertExceptionNotReportedAsUnobserved();
+                });
+        }
+
 #if DESKTOPCLR
         [TestMethod]
         public void FromAsync_Func_Scheduler1()
@@ -233,9 +323,8 @@ namespace ReactiveTests.Tests
             ReactiveAssert.Throws<ArgumentNullException>(() => Observable.FromAsync(default(Func<CancellationToken, Task>)));
 
             ReactiveAssert.Throws<ArgumentNullException>(() => Observable.FromAsync(default(Func<Task>), s));
-            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.FromAsync(() => (Task)_doneTask, default));
             ReactiveAssert.Throws<ArgumentNullException>(() => Observable.FromAsync(default(Func<CancellationToken, Task>), s));
-            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.FromAsync(ct => (Task)_doneTask, default));
+            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.FromAsync(ct => (Task)_doneTask, default(IScheduler)));
         }
 
         [TestMethod]
@@ -358,6 +447,94 @@ namespace ReactiveTests.Tests
             }
         }
 
+
+        [TestMethod]
+        public void FromAsync_Action_UnsubscribeThenError_ErrorReportedAsUnobserved()
+        {
+            FromAsync_Action_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
+                createTask => Observable.FromAsync(createTask),
+                errorObservation =>
+                {
+                    errorObservation.AssertExceptionReportedAsUnobserved();
+                });
+        }
+
+        [TestMethod]
+        public void FromAsync_ActionWithCancel_UnsubscribeThenError_ErrorReportedAsUnobserved()
+        {
+            FromAsync_Action_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
+                createTask => Observable.FromAsync(_ => createTask()),
+                errorObservation =>
+                {
+                    errorObservation.AssertExceptionReportedAsUnobserved();
+                });
+        }
+
+        [TestMethod]
+        public void FromAsync_Action_WithScheduler_UnsubscribeThenError_ErrorReportedAsUnobserved()
+        {
+            FromAsync_Action_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
+                createTask => Observable.FromAsync(createTask, TaskPoolScheduler.Default),
+                errorObservation =>
+                {
+                    errorObservation.AssertExceptionReportedAsUnobserved();
+                });
+        }
+
+        [TestMethod]
+        public void FromAsync_ActionWithCancel_WithScheduler_UnsubscribeThenError_ErrorReportedAsUnobserved()
+        {
+            FromAsync_Action_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
+                createTask => Observable.FromAsync(_ => createTask(), TaskPoolScheduler.Default),
+                errorObservation =>
+                {
+                    errorObservation.AssertExceptionReportedAsUnobserved();
+                });
+        }
+
+        [TestMethod]
+        public void FromAsync_Action_IgnorePostUnsubscribeErrors_UnsubscribeThenError_ErrorNotReportedAsUnobserved()
+        {
+            FromAsync_Action_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
+                createTask => Observable.FromAsync(createTask, new TaskObservationOptions(null, ignoreExceptionsAfterUnsubscribe: true)),
+                errorObservation =>
+                {
+                    errorObservation.AssertExceptionNotReportedAsUnobserved();
+                });
+        }
+
+        [TestMethod]
+        public void FromAsync_ActionWithCancel_IgnorePostUnsubscribeErrors_UnsubscribeThenError_ErrorNotReportedAsUnobserved()
+        {
+            FromAsync_Action_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
+                createTask => Observable.FromAsync(_ => createTask(), new TaskObservationOptions(scheduler: null, ignoreExceptionsAfterUnsubscribe: true)),
+                errorObservation =>
+                {
+                    errorObservation.AssertExceptionNotReportedAsUnobserved();
+                });
+        }
+
+        [TestMethod]
+        public void FromAsync_Action_WithScheduler_IgnorePostUnsubscribeErrors_UnsubscribeThenError_ErrorNotReportedAsUnobserved()
+        {
+            FromAsync_Action_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
+                createTask => Observable.FromAsync(createTask, new TaskObservationOptions(TaskPoolScheduler.Default, ignoreExceptionsAfterUnsubscribe: true)),
+                errorObservation =>
+                {
+                    errorObservation.AssertExceptionNotReportedAsUnobserved();
+                });
+        }
+
+        [TestMethod]
+        public void FromAsync_ActionWithCancel_WithScheduler_IgnorePostUnsubscribeErrors_UnsubscribeThenError_ErrorNotReportedAsUnobserved()
+        {
+            FromAsync_Action_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
+                createTask => Observable.FromAsync(_ => createTask(), new TaskObservationOptions(TaskPoolScheduler.Default, ignoreExceptionsAfterUnsubscribe: true)),
+                errorObservation =>
+                {
+                    errorObservation.AssertExceptionNotReportedAsUnobserved();
+                });
+        }
 #if DESKTOPCLR
         [TestMethod]
         public void FromAsync_Action_Scheduler1()
@@ -406,5 +583,49 @@ namespace ReactiveTests.Tests
 
         #endregion
 
+        private void FromAsync_Func_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
+            Func<Func<Task<int>>, IObservable<int>> createObservable,
+            Action<TaskErrorObservation> testResults)
+        {
+            FromAsync_Func_ErrorAfterUnsubscribeReportedAsUnobserved_Core<int>(createObservable, testResults);
+        }
+
+        private void FromAsync_Action_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
+            Func<Func<Task>, IObservable<Unit>> createObservable,
+            Action<TaskErrorObservation> testResults)
+        {
+            FromAsync_Func_ErrorAfterUnsubscribeReportedAsUnobserved_Core<Unit>(createObservable, testResults);
+        }
+
+        private void FromAsync_Func_ErrorAfterUnsubscribeReportedAsUnobserved_Core<T>(
+            Func<Func<Task<T>>, IObservable<T>> createObservable,
+            Action<TaskErrorObservation> testResults)
+        {
+            using Barrier gate = new(2);
+            using TaskErrorObservation errorObservation = new();
+
+            var sub = errorObservation.SuscribeWithoutKeepingSourceReachable<T>(
+                (setTask, exception) => createObservable(
+                    () => setTask(Task.Factory.StartNew<T>(
+                        () =>
+                        {
+                            // 1: Notify that task execution has begun
+                            gate.SignalAndWait();
+                            // 2: Wait until unsubscribe Dispose has returned
+                            gate.SignalAndWait();
+                            throw exception;
+                        })))
+                    .Subscribe());
+
+            // 1: wait until task execution has begun
+            gate.SignalAndWait();
+
+            sub.Dispose();
+
+            // 2: Notify that unsubscribe Dispose has returned
+            gate.SignalAndWait();
+
+            testResults(errorObservation);
+        }
     }
 }

+ 229 - 4
Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/Observable/StartAsyncTest.cs

@@ -7,11 +7,15 @@ using System.Linq;
 using System.Reactive;
 using System.Reactive.Concurrency;
 using System.Reactive.Linq;
+using System.Runtime.CompilerServices;
+using System.Security.Cryptography;
 using System.Threading;
 using System.Threading.Tasks;
 using Microsoft.Reactive.Testing;
 using Microsoft.VisualStudio.TestTools.UnitTesting;
 
+using Tests.System.Reactive;
+
 using Assert = Xunit.Assert;
 
 namespace ReactiveTests.Tests
@@ -42,8 +46,8 @@ namespace ReactiveTests.Tests
             ReactiveAssert.Throws<ArgumentNullException>(() => Observable.StartAsync(default(Func<Task<int>>), s));
             ReactiveAssert.Throws<ArgumentNullException>(() => Observable.StartAsync(default(Func<CancellationToken, Task<int>>), s));
 
-            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.StartAsync(() => _doneTask, default));
-            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.StartAsync(ct => _doneTask, default));
+            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.StartAsync(() => _doneTask, default(IScheduler)));
+            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.StartAsync(ct => _doneTask, default(IScheduler)));
         }
 
         [TestMethod]
@@ -182,6 +186,94 @@ namespace ReactiveTests.Tests
             }
         }
 
+        [TestMethod]
+        public void Start_Func_UnsubscribeThenError_ErrorReportedAsUnobserved()
+        {
+            Start_Func_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
+                createTask => Observable.StartAsync(createTask),
+                errorObservation =>
+                {
+                    errorObservation.AssertExceptionReportedAsUnobserved();
+                });
+        }
+
+        [TestMethod]
+        public void Start_FuncWithCancel_UnsubscribeThenError_ErrorReportedAsUnobserved()
+        {
+            Start_Func_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
+                createTask => Observable.StartAsync(_ => createTask()),
+                errorObservation =>
+                {
+                    errorObservation.AssertExceptionReportedAsUnobserved();
+                });
+        }
+
+        [TestMethod]
+        public void Start_Func_WithScheduler_UnsubscribeThenError_ErrorReportedAsUnobserved()
+        {
+            Start_Func_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
+                createTask => Observable.StartAsync(createTask, TaskPoolScheduler.Default),
+                errorObservation =>
+                {
+                    errorObservation.AssertExceptionReportedAsUnobserved();
+                });
+        }
+
+        [TestMethod]
+        public void Start_FuncWithCancel_WithScheduler_UnsubscribeThenError_ErrorReportedAsUnobserved()
+        {
+            Start_Func_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
+                createTask => Observable.StartAsync(_ => createTask(), TaskPoolScheduler.Default),
+                errorObservation =>
+                {
+                    errorObservation.AssertExceptionReportedAsUnobserved();
+                });
+        }
+
+        [TestMethod]
+        public void Start_Func_IgnorePostUnsubscribeErrors_UnsubscribeThenError_ErrorNotReportedAsUnobserved()
+        {
+            Start_Func_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
+                createTask => Observable.StartAsync(createTask, new TaskObservationOptions(null, ignoreExceptionsAfterUnsubscribe: true)),
+                errorObservation =>
+                {
+                    errorObservation.AssertExceptionNotReportedAsUnobserved();
+                });
+        }
+
+        [TestMethod]
+        public void Start_FuncWithCancel_IgnorePostUnsubscribeErrors_UnsubscribeThenError_ErrorNotReportedAsUnobserved()
+        {
+            Start_Func_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
+                createTask => Observable.StartAsync(_ => createTask(), new TaskObservationOptions(null, ignoreExceptionsAfterUnsubscribe: true)),
+                errorObservation =>
+                {
+                    errorObservation.AssertExceptionNotReportedAsUnobserved();
+                });
+        }
+
+        [TestMethod]
+        public void Start_Func_WithScheduler_IgnorePostUnsubscribeErrors_UnsubscribeThenError_ErrorNotReportedAsUnobserved()
+        {
+            Start_Func_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
+                createTask => Observable.StartAsync(createTask, new TaskObservationOptions(TaskPoolScheduler.Default, ignoreExceptionsAfterUnsubscribe: true)),
+                errorObservation =>
+                {
+                    errorObservation.AssertExceptionNotReportedAsUnobserved();
+                });
+        }
+
+        [TestMethod]
+        public void Start_FuncWithCancel_WithScheduler_IgnorePostUnsubscribeErrors_UnsubscribeThenError_ErrorNotReportedAsUnobserved()
+        {
+            Start_Func_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
+                createTask => Observable.StartAsync(_ => createTask(), new TaskObservationOptions(TaskPoolScheduler.Default, ignoreExceptionsAfterUnsubscribe: true)),
+                errorObservation =>
+                {
+                    errorObservation.AssertExceptionNotReportedAsUnobserved();
+                });
+        }
+
 #if DESKTOPCLR
         [TestMethod]
         public void StartAsync_Func_Scheduler1()
@@ -248,8 +340,8 @@ namespace ReactiveTests.Tests
 
             ReactiveAssert.Throws<ArgumentNullException>(() => Observable.StartAsync(default(Func<Task>), s));
             ReactiveAssert.Throws<ArgumentNullException>(() => Observable.StartAsync(default(Func<CancellationToken, Task>), s));
-            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.StartAsync(() => (Task)_doneTask, default));
-            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.StartAsync(ct => (Task)_doneTask, default));
+            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.StartAsync(() => (Task)_doneTask, default(IScheduler)));
+            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.StartAsync(ct => (Task)_doneTask, default(IScheduler)));
         }
 
         [TestMethod]
@@ -380,6 +472,94 @@ namespace ReactiveTests.Tests
             }
         }
 
+        [TestMethod]
+        public void Start_Action_UnsubscribeThenError_ErrorReportedAsUnobserved()
+        {
+            Start_Action_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
+                createTask => Observable.StartAsync(createTask),
+                errorObservation =>
+                {
+                    errorObservation.AssertExceptionReportedAsUnobserved();
+                });
+        }
+
+        [TestMethod]
+        public void Start_ActionWithCancel_UnsubscribeThenError_ErrorReportedAsUnobserved()
+        {
+            Start_Action_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
+                createTask => Observable.StartAsync(_ => createTask()),
+                errorObservation =>
+                {
+                    errorObservation.AssertExceptionReportedAsUnobserved();
+                });
+        }
+
+        [TestMethod]
+        public void Start_Action_WithScheduler_UnsubscribeThenError_ErrorReportedAsUnobserved()
+        {
+            Start_Action_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
+                createTask => Observable.StartAsync(createTask, TaskPoolScheduler.Default),
+                errorObservation =>
+                {
+                    errorObservation.AssertExceptionReportedAsUnobserved();
+                });
+        }
+
+        [TestMethod]
+        public void Start_ActionWithCancel_WithScheduler_UnsubscribeThenError_ErrorReportedAsUnobserved()
+        {
+            Start_Action_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
+                createTask => Observable.StartAsync(_ => createTask(), TaskPoolScheduler.Default),
+                errorObservation =>
+                {
+                    errorObservation.AssertExceptionReportedAsUnobserved();
+                });
+        }
+
+        [TestMethod]
+        public void Start_Action_IgnorePostUnsubscribeErrors_UnsubscribeThenError_ErrorNotReportedAsUnobserved()
+        {
+            Start_Action_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
+                createTask => Observable.StartAsync(createTask, new TaskObservationOptions(null, ignoreExceptionsAfterUnsubscribe: true)),
+                errorObservation =>
+                {
+                    errorObservation.AssertExceptionNotReportedAsUnobserved();
+                });
+        }
+
+        [TestMethod]
+        public void Start_ActionWithCancel_IgnorePostUnsubscribeErrors_UnsubscribeThenError_ErrorNotReportedAsUnobserved()
+        {
+            Start_Action_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
+                createTask => Observable.StartAsync(_ => createTask(), new TaskObservationOptions(null, ignoreExceptionsAfterUnsubscribe: true)),
+                errorObservation =>
+                {
+                    errorObservation.AssertExceptionNotReportedAsUnobserved();
+                });
+        }
+
+        [TestMethod]
+        public void Start_Action_WithScheduler_IgnorePostUnsubscribeErrors_UnsubscribeThenError_ErrorNotReportedAsUnobserved()
+        {
+            Start_Action_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
+                createTask => Observable.StartAsync(createTask, new TaskObservationOptions(TaskPoolScheduler.Default, ignoreExceptionsAfterUnsubscribe: true)),
+                errorObservation =>
+                {
+                    errorObservation.AssertExceptionNotReportedAsUnobserved();
+                });
+        }
+
+        [TestMethod]
+        public void Start_ActionWithCancel_WithScheduler_IgnorePostUnsubscribeErrors_UnsubscribeThenError_ErrorNotReportedAsUnobserved()
+        {
+            Start_Action_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
+                createTask => Observable.StartAsync(_ => createTask(), new TaskObservationOptions(TaskPoolScheduler.Default, ignoreExceptionsAfterUnsubscribe: true)),
+                errorObservation =>
+                {
+                    errorObservation.AssertExceptionNotReportedAsUnobserved();
+                });
+        }
+
 #if DESKTOPCLR
         [TestMethod]
         public void StartAsync_Action_Scheduler1()
@@ -428,5 +608,50 @@ namespace ReactiveTests.Tests
 
         #endregion
 
+        private void Start_Func_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
+            Func<Func<Task<int>>, IObservable<int>> createObservable,
+            Action<TaskErrorObservation> testResults)
+        {
+            Start_Func_ErrorAfterUnsubscribeReportedAsUnobserved_Core<int>(createObservable, testResults);
+        }
+
+        private void Start_Action_ErrorAfterUnsubscribeReportedAsUnobserved_Core(
+            Func<Func<Task>, IObservable<Unit>> createObservable,
+            Action<TaskErrorObservation> testResults)
+        {
+            Start_Func_ErrorAfterUnsubscribeReportedAsUnobserved_Core<Unit>(createObservable, testResults);
+        }
+
+        private void Start_Func_ErrorAfterUnsubscribeReportedAsUnobserved_Core<T>(
+            Func<Func<Task<T>>, IObservable<T>> createObservable,
+            Action<TaskErrorObservation> testResults)
+        {
+            using Barrier gate = new(2);
+            using TaskErrorObservation errorObservation = new();
+
+            var sub = errorObservation.SuscribeWithoutKeepingSourceReachable<T>(
+                (setTask, exception) => createObservable(
+                    () => setTask(Task.Factory.StartNew<T>(
+                        () =>
+                        {
+                            // 1: Notify that task execution has begun
+                            gate.SignalAndWait();
+                            // 2: Wait until unsubscribe Dispose has returned
+                            gate.SignalAndWait();
+                            throw exception;
+                        })))
+                    .Subscribe());
+
+            // 1: wait until task execution has begun
+            gate.SignalAndWait();
+
+            sub.Dispose();
+            //sub = null;
+
+            // 2: Notify that unsubscribe Dispose has returned
+            gate.SignalAndWait();
+
+            testResults(errorObservation);
+        }
     }
 }

+ 28 - 0
Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/Observable/StartTest.cs

@@ -6,6 +6,8 @@ using System;
 using System.Linq;
 using System.Reactive;
 using System.Reactive.Linq;
+using System.Threading;
+
 using Microsoft.Reactive.Testing;
 using Microsoft.VisualStudio.TestTools.UnitTesting;
 
@@ -70,6 +72,32 @@ namespace ReactiveTests.Tests
             }));
         }
 
+        [TestMethod]
+        public void Start_ActionErrorAfterUnsubscribeDoesNotThrow()
+        {
+            var ex = new Exception();
+            using Barrier gate = new(2);
+
+            var sub = Observable.Start(
+                () =>
+                {
+                    // 1: action running
+                    gate.SignalAndWait();
+                    // 2: unsubscribe Dispose returned
+                    gate.SignalAndWait();
+                    throw ex;
+                })
+                .Subscribe();
+
+            // 1: action running
+            gate.SignalAndWait();
+
+            sub.Dispose();
+
+            // 2: unsubscribe Dispose returned
+            gate.SignalAndWait();
+        }
+
         [TestMethod]
         public void Start_Func()
         {

+ 2 - 2
Rx.NET/Source/tests/Tests.System.Reactive/Tests/TaskObservableExtensionsTest.cs

@@ -39,7 +39,7 @@ namespace ReactiveTests.Tests
             ReactiveAssert.Throws<ArgumentNullException>(() => TaskObservableExtensions.ToObservable((Task<int>)null));
 
             ReactiveAssert.Throws<ArgumentNullException>(() => TaskObservableExtensions.ToObservable((Task<int>)null, s));
-            ReactiveAssert.Throws<ArgumentNullException>(() => TaskObservableExtensions.ToObservable(_doneTask, default));
+            ReactiveAssert.Throws<ArgumentNullException>(() => TaskObservableExtensions.ToObservable(_doneTask, default(IScheduler)));
 
             var tcs = new TaskCompletionSource<int>();
             var task = tcs.Task;
@@ -399,7 +399,7 @@ namespace ReactiveTests.Tests
             ReactiveAssert.Throws<ArgumentNullException>(() => TaskObservableExtensions.ToObservable(null));
 
             ReactiveAssert.Throws<ArgumentNullException>(() => TaskObservableExtensions.ToObservable(null, s));
-            ReactiveAssert.Throws<ArgumentNullException>(() => TaskObservableExtensions.ToObservable((Task)_doneTask, default));
+            ReactiveAssert.Throws<ArgumentNullException>(() => TaskObservableExtensions.ToObservable((Task)_doneTask, default(IScheduler)));
 
             var tcs = new TaskCompletionSource<int>();
             Task task = tcs.Task;

+ 13 - 15
Rx.NET/tools/HomoIcon/Program.cs

@@ -153,26 +153,24 @@ namespace HomoIconize
  */
 ");
             WriteLine(
-@"#pragma warning disable 1591
+@"#nullable enable
+#pragma warning disable 1591
 ");
 
             WriteLine(
-@"using System;
-using System.Reactive.Concurrency;
-using System.Collections.Generic;");
+@"using System.Collections.Generic;");
 
             if (exludeFromCodeCoverage)
                 WriteLine("using System.Diagnostics.CodeAnalysis;");
 
             WriteLine(
-@"using System.Reactive.Joins;
-using System.Linq;
+@"using System.Linq;
 using System.Linq.Expressions;
+using System.Reactive.Concurrency;
+using System.Reactive.Subjects;
 using System.Reflection;
 using System.Threading;
 using System.Threading.Tasks;
-using System.Reactive;
-using System.Reactive.Subjects;
 ");
             WriteLine(
 @"namespace System.Reactive.Linq
@@ -425,9 +423,9 @@ using System.Reactive.Subjects;
                     WriteLine("InfoOf(() => " + typeName + "." + name + g + "(" + string.Join(", ", ptps.Select(pt => "default(" + pt + ")")) + "))" + cma);
                     WriteLine("#else", true);
                     if (!m.IsGenericMethod)
-                        WriteLine("(MethodInfo)MethodInfo.GetCurrentMethod()" + cma);
+                        WriteLine("(MethodInfo)MethodInfo.GetCurrentMethod()!" + cma);
                     else
-                        WriteLine("((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(" + string.Join(", ", m.GetGenericArguments().Select(ga => "typeof(" + ga.Name + ")").ToArray()) + ")" + cma);
+                        WriteLine("((MethodInfo)MethodInfo.GetCurrentMethod()!).MakeGenericMethod(" + string.Join(", ", m.GetGenericArguments().Select(ga => "typeof(" + ga.Name + ")").ToArray()) + ")" + cma);
                     WriteLine("#endif", true);
                     for (int j = 0; j < args.Count; j++)
                         WriteLine(args[j] + (j < args.Count - 1 ? "," : ""));
@@ -614,9 +612,9 @@ using System.Reactive.Subjects;
                         WriteLine("var m = InfoOf(() => " + typeName + ".ToAsync" + genArgss + "(" + string.Join(", ", aprs.Select(pt => "default(" + pt + ")")) + "));");
                         WriteLine("#else", true);
                         if (genArgs.Length == 0)
-                            WriteLine("var m = (MethodInfo)MethodInfo.GetCurrentMethod();");
+                            WriteLine("var m = (MethodInfo)MethodInfo.GetCurrentMethod()!;");
                         else
-                            WriteLine("var m = ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(" + string.Join(", ", genArgs.Select(a => "typeof(" + a + ")").ToArray()) + ");");
+                            WriteLine("var m = ((MethodInfo)MethodInfo.GetCurrentMethod()!).MakeGenericMethod(" + string.Join(", ", genArgs.Select(a => "typeof(" + a + ")").ToArray()) + ");");
                         WriteLine("#endif", true);
 
                         WriteLine("return (" + string.Join(", ", lamPars) + ") => provider.CreateQuery<" + ret + ">(");
@@ -724,7 +722,7 @@ using System.Reactive.Subjects;
                     WriteLine("[Obsolete(Constants_Linq.USE_TASK_FROMASYNCPATTERN)]");
                     WriteLine("#endif", true);
 
-                    WriteLine("public static " + retType + " FromAsyncPattern" + genArgss + "(this IQbservableProvider provider, " + begType + " begin, " + endType + "end)");
+                    WriteLine("public static " + retType + " FromAsyncPattern" + genArgss + "(this IQbservableProvider provider, " + begType + " begin, " + endType + " end)");
                     WriteLine("{");
 
                     Indent();
@@ -751,9 +749,9 @@ using System.Reactive.Subjects;
                     WriteLine("var m = InfoOf(() => " + typeName + ".FromAsyncPattern" + genArgss + "(" + string.Join(", ", aprs.Select(pt => "default(" + pt + ")")) + "));");
                     WriteLine("#else", true);
                     if (genArgs.Length == 0)
-                        WriteLine("var m = (MethodInfo)MethodInfo.GetCurrentMethod();");
+                        WriteLine("var m = (MethodInfo)MethodInfo.GetCurrentMethod()!;");
                     else
-                        WriteLine("var m = ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(" + string.Join(", ", genArgs.Select(a => "typeof(" + a + ")").ToArray()) + ");");
+                        WriteLine("var m = ((MethodInfo)MethodInfo.GetCurrentMethod()!).MakeGenericMethod(" + string.Join(", ", genArgs.Select(a => "typeof(" + a + ")").ToArray()) + ");");
                     WriteLine("#endif", true);
 
                     WriteLine("return (" + string.Join(", ", lamPars) + ") => provider.CreateQuery<" + ret + ">(");