Преглед изворни кода

Implement TakeLast using scheduler.

Bart De Smet пре 8 година
родитељ
комит
8d70dff78b

+ 85 - 46
AsyncRx.NET/System.Reactive.Async/System/Reactive/Linq/Operators/TakeLast.cs

@@ -4,6 +4,7 @@
 
 using System.Collections.Generic;
 using System.Reactive.Concurrency;
+using System.Reactive.Disposables;
 using System.Threading.Tasks;
 
 namespace System.Reactive.Linq
@@ -22,7 +23,14 @@ namespace System.Reactive.Linq
                 return Empty<TSource>();
             }
 
-            return Create<TSource>(observer => source.SubscribeAsync(AsyncObserver.TakeLast(observer, count)));
+            return Create<TSource>(async observer =>
+            {
+                var (sink, drain) = AsyncObserver.TakeLast(observer, count);
+
+                var subscription = await source.SubscribeAsync(sink).ConfigureAwait(false);
+
+                return StableCompositeAsyncDisposable.Create(subscription, drain);
+            });
         }
 
         public static IAsyncObservable<TSource> TakeLast<TSource>(this IAsyncObservable<TSource> source, int count, IAsyncScheduler scheduler)
@@ -39,7 +47,14 @@ namespace System.Reactive.Linq
                 return Empty<TSource>();
             }
 
-            return Create<TSource>(observer => source.SubscribeAsync(AsyncObserver.TakeLast(observer, count, scheduler)));
+            return Create<TSource>(async observer =>
+            {
+                var (sink, drain) = AsyncObserver.TakeLast(observer, count, scheduler);
+
+                var subscription = await source.SubscribeAsync(sink).ConfigureAwait(false);
+
+                return StableCompositeAsyncDisposable.Create(subscription, drain);
+            });
         }
 
         public static IAsyncObservable<TSource> TakeLast<TSource>(this IAsyncObservable<TSource> source, TimeSpan duration)
@@ -54,7 +69,14 @@ namespace System.Reactive.Linq
                 return Empty<TSource>();
             }
 
-            return Create<TSource>(observer => source.SubscribeAsync(AsyncObserver.TakeLast(observer, duration)));
+            return Create<TSource>(async observer =>
+            {
+                var (sink, drain) = AsyncObserver.TakeLast(observer, duration);
+
+                var subscription = await source.SubscribeAsync(sink).ConfigureAwait(false);
+
+                return StableCompositeAsyncDisposable.Create(subscription, drain);
+            });
         }
 
         public static IAsyncObservable<TSource> TakeLast<TSource>(this IAsyncObservable<TSource> source, TimeSpan duration, IClock clock)
@@ -71,7 +93,14 @@ namespace System.Reactive.Linq
                 return Empty<TSource>();
             }
 
-            return Create<TSource>(observer => source.SubscribeAsync(AsyncObserver.TakeLast(observer, duration, clock)));
+            return Create<TSource>(async observer =>
+            {
+                var (sink, drain) = AsyncObserver.TakeLast(observer, duration, clock);
+
+                var subscription = await source.SubscribeAsync(sink).ConfigureAwait(false);
+
+                return StableCompositeAsyncDisposable.Create(subscription, drain);
+            });
         }
 
         public static IAsyncObservable<TSource> TakeLast<TSource>(this IAsyncObservable<TSource> source, TimeSpan duration, IClock clock, IAsyncScheduler scheduler)
@@ -90,7 +119,14 @@ namespace System.Reactive.Linq
                 return Empty<TSource>();
             }
 
-            return Create<TSource>(observer => source.SubscribeAsync(AsyncObserver.TakeLast(observer, duration, clock, scheduler)));
+            return Create<TSource>(async observer =>
+            {
+                var (sink, drain) = AsyncObserver.TakeLast(observer, duration, clock, scheduler);
+
+                var subscription = await source.SubscribeAsync(sink).ConfigureAwait(false);
+
+                return StableCompositeAsyncDisposable.Create(subscription, drain);
+            });
         }
 
         public static IAsyncObservable<TSource> TakeLast<TSource>(this IAsyncObservable<TSource> source, TimeSpan duration, IAsyncScheduler scheduler) => TakeLast(source, duration, scheduler, scheduler);
