Sfoglia il codice sorgente

Using scheduler in Return.

Bart De Smet 8 anni fa
parent
commit
dca0b2912b

+ 24 - 8
AsyncRx.NET/Playground/Program.cs

@@ -13,12 +13,35 @@ namespace Playground
         }
 
         static async Task MainAsync()
+        {
+            await ReturnAsync();
+            await SubjectAsync();
+        }
+
+        static async Task ReturnAsync()
+        {
+            await AsyncObservable.Return(42).SubscribeAsync(Print<int>());
+        }
+
+        static async Task SubjectAsync()
         {
             var subject = new SequentialSimpleAsyncSubject<int>();
 
             var res = subject.Where(x => x % 2 == 0).Select(x => x + 1);
 
-            await res.SubscribeAsync(
+            await res.SubscribeAsync(Print<int>());
+
+            for (var i = 0; i < 10; i++)
+            {
+                await subject.OnNextAsync(i);
+            }
+
+            await subject.OnCompletedAsync();
+        }
+
+        static IAsyncObserver<T> Print<T>()
+        {
+            return AsyncObserver.Create<T>(
                 x =>
                 {
                     Console.WriteLine(x);
@@ -35,13 +58,6 @@ namespace Playground
                     return Task.CompletedTask;
                 }
             );
-
-            for (var i = 0; i < 10; i++)
-            {
-                await subject.OnNextAsync(i);
-            }
-
-            await subject.OnCompletedAsync();
         }
     }
 }

+ 1 - 1
AsyncRx.NET/System.Reactive.Async/System/Reactive/Concurrency/TaskPoolAsyncScheduler.cs

@@ -38,7 +38,7 @@ namespace System.Reactive.Concurrency
 
             task.Unwrap().ContinueWith(t =>
             {
-                if (t.Exception != null)
+                if (!t.IsCanceled && t.Exception != null)
                 {
                     // TODO: Call event handler.
                 }

+ 9 - 8
AsyncRx.NET/System.Reactive.Async/System/Reactive/Linq/Operators/Return.cs

@@ -3,7 +3,6 @@
 // See the LICENSE file in the project root for more information. 
 
 using System.Reactive.Concurrency;
-using System.Reactive.Disposables;
 
 namespace System.Reactive.Linq
 {
@@ -11,12 +10,7 @@ namespace System.Reactive.Linq
     {
         public static IAsyncObservable<TSource> Return<TSource>(TSource value)
         {
-            return Create<TSource>(async observer =>
-            {
-                await observer.OnNextAsync(value).ConfigureAwait(false);
-                await observer.OnCompletedAsync().ConfigureAwait(false);
-                return AsyncDisposable.Nop;
-            });
+            return Return(value, ImmediateAsyncScheduler.Instance);
         }
 
         public static IAsyncObservable<TSource> Return<TSource>(TSource value, IAsyncScheduler scheduler)
@@ -24,7 +18,14 @@ namespace System.Reactive.Linq
             if (scheduler == null)
                 throw new ArgumentNullException(nameof(scheduler));
 
-            throw new NotImplementedException();
+            return Create<TSource>(observer => scheduler.ScheduleAsync(async ct =>
+            {
+                ct.ThrowIfCancellationRequested();
+                await observer.OnNextAsync(value).ConfigureAwait(false);
+
+                ct.ThrowIfCancellationRequested();
+                await observer.OnCompletedAsync().ConfigureAwait(false);
+            }));
         }
     }
 }