Jelajahi Sumber

Merge pull request #11 from Blewzman/AddToAsyncEnumerableOverloadForTasks

Add ToAsyncEnumerable overload for tasks.
Matthew Podwysocki 11 tahun lalu
induk
melakukan
92c3670621

+ 34 - 0
Ix.NET/Source/System.Interactive.Async/AsyncEnumerable.Conversions.cs

@@ -63,6 +63,40 @@ namespace System.Linq
             }
         }
 
+        public static IAsyncEnumerable<TSource> ToAsyncEnumerable<TSource>(this Task<TSource> task)
+        {
+            if (task == null)
+                throw new ArgumentNullException("task");
+
+            return Create(() =>
+            {
+                var called = 0;
+
+                return Create(
+                    (ct, tcs) =>
+                    {
+                        if (Interlocked.CompareExchange(ref called, 1, 0) == 0)
+                        {
+                            task.ContinueWith(continuedTask =>
+                            {
+                                if (continuedTask.IsCanceled)
+                                    tcs.SetCanceled();
+                                else if (continuedTask.IsFaulted)
+                                    tcs.SetException(continuedTask.Exception.InnerException);
+                                else
+                                    tcs.SetResult(true);
+                            });
+                        }
+                        else
+                            tcs.SetResult(false);
+
+                        return tcs.Task;
+                    },
+                    () => task.Result,
+                    () => { });
+            });
+        }
+
 #if !NO_RXINTERFACES
         public static IAsyncEnumerable<TSource> ToAsyncEnumerable<TSource>(this IObservable<TSource> source)
         {

+ 39 - 0
Ix.NET/Source/Tests/AsyncTests.Conversions.cs

@@ -4,6 +4,7 @@ using System;
 using System.Collections.Generic;
 using System.Linq;
 using System.Text;
+using System.Threading.Tasks;
 using Microsoft.VisualStudio.TestTools.UnitTesting;
 using System.Threading;
 
@@ -95,6 +96,44 @@ namespace Tests
             AssertThrows<Exception>(() => e.MoveNext().Wait(), ex_ => ((AggregateException)ex_).InnerExceptions.Single() == ex);
         }
 
+        [TestMethod]
+        public async Task ToAsyncEnumerable_with_completed_task()
+        {
+            var task = Task.FromResult(36);
+
+            var xs = task.ToAsyncEnumerable();
+            var e = xs.GetEnumerator();
+
+            Assert.IsTrue(e.MoveNext().Result);
+            Assert.AreEqual(36, e.Current);
+            Assert.IsFalse(e.MoveNext().Result);
+        }
+
+        [TestMethod]
+        public async Task ToAsyncEnumerable_with_faulted_task()
+        {
+            var ex = new InvalidOperationException();
+            var tcs = new TaskCompletionSource<int>();
+            tcs.SetException(ex);
+
+            var xs = tcs.Task.ToAsyncEnumerable();
+            var e = xs.GetEnumerator();
+
+            AssertThrows<Exception>(() => e.MoveNext().Wait(), ex_ => ((AggregateException)ex_).InnerExceptions.Single() == ex);
+        }
+
+        [TestMethod]
+        public async Task ToAsyncEnumerable_with_canceled_task()
+        {
+            var tcs = new TaskCompletionSource<int>();
+            tcs.SetCanceled();
+
+            var xs = tcs.Task.ToAsyncEnumerable();
+            var e = xs.GetEnumerator();
+
+            AssertThrows<Exception>(() => e.MoveNext().Wait(), ex_ => ((AggregateException)ex_).InnerExceptions.Single() is TaskCanceledException);
+        }
+
         class MyObservable<T> : IObservable<T>
         {
             private Func<IObserver<T>, IDisposable> _subscribe;