@@ -98,43 +134,9 @@ namespace System.Reactive.Linq
 
     partial class AsyncObserver
     {
-        public static IAsyncObserver<TSource> TakeLast<TSource>(IAsyncObserver<TSource> observer, int count)
-        {
-            if (observer == null)
-                throw new ArgumentNullException(nameof(observer));
-            if (count <= 0)
-                throw new ArgumentOutOfRangeException(nameof(count));
-
-            var queue = new Queue<TSource>();
-
-            return Create<TSource>(
-                x =>
-                {
-                    queue.Enqueue(x);
-
-                    if (queue.Count > count)
-                    {
-                        queue.Dequeue();
-                    }
-
-                    return Task.CompletedTask;
-                },
-                observer.OnErrorAsync,
-                async () =>
-                {
-                    var n = queue.Count;
-
-                    while (queue.Count > 0)
-                    {
-                        await observer.OnNextAsync(queue.Dequeue()).ConfigureAwait(false);
-                    }
-
-                    await observer.OnCompletedAsync().ConfigureAwait(false);
-                }
-            );
-        }
+        public static (IAsyncObserver<TSource>, IAsyncDisposable) TakeLast<TSource>(IAsyncObserver<TSource> observer, int count) => TakeLast(observer, count, TaskPoolAsyncScheduler.Default);
 
-        public static IAsyncObserver<TSource> TakeLast<TSource>(IAsyncObserver<TSource> observer, int count, IAsyncScheduler scheduler)
+        public static (IAsyncObserver<TSource>, IAsyncDisposable) TakeLast<TSource>(IAsyncObserver<TSource> observer, int count, IAsyncScheduler scheduler)
         {
             if (observer == null)
                 throw new ArgumentNullException(nameof(observer));
@@ -143,16 +145,53 @@ namespace System.Reactive.Linq
             if (scheduler == null)
                 throw new ArgumentNullException(nameof(scheduler));
 
-            throw new NotImplementedException();
+            var sad = new SingleAssignmentAsyncDisposable();
+
+            var queue = new Queue<TSource>();
+
+            return
+                (
+                    Create<TSource>(
+                        x =>
+                        {
+                            queue.Enqueue(x);
+
+                            if (queue.Count > count)
+                            {
+                                queue.Dequeue();
+                            }
+
+                            return Task.CompletedTask;
+                        },
+                        observer.OnErrorAsync,
+                        async () =>
+                        {
+                            var drain = await scheduler.ScheduleAsync(async ct =>
+                            {
+                                while (!ct.IsCancellationRequested && queue.Count > 0)
+                                {
+                                    await observer.OnNextAsync(queue.Dequeue()).RendezVous(scheduler);
+                                }
+
+                                ct.ThrowIfCancellationRequested();
+
+                                await observer.OnCompletedAsync().RendezVous(scheduler);
+                            }).ConfigureAwait(false);
+
+                            await sad.AssignAsync(drain).ConfigureAwait(false);
+                        }
+                    ),
+                    sad
+                );
         }
 
-        public static IAsyncObserver<TSource> TakeLast<TSource>(IAsyncObserver<TSource> observer, TimeSpan duration) => TakeLast(observer, duration, TaskPoolAsyncScheduler.Default, TaskPoolAsyncScheduler.Default);
+        public static (IAsyncObserver<TSource>, IAsyncDisposable) TakeLast<TSource>(IAsyncObserver<TSource> observer, TimeSpan duration) => TakeLast(observer, duration, TaskPoolAsyncScheduler.Default, TaskPoolAsyncScheduler.Default);
 
-        public static IAsyncObserver<TSource> TakeLast<TSource>(IAsyncObserver<TSource> observer, TimeSpan duration, IAsyncScheduler scheduler) => TakeLast(observer, duration, scheduler, scheduler);
+        public static (IAsyncObserver<TSource>, IAsyncDisposable) TakeLast<TSource>(IAsyncObserver<TSource> observer, TimeSpan duration, IAsyncScheduler scheduler) => TakeLast(observer, duration, scheduler, scheduler);
 
-        public static IAsyncObserver<TSource> TakeLast<TSource>(IAsyncObserver<TSource> observer, TimeSpan duration, IClock clock) => TakeLast(observer, duration, clock, TaskPoolAsyncScheduler.Default);
+        public static (IAsyncObserver<TSource>, IAsyncDisposable) TakeLast<TSource>(IAsyncObserver<TSource> observer, TimeSpan duration, IClock clock) => TakeLast(observer, duration, clock, TaskPoolAsyncScheduler.Default);
 
-        public static IAsyncObserver<TSource> TakeLast<TSource>(IAsyncObserver<TSource> observer, TimeSpan duration, IClock clock, IAsyncScheduler scheduler)
+        public static (IAsyncObserver<TSource>, IAsyncDisposable) TakeLast<TSource>(IAsyncObserver<TSource> observer, TimeSpan duration, IClock clock, IAsyncScheduler scheduler)
         {
             if (observer == null)
                 throw new ArgumentNullException(nameof(observer));