Ver código fonte

Implementing Buffer with sliding time.

Bart De Smet 8 anos atrás
pai
commit
57a1cfcb3b

+ 22 - 3
AsyncRx.NET/Playground/Program.cs

@@ -1,4 +1,6 @@
 using System;
+using System.Linq;
+using System.Reactive.Concurrency;
 using System.Reactive.Linq;
 using System.Reactive.Subjects;
 using System.Threading.Tasks;
@@ -16,7 +18,8 @@ namespace Playground
 
         static async Task MainAsync()
         {
-            await BufferAsync();
+            await BufferTimeHoppingAsync();
+            await BufferTimeSlidingAsync();
             await MergeAsync();
             await RangeAsync();
             await ReturnAsync();
@@ -25,9 +28,25 @@ namespace Playground
             await TimerAsync();
         }
 
-        static async Task BufferAsync()
+        static async Task BufferTimeHoppingAsync()
         {
-            await AsyncObservable.Interval(TimeSpan.FromMilliseconds(300)).Buffer(TimeSpan.FromSeconds(1)).Select(xs => string.Join(", ", xs)).SubscribeAsync(Print<string>()); // TODO: Use ForEachAsync.
+            await
+                AsyncObservable
+                    .Interval(TimeSpan.FromMilliseconds(300))
+                    .Buffer(TimeSpan.FromSeconds(1))
+                    .Select(xs => string.Join(", ", xs))
+                    .SubscribeAsync(Print<string>()); // TODO: Use ForEachAsync.
+        }
+
+        static async Task BufferTimeSlidingAsync()
+        {
+            await
+                AsyncObservable
+                    .Interval(TimeSpan.FromMilliseconds(100))
+                    .Timestamp(TaskPoolAsyncScheduler.Default)
+                    .Buffer(TimeSpan.FromSeconds(1), TimeSpan.FromMilliseconds(300))
+                    .Select(xs => $"[{xs.First().Timestamp}, {xs.Last().Timestamp}] = {(xs.Last().Timestamp - xs.First().Timestamp).TotalMilliseconds}")
+                    .SubscribeAsync(Print<string>()); // TODO: Use ForEachAsync.
         }
 
         static async Task MergeAsync()

+ 126 - 2
AsyncRx.NET/System.Reactive.Async/System/Reactive/Linq/Operators/Buffer.cs

@@ -316,7 +316,122 @@ namespace System.Reactive.Linq
             if (scheduler == null)
                 throw new ArgumentNullException(nameof(scheduler));
 
-            throw new NotImplementedException();
+            return CoreAsync();
+
+            async Task<(IAsyncObserver<TSource>, IAsyncDisposable)> CoreAsync()
+            {
+                var gate = new AsyncLock();
+
+                var queue = new Queue<List<TSource>>();
+
+                queue.Enqueue(new List<TSource>());
+
+                var sink = Create<TSource>(
+                    async x =>
+                    {
+                        using (await gate.LockAsync().ConfigureAwait(false))
+                        {
+                            foreach (var buffer in queue)
+                            {
+                                buffer.Add(x);
+                            }
+                        }
+                    },
+                    async ex =>
+                    {
+                        using (await gate.LockAsync().ConfigureAwait(false))
+                        {
+                            await observer.OnErrorAsync(ex).ConfigureAwait(false);
+                        }
+                    },
+                    async () =>
+                    {
+                        using (await gate.LockAsync().ConfigureAwait(false))
+                        {
+                            while (queue.Count > 0)
+                            {
+                                var buffer = queue.Dequeue();
+
+                                if (buffer.Count > 0)
+                                {
+                                    await observer.OnNextAsync(buffer).ConfigureAwait(false);
+                                }
+                            }
+
+                            await observer.OnCompletedAsync().ConfigureAwait(false);
+                        }
+                    }
+                );
+
+                var nextOpen = timeShift;
+                var nextClose = timeSpan;
+                var totalTime = TimeSpan.Zero;
+
+                var isOpen = false;
+                var isClose = false;
+
+                TimeSpan GetNextDue()
+                {
+                    if (nextOpen == nextClose)
+                    {
+                        isOpen = isClose = true;
+                    }
+                    else if (nextClose < nextOpen)
+                    {
+                        isClose = true;
+                        isOpen = false;
+                    }
+                    else
+                    {
+                        isOpen = true;
+                        isClose = false;
+                    }
+
+                    var newTotalTime = isClose ? nextClose : nextOpen;
+                    var due = newTotalTime - totalTime;
+                    totalTime = newTotalTime;
+
+                    if (isOpen)
+                    {
+                        nextOpen += timeShift;
+                    }
+
+                    if (isClose)
+                    {
+                        nextClose += timeShift;
+                    }
+
+                    return due;
+                }
+
+                var timer = await scheduler.ScheduleAsync(async ct =>
+                {
+                    while (!ct.IsCancellationRequested)
+                    {
+                        using (await gate.LockAsync().ConfigureAwait(false))
+                        {
+                            if (isClose)
+                            {
+                                var buffer = queue.Dequeue();
+
+                                if (buffer.Count > 0)
+                                {
+                                    await observer.OnNextAsync(buffer).RendezVous(scheduler);
+                                }
+                            }
+
+                            if (isOpen)
+                            {
+                                queue.Enqueue(new List<TSource>());
+                            }
+                        }
+
+                        await scheduler.Delay(GetNextDue(), ct).RendezVous(scheduler);
+                    }
+                }, GetNextDue());
+
+                return (sink, timer);
+            };
         }
 
         public static Task<(IAsyncObserver<TSource>, IAsyncDisposable)> Buffer<TSource>(IAsyncObserver<IList<TSource>> observer, TimeSpan timeSpan, int count) => Buffer(observer, timeSpan, count, TaskPoolAsyncScheduler.Default);
@@ -332,7 +447,16 @@ namespace System.Reactive.Linq
             if (scheduler == null)
                 throw new ArgumentNullException(nameof(scheduler));
 
-            throw new NotImplementedException();
+            return CoreAsync();
+
+            async Task<(IAsyncObserver<TSource>, IAsyncDisposable)> CoreAsync()
+            {
+                var gate = new AsyncLock();
+
+                var queue = new Queue<List<TSource>>();
+
+                throw new NotImplementedException();
+            };
         }
     }
 }