123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176 |
- // Licensed to the .NET Foundation under one or more agreements.
- // The .NET Foundation licenses this file to you under the MIT License.
- // See the LICENSE file in the project root for more information.
- using System.Reactive;
- using System.Reactive.Concurrency;
- using System.Reactive.Linq;
- using System.Reactive.Subjects;
- namespace System.Threading.Tasks
- {
- // TODO: Add ToTask.
- public static class TaskAsyncObservableExtensions
- {
- public static IAsyncObservable<Unit> ToAsyncObservable(this Task task)
- {
- if (task == null)
- throw new ArgumentNullException(nameof(task));
- return AsyncObservable.Create<Unit>(observer => task.AcceptAsync(observer));
- }
- public static IAsyncObservable<Unit> ToAsyncObservable(this Task task, IAsyncScheduler scheduler)
- {
- if (task == null)
- throw new ArgumentNullException(nameof(task));
- if (scheduler == null)
- throw new ArgumentNullException(nameof(scheduler));
- return AsyncObservable.Create<Unit>(observer => task.AcceptAsync(observer, scheduler));
- }
- public static IAsyncObservable<TResult> ToAsyncObservable<TResult>(this Task<TResult> task)
- {
- if (task == null)
- throw new ArgumentNullException(nameof(task));
- return AsyncObservable.Create<TResult>(observer => task.AcceptAsync(observer));
- }
- public static IAsyncObservable<TResult> ToAsyncObservable<TResult>(this Task<TResult> task, IAsyncScheduler scheduler)
- {
- if (task == null)
- throw new ArgumentNullException(nameof(task));
- if (scheduler == null)
- throw new ArgumentNullException(nameof(scheduler));
- return AsyncObservable.Create<TResult>(observer => task.AcceptAsync(observer, scheduler));
- }
- public static ValueTask<IAsyncDisposable> AcceptAsync(this Task task, IAsyncObserver<Unit> observer) => AcceptAsync(task, observer, ImmediateAsyncScheduler.Instance);
- public static ValueTask<IAsyncDisposable> AcceptAsync(this Task task, IAsyncObserver<Unit> observer, IAsyncScheduler scheduler)
- {
- if (task == null)
- throw new ArgumentNullException(nameof(task));
- if (observer == null)
- throw new ArgumentNullException(nameof(observer));
- if (scheduler == null)
- throw new ArgumentNullException(nameof(scheduler));
- ValueTask<IAsyncDisposable> CompleteAsync()
- {
- return scheduler.ScheduleAsync(async ct =>
- {
- if (ct.IsCancellationRequested)
- {
- return;
- }
- switch (task.Status)
- {
- case TaskStatus.RanToCompletion:
- await observer.OnNextAsync(Unit.Default).RendezVous(scheduler, ct);
- await observer.OnCompletedAsync().RendezVous(scheduler, ct);
- break;
- case TaskStatus.Faulted:
- await observer.OnErrorAsync(task.Exception.InnerException).RendezVous(scheduler, ct);
- break;
- case TaskStatus.Canceled:
- await observer.OnErrorAsync(new TaskCanceledException(task)).RendezVous(scheduler, ct);
- break;
- }
- });
- }
- ValueTask<IAsyncDisposable> CoreAsync()
- {
- if (task.IsCompleted)
- {
- return CompleteAsync();
- }
- else
- {
- var tco = TaskContinuationOptions.None;
- if (scheduler == ImmediateAsyncScheduler.Instance)
- {
- tco = TaskContinuationOptions.ExecuteSynchronously;
- }
- var subject = new SequentialAsyncAsyncSubject<Unit>();
- task.ContinueWith(t => CompleteAsync(), tco);
- return subject.SubscribeAsync(observer);
- }
- }
- return CoreAsync();
- }
- public static ValueTask<IAsyncDisposable> AcceptAsync<TResult>(this Task<TResult> task, IAsyncObserver<TResult> observer) => AcceptAsync(task, observer, ImmediateAsyncScheduler.Instance);
- public static ValueTask<IAsyncDisposable> AcceptAsync<TResult>(this Task<TResult> task, IAsyncObserver<TResult> observer, IAsyncScheduler scheduler)
- {
- if (task == null)
- throw new ArgumentNullException(nameof(task));
- if (observer == null)
- throw new ArgumentNullException(nameof(observer));
- if (scheduler == null)
- throw new ArgumentNullException(nameof(scheduler));
- ValueTask<IAsyncDisposable> CompleteAsync()
- {
- return scheduler.ScheduleAsync(async ct =>
- {
- if (ct.IsCancellationRequested)
- {
- return;
- }
- switch (task.Status)
- {
- case TaskStatus.RanToCompletion:
- await observer.OnNextAsync(task.Result).RendezVous(scheduler, ct);
- await observer.OnCompletedAsync().RendezVous(scheduler, ct);
- break;
- case TaskStatus.Faulted:
- await observer.OnErrorAsync(task.Exception.InnerException).RendezVous(scheduler, ct);
- break;
- case TaskStatus.Canceled:
- await observer.OnErrorAsync(new TaskCanceledException(task)).RendezVous(scheduler, ct);
- break;
- }
- });
- }
- ValueTask<IAsyncDisposable> CoreAsync()
- {
- if (task.IsCompleted)
- {
- return CompleteAsync();
- }
- else
- {
- var tco = TaskContinuationOptions.None;
- if (scheduler == ImmediateAsyncScheduler.Instance)
- {
- tco = TaskContinuationOptions.ExecuteSynchronously;
- }
- var subject = new SequentialAsyncAsyncSubject<TResult>();
- task.ContinueWith(t => CompleteAsync(), tco);
- return subject.SubscribeAsync(observer);
- }
- }
- return CoreAsync();
- }
- }
- }
|