Browse Source

Implementing TakeUntil with time.

Bart De Smet 8 years ago
parent
commit
fa885cdb8b

+ 6 - 0
AsyncRx.NET/Playground/Program.cs

@@ -19,6 +19,7 @@ namespace Playground
             await RangeAsync();
             await ReturnAsync();
             await SubjectAsync();
+            await TakeUntilAsync();
             await TimerAsync();
         }
 
@@ -48,6 +49,11 @@ namespace Playground
             await subject.OnCompletedAsync();
         }
 
+        static async Task TakeUntilAsync()
+        {
+            await AsyncObservable.Range(0, int.MaxValue).TakeUntil(DateTimeOffset.Now.AddSeconds(5)).SubscribeAsync(Print<int>()); // TODO: Use ForEachAsync.
+        }
+
         static async Task TimerAsync()
         {
             await AsyncObservable.Timer(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(2)).Take(5).Select(_ => DateTimeOffset.Now).SubscribeAsync(Print<DateTimeOffset>()); // TODO: Use ForEachAsync.

+ 42 - 13
AsyncRx.NET/System.Reactive.Async/System/Reactive/Linq/Operators/TakeUntil.cs

@@ -32,22 +32,40 @@ namespace System.Reactive.Linq
             });
         }
 
-        public static IAsyncObservable<TSource> TakeUntil<TSource, TUntil>(this IAsyncObservable<TSource> source, DateTimeOffset endTime)
+        public static IAsyncObservable<TSource> TakeUntil<TSource>(this IAsyncObservable<TSource> source, DateTimeOffset endTime)
         {
             if (source == null)
                 throw new ArgumentNullException(nameof(source));
 
-            return Create<TSource>(observer => source.SubscribeAsync(AsyncObserver.TakeUntil(observer, endTime)));
+            // REVIEW: May be easier to just use TakeUntil with a Timer parameter. Do we want TakeUntil on the observer?
+
+            return Create<TSource>(async observer =>
+            {
+                var (sourceObserver, timer) = await AsyncObserver.TakeUntil(observer, endTime);
+
+                var subscription = await source.SubscribeAsync(sourceObserver).ConfigureAwait(false);
+
+                return StableCompositeAsyncDisposable.Create(subscription, timer);
+            });
         }
 
-        public static IAsyncObservable<TSource> TakeUntil<TSource, TUntil>(this IAsyncObservable<TSource> source, DateTimeOffset endTime, IAsyncScheduler scheduler)
+        public static IAsyncObservable<TSource> TakeUntil<TSource>(this IAsyncObservable<TSource> source, DateTimeOffset endTime, IAsyncScheduler scheduler)
         {
             if (source == null)
                 throw new ArgumentNullException(nameof(source));
             if (scheduler == null)
                 throw new ArgumentNullException(nameof(scheduler));
 
-            return Create<TSource>(observer => source.SubscribeAsync(AsyncObserver.TakeUntil(observer, endTime, scheduler)));
+            // REVIEW: May be easier to just use TakeUntil with a Timer parameter. Do we want TakeUntil on the observer?
+
+            return Create<TSource>(async observer =>
+            {
+                var (sourceObserver, timer) = await AsyncObserver.TakeUntil(observer, endTime, scheduler);
+
+                var subscription = await source.SubscribeAsync(sourceObserver).ConfigureAwait(false);
+
+                return StableCompositeAsyncDisposable.Create(subscription, timer);
+            });
         }
     }
 
@@ -105,22 +123,33 @@ namespace System.Reactive.Linq
                 );
         }
 
-        public static IAsyncObserver<TSource> TakeUntil<TSource>(IAsyncObserver<TSource> observer, DateTimeOffset endTime)
-        {
-            if (observer == null)
-                throw new ArgumentNullException(nameof(observer));
-
-            throw new NotImplementedException();
-        }
+        public static Task<(IAsyncObserver<TSource>, IAsyncDisposable)> TakeUntil<TSource>(IAsyncObserver<TSource> observer, DateTimeOffset endTime) => TakeUntil(observer, endTime, TaskPoolAsyncScheduler.Default);
 
-        public static IAsyncObserver<TSource> TakeUntil<TSource>(IAsyncObserver<TSource> observer, DateTimeOffset endTime, IAsyncScheduler scheduler)
+        public static async Task<(IAsyncObserver<TSource>, IAsyncDisposable)> TakeUntil<TSource>(IAsyncObserver<TSource> observer, DateTimeOffset endTime, IAsyncScheduler scheduler)
         {
             if (observer == null)
                 throw new ArgumentNullException(nameof(observer));
             if (scheduler == null)
                 throw new ArgumentNullException(nameof(scheduler));
 
-            throw new NotImplementedException();
+            // REVIEW: May be easier to just use TakeUntil with a Timer parameter. Do we want TakeUntil on the observer?
+            // DESIGN: It seems that if an observer would be an IAsyncDisposable, this could get a bit easier ("inject" the inner disposable).
+
+            var gate = new AsyncLock();
+
+            return
+                (
+                    Synchronize(observer, gate),
+                    await scheduler.ScheduleAsync(async ct =>
+                    {
+                        ct.ThrowIfCancellationRequested();
+
+                        using (await gate.LockAsync().RendezVous(scheduler))
+                        {
+                            await observer.OnCompletedAsync().RendezVous(scheduler);
+                        }
+                    }, endTime)
+                );
         }
     }
 }