Răsfoiți Sursa

Implementing time-based constructors for ReplayAsyncSubject.

Bart De Smet 8 ani în urmă
părinte
comite
779e787f62

+ 62 - 14
AsyncRx.NET/System.Reactive.Async.Subjects/System/Reactive/Subjects/ReplayAsyncSubject.cs

@@ -156,7 +156,7 @@ namespace System.Reactive.Subjects
             if (window < TimeSpan.Zero)
                 throw new ArgumentOutOfRangeException(nameof(window));
 
-            throw new NotImplementedException();
+            _impl = new ReplayTime(concurrent, ImmediateAsyncScheduler.Instance, int.MaxValue, window, CreateImmediateObserver);
         }
 
         public ReplayAsyncSubject(bool concurrent, TimeSpan window, IAsyncScheduler scheduler)
@@ -166,7 +166,7 @@ namespace System.Reactive.Subjects
             if (scheduler == null)
                 throw new ArgumentNullException(nameof(scheduler));
 
-            throw new NotImplementedException();
+            _impl = new ReplayTime(concurrent, scheduler, int.MaxValue, window, o => CreateScheduledObserver(o, scheduler));
         }
 
         public ReplayAsyncSubject(bool concurrent, int bufferSize, TimeSpan window)
@@ -176,7 +176,7 @@ namespace System.Reactive.Subjects
             if (bufferSize < 0)
                 throw new ArgumentOutOfRangeException(nameof(bufferSize));
 
-            throw new NotImplementedException();
+            _impl = new ReplayTime(concurrent, ImmediateAsyncScheduler.Instance, bufferSize, window, CreateImmediateObserver);
         }
 
         public ReplayAsyncSubject(bool concurrent, int bufferSize, TimeSpan window, IAsyncScheduler scheduler)
@@ -188,7 +188,7 @@ namespace System.Reactive.Subjects
             if (scheduler == null)
                 throw new ArgumentNullException(nameof(scheduler));
 
-            throw new NotImplementedException();
+            _impl = new ReplayTime(concurrent, scheduler, bufferSize, window, o => CreateScheduledObserver(o, scheduler));
         }
 
         private static IScheduledAsyncObserver<T> CreateImmediateObserver(IAsyncObserver<T> observer) => new FastImmediateAsyncObserver<T>(observer);
@@ -225,7 +225,7 @@ namespace System.Reactive.Subjects
                     if (!_done)
                     {
                         _done = true;
-                        await TrimAsync().ConfigureAwait(false);
+                        Trim();
 
                         observers = _observers.ToArray();
 
@@ -259,7 +259,7 @@ namespace System.Reactive.Subjects
                     {
                         _done = true;
                         _error = error;
-                        await TrimAsync().ConfigureAwait(false);
+                        Trim();
 
                         observers = _observers.ToArray();
 
@@ -292,7 +292,7 @@ namespace System.Reactive.Subjects
                     if (!_done)
                     {
                         await NextAsync(value);
-                        await TrimAsync().ConfigureAwait(false);
+                        Trim();
 
                         observers = _observers.ToArray();
 
@@ -341,7 +341,7 @@ namespace System.Reactive.Subjects
 
                 using (await _lock.LockAsync().ConfigureAwait(false))
                 {
-                    await TrimAsync().ConfigureAwait(false);
+                    Trim();
 
                     count = await ReplayAsync(scheduled).ConfigureAwait(false);
 
@@ -375,7 +375,7 @@ namespace System.Reactive.Subjects
 
             protected abstract Task<int> ReplayAsync(IScheduledAsyncObserver<T> observer);
 
-            protected abstract Task TrimAsync();
+            protected abstract void Trim();
 
             private async Task UnsubscribeAsync(IScheduledAsyncObserver<T> observer)
             {
@@ -442,7 +442,7 @@ namespace System.Reactive.Subjects
                 return 0;
             }
 
-            protected override Task TrimAsync() => Task.CompletedTask;
+            protected override void Trim() { }
         }
 
         private abstract class ReplayManyBase : ReplayBufferBase
@@ -484,14 +484,12 @@ namespace System.Reactive.Subjects
                 _bufferSize = bufferSize;
             }
 
-            protected override Task TrimAsync()
+            protected override void Trim()
             {
                 while (_values.Count > _bufferSize)
                 {
                     _values.Dequeue();
                 }
-
-                return Task.CompletedTask;
             }
         }
 
@@ -502,7 +500,57 @@ namespace System.Reactive.Subjects
             {
             }
 
-            protected override Task TrimAsync() => Task.CompletedTask;
+            protected override void Trim() { }
+        }
+
+        private sealed class ReplayTime : ReplayBufferBase
+        {
+            private readonly IAsyncScheduler _scheduler;
+            private readonly int _bufferSize;
+            private readonly TimeSpan _window;
+            private readonly Queue<Timestamped<T>> _values = new Queue<Timestamped<T>>();
+
+            public ReplayTime(bool concurrent, IAsyncScheduler scheduler, int bufferSize, TimeSpan window, Func<IAsyncObserver<T>, IScheduledAsyncObserver<T>> createObserver)
+                : base(concurrent, createObserver)
+            {
+                _scheduler = scheduler;
+                _bufferSize = bufferSize;
+                _window = window;
+            }
+
+            protected override Task NextAsync(T value)
+            {
+                _values.Enqueue(new Timestamped<T>(value, _scheduler.Now));
+
+                return Task.CompletedTask;
+            }
+
+            protected override async Task<int> ReplayAsync(IScheduledAsyncObserver<T> observer)
+            {
+                var count = _values.Count;
+
+                foreach (var value in _values)
+                {
+                    await observer.OnNextAsync(value.Value).ConfigureAwait(false);
+                }
+
+                return count;
+            }
+
+            protected override void Trim()
+            {
+                while (_values.Count > _bufferSize)
+                {
+                    _values.Dequeue();
+                }
+
+                var threshold = _scheduler.Now - _window;
+
+                while (_values.Count > 0 && _values.Peek().Timestamp < threshold)
+                {
+                    _values.Dequeue();
+                }
+            }
         }
     }
 }