Browse Source

Using scheduler in Throw.

Bart De Smet 8 years ago
parent
commit
d9a948f1da

+ 23 - 7
AsyncRx.NET/System.Reactive.Async/System/Reactive/Linq/Operators/Throw.cs

@@ -3,7 +3,7 @@
 // See the LICENSE file in the project root for more information. 
 
 using System.Reactive.Concurrency;
-using System.Reactive.Disposables;
+using System.Threading.Tasks;
 
 namespace System.Reactive.Linq
 {
@@ -14,11 +14,7 @@ namespace System.Reactive.Linq
             if (error == null)
                 throw new ArgumentNullException(nameof(error));
 
-            return Create<TSource>(async observer =>
-            {
-                await observer.OnErrorAsync(error).ConfigureAwait(false);
-                return AsyncDisposable.Nop;
-            });
+            return Create<TSource>(observer => AsyncObserver.Throw(observer, error));
         }
 
         public static IAsyncObservable<TSource> Throw<TSource>(Exception error, IAsyncScheduler scheduler)
@@ -28,7 +24,27 @@ namespace System.Reactive.Linq
             if (scheduler == null)
                 throw new ArgumentNullException(nameof(scheduler));
 
-            throw new NotImplementedException();
+            return Create<TSource>(observer => AsyncObserver.Throw(observer, error, scheduler));
+        }
+    }
+
+    partial class AsyncObserver
+    {
+        public static Task<IAsyncDisposable> Throw<TSource>(IAsyncObserver<TSource> observer, Exception error) => Throw(observer, error, ImmediateAsyncScheduler.Instance);
+
+        public static Task<IAsyncDisposable> Throw<TSource>(IAsyncObserver<TSource> observer, Exception error, IAsyncScheduler scheduler)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (scheduler == null)
+                throw new ArgumentNullException(nameof(scheduler));
+
+            return scheduler.ScheduleAsync(async ct =>
+            {
+                ct.ThrowIfCancellationRequested();
+
+                await observer.OnErrorAsync(error).RendezVous(scheduler);
+            });
         }
     }
 }