Pārlūkot izejas kodu

Implement hopping time-based Buffer.

Bart De Smet 8 gadi atpakaļ
vecāks
revīzija
0836017062

+ 6 - 0
AsyncRx.NET/Playground/Program.cs

@@ -16,6 +16,7 @@ namespace Playground
 
         static async Task MainAsync()
         {
+            await BufferAsync();
             await MergeAsync();
             await RangeAsync();
             await ReturnAsync();
@@ -24,6 +25,11 @@ namespace Playground
             await TimerAsync();
         }
 
+        static async Task BufferAsync()
+        {
+            await AsyncObservable.Interval(TimeSpan.FromMilliseconds(300)).Buffer(TimeSpan.FromSeconds(1)).Select(xs => string.Join(", ", xs)).SubscribeAsync(Print<string>()); // TODO: Use ForEachAsync.
+        }
+
         static async Task MergeAsync()
         {
             var subject = new SequentialSimpleAsyncSubject<IAsyncObservable<int>>();

+ 114 - 13
AsyncRx.NET/System.Reactive.Async/System/Reactive/Linq/Operators/Buffer.cs

@@ -4,6 +4,9 @@
 
 using System.Collections.Generic;
 using System.Reactive.Concurrency;
+using System.Reactive.Disposables;
+using System.Threading;
+using System.Threading.Tasks;
 
 namespace System.Reactive.Linq
 {
@@ -38,7 +41,14 @@ namespace System.Reactive.Linq
             if (timeSpan < TimeSpan.Zero)
                 throw new ArgumentNullException(nameof(timeSpan));
 
-            return Create<IList<TSource>>(observer => source.SubscribeAsync(AsyncObserver.Buffer(observer, timeSpan)));
+            return Create<IList<TSource>>(async observer =>
+            {
+                var (sink, timer) = await AsyncObserver.Buffer(observer, timeSpan).ConfigureAwait(false);
+
+                var subscription = await source.SubscribeAsync(sink).ConfigureAwait(false);
+
+                return StableCompositeAsyncDisposable.Create(subscription, timer);
+            });
         }
 
         public static IAsyncObservable<IList<TSource>> Buffer<TSource>(this IAsyncObservable<TSource> source, TimeSpan timeSpan, IAsyncScheduler scheduler)
@@ -50,7 +60,14 @@ namespace System.Reactive.Linq
             if (scheduler == null)
                 throw new ArgumentNullException(nameof(scheduler));
 
-            return Create<IList<TSource>>(observer => source.SubscribeAsync(AsyncObserver.Buffer(observer, timeSpan, scheduler)));
+            return Create<IList<TSource>>(async observer =>
+            {
+                var (sink, timer) = await AsyncObserver.Buffer(observer, timeSpan, scheduler).ConfigureAwait(false);
+
+                var subscription = await source.SubscribeAsync(sink).ConfigureAwait(false);
+
+                return StableCompositeAsyncDisposable.Create(subscription, timer);
+            });
         }
 
         public static IAsyncObservable<IList<TSource>> Buffer<TSource>(this IAsyncObservable<TSource> source, TimeSpan timeSpan, TimeSpan timeShift)
@@ -62,7 +79,14 @@ namespace System.Reactive.Linq
             if (timeShift < TimeSpan.Zero)
                 throw new ArgumentNullException(nameof(timeShift));
 
-            return Create<IList<TSource>>(observer => source.SubscribeAsync(AsyncObserver.Buffer(observer, timeSpan, timeShift)));
+            return Create<IList<TSource>>(async observer =>
+            {
+                var (sink, timer) = await AsyncObserver.Buffer(observer, timeSpan, timeShift).ConfigureAwait(false);
+
+                var subscription = await source.SubscribeAsync(sink).ConfigureAwait(false);
+
+                return StableCompositeAsyncDisposable.Create(subscription, timer);
+            });
         }
 
         public static IAsyncObservable<IList<TSource>> Buffer<TSource>(this IAsyncObservable<TSource> source, TimeSpan timeSpan, TimeSpan timeShift, IAsyncScheduler scheduler)
