Browse Source

Adding Range.

Bart De Smet 8 years ago
parent
commit
c9732235ee

+ 6 - 0
AsyncRx.NET/Playground/Program.cs

@@ -14,10 +14,16 @@ namespace Playground
 
         static async Task MainAsync()
         {
+            await RangeAsync();
             await ReturnAsync();
             await SubjectAsync();
         }
 
+        static async Task RangeAsync()
+        {
+            await AsyncObservable.Range(0, 10).SubscribeAsync(Print<int>());
+        }
+
         static async Task ReturnAsync()
         {
             await AsyncObservable.Return(42).SubscribeAsync(Print<int>());

+ 14 - 10
AsyncRx.NET/System.Reactive.Async/System/Reactive/Linq/Operators/Range.cs

@@ -8,15 +8,7 @@ namespace System.Reactive.Linq
 {
     partial class AsyncObservable
     {
-        public static IAsyncObservable<int> Range(int start, int count)
-        {
-            var max = ((long)start) + count - 1;
-
-            if (count < 0 || max > int.MaxValue)
-                throw new ArgumentOutOfRangeException(nameof(count));
-
-            throw new NotImplementedException();
-        }
+        public static IAsyncObservable<int> Range(int start, int count) => Range(start, count, TaskPoolAsyncScheduler.Default);
 
         public static IAsyncObservable<int> Range(int start, int count, IAsyncScheduler scheduler)
         {
@@ -28,7 +20,19 @@ namespace System.Reactive.Linq
             if (scheduler == null)
                 throw new ArgumentNullException(nameof(scheduler));
 
-            throw new NotImplementedException();
+            return Create<int>(observer => scheduler.ScheduleAsync(async ct =>
+            {
+                ct.ThrowIfCancellationRequested();
+
+                for (int i = start, end = start + count - 1; i <= end && !ct.IsCancellationRequested; i++)
+                {
+                    await observer.OnNextAsync(i);
+                }
+
+                ct.ThrowIfCancellationRequested();
+
+                await observer.OnCompletedAsync();
+            }));
         }
     }
 }