Переглянути джерело

Implement Empty, Range, and Return on observers.

Bart De Smet 8 роки тому
батько
коміт
f06bd034d7

+ 22 - 3
AsyncRx.NET/System.Reactive.Async/System/Reactive/Linq/Operators/Empty.cs

@@ -3,24 +3,43 @@
 // See the LICENSE file in the project root for more information. 
 
 using System.Reactive.Concurrency;
+using System.Threading.Tasks;
 
 namespace System.Reactive.Linq
 {
     partial class AsyncObservable
     {
-        public static IAsyncObservable<TSource> Empty<TSource>() => Empty<TSource>(ImmediateAsyncScheduler.Instance);
+        public static IAsyncObservable<TSource> Empty<TSource>()
+        {
+            return Create<TSource>(observer => AsyncObserver.Empty(observer));
+        }
 
         public static IAsyncObservable<TSource> Empty<TSource>(IAsyncScheduler scheduler)
         {
             if (scheduler == null)
                 throw new ArgumentNullException(nameof(scheduler));
 
-            return Create<TSource>(observer => scheduler.ScheduleAsync(async ct =>
+            return Create<TSource>(observer => AsyncObserver.Empty(observer, scheduler));
+        }
+    }
+
+    partial class AsyncObserver
+    {
+        public static Task<IAsyncDisposable> Empty<TSource>(IAsyncObserver<TSource> observer) => Empty(observer, ImmediateAsyncScheduler.Instance);
+
+        public static Task<IAsyncDisposable> Empty<TSource>(IAsyncObserver<TSource> observer, IAsyncScheduler scheduler)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (scheduler == null)
+                throw new ArgumentNullException(nameof(scheduler));
+
+            return scheduler.ScheduleAsync(async ct =>
             {
                 ct.ThrowIfCancellationRequested();
 
                 await observer.OnCompletedAsync().RendezVous(scheduler);
-            }));
+            });
         }
     }
 }

+ 27 - 6
AsyncRx.NET/System.Reactive.Async/System/Reactive/Linq/Operators/Range.cs

@@ -3,24 +3,45 @@
 // See the LICENSE file in the project root for more information. 
 
 using System.Reactive.Concurrency;
+using System.Threading.Tasks;
 
 namespace System.Reactive.Linq
 {
     partial class AsyncObservable
     {
-        public static IAsyncObservable<int> Range(int start, int count) => Range(start, count, TaskPoolAsyncScheduler.Default);
+        public static IAsyncObservable<int> Range(int start, int count)
+        {
+            if (count < 0 || ((long)start) + count - 1 > int.MaxValue)
+                throw new ArgumentOutOfRangeException(nameof(count));
+
+            return Create<int>(observer => AsyncObserver.Range(observer, start, count));
+        }
 
         public static IAsyncObservable<int> Range(int start, int count, IAsyncScheduler scheduler)
         {
-            var max = ((long)start) + count - 1;
-
-            if (count < 0 || max > int.MaxValue)
+            if (count < 0 || ((long)start) + count - 1 > int.MaxValue)
                 throw new ArgumentOutOfRangeException(nameof(count));
+            if (scheduler == null)
+                throw new ArgumentNullException(nameof(scheduler));
 
+            return Create<int>(observer => AsyncObserver.Range(observer, start, count, scheduler));
+        }
+    }
+
+    partial class AsyncObserver
+    {
+        public static Task<IAsyncDisposable> Range(IAsyncObserver<int> observer, int start, int count) => Range(observer, start, count, TaskPoolAsyncScheduler.Default);
+
+        public static Task<IAsyncDisposable> Range(IAsyncObserver<int> observer, int start, int count, IAsyncScheduler scheduler)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (count < 0 || ((long)start) + count - 1 > int.MaxValue)
+                throw new ArgumentOutOfRangeException(nameof(count));
             if (scheduler == null)
                 throw new ArgumentNullException(nameof(scheduler));
 
-            return Create<int>(observer => scheduler.ScheduleAsync(async ct =>
+            return scheduler.ScheduleAsync(async ct =>
             {
                 ct.ThrowIfCancellationRequested();
 
@@ -32,7 +53,7 @@ namespace System.Reactive.Linq
                 ct.ThrowIfCancellationRequested();
 
                 await observer.OnCompletedAsync().RendezVous(scheduler);
-            }));
+            });
         }
     }
 }

+ 19 - 3
AsyncRx.NET/System.Reactive.Async/System/Reactive/Linq/Operators/Return.cs

@@ -3,6 +3,7 @@
 // See the LICENSE file in the project root for more information. 
 
 using System.Reactive.Concurrency;
+using System.Threading.Tasks;
 
 namespace System.Reactive.Linq
 {
@@ -10,7 +11,7 @@ namespace System.Reactive.Linq
     {
         public static IAsyncObservable<TSource> Return<TSource>(TSource value)
         {
-            return Return(value, ImmediateAsyncScheduler.Instance);
+            return Create<TSource>(observer => AsyncObserver.Return(observer, value));
         }
 
         public static IAsyncObservable<TSource> Return<TSource>(TSource value, IAsyncScheduler scheduler)
@@ -18,7 +19,22 @@ namespace System.Reactive.Linq
             if (scheduler == null)
                 throw new ArgumentNullException(nameof(scheduler));
 
-            return Create<TSource>(observer => scheduler.ScheduleAsync(async ct =>
+            return Create<TSource>(observer => AsyncObserver.Return(observer, value, scheduler));
+        }
+    }
+
+    partial class AsyncObserver
+    {
+        public static Task<IAsyncDisposable> Return<TSource>(IAsyncObserver<TSource> observer, TSource value) => Return(observer, value, ImmediateAsyncScheduler.Instance);
+
+        public static Task<IAsyncDisposable> Return<TSource>(IAsyncObserver<TSource> observer, TSource value, IAsyncScheduler scheduler)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (scheduler == null)
+                throw new ArgumentNullException(nameof(scheduler));
+
+            return scheduler.ScheduleAsync(async ct =>
             {
                 ct.ThrowIfCancellationRequested();
 
@@ -27,7 +43,7 @@ namespace System.Reactive.Linq
                 ct.ThrowIfCancellationRequested();
 
                 await observer.OnCompletedAsync().RendezVous(scheduler);
-            }));
+            });
         }
     }
 }