|
|
@@ -59,36 +59,27 @@ namespace System.Reactive.Threading.Tasks
|
|
|
|
|
|
private static IObservable<Unit> ToObservableImpl(Task task, IScheduler scheduler)
|
|
|
{
|
|
|
- var res = default(IObservable<Unit>);
|
|
|
-
|
|
|
if (task.IsCompleted)
|
|
|
{
|
|
|
scheduler = scheduler ?? ImmediateScheduler.Instance;
|
|
|
|
|
|
switch (task.Status)
|
|
|
{
|
|
|
- case TaskStatus.RanToCompletion:
|
|
|
- res = new Return<Unit>(Unit.Default, scheduler);
|
|
|
- break;
|
|
|
case TaskStatus.Faulted:
|
|
|
- res = new Throw<Unit>(task.Exception.InnerException, scheduler);
|
|
|
- break;
|
|
|
+ return new Throw<Unit>(task.Exception.InnerException, scheduler);
|
|
|
case TaskStatus.Canceled:
|
|
|
- res = new Throw<Unit>(new TaskCanceledException(task), scheduler);
|
|
|
- break;
|
|
|
+ return new Throw<Unit>(new TaskCanceledException(task), scheduler);
|
|
|
}
|
|
|
+
|
|
|
+ return new Return<Unit>(Unit.Default, scheduler);
|
|
|
}
|
|
|
- else
|
|
|
- {
|
|
|
- var subject = new AsyncSubject<Unit>();
|
|
|
- var options = GetTaskContinuationOptions(scheduler);
|
|
|
|
|
|
- task.ContinueWith((t, subjectObject) => t.EmitTaskResult((AsyncSubject<Unit>)subjectObject), subject, options);
|
|
|
+ var subject = new AsyncSubject<Unit>();
|
|
|
+ var options = GetTaskContinuationOptions(scheduler);
|
|
|
|
|
|
- return subject.ToObservableResult(scheduler);
|
|
|
- }
|
|
|
+ task.ContinueWith((t, subjectObject) => t.EmitTaskResult((AsyncSubject<Unit>)subjectObject), subject, options);
|
|
|
|
|
|
- return res;
|
|
|
+ return subject.ToObservableResult(scheduler);
|
|
|
}
|
|
|
|
|
|
private static void EmitTaskResult(this Task task, IObserver<Unit> subject)
|
|
|
@@ -172,36 +163,27 @@ namespace System.Reactive.Threading.Tasks
|
|
|
|
|
|
private static IObservable<TResult> ToObservableImpl<TResult>(Task<TResult> task, IScheduler scheduler)
|
|
|
{
|
|
|
- var res = default(IObservable<TResult>);
|
|
|
-
|
|
|
if (task.IsCompleted)
|
|
|
{
|
|
|
scheduler = scheduler ?? ImmediateScheduler.Instance;
|
|
|
|
|
|
switch (task.Status)
|
|
|
{
|
|
|
- case TaskStatus.RanToCompletion:
|
|
|
- res = new Return<TResult>(task.Result, scheduler);
|
|
|
- break;
|
|
|
case TaskStatus.Faulted:
|
|
|
- res = new Throw<TResult>(task.Exception.InnerException, scheduler);
|
|
|
- break;
|
|
|
+ return new Throw<TResult>(task.Exception.InnerException, scheduler);
|
|
|
case TaskStatus.Canceled:
|
|
|
- res = new Throw<TResult>(new TaskCanceledException(task), scheduler);
|
|
|
- break;
|
|
|
+ return new Throw<TResult>(new TaskCanceledException(task), scheduler);
|
|
|
}
|
|
|
+
|
|
|
+ return new Return<TResult>(task.Result, scheduler);
|
|
|
}
|
|
|
- else
|
|
|
- {
|
|
|
- var subject = new AsyncSubject<TResult>();
|
|
|
- var options = GetTaskContinuationOptions(scheduler);
|
|
|
|
|
|
- task.ContinueWith((t, subjectObject) => t.EmitTaskResult((AsyncSubject<TResult>)subjectObject), subject, options);
|
|
|
+ var subject = new AsyncSubject<TResult>();
|
|
|
+ var options = GetTaskContinuationOptions(scheduler);
|
|
|
|
|
|
- return subject.ToObservableResult(scheduler);
|
|
|
- }
|
|
|
+ task.ContinueWith((t, subjectObject) => t.EmitTaskResult((AsyncSubject<TResult>)subjectObject), subject, options);
|
|
|
|
|
|
- return res;
|
|
|
+ return subject.ToObservableResult(scheduler);
|
|
|
}
|
|
|
|
|
|
private static void EmitTaskResult<TResult>(this Task<TResult> task, IObserver<TResult> subject)
|