فهرست منبع

Implementing SkipUntil with time.

Bart De Smet 8 سال پیش
والد
کامیت
314e30a6f8

+ 73 - 13
AsyncRx.NET/System.Reactive.Async/System/Reactive/Linq/Operators/SkipUntil.cs

@@ -32,22 +32,40 @@ namespace System.Reactive.Linq
             });
         }
 
-        public static IAsyncObservable<TSource> SkipUntil<TSource, TUntil>(this IAsyncObservable<TSource> source, DateTimeOffset endTime)
+        public static IAsyncObservable<TSource> SkipUntil<TSource>(this IAsyncObservable<TSource> source, DateTimeOffset endTime)
         {
             if (source == null)
                 throw new ArgumentNullException(nameof(source));
 
-            return Create<TSource>(observer => source.SubscribeAsync(AsyncObserver.SkipUntil(observer, endTime)));
+            // REVIEW: May be easier to just use SkipUntil with a Timer parameter. Do we want SkipUntil on the observer?
+
+            return Create<TSource>(async observer =>
+            {
+                var (sourceObserver, timer) = await AsyncObserver.SkipUntil(observer, endTime);
+
+                var subscription = await source.SubscribeAsync(sourceObserver).ConfigureAwait(false);
+
+                return StableCompositeAsyncDisposable.Create(subscription, timer);
+            });
         }
 
-        public static IAsyncObservable<TSource> SkipUntil<TSource, TUntil>(this IAsyncObservable<TSource> source, DateTimeOffset endTime, IAsyncScheduler scheduler)
+        public static IAsyncObservable<TSource> SkipUntil<TSource>(this IAsyncObservable<TSource> source, DateTimeOffset endTime, IAsyncScheduler scheduler)
         {
             if (source == null)
                 throw new ArgumentNullException(nameof(source));
             if (scheduler == null)
                 throw new ArgumentNullException(nameof(scheduler));
 
-            return Create<TSource>(observer => source.SubscribeAsync(AsyncObserver.SkipUntil(observer, endTime, scheduler)));
+            // REVIEW: May be easier to just use SkipUntil with a Timer parameter. Do we want SkipUntil on the observer?
+
+            return Create<TSource>(async observer =>
+            {
+                var (sourceObserver, timer) = await AsyncObserver.SkipUntil(observer, endTime);
+
+                var subscription = await source.SubscribeAsync(sourceObserver).ConfigureAwait(false);
+
+                return StableCompositeAsyncDisposable.Create(subscription, timer);
+            });
         }
     }
 
@@ -109,22 +127,64 @@ namespace System.Reactive.Linq
                 );
         }
 
-        public static IAsyncObserver<TSource> SkipUntil<TSource>(IAsyncObserver<TSource> observer, DateTimeOffset endTime)
-        {
-            if (observer == null)
-                throw new ArgumentNullException(nameof(observer));
-
-            throw new NotImplementedException();
-        }
+        public static Task<(IAsyncObserver<TSource>, IAsyncDisposable)> SkipUntil<TSource>(IAsyncObserver<TSource> observer, DateTimeOffset endTime) => SkipUntil(observer, endTime, TaskPoolAsyncScheduler.Default);
 
