Просмотр исходного кода

Adding Task[<T>] to IAsyncObservable<T> conversions.

Bart De Smet 8 лет назад
Родитель
Сommit
85da8ca9c2

+ 174 - 0
AsyncRx.NET/System.Reactive.Async.Linq/System/Threading/Tasks/TaskAsyncObservableExtensions.cs

@@ -0,0 +1,174 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information. 
+
+using System.Reactive;
+using System.Reactive.Concurrency;
+using System.Reactive.Linq;
+using System.Reactive.Subjects;
+
+namespace System.Threading.Tasks
+{
+    public static class TaskAsyncObservableExtensions
+    {
+        public static IAsyncObservable<Unit> ToAsyncObservable(this Task task)
+        {
+            if (task == null)
+                throw new ArgumentNullException(nameof(task));
+
+            return AsyncObservable.Create<Unit>(observer => task.AcceptAsync(observer));
+        }
+
+        public static IAsyncObservable<Unit> ToAsyncObservable(this Task task, IAsyncScheduler scheduler)
+        {
+            if (task == null)
+                throw new ArgumentNullException(nameof(task));
+            if (scheduler == null)
+                throw new ArgumentNullException(nameof(scheduler));
+
+            return AsyncObservable.Create<Unit>(observer => task.AcceptAsync(observer, scheduler));
+        }
+
+        public static IAsyncObservable<TResult> ToAsyncObservable<TResult>(this Task<TResult> task)
+        {
+            if (task == null)
+                throw new ArgumentNullException(nameof(task));
+
+            return AsyncObservable.Create<TResult>(observer => task.AcceptAsync(observer));
+        }
+
+        public static IAsyncObservable<TResult> ToAsyncObservable<TResult>(this Task<TResult> task, IAsyncScheduler scheduler)
+        {
+            if (task == null)
+                throw new ArgumentNullException(nameof(task));
+            if (scheduler == null)
+                throw new ArgumentNullException(nameof(scheduler));
+
+            return AsyncObservable.Create<TResult>(observer => task.AcceptAsync(observer, scheduler));
+        }
+
+        public static Task<IAsyncDisposable> AcceptAsync(this Task task, IAsyncObserver<Unit> observer) => AcceptAsync(task, observer, ImmediateAsyncScheduler.Instance);
+
+        public static Task<IAsyncDisposable> AcceptAsync(this Task task, IAsyncObserver<Unit> observer, IAsyncScheduler scheduler)
+        {
+            if (task == null)
+                throw new ArgumentNullException(nameof(task));
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (scheduler == null)
+                throw new ArgumentNullException(nameof(scheduler));
+
+            Task<IAsyncDisposable> CompleteAsync()
+            {
+                return scheduler.ScheduleAsync(async ct =>
+                {
+                    if (ct.IsCancellationRequested)
+                    {
+                        return;
+                    }
+
+                    switch (task.Status)
+                    {
+                        case TaskStatus.RanToCompletion:
+                            await observer.OnNextAsync(Unit.Default).RendezVous(scheduler);
+                            await observer.OnCompletedAsync().RendezVous(scheduler);
+                            break;
+                        case TaskStatus.Faulted:
+                            await observer.OnErrorAsync(task.Exception.InnerException).RendezVous(scheduler);
+                            break;
+                        case TaskStatus.Canceled:
+                            await observer.OnErrorAsync(new TaskCanceledException(task)).RendezVous(scheduler);
+                            break;
+                    }
+                });
+            }
+
+            Task<IAsyncDisposable> CoreAsync()
+            {
+                if (task.IsCompleted)
+                {
+                    return CompleteAsync();
+                }
+                else
+                {
+                    var tco = TaskContinuationOptions.None;
+
+                    if (scheduler == ImmediateAsyncScheduler.Instance)
+                    {
+                        tco = TaskContinuationOptions.ExecuteSynchronously;
+                    }
+
+                    var subject = new SequentialAsyncAsyncSubject<Unit>();
+
+                    task.ContinueWith(t => CompleteAsync(), tco);
+
+                    return subject.SubscribeAsync(observer);
+                }
+            }
+
+            return CoreAsync();
+        }
+
+        public static Task<IAsyncDisposable> AcceptAsync<TResult>(this Task<TResult> task, IAsyncObserver<TResult> observer) => AcceptAsync(task, observer, ImmediateAsyncScheduler.Instance);
+
+        public static Task<IAsyncDisposable> AcceptAsync<TResult>(this Task<TResult> task, IAsyncObserver<TResult> observer, IAsyncScheduler scheduler)
+        {
+            if (task == null)
+                throw new ArgumentNullException(nameof(task));
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (scheduler == null)
+                throw new ArgumentNullException(nameof(scheduler));
+
+            Task<IAsyncDisposable> CompleteAsync()
+            {
+                return scheduler.ScheduleAsync(async ct =>
+                {
+                    if (ct.IsCancellationRequested)
+                    {
+                        return;
+                    }
+
+                    switch (task.Status)
+                    {
+                        case TaskStatus.RanToCompletion:
+                            await observer.OnNextAsync(task.Result).RendezVous(scheduler);
+                            await observer.OnCompletedAsync().RendezVous(scheduler);
+                            break;
+                        case TaskStatus.Faulted:
+                            await observer.OnErrorAsync(task.Exception.InnerException).RendezVous(scheduler);
+                            break;
+                        case TaskStatus.Canceled:
+                            await observer.OnErrorAsync(new TaskCanceledException(task)).RendezVous(scheduler);
+                            break;
+                    }
+                });
+            }
+
+            Task<IAsyncDisposable> CoreAsync()
+            {
+                if (task.IsCompleted)
+                {
+                    return CompleteAsync();
+                }
+                else
+                {
+                    var tco = TaskContinuationOptions.None;
+
+                    if (scheduler == ImmediateAsyncScheduler.Instance)
+                    {
+                        tco = TaskContinuationOptions.ExecuteSynchronously;
+                    }
+
+                    var subject = new SequentialAsyncAsyncSubject<TResult>();
+
+                    task.ContinueWith(t => CompleteAsync(), tco);
+
+                    return subject.SubscribeAsync(observer);
+                }
+            }
+
+            return CoreAsync();
+        }
+    }
+}