@@ -76,7 +100,14 @@ namespace System.Reactive.Linq
             if (scheduler == null)
                 throw new ArgumentNullException(nameof(scheduler));
 
-            return Create<IList<TSource>>(observer => source.SubscribeAsync(AsyncObserver.Buffer(observer, timeSpan, timeShift, scheduler)));
+            return Create<IList<TSource>>(async observer =>
+            {
+                var (sink, timer) = await AsyncObserver.Buffer(observer, timeSpan, timeShift, scheduler).ConfigureAwait(false);
+
+                var subscription = await source.SubscribeAsync(sink).ConfigureAwait(false);
+
+                return StableCompositeAsyncDisposable.Create(subscription, timer);
+            });
         }
 
         public static IAsyncObservable<IList<TSource>> Buffer<TSource>(this IAsyncObservable<TSource> source, TimeSpan timeSpan, int count)
@@ -88,7 +119,14 @@ namespace System.Reactive.Linq
             if (count <= 0)
                 throw new ArgumentNullException(nameof(count));
 
-            return Create<IList<TSource>>(observer => source.SubscribeAsync(AsyncObserver.Buffer(observer, timeSpan, count)));
+            return Create<IList<TSource>>(async observer =>
+            {
+                var (sink, timer) = await AsyncObserver.Buffer(observer, timeSpan, count).ConfigureAwait(false);
+
+                var subscription = await source.SubscribeAsync(sink).ConfigureAwait(false);
+
+                return StableCompositeAsyncDisposable.Create(subscription, timer);
+            });
         }
 
         public static IAsyncObservable<IList<TSource>> Buffer<TSource>(this IAsyncObservable<TSource> source, TimeSpan timeSpan, int count, IAsyncScheduler scheduler)
@@ -102,7 +140,14 @@ namespace System.Reactive.Linq
             if (scheduler == null)
                 throw new ArgumentNullException(nameof(scheduler));
 
-            return Create<IList<TSource>>(observer => source.SubscribeAsync(AsyncObserver.Buffer(observer, timeSpan, count, scheduler)));
+            return Create<IList<TSource>>(async observer =>
+            {
+                var (sink, timer) = await AsyncObserver.Buffer(observer, timeSpan, count, scheduler).ConfigureAwait(false);
+
+                var subscription = await source.SubscribeAsync(sink).ConfigureAwait(false);
+
+                return StableCompositeAsyncDisposable.Create(subscription, timer);
+            });
         }
     }
 
@@ -187,10 +232,10 @@ namespace System.Reactive.Linq
             );
         }
 