-        public static IAsyncObserver<TSource> SkipUntil<TSource>(IAsyncObserver<TSource> observer, DateTimeOffset endTime, IAsyncScheduler scheduler)
+        public static Task<(IAsyncObserver<TSource>, IAsyncDisposable)> SkipUntil<TSource>(IAsyncObserver<TSource> observer, DateTimeOffset endTime, IAsyncScheduler scheduler)
         {
             if (observer == null)
                 throw new ArgumentNullException(nameof(observer));
             if (scheduler == null)
                 throw new ArgumentNullException(nameof(scheduler));
 
-            throw new NotImplementedException();
+            return CoreAsync();
+
+            async Task<(IAsyncObserver<TSource>, IAsyncDisposable)> CoreAsync()
+            {
+                // REVIEW: May be easier to just use SkipUntil with a Timer parameter. Do we want SkipUntil on the observer?
+                // DESIGN: It seems that if an observer would be an IAsyncDisposable, this could get a bit easier ("inject" the inner disposable).
+
+                var gate = new AsyncLock();
+                var open = false;
+
+                return
+                    (
+                        Create<TSource>(
+                            async x =>
+                            {
+                                using (await gate.LockAsync().ConfigureAwait(false))
+                                {
+                                    if (open)
+                                    {
+                                        await observer.OnNextAsync(x).ConfigureAwait(false);
+                                    }
+                                }
+                            },
+                            async ex =>
+                            {
+                                using (await gate.LockAsync().ConfigureAwait(false))
+                                {
+                                    await observer.OnErrorAsync(ex).ConfigureAwait(false);
+                                }
+                            },
+                            async () =>
+                            {
+                                using (await gate.LockAsync().ConfigureAwait(false))
+                                {
+                                    await observer.OnCompletedAsync().ConfigureAwait(false);
+                                }
+                            }
+                        ),
+                        await scheduler.ScheduleAsync(async ct =>
+                        {
+                            ct.ThrowIfCancellationRequested();
+
+                            using (await gate.LockAsync().RendezVous(scheduler))
+                            {
+                                open = true;
+                            }
+                        }, endTime)
+                    );
+            }
         }
     }
 }

+ 20 - 15
AsyncRx.NET/System.Reactive.Async/System/Reactive/Linq/Operators/TakeUntil.cs

@@ -125,31 +125,36 @@ namespace System.Reactive.Linq
 
         public static Task<(IAsyncObserver<TSource>, IAsyncDisposable)> TakeUntil<TSource>(IAsyncObserver<TSource> observer, DateTimeOffset endTime) => TakeUntil(observer, endTime, TaskPoolAsyncScheduler.Default);
 
-        public static async Task<(IAsyncObserver<TSource>, IAsyncDisposable)> TakeUntil<TSource>(IAsyncObserver<TSource> observer, DateTimeOffset endTime, IAsyncScheduler scheduler)
+        public static Task<(IAsyncObserver<TSource>, IAsyncDisposable)> TakeUntil<TSource>(IAsyncObserver<TSource> observer, DateTimeOffset endTime, IAsyncScheduler scheduler)
         {
             if (observer == null)
                 throw new ArgumentNullException(nameof(observer));
             if (scheduler == null)
                 throw new ArgumentNullException(nameof(scheduler));
 
-            // REVIEW: May be easier to just use TakeUntil with a Timer parameter. Do we want TakeUntil on the observer?
-            // DESIGN: It seems that if an observer would be an IAsyncDisposable, this could get a bit easier ("inject" the inner disposable).
+            return CoreAsync();
 
-            var gate = new AsyncLock();
+            async Task<(IAsyncObserver<TSource>, IAsyncDisposable)> CoreAsync()
+            {
+                // REVIEW: May be easier to just use TakeUntil with a Timer parameter. Do we want TakeUntil on the observer?
+                // DESIGN: It seems that if an observer would be an IAsyncDisposable, this could get a bit easier ("inject" the inner disposable).
 
-            return
-                (
-                    Synchronize(observer, gate),
-                    await scheduler.ScheduleAsync(async ct =>
-                    {
-                        ct.ThrowIfCancellationRequested();
+                var gate = new AsyncLock();
 
-                        using (await gate.LockAsync().RendezVous(scheduler))
+                return
+                    (
+                        Synchronize(observer, gate),
+                        await scheduler.ScheduleAsync(async ct =>
                         {
-                            await observer.OnCompletedAsync().RendezVous(scheduler);
-                        }
-                    }, endTime)
-                );
+                            ct.ThrowIfCancellationRequested();
+
+                            using (await gate.LockAsync().RendezVous(scheduler))
+                            {
+                                await observer.OnCompletedAsync().RendezVous(scheduler);
+                            }
+                        }, endTime)
+                    );
+            }
         }
     }
 }