|
@@ -20,11 +20,13 @@ namespace System.Reactive.Threading.Tasks
|
|
|
{
|
|
|
private readonly Task _task;
|
|
|
private readonly IScheduler? _scheduler;
|
|
|
+ private readonly bool _ignoreExceptionsAfterUnsubscribe;
|
|
|
|
|
|
- public SlowTaskObservable(Task task, IScheduler? scheduler)
|
|
|
+ public SlowTaskObservable(Task task, IScheduler? scheduler, bool ignoreExceptionsAfterUnsubscribe)
|
|
|
{
|
|
|
_task = task;
|
|
|
_scheduler = scheduler;
|
|
|
+ _ignoreExceptionsAfterUnsubscribe = ignoreExceptionsAfterUnsubscribe;
|
|
|
}
|
|
|
|
|
|
public IDisposable Subscribe(IObserver<Unit> observer)
|
|
@@ -52,6 +54,11 @@ namespace System.Reactive.Threading.Tasks
|
|
|
options);
|
|
|
}
|
|
|
|
|
|
+ if (_ignoreExceptionsAfterUnsubscribe)
|
|
|
+ {
|
|
|
+ _task.ContinueWith(t => _ = t.Exception, TaskContinuationOptions.OnlyOnFaulted);
|
|
|
+ }
|
|
|
+
|
|
|
return cts;
|
|
|
}
|
|
|
}
|
|
@@ -60,11 +67,13 @@ namespace System.Reactive.Threading.Tasks
|
|
|
{
|
|
|
private readonly Task<TResult> _task;
|
|
|
private readonly IScheduler? _scheduler;
|
|
|
+ private readonly bool _ignoreExceptionsAfterUnsubscribe;
|
|
|
|
|
|
- public SlowTaskObservable(Task<TResult> task, IScheduler? scheduler)
|
|
|
+ public SlowTaskObservable(Task<TResult> task, IScheduler? scheduler, bool ignoreExceptionsAfterUnsubscribe)
|
|
|
{
|
|
|
_task = task;
|
|
|
_scheduler = scheduler;
|
|
|
+ _ignoreExceptionsAfterUnsubscribe = ignoreExceptionsAfterUnsubscribe;
|
|
|
}
|
|
|
|
|
|
public IDisposable Subscribe(IObserver<TResult> observer)
|
|
@@ -92,9 +101,15 @@ namespace System.Reactive.Threading.Tasks
|
|
|
options);
|
|
|
}
|
|
|
|
|
|
+ if (_ignoreExceptionsAfterUnsubscribe)
|
|
|
+ {
|
|
|
+ _task.ContinueWith(t => _ = t.Exception, TaskContinuationOptions.OnlyOnFaulted);
|
|
|
+ }
|
|
|
+
|
|
|
return cts;
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
/// <summary>
|
|
|
/// Returns an observable sequence that signals when the task completes.
|
|
|
/// </summary>
|
|
@@ -103,13 +118,30 @@ namespace System.Reactive.Threading.Tasks
|
|
|
/// <exception cref="ArgumentNullException"><paramref name="task"/> is <c>null</c>.</exception>
|
|
|
/// <remarks>If the specified task object supports cancellation, consider using <see cref="Observable.FromAsync(Func{CancellationToken, Task})"/> instead.</remarks>
|
|
|
public static IObservable<Unit> ToObservable(this Task task)
|
|
|
+ {
|
|
|
+ return ToObservable(task, ignoreExceptionsAfterUnsubscribe: false);
|
|
|
+ }
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// Returns an observable sequence that signals when the task completes.
|
|
|
+ /// </summary>
|
|
|
+ /// <param name="task">Task to convert to an observable sequence.</param>
|
|
|
+ /// <param name="ignoreExceptionsAfterUnsubscribe">
|
|
|
+ /// If true, exceptions that occur after cancellation has been initiated by unsubscribing from the observable
|
|
|
+ /// this method returns will be handled and silently ignored. If false, they will go unobserved, meaning they
|
|
|
+ /// will eventually emerge through <see cref="TaskScheduler.UnobservedTaskException"/>.
|
|
|
+ /// </param>
|
|
|
+ /// <returns>An observable sequence that produces a unit value when the task completes, or propagates the exception produced by the task.</returns>
|
|
|
+ /// <exception cref="ArgumentNullException"><paramref name="task"/> is <c>null</c>.</exception>
|
|
|
+ /// <remarks>If the specified task object supports cancellation, consider using <see cref="Observable.FromAsync(Func{CancellationToken, Task})"/> instead.</remarks>
|
|
|
+ public static IObservable<Unit> ToObservable(this Task task, bool ignoreExceptionsAfterUnsubscribe)
|
|
|
{
|
|
|
if (task == null)
|
|
|
{
|
|
|
throw new ArgumentNullException(nameof(task));
|
|
|
}
|
|
|
|
|
|
- return ToObservableImpl(task, scheduler: null);
|
|
|
+ return ToObservableImpl(task, scheduler: null, ignoreExceptionsAfterUnsubscribe);
|
|
|
}
|
|
|
|
|
|
/// <summary>
|
|
@@ -121,6 +153,24 @@ namespace System.Reactive.Threading.Tasks
|
|
|
/// <exception cref="ArgumentNullException"><paramref name="task"/> is <c>null</c> or <paramref name="scheduler"/> is <c>null</c>.</exception>
|
|
|
/// <remarks>If the specified task object supports cancellation, consider using <see cref="Observable.FromAsync(Func{CancellationToken, Task})"/> instead.</remarks>
|
|
|
public static IObservable<Unit> ToObservable(this Task task, IScheduler scheduler)
|
|
|
+ {
|
|
|
+ return ToObservable(task, scheduler, ignoreExceptionsAfterUnsubscribe: false);
|
|
|
+ }
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// Returns an observable sequence that signals when the task completes.
|
|
|
+ /// </summary>
|
|
|
+ /// <param name="task">Task to convert to an observable sequence.</param>
|
|
|
+ /// <param name="scheduler">Scheduler on which to notify observers about completion, cancellation or failure.</param>
|
|
|
+ /// <param name="ignoreExceptionsAfterUnsubscribe">
|
|
|
+ /// If true, exceptions that occur after cancellation has been initiated by unsubscribing from the observable
|
|
|
+ /// this method returns will be handled and silently ignored. If false, they will go unobserved, meaning they
|
|
|
+ /// will eventually emerge through <see cref="TaskScheduler.UnobservedTaskException"/>.
|
|
|
+ /// </param>
|
|
|
+ /// <returns>An observable sequence that produces a unit value when the task completes, or propagates the exception produced by the task.</returns>
|
|
|
+ /// <exception cref="ArgumentNullException"><paramref name="task"/> is <c>null</c> or <paramref name="scheduler"/> is <c>null</c>.</exception>
|
|
|
+ /// <remarks>If the specified task object supports cancellation, consider using <see cref="Observable.FromAsync(Func{CancellationToken, Task})"/> instead.</remarks>
|
|
|
+ public static IObservable<Unit> ToObservable(this Task task, IScheduler scheduler, bool ignoreExceptionsAfterUnsubscribe)
|
|
|
{
|
|
|
if (task == null)
|
|
|
{
|
|
@@ -132,10 +182,10 @@ namespace System.Reactive.Threading.Tasks
|
|
|
throw new ArgumentNullException(nameof(scheduler));
|
|
|
}
|
|
|
|
|
|
- return ToObservableImpl(task, scheduler);
|
|
|
+ return ToObservableImpl(task, scheduler, ignoreExceptionsAfterUnsubscribe);
|
|
|
}
|
|
|
|
|
|
- private static IObservable<Unit> ToObservableImpl(Task task, IScheduler? scheduler)
|
|
|
+ private static IObservable<Unit> ToObservableImpl(Task task, IScheduler? scheduler, bool ignoreExceptionsAfterUnsubscribe)
|
|
|
{
|
|
|
if (task.IsCompleted)
|
|
|
{
|
|
@@ -149,7 +199,7 @@ namespace System.Reactive.Threading.Tasks
|
|
|
};
|
|
|
}
|
|
|
|
|
|
- return new SlowTaskObservable(task, scheduler);
|
|
|
+ return new SlowTaskObservable(task, scheduler, ignoreExceptionsAfterUnsubscribe);
|
|
|
}
|
|
|
|
|
|
private static void EmitTaskResult(this Task task, IObserver<Unit> subject)
|
|
@@ -198,13 +248,31 @@ namespace System.Reactive.Threading.Tasks
|
|
|
/// <exception cref="ArgumentNullException"><paramref name="task"/> is <c>null</c>.</exception>
|
|
|
/// <remarks>If the specified task object supports cancellation, consider using <see cref="Observable.FromAsync{TResult}(Func{CancellationToken, Task{TResult}})"/> instead.</remarks>
|
|
|
public static IObservable<TResult> ToObservable<TResult>(this Task<TResult> task)
|
|
|
+ {
|
|
|
+ return ToObservable(task, ignoreExceptionsAfterUnsubscribe: false);
|
|
|
+ }
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// Returns an observable sequence that propagates the result of the task.
|
|
|
+ /// </summary>
|
|
|
+ /// <typeparam name="TResult">The type of the result produced by the task.</typeparam>
|
|
|
+ /// <param name="task">Task to convert to an observable sequence.</param>
|
|
|
+ /// <param name="ignoreExceptionsAfterUnsubscribe">
|
|
|
+ /// If true, exceptions that occur after cancellation has been initiated by unsubscribing from the observable
|
|
|
+ /// this method returns will be handled and silently ignored. If false, they will go unobserved, meaning they
|
|
|
+ /// will eventually emerge through <see cref="TaskScheduler.UnobservedTaskException"/>.
|
|
|
+ /// </param>
|
|
|
+ /// <returns>An observable sequence that produces the task's result, or propagates the exception produced by the task.</returns>
|
|
|
+ /// <exception cref="ArgumentNullException"><paramref name="task"/> is <c>null</c>.</exception>
|
|
|
+ /// <remarks>If the specified task object supports cancellation, consider using <see cref="Observable.FromAsync{TResult}(Func{CancellationToken, Task{TResult}})"/> instead.</remarks>
|
|
|
+ public static IObservable<TResult> ToObservable<TResult>(this Task<TResult> task, bool ignoreExceptionsAfterUnsubscribe)
|
|
|
{
|
|
|
if (task == null)
|
|
|
{
|
|
|
throw new ArgumentNullException(nameof(task));
|
|
|
}
|
|
|
|
|
|
- return ToObservableImpl(task, scheduler: null);
|
|
|
+ return ToObservableImpl(task, scheduler: null, ignoreExceptionsAfterUnsubscribe);
|
|
|
}
|
|
|
|
|
|
/// <summary>
|
|
@@ -217,6 +285,25 @@ namespace System.Reactive.Threading.Tasks
|
|
|
/// <exception cref="ArgumentNullException"><paramref name="task"/> is <c>null</c> or <paramref name="scheduler"/> is <c>null</c>.</exception>
|
|
|
/// <remarks>If the specified task object supports cancellation, consider using <see cref="Observable.FromAsync{TResult}(Func{CancellationToken, Task{TResult}})"/> instead.</remarks>
|
|
|
public static IObservable<TResult> ToObservable<TResult>(this Task<TResult> task, IScheduler scheduler)
|
|
|
+ {
|
|
|
+ return ToObservable(task, scheduler, ignoreExceptionsAfterUnsubscribe: false);
|
|
|
+ }
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// Returns an observable sequence that propagates the result of the task.
|
|
|
+ /// </summary>
|
|
|
+ /// <typeparam name="TResult">The type of the result produced by the task.</typeparam>
|
|
|
+ /// <param name="task">Task to convert to an observable sequence.</param>
|
|
|
+ /// <param name="scheduler">Scheduler on which to notify observers about completion, cancellation or failure.</param>
|
|
|
+ /// <param name="ignoreExceptionsAfterUnsubscribe">
|
|
|
+ /// If true, exceptions that occur after cancellation has been initiated by unsubscribing from the observable
|
|
|
+ /// this method returns will be handled and silently ignored. If false, they will go unobserved, meaning they
|
|
|
+ /// will eventually emerge through <see cref="TaskScheduler.UnobservedTaskException"/>.
|
|
|
+ /// </param>
|
|
|
+ /// <returns>An observable sequence that produces the task's result, or propagates the exception produced by the task.</returns>
|
|
|
+ /// <exception cref="ArgumentNullException"><paramref name="task"/> is <c>null</c> or <paramref name="scheduler"/> is <c>null</c>.</exception>
|
|
|
+ /// <remarks>If the specified task object supports cancellation, consider using <see cref="Observable.FromAsync{TResult}(Func{CancellationToken, Task{TResult}})"/> instead.</remarks>
|
|
|
+ public static IObservable<TResult> ToObservable<TResult>(this Task<TResult> task, IScheduler scheduler, bool ignoreExceptionsAfterUnsubscribe)
|
|
|
{
|
|
|
if (task == null)
|
|
|
{
|
|
@@ -228,10 +315,10 @@ namespace System.Reactive.Threading.Tasks
|
|
|
throw new ArgumentNullException(nameof(scheduler));
|
|
|
}
|
|
|
|
|
|
- return ToObservableImpl(task, scheduler);
|
|
|
+ return ToObservableImpl(task, scheduler, ignoreExceptionsAfterUnsubscribe);
|
|
|
}
|
|
|
|
|
|
- private static IObservable<TResult> ToObservableImpl<TResult>(Task<TResult> task, IScheduler? scheduler)
|
|
|
+ private static IObservable<TResult> ToObservableImpl<TResult>(Task<TResult> task, IScheduler? scheduler, bool ignoreExceptionsAfterUnsubscribe)
|
|
|
{
|
|
|
if (task.IsCompleted)
|
|
|
{
|
|
@@ -245,7 +332,7 @@ namespace System.Reactive.Threading.Tasks
|
|
|
};
|
|
|
}
|
|
|
|
|
|
- return new SlowTaskObservable<TResult>(task, scheduler);
|
|
|
+ return new SlowTaskObservable<TResult>(task, scheduler, ignoreExceptionsAfterUnsubscribe);
|
|
|
}
|
|
|
|
|
|
private static void EmitTaskResult<TResult>(this Task<TResult> task, IObserver<TResult> subject)
|