Browse Source

Implementing Skip and Take with time.

Bart De Smet 8 years ago
parent
commit
7c770084c0

+ 74 - 5
AsyncRx.NET/System.Reactive.Async/System/Reactive/Linq/Operators/Skip.cs

@@ -3,6 +3,9 @@
 // See the LICENSE file in the project root for more information. 
 
 using System.Reactive.Concurrency;
+using System.Reactive.Disposables;
+using System.Threading;
+using System.Threading.Tasks;
 
 namespace System.Reactive.Linq
 {
@@ -35,7 +38,16 @@ namespace System.Reactive.Linq
                 return source;
             }
 
-            return Create<TSource>(observer => source.SubscribeAsync(AsyncObserver.Skip(observer, duration)));
+            // REVIEW: May be easier to just use SkipUntil with a Timer parameter. Do we want Skip on the observer?
+
+            return Create<TSource>(async observer =>
+            {
+                var (sourceObserver, timer) = await AsyncObserver.Skip(observer, duration);
+
+                var subscription = await source.SubscribeAsync(sourceObserver).ConfigureAwait(false);
+
+                return StableCompositeAsyncDisposable.Create(subscription, timer);
+            });
         }
 
         public static IAsyncObservable<TSource> Skip<TSource>(this IAsyncObservable<TSource> source, TimeSpan duration, IAsyncScheduler scheduler)
@@ -52,7 +64,16 @@ namespace System.Reactive.Linq
                 return source;
             }
 
-            return Create<TSource>(observer => source.SubscribeAsync(AsyncObserver.Skip(observer, duration, scheduler)));
+            // REVIEW: May be easier to just use SkipUntil with a Timer parameter. Do we want Skip on the observer?
+
+            return Create<TSource>(async observer =>
+            {
+                var (sourceObserver, timer) = await AsyncObserver.Skip(observer, duration);
+
+                var subscription = await source.SubscribeAsync(sourceObserver).ConfigureAwait(false);
+
+                return StableCompositeAsyncDisposable.Create(subscription, timer);
+            });
         }
     }
 
@@ -82,9 +103,9 @@ namespace System.Reactive.Linq
             );
         }
 
-        public static IAsyncObserver<TSource> Skip<TSource>(IAsyncObserver<TSource> observer, TimeSpan duration) => Skip(observer, duration, TaskPoolAsyncScheduler.Default);
+        public static Task<(IAsyncObserver<TSource>, IAsyncDisposable)> Skip<TSource>(IAsyncObserver<TSource> observer, TimeSpan duration) => Skip(observer, duration, TaskPoolAsyncScheduler.Default);
 
-        public static IAsyncObserver<TSource> Skip<TSource>(IAsyncObserver<TSource> observer, TimeSpan duration, IAsyncScheduler scheduler)
+        public static Task<(IAsyncObserver<TSource>, IAsyncDisposable)> Skip<TSource>(IAsyncObserver<TSource> observer, TimeSpan duration, IAsyncScheduler scheduler)
         {
             if (observer == null)
                 throw new ArgumentNullException(nameof(observer));
@@ -93,7 +114,55 @@ namespace System.Reactive.Linq
             if (scheduler == null)
                 throw new ArgumentNullException(nameof(scheduler));
 
-            throw new NotImplementedException();
+            return CoreAsync();
+
+            async Task<(IAsyncObserver<TSource>, IAsyncDisposable)> CoreAsync()
+            {
+                // REVIEW: May be easier to just use SkipUntil with a Timer parameter. Do we want Skip on the observer?
+                // DESIGN: It seems that if an observer would be an IAsyncDisposable, this could get a bit easier ("inject" the inner disposable).
+
+                var gate = new AsyncLock();
+                var open = false;
+
+                return
+                    (
+                        Create<TSource>(
+                            async x =>
+                            {
+                                using (await gate.LockAsync().ConfigureAwait(false))
+                                {
+                                    if (open)
+                                    {
+                                        await observer.OnNextAsync(x).ConfigureAwait(false);
+                                    }
+                                }
+                            },
+                            async ex =>
+                            {
+                                using (await gate.LockAsync().ConfigureAwait(false))
+                                {
+                                    await observer.OnErrorAsync(ex).ConfigureAwait(false);
+                                }
+                            },
+                            async () =>
+                            {
+                                using (await gate.LockAsync().ConfigureAwait(false))
+                                {
+                                    await observer.OnCompletedAsync().ConfigureAwait(false);
+                                }
+                            }
+                        ),
+                        await scheduler.ScheduleAsync(async ct =>
+                        {
+                            ct.ThrowIfCancellationRequested();
+
+                            using (await gate.LockAsync().RendezVous(scheduler))
+                            {
+                                open = true;
+                            }
+                        }, duration)
+                    );
+            }
         }
     }
 }

