Browse Source

Adding Interval and Timer on observers.

Bart De Smet 8 years ago
parent
commit
1c0122cbb2

+ 33 - 1
AsyncRx.NET/System.Reactive.Async/System/Reactive/Linq/Operators/Interval.cs

@@ -3,12 +3,19 @@
 // See the LICENSE file in the project root for more information. 
 
 using System.Reactive.Concurrency;
+using System.Threading.Tasks;
 
 namespace System.Reactive.Linq
 {
     partial class AsyncObservable
     {
-        public static IAsyncObservable<long> Interval(TimeSpan period) => Interval(period, TaskPoolAsyncScheduler.Default);
+        public static IAsyncObservable<long> Interval(TimeSpan period)
+        {
+            if (period < TimeSpan.Zero)
+                throw new ArgumentOutOfRangeException(nameof(period));
+
+            return Create<long>(observer => AsyncObserver.Interval(observer, period));
+        }
 
         public static IAsyncObservable<long> Interval(TimeSpan period, IAsyncScheduler scheduler)
         {
@@ -17,6 +24,31 @@ namespace System.Reactive.Linq
             if (scheduler == null)
                 throw new ArgumentNullException(nameof(scheduler));
 
+            return Create<long>(observer => AsyncObserver.Interval(observer, period, scheduler));
+        }
+    }
+
+    partial class AsyncObserver
+    {
+        public static Task<IAsyncDisposable> Interval(IAsyncObserver<long> observer, TimeSpan period)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (period < TimeSpan.Zero)
+                throw new ArgumentOutOfRangeException(nameof(period));
+
+            throw new NotImplementedException();
+        }
+
+        public static Task<IAsyncDisposable> Interval(IAsyncObserver<long> observer, TimeSpan period, IAsyncScheduler scheduler)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+            if (period < TimeSpan.Zero)
+                throw new ArgumentOutOfRangeException(nameof(period));
+            if (scheduler == null)
+                throw new ArgumentNullException(nameof(scheduler));
+
             throw new NotImplementedException();
         }
     }

+ 86 - 11
AsyncRx.NET/System.Reactive.Async/System/Reactive/Linq/Operators/Timer.cs

@@ -3,44 +3,119 @@
 // See the LICENSE file in the project root for more information. 
 
 using System.Reactive.Concurrency;
