Browse Source

Improve Buffer via dedicated exact/skip implementations (#665)

* 4.x: Improve Buffer via dedicated exact/skip/overlap implementations

* Use Queue.Clear directly

* Use null check for clarity in exact
David Karnok 7 years ago
parent
commit
657da13e39

+ 42 - 0
Rx.NET/Source/benchmarks/Benchmarks.System.Reactive/BufferCountBenchmark.cs

@@ -0,0 +1,42 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information.
+
+using System;
+using System.Collections.Generic;
+using System.Reactive.Linq;
+using System.Threading;
+using BenchmarkDotNet.Attributes;
+
+namespace Benchmarks.System.Reactive
+{
+    [MemoryDiagnoser]
+    public class BufferCountBenchmark
+    {
+        IList<int> _store;
+
+        [Benchmark]
+        public void Exact()
+        {
+            Observable.Range(1, 1000)
+                .Buffer(1)
+                .Subscribe(v => Volatile.Write(ref _store, v));
+        }
+
+        [Benchmark]
+        public void Skip()
+        {
+            Observable.Range(1, 1000)
+                .Buffer(1, 2)
+                .Subscribe(v => Volatile.Write(ref _store, v));
+        }
+
+        [Benchmark]
+        public void Overlap()
+        {
+            Observable.Range(1, 1000)
+                .Buffer(2, 1)
+                .Subscribe(v => Volatile.Write(ref _store, v));
+        }
+    }
+}

+ 2 - 1
Rx.NET/Source/benchmarks/Benchmarks.System.Reactive/Program.cs

@@ -14,7 +14,8 @@ namespace Benchmarks.System.Reactive
             var switcher = new BenchmarkSwitcher(new[] {
                 typeof(ZipBenchmark),
                 typeof(CombineLatestBenchmark),
-                typeof(SwitchBenchmark)
+                typeof(SwitchBenchmark),
+                typeof(BufferCountBenchmark)
             });
 
             switcher.Run();

+ 171 - 20
Rx.NET/Source/src/System.Reactive/Linq/Observable/Buffer.cs

@@ -10,45 +10,196 @@ namespace System.Reactive.Linq.ObservableImpl
 {
     internal static class Buffer<TSource>
     {
-        internal sealed class Count : Producer<IList<TSource>, Count._>
+        internal sealed class CountExact : Producer<IList<TSource>, CountExact.ExactSink>
+        {
+            readonly IObservable<TSource> _source;
+
+            readonly int _count;
+
+            public CountExact(IObservable<TSource> source, int count)
+            {
+                _source = source;
+                _count = count;
+            }
+
+            protected override ExactSink CreateSink(IObserver<IList<TSource>> observer) => new ExactSink(observer, _count);
+
+            protected override void Run(ExactSink sink) => sink.Run(_source);
+
+            internal sealed class ExactSink : Sink<TSource, IList<TSource>>
+            {
+                readonly int _count;
+
+                int _index;
+
+                IList<TSource> _buffer;
+
+                internal ExactSink(IObserver<IList<TSource>> observer, int count) : base(observer)
+                {
+                    _count = count;
+                }
+
+                public override void OnNext(TSource value)
+                {
+                    var buffer = _buffer;
+                    if (buffer == null)
+                    {
+                        buffer = new List<TSource>();
+                        _buffer = buffer;
+                    }
+
+                    buffer.Add(value);
+
+                    var idx = _index + 1;
+                    if (idx == _count)
+                    {
+                        _buffer = null;
+                        _index = 0;
+                        ForwardOnNext(buffer);
+                    }
+                    else
+                    {
+                        _index = idx;
+                    }
+                }
+
+                public override void OnError(Exception error)
+                {
+                    _buffer = null;
+                    ForwardOnError(error);
+                }
+
+                public override void OnCompleted()
+                {
+                    var buffer = _buffer;
+                    _buffer = null;
+
+                    if (buffer != null)
+                    {
+                        ForwardOnNext(buffer);
+                    }
+                    ForwardOnCompleted();
+                }   
+            }
+        }
+
+        internal sealed class CountSkip : Producer<IList<TSource>, CountSkip.SkipSink>
+        {
+            readonly IObservable<TSource> _source;
+
+            readonly int _count;
+
+            readonly int _skip;
+
+            public CountSkip(IObservable<TSource> source, int count, int skip)
+            {
+                _source = source;
+                _count = count;
+                _skip = skip;
+            }
+
+            protected override SkipSink CreateSink(IObserver<IList<TSource>> observer) => new SkipSink(observer, _count, _skip);
+
+            protected override void Run(SkipSink sink) => sink.Run(_source);
+
+            internal sealed class SkipSink : Sink<TSource, IList<TSource>>
+            {
+                readonly int _count;
+
+                readonly int _skip;
+
+                int _index;
+
+                IList<TSource> _buffer;
+
+                internal SkipSink(IObserver<IList<TSource>> observer, int count, int skip) : base(observer)
+                {
+                    _count = count;
+                    _skip = skip;
+                }
+
+                public override void OnNext(TSource value)
+                {
+                    var idx = _index;
+                    var buffer = _buffer;
+                    if (idx == 0)
+                    {
+                        buffer = new List<TSource>();
+                        _buffer = buffer;
+                    }
+
+                    buffer?.Add(value);
+
+                    if (++idx == _count)
+                    {
+                        _buffer = null;
+                        ForwardOnNext(buffer);
+                    }
+
+                    if (idx == _skip)
+                    {
+                        _index = 0;
+                    }
+                    else
+                    {
+                        _index = idx;
+                    }
+                }
+
+                public override void OnError(Exception error)
+                {
+                    _buffer = null;
+                    ForwardOnError(error);
+                }
+
+                public override void OnCompleted()
+                {
+                    var buffer = _buffer;
+                    _buffer = null;
+
+                    if (buffer != null)
+                    {
+                        ForwardOnNext(buffer);
+                    }
+                    ForwardOnCompleted();
+                }
+            }
+        }
+
+        internal sealed class CountOverlap : Producer<IList<TSource>, CountOverlap.OverlapSink>
         {
             private readonly IObservable<TSource> _source;
             private readonly int _count;
             private readonly int _skip;
 
-            public Count(IObservable<TSource> source, int count, int skip)
+            public CountOverlap(IObservable<TSource> source, int count, int skip)
             {
                 _source = source;
                 _count = count;
                 _skip = skip;
             }
 
-            protected override _ CreateSink(IObserver<IList<TSource>> observer) => new _(this, observer);
+            protected override OverlapSink CreateSink(IObserver<IList<TSource>> observer) => new OverlapSink(observer, _count, _skip);
 
-            protected override void Run(_ sink) => sink.Run(_source);
+            protected override void Run(OverlapSink sink) => sink.Run(_source);
 
-            internal sealed class _ : Sink<TSource, IList<TSource>> 
+            internal sealed class OverlapSink : Sink<TSource, IList<TSource>> 
             {
-                private readonly Queue<IList<TSource>> _queue = new Queue<IList<TSource>>();
+                private readonly Queue<IList<TSource>> _queue;
 
                 private readonly int _count;
                 private readonly int _skip;
 
-                public _(Count parent, IObserver<IList<TSource>> observer)
-                    : base(observer)
-                {
-                    _count = parent._count;
-                    _skip = parent._skip;
-                }
-
-                private int _n;
+                int _index;
+                int _n;
 
-                public override void Run(IObservable<TSource> source)
+                public OverlapSink(IObserver<IList<TSource>> observer, int count, int skip)
+                    : base(observer)
                 {
-                    _n = 0;
-
+                    _queue = new Queue<IList<TSource>>();
+                    _count = count;
+                    _skip = skip;
                     CreateWindow();
-                    base.Run(source);
                 }
 
                 private void CreateWindow()
@@ -77,8 +228,8 @@ namespace System.Reactive.Linq.ObservableImpl
 
                 public override void OnError(Exception error)
                 {
-                    while (_queue.Count > 0)
-                        _queue.Dequeue().Clear();
+                    // just drop the ILists on the GC floor, no reason to clear them
+                    _queue.Clear();
 
                     ForwardOnError(error);
                 }

+ 11 - 7
Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Single.cs

@@ -50,17 +50,21 @@ namespace System.Reactive.Linq
 
         public virtual IObservable<IList<TSource>> Buffer<TSource>(IObservable<TSource> source, int count)
         {
-            return Buffer_<TSource>(source, count, count);
+            return new Buffer<TSource>.CountExact(source, count);
         }
 
         public virtual IObservable<IList<TSource>> Buffer<TSource>(IObservable<TSource> source, int count, int skip)
         {
-            return Buffer_<TSource>(source, count, skip);
-        }
-
-        private static IObservable<IList<TSource>> Buffer_<TSource>(IObservable<TSource> source, int count, int skip)
-        {
-            return new Buffer<TSource>.Count(source, count, skip);
+            if (count > skip)
+            {
+                return new Buffer<TSource>.CountOverlap(source, count, skip);
+            }
+            else if (count < skip)
+            {
+                return new Buffer<TSource>.CountSkip(source, count, skip);
+            }
+            // count == skip
+            return new Buffer<TSource>.CountExact(source, count);
         }
 
         #endregion