Browse Source

Optimize Aggregate layout.

Bart De Smet 8 years ago
parent
commit
ca2276d0b1

+ 114 - 54
Rx.NET/Source/src/System.Reactive/Linq/Observable/Aggregate.cs

@@ -4,50 +4,56 @@
 
 namespace System.Reactive.Linq.ObservableImpl
 {
-    internal sealed class Aggregate<TSource, TAccumulate, TResult> : Producer<TResult>
+    internal sealed class Aggregate<TSource> : Producer<TSource>
     {
         private readonly IObservable<TSource> _source;
-        private readonly TAccumulate _seed;
-        private readonly Func<TAccumulate, TSource, TAccumulate> _accumulator;
-        private readonly Func<TAccumulate, TResult> _resultSelector;
+        private readonly Func<TSource, TSource, TSource> _accumulator;
 
-        public Aggregate(IObservable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator, Func<TAccumulate, TResult> resultSelector)
+        public Aggregate(IObservable<TSource> source, Func<TSource, TSource, TSource> accumulator)
         {
             _source = source;
-            _seed = seed;
             _accumulator = accumulator;
-            _resultSelector = resultSelector;
         }
 
-        protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
+        protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
-            var sink = new _(this, observer, cancel);
+            var sink = new _(_accumulator, observer, cancel);
             setSink(sink);
             return _source.SubscribeSafe(sink);
         }
 
-        class _ : Sink<TResult>, IObserver<TSource>
+        private sealed class _ : Sink<TSource>, IObserver<TSource>
         {
-            private readonly Aggregate<TSource, TAccumulate, TResult> _parent;
-            private TAccumulate _accumulation;
+            private readonly Func<TSource, TSource, TSource> _accumulator;
+            private TSource _accumulation;
+            private bool _hasAccumulation;
 
-            public _(Aggregate<TSource, TAccumulate, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
+            public _(Func<TSource, TSource, TSource> accumulator, IObserver<TSource> observer, IDisposable cancel)
                 : base(observer, cancel)
             {
-                _parent = parent;
-                _accumulation = _parent._seed;
+                _accumulator = accumulator;
+                _accumulation = default(TSource);
+                _hasAccumulation = false;
             }
 
             public void OnNext(TSource value)
             {
-                try
+                if (!_hasAccumulation)
                 {
-                    _accumulation = _parent._accumulator(_accumulation, value);
+                    _accumulation = value;
+                    _hasAccumulation = true;
                 }
-                catch (Exception exception)
+                else
                 {
-                    base._observer.OnError(exception);
-                    base.Dispose();
+                    try
+                    {
+                        _accumulation = _accumulator(_accumulation, value);
+                    }
+                    catch (Exception exception)
+                    {
+                        base._observer.OnError(exception);
+                        base.Dispose();
+                    }
                 }
             }
 
@@ -59,75 +65,125 @@ namespace System.Reactive.Linq.ObservableImpl
 
             public void OnCompleted()
             {
-                var result = default(TResult);
+                if (!_hasAccumulation)
+                {
+                    base._observer.OnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS));
+                    base.Dispose();
+                }
+                else
+                {
+                    base._observer.OnNext(_accumulation);
+                    base._observer.OnCompleted();
+                    base.Dispose();
+                }
+            }
+        }
+    }
+
+    internal sealed class Aggregate<TSource, TAccumulate> : Producer<TAccumulate>
+    {
+        private readonly IObservable<TSource> _source;
+        private readonly TAccumulate _seed;
+        private readonly Func<TAccumulate, TSource, TAccumulate> _accumulator;
+
+        public Aggregate(IObservable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator)
+        {
+            _source = source;
+            _seed = seed;
+            _accumulator = accumulator;
+        }
+
+        protected override IDisposable Run(IObserver<TAccumulate> observer, IDisposable cancel, Action<IDisposable> setSink)
+        {
+            var sink = new _(_seed, _accumulator, observer, cancel);
+            setSink(sink);
+            return _source.SubscribeSafe(sink);
+        }
+
+        private sealed class _ : Sink<TAccumulate>, IObserver<TSource>
+        {
+            private readonly Func<TAccumulate, TSource, TAccumulate> _accumulator;
+            private TAccumulate _accumulation;
+
+            public _(TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator, IObserver<TAccumulate> observer, IDisposable cancel)
+                : base(observer, cancel)
+            {
+                _accumulator = accumulator;
+                _accumulation = seed;
+            }
+
+            public void OnNext(TSource value)
+            {
                 try
                 {
-                    result = _parent._resultSelector(_accumulation);
+                    _accumulation = _accumulator(_accumulation, value);
                 }
                 catch (Exception exception)
                 {
                     base._observer.OnError(exception);
                     base.Dispose();
-                    return;
                 }
+            }
 
-                base._observer.OnNext(result);
+            public void OnError(Exception error)
+            {
+                base._observer.OnError(error);
+                base.Dispose();
+            }
+
+            public void OnCompleted()
+            {
+                base._observer.OnNext(_accumulation);
                 base._observer.OnCompleted();
                 base.Dispose();
             }
         }
     }
 