+using System.Threading.Tasks;
 
 namespace System.Reactive.Linq
 {
     partial class AsyncObservable
     {
-        public static IAsyncObservable<long> Timer(TimeSpan dueTime) => Timer(dueTime, TaskPoolAsyncScheduler.Default);
+        public static IAsyncObservable<long> Timer(TimeSpan dueTime)
+        {
+            return Create<long>(observer => AsyncObserver.Timer(observer, dueTime));
+        }
 
         public static IAsyncObservable<long> Timer(TimeSpan dueTime, IAsyncScheduler scheduler)
         {
             if (scheduler == null)
                 throw new ArgumentNullException(nameof(scheduler));
 
-            return Create<long>(observer => scheduler.ScheduleAsync(async ct =>
+            return Create<long>(observer => AsyncObserver.Timer(observer, dueTime, scheduler));
+        }
+
+        public static IAsyncObservable<long> Timer(DateTimeOffset dueTime)
+        {
+            return Create<long>(observer => AsyncObserver.Timer(observer, dueTime));
+        }
+
+        public static IAsyncObservable<long> Timer(DateTimeOffset dueTime, IAsyncScheduler scheduler)
+        {
+            if (scheduler == null)
+                throw new ArgumentNullException(nameof(scheduler));
+
+            return Create<long>(observer => AsyncObserver.Timer(observer, dueTime, scheduler));
+        }
+
+        public static IAsyncObservable<long> Timer(TimeSpan dueTime, TimeSpan period)
+        {
+            if (period < TimeSpan.Zero)
+                throw new ArgumentOutOfRangeException(nameof(period));
+
+            return Create<long>(observer => AsyncObserver.Timer(observer, dueTime, period));
+        }
+
+        public static IAsyncObservable<long> Timer(TimeSpan dueTime, TimeSpan period, IAsyncScheduler scheduler)
+        {
+            if (period < TimeSpan.Zero)
+                throw new ArgumentOutOfRangeException(nameof(period));
+            if (scheduler == null)
+                throw new ArgumentNullException(nameof(scheduler));
+
+            return Create<long>(observer => AsyncObserver.Timer(observer, dueTime, period, scheduler));
+        }
+
+        public static IAsyncObservable<long> Timer(DateTimeOffset dueTime, TimeSpan period)
+        {
+            if (period < TimeSpan.Zero)
+                throw new ArgumentOutOfRangeException(nameof(period));
+
+            return Create<long>(observer => AsyncObserver.Timer(observer, dueTime, period));
+        }
+
+        public static IAsyncObservable<long> Timer(DateTimeOffset dueTime, TimeSpan period, IAsyncScheduler scheduler)
+        {
+            if (period < TimeSpan.Zero)
+                throw new ArgumentOutOfRangeException(nameof(period));
+            if (scheduler == null)
+                throw new ArgumentNullException(nameof(scheduler));
+
+            return Create<long>(observer => AsyncObserver.Timer(observer, dueTime, period, scheduler));
+        }
+    }
+
+    partial class AsyncObserver
+    {
+        public static Task<IAsyncDisposable> Timer(IAsyncObserver<long> observer, TimeSpan dueTime) => Timer(observer, dueTime, TaskPoolAsyncScheduler.Default);
+
+        public static Task<IAsyncDisposable> Timer(IAsyncObserver<long> observer, TimeSpan dueTime, IAsyncScheduler scheduler)
+        {
+            if (scheduler == null)
+                throw new ArgumentNullException(nameof(scheduler));
+
+            return scheduler.ScheduleAsync(async ct =>
             {
                 ct.ThrowIfCancellationRequested();
 
                 await observer.OnNextAsync(0L).RendezVous(scheduler);
-            }, dueTime));
+
+                ct.ThrowIfCancellationRequested();
+
+                await observer.OnCompletedAsync().RendezVous(scheduler);
+            }, dueTime);
+
         }
 
-        public static IAsyncObservable<long> Timer(DateTimeOffset dueTime) => Timer(dueTime, TaskPoolAsyncScheduler.Default);
+        public static Task<IAsyncDisposable> Timer(IAsyncObserver<long> observer, DateTimeOffset dueTime) => Timer(observer, dueTime, TaskPoolAsyncScheduler.Default);
 
-        public static IAsyncObservable<long> Timer(DateTimeOffset dueTime, IAsyncScheduler scheduler)
+        public static Task<IAsyncDisposable> Timer(IAsyncObserver<long> observer, DateTimeOffset dueTime, IAsyncScheduler scheduler)
         {
             if (scheduler == null)
                 throw new ArgumentNullException(nameof(scheduler));
 
-            return Create<long>(observer => scheduler.ScheduleAsync(async ct =>
+            return scheduler.ScheduleAsync(async ct =>
             {
                 ct.ThrowIfCancellationRequested();
 
                 await observer.OnNextAsync(0L).RendezVous(scheduler);
-            }, dueTime));
+
+                ct.ThrowIfCancellationRequested();
+
+                await observer.OnCompletedAsync().RendezVous(scheduler);
+            }, dueTime);
         }
 
-        public static IAsyncObservable<long> Timer(TimeSpan dueTime, TimeSpan period) => Timer(dueTime, period, TaskPoolAsyncScheduler.Default);
+        public static Task<IAsyncDisposable> Timer(IAsyncObserver<long> observer, TimeSpan dueTime, TimeSpan period) => Timer(observer, dueTime, period, TaskPoolAsyncScheduler.Default);
 
-        public static IAsyncObservable<long> Timer(TimeSpan dueTime, TimeSpan period, IAsyncScheduler scheduler)
+        public static Task<IAsyncDisposable> Timer(IAsyncObserver<long> observer, TimeSpan dueTime, TimeSpan period, IAsyncScheduler scheduler)
         {
             if (period < TimeSpan.Zero)
                 throw new ArgumentOutOfRangeException(nameof(period));
@@ -50,9 +125,9 @@ namespace System.Reactive.Linq
             throw new NotImplementedException();
         }
 
-        public static IAsyncObservable<long> Timer(DateTimeOffset dueTime, TimeSpan period) => Timer(dueTime, period, TaskPoolAsyncScheduler.Default);
+        public static Task<IAsyncDisposable> Timer(IAsyncObserver<long> observer, DateTimeOffset dueTime, TimeSpan period) => Timer(observer, dueTime, period, TaskPoolAsyncScheduler.Default);
 
-        public static IAsyncObservable<long> Timer(DateTimeOffset dueTime, TimeSpan period, IAsyncScheduler scheduler)
+        public static Task<IAsyncDisposable> Timer(IAsyncObserver<long> observer, DateTimeOffset dueTime, TimeSpan period, IAsyncScheduler scheduler)
         {
             if (period < TimeSpan.Zero)
                 throw new ArgumentOutOfRangeException(nameof(period));