+ 48 - 5
AsyncRx.NET/System.Reactive.Async/System/Reactive/Linq/Operators/Take.cs

@@ -3,6 +3,9 @@
 // See the LICENSE file in the project root for more information. 
 
 using System.Reactive.Concurrency;
+using System.Reactive.Disposables;
+using System.Threading;
+using System.Threading.Tasks;
 
 namespace System.Reactive.Linq
 {
@@ -35,7 +38,16 @@ namespace System.Reactive.Linq
                 return Empty<TSource>();
             }
 
-            return Create<TSource>(observer => source.SubscribeAsync(AsyncObserver.Take(observer, duration)));
+            // REVIEW: May be easier to just use TakeUntil with a Timer parameter. Do we want Take on the observer?
+
+            return Create<TSource>(async observer =>
+            {
+                var (sourceObserver, timer) = await AsyncObserver.Take(observer, duration);
+
+                var subscription = await source.SubscribeAsync(sourceObserver).ConfigureAwait(false);
+
+                return StableCompositeAsyncDisposable.Create(subscription, timer);
+            });
         }
 
         public static IAsyncObservable<TSource> Take<TSource>(this IAsyncObservable<TSource> source, TimeSpan duration, IAsyncScheduler scheduler)
@@ -52,7 +64,16 @@ namespace System.Reactive.Linq
                 return Empty<TSource>();
             }
 
-            return Create<TSource>(observer => source.SubscribeAsync(AsyncObserver.Take(observer, duration, scheduler)));
+            // REVIEW: May be easier to just use TakeUntil with a Timer parameter. Do we want Take on the observer?
+
+            return Create<TSource>(async observer =>
+            {
+                var (sourceObserver, timer) = await AsyncObserver.Take(observer, duration);
+
+                var subscription = await source.SubscribeAsync(sourceObserver).ConfigureAwait(false);
+
+                return StableCompositeAsyncDisposable.Create(subscription, timer);
+            });
         }
     }
 
@@ -82,9 +103,9 @@ namespace System.Reactive.Linq
             );
         }
 
-        public static IAsyncObserver<TSource> Take<TSource>(IAsyncObserver<TSource> observer, TimeSpan duration) => Take(observer, duration, TaskPoolAsyncScheduler.Default);
+        public static Task<(IAsyncObserver<TSource>, IAsyncDisposable)> Take<TSource>(IAsyncObserver<TSource> observer, TimeSpan duration) => Take(observer, duration, TaskPoolAsyncScheduler.Default);
 
-        public static IAsyncObserver<TSource> Take<TSource>(IAsyncObserver<TSource> observer, TimeSpan duration, IAsyncScheduler scheduler)
+        public static Task<(IAsyncObserver<TSource>, IAsyncDisposable)> Take<TSource>(IAsyncObserver<TSource> observer, TimeSpan duration, IAsyncScheduler scheduler)
         {
             if (observer == null)
                 throw new ArgumentNullException(nameof(observer));
@@ -93,7 +114,29 @@ namespace System.Reactive.Linq
             if (scheduler == null)
                 throw new ArgumentNullException(nameof(scheduler));
 
-            throw new NotImplementedException();
+            return CoreAsync();
+
+            async Task<(IAsyncObserver<TSource>, IAsyncDisposable)> CoreAsync()
+            {
+                // REVIEW: May be easier to just use TakeUntil with a Timer parameter. Do we want TakeUntil on the observer?
+                // DESIGN: It seems that if an observer would be an IAsyncDisposable, this could get a bit easier ("inject" the inner disposable).
+
+                var gate = new AsyncLock();
+
+                return
+                    (
+                        Synchronize(observer, gate),
+                        await scheduler.ScheduleAsync(async ct =>
+                        {
+                            ct.ThrowIfCancellationRequested();
+
+                            using (await gate.LockAsync().RendezVous(scheduler))
+                            {
+                                await observer.OnCompletedAsync().RendezVous(scheduler);
+                            }
+                        }, duration)
+                    );
+            }
         }
     }
 }