-        public static IAsyncObserver<TSource> Buffer<TSource>(IAsyncObserver<IList<TSource>> observer, TimeSpan timeSpan) => Buffer(observer, timeSpan, TaskPoolAsyncScheduler.Default);
+        public static Task<(IAsyncObserver<TSource>, IAsyncDisposable)> Buffer<TSource>(IAsyncObserver<IList<TSource>> observer, TimeSpan timeSpan) => Buffer(observer, timeSpan, TaskPoolAsyncScheduler.Default);
 
 
-        public static IAsyncObserver<TSource> Buffer<TSource>(IAsyncObserver<IList<TSource>> observer, TimeSpan timeSpan, IAsyncScheduler scheduler)
+        public static Task<(IAsyncObserver<TSource>, IAsyncDisposable)> Buffer<TSource>(IAsyncObserver<IList<TSource>> observer, TimeSpan timeSpan, IAsyncScheduler scheduler)
         {
             if (observer == null)
                 throw new ArgumentNullException(nameof(observer));
@@ -199,12 +244,68 @@ namespace System.Reactive.Linq
             if (scheduler == null)
                 throw new ArgumentNullException(nameof(scheduler));
 
-            throw new NotImplementedException();
+            return CoreAsync();
+
+            async Task<(IAsyncObserver<TSource>, IAsyncDisposable)> CoreAsync()
+            {
+                var gate = new AsyncLock();
+
+                var buffer = new List<TSource>();
+
+                var sink = Create<TSource>(
+                    async x =>
+                    {
+                        using (await gate.LockAsync().ConfigureAwait(false))
+                        {
+                            buffer.Add(x);
+                        }
+                    },
+                    async ex =>
+                    {
+                        using (await gate.LockAsync().ConfigureAwait(false))
+                        {
+                            await observer.OnErrorAsync(ex).ConfigureAwait(false);
+                        }
+                    },
+                    async () =>
+                    {
+                        using (await gate.LockAsync().ConfigureAwait(false))
+                        {
+                            if (buffer.Count > 0)
+                            {
+                                await observer.OnNextAsync(buffer).ConfigureAwait(false);
+                            }
+
+                            await observer.OnCompletedAsync().ConfigureAwait(false);
+                        }
+                    }
+                );
+
+                var timer = await scheduler.ScheduleAsync(async ct =>
+                {
+                    while (!ct.IsCancellationRequested)
+                    {
+                        using (await gate.LockAsync().ConfigureAwait(false))
+                        {
+                            if (buffer.Count > 0)
+                            {
+                                await observer.OnNextAsync(buffer).ConfigureAwait(false);
+
+                                buffer = new List<TSource>();
+                            }
+                        }
+
+                        await scheduler.Delay(timeSpan, ct).RendezVous(scheduler);
+                    }
+                }, timeSpan);
+
+                return (sink, timer);
+            };
         }
 
-        public static IAsyncObserver<TSource> Buffer<TSource>(IAsyncObserver<IList<TSource>> observer, TimeSpan timeSpan, TimeSpan timeShift) => Buffer(observer, timeSpan, timeShift, TaskPoolAsyncScheduler.Default);
+        public static Task<(IAsyncObserver<TSource>, IAsyncDisposable)> Buffer<TSource>(IAsyncObserver<IList<TSource>> observer, TimeSpan timeSpan, TimeSpan timeShift) => Buffer(observer, timeSpan, timeShift, TaskPoolAsyncScheduler.Default);
 
-        public static IAsyncObserver<TSource> Buffer<TSource>(IAsyncObserver<IList<TSource>> observer, TimeSpan timeSpan, TimeSpan timeShift, IAsyncScheduler scheduler)
+        public static Task<(IAsyncObserver<TSource>, IAsyncDisposable)> Buffer<TSource>(IAsyncObserver<IList<TSource>> observer, TimeSpan timeSpan, TimeSpan timeShift, IAsyncScheduler scheduler)
         {
             if (observer == null)
                 throw new ArgumentNullException(nameof(observer));
@@ -218,9 +319,9 @@ namespace System.Reactive.Linq
             throw new NotImplementedException();
         }
 
-        public static IAsyncObserver<TSource> Buffer<TSource>(IAsyncObserver<IList<TSource>> observer, TimeSpan timeSpan, int count) => Buffer(observer, timeSpan, count, TaskPoolAsyncScheduler.Default);
+        public static Task<(IAsyncObserver<TSource>, IAsyncDisposable)> Buffer<TSource>(IAsyncObserver<IList<TSource>> observer, TimeSpan timeSpan, int count) => Buffer(observer, timeSpan, count, TaskPoolAsyncScheduler.Default);
 
-        public static IAsyncObserver<TSource> Buffer<TSource>(IAsyncObserver<IList<TSource>> observer, TimeSpan timeSpan, int count, IAsyncScheduler scheduler)
+        public static Task<(IAsyncObserver<TSource>, IAsyncDisposable)> Buffer<TSource>(IAsyncObserver<IList<TSource>> observer, TimeSpan timeSpan, int count, IAsyncScheduler scheduler)
         {
             if (observer == null)
                 throw new ArgumentNullException(nameof(observer));