-    internal sealed class Aggregate<TSource> : Producer<TSource>
+    internal sealed class Aggregate<TSource, TAccumulate, TResult> : Producer<TResult>
     {
         private readonly IObservable<TSource> _source;
-        private readonly Func<TSource, TSource, TSource> _accumulator;
+        private readonly TAccumulate _seed;
+        private readonly Func<TAccumulate, TSource, TAccumulate> _accumulator;
+        private readonly Func<TAccumulate, TResult> _resultSelector;
 
-        public Aggregate(IObservable<TSource> source, Func<TSource, TSource, TSource> accumulator)
+        public Aggregate(IObservable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator, Func<TAccumulate, TResult> resultSelector)
         {
             _source = source;
+            _seed = seed;
             _accumulator = accumulator;
+            _resultSelector = resultSelector;
         }
 
-        protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
+        protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
             var sink = new _(this, observer, cancel);
             setSink(sink);
             return _source.SubscribeSafe(sink);
         }
 
-        class _ : Sink<TSource>, IObserver<TSource>
+        private sealed class _ : Sink<TResult>, IObserver<TSource>
         {
-            private readonly Aggregate<TSource> _parent;
-            private TSource _accumulation;
-            private bool _hasAccumulation;
+            private readonly Aggregate<TSource, TAccumulate, TResult> _parent;
+            private TAccumulate _accumulation;
 
-            public _(Aggregate<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
+            public _(Aggregate<TSource, TAccumulate, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
                 : base(observer, cancel)
             {
                 _parent = parent;
-                _accumulation = default(TSource);
-                _hasAccumulation = false;
+                _accumulation = _parent._seed;
             }
 
             public void OnNext(TSource value)
             {
-                if (!_hasAccumulation)
+                try
                 {
-                    _accumulation = value;
-                    _hasAccumulation = true;
+                    _accumulation = _parent._accumulator(_accumulation, value);
                 }
-                else
+                catch (Exception exception)
                 {
-                    try
-                    {
-                        _accumulation = _parent._accumulator(_accumulation, value);
-                    }
-                    catch (Exception exception)
-                    {
-                        base._observer.OnError(exception);
-                        base.Dispose();
-                    }
+                    base._observer.OnError(exception);
+                    base.Dispose();
                 }
             }
 
@@ -139,17 +195,21 @@ namespace System.Reactive.Linq.ObservableImpl
 
             public void OnCompleted()
             {
-                if (!_hasAccumulation)
+                var result = default(TResult);
+                try
                 {
-                    base._observer.OnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS));
-                    base.Dispose();
+                    result = _parent._resultSelector(_accumulation);
                 }
-                else
+                catch (Exception exception)
                 {
-                    base._observer.OnNext(_accumulation);
-                    base._observer.OnCompleted();
+                    base._observer.OnError(exception);
                     base.Dispose();
+                    return;
                 }
+
+                base._observer.OnNext(result);
+                base._observer.OnCompleted();
+                base.Dispose();
             }
         }
     }

+ 1 - 1
Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Aggregates.cs

@@ -16,7 +16,7 @@ namespace System.Reactive.Linq
 
         public virtual IObservable<TAccumulate> Aggregate<TSource, TAccumulate>(IObservable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator)
         {
-            return new Aggregate<TSource, TAccumulate, TAccumulate>(source, seed, accumulator, Stubs<TAccumulate>.I);
+            return new Aggregate<TSource, TAccumulate>(source, seed, accumulator);
         }
 
         public virtual IObservable<TResult> Aggregate<TSource, TAccumulate, TResult>(IObservable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator, Func<TAccumulate, TResult> resultSelector)