Przeglądaj źródła

Initial implementation of periodic timers.

Bart De Smet 8 lat temu
rodzic
commit
908685c3e1

+ 8 - 0
AsyncRx.NET/Playground/Program.cs

@@ -10,6 +10,8 @@ namespace Playground
         static void Main()
         {
             MainAsync().GetAwaiter().GetResult();
+
+            Console.ReadLine();
         }
 
         static async Task MainAsync()
@@ -17,6 +19,7 @@ namespace Playground
             await RangeAsync();
             await ReturnAsync();
             await SubjectAsync();
+            await TimerAsync();
         }
 
         static async Task RangeAsync()
@@ -45,6 +48,11 @@ namespace Playground
             await subject.OnCompletedAsync();
         }
 
+        static async Task TimerAsync()
+        {
+            await AsyncObservable.Timer(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(2)).Take(5).Select(_ => DateTimeOffset.Now).SubscribeAsync(Print<DateTimeOffset>()); // TODO: Use ForEachAsync.
+        }
+
         static IAsyncObserver<T> Print<T>()
         {
             return AsyncObserver.Create<T>(

+ 28 - 0
AsyncRx.NET/System.Reactive.Async/System/Reactive/Concurrency/AsyncScheduler.cs

@@ -3,6 +3,7 @@
 // See the LICENSE file in the project root for more information. 
 
 using System.Runtime.CompilerServices;
+using System.Threading;
 using System.Threading.Tasks;
 
 namespace System.Reactive.Concurrency
@@ -20,5 +21,32 @@ namespace System.Reactive.Concurrency
         {
             return task.ConfigureAwait(true);
         }
+
+        public static async Task Delay(this IAsyncScheduler scheduler, TimeSpan dueTime, CancellationToken token = default(CancellationToken))
+        {
+            if (scheduler == null)
+                throw new ArgumentNullException(nameof(scheduler));
+
+            var tcs = new TaskCompletionSource<bool>();
+
+            var task = await scheduler.ScheduleAsync(ct =>
+            {
+                if (ct.IsCancellationRequested)
+                {
+                    tcs.SetCanceled();
+                }
+                else
+                {
+                    tcs.SetResult(true);
+                }
+
+                return Task.CompletedTask;
+            }, dueTime);
+
+            using (token.Register(() => task.DisposeAsync()))
+            {
+                await tcs.Task;
+            }
+        }
     }
 }

+ 30 - 2
AsyncRx.NET/System.Reactive.Async/System/Reactive/Linq/Operators/Timer.cs

@@ -122,7 +122,21 @@ namespace System.Reactive.Linq
             if (scheduler == null)
                 throw new ArgumentNullException(nameof(scheduler));
 
-            throw new NotImplementedException();
+            var tick = 0L;
+
+            return scheduler.ScheduleAsync(async ct =>
+            {
+                ct.ThrowIfCancellationRequested();
+
+                do
+                {
+                    await observer.OnNextAsync(tick++).RendezVous(scheduler);
+
+                    await scheduler.Delay(period, ct).RendezVous(scheduler);
+                } while (!ct.IsCancellationRequested);
+
+                ct.ThrowIfCancellationRequested();
+            }, dueTime);
         }
 
         public static Task<IAsyncDisposable> Timer(IAsyncObserver<long> observer, DateTimeOffset dueTime, TimeSpan period) => Timer(observer, dueTime, period, TaskPoolAsyncScheduler.Default);
@@ -134,7 +148,21 @@ namespace System.Reactive.Linq
             if (scheduler == null)
                 throw new ArgumentNullException(nameof(scheduler));
 
-            throw new NotImplementedException();
+            var tick = 0L;
+
+            return scheduler.ScheduleAsync(async ct =>
+            {
+                ct.ThrowIfCancellationRequested();
+
+                do
+                {
+                    await observer.OnNextAsync(tick++).RendezVous(scheduler);
+
+                    await scheduler.Delay(period, ct).RendezVous(scheduler);
+                } while (!ct.IsCancellationRequested);
+
+                ct.ThrowIfCancellationRequested();
+            }, dueTime);
         }
     }
 }