Browse Source

Optimizing layouts of SkipWhile

Bart De Smet 8 years ago
parent
commit
d969ef4c82
1 changed files with 92 additions and 86 deletions
  1. 92 86
      Rx.NET/Source/src/System.Reactive/Linq/Observable/SkipWhile.cs

+ 92 - 86
Rx.NET/Source/src/System.Reactive/Linq/Observable/SkipWhile.cs

@@ -4,133 +4,139 @@
 
 namespace System.Reactive.Linq.ObservableImpl
 {
-    internal sealed class SkipWhile<TSource> : Producer<TSource>
+    internal static class SkipWhile<TSource>
     {
-        private readonly IObservable<TSource> _source;
-        private readonly Func<TSource, bool> _predicate;
-        private readonly Func<TSource, int, bool> _predicateI;
-
-        public SkipWhile(IObservable<TSource> source, Func<TSource, bool> predicate)
-        {
-            _source = source;
-            _predicate = predicate;
-        }
-
-        public SkipWhile(IObservable<TSource> source, Func<TSource, int, bool> predicate)
+        internal sealed class Predicate : Producer<TSource>
         {
-            _source = source;
-            _predicateI = predicate;
-        }
+            private readonly IObservable<TSource> _source;
+            private readonly Func<TSource, bool> _predicate;
 
-        protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
-        {
-            if (_predicate != null)
+            public Predicate(IObservable<TSource> source, Func<TSource, bool> predicate)
             {
-                var sink = new _(this, observer, cancel);
-                setSink(sink);
-                return _source.SubscribeSafe(sink);
+                _source = source;
+                _predicate = predicate;
             }
-            else
+
+            protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
-                var sink = new SkipWhileImpl(this, observer, cancel);
+                var sink = new _(_predicate, observer, cancel);
                 setSink(sink);
                 return _source.SubscribeSafe(sink);
             }
-        }
 
-        class _ : Sink<TSource>, IObserver<TSource>
-        {
-            private readonly SkipWhile<TSource> _parent;
-            private bool _running;
-
-            public _(SkipWhile<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
-                : base(observer, cancel)
+            private sealed class _ : Sink<TSource>, IObserver<TSource>
             {
-                _parent = parent;
-                _running = false;
-            }
+                private readonly Func<TSource, bool> _predicate;
+                private bool _running;
 
-            public void OnNext(TSource value)
-            {
-                if (!_running)
+                public _(Func<TSource, bool> predicate, IObserver<TSource> observer, IDisposable cancel)
+                    : base(observer, cancel)
+                {
+                    _predicate = predicate;
+                    _running = false;
+                }
+
+                public void OnNext(TSource value)
                 {
-                    try
+                    if (!_running)
                     {
-                        _running = !_parent._predicate(value);
+                        try
+                        {
+                            _running = !_predicate(value);
+                        }
+                        catch (Exception exception)
+                        {
+                            base._observer.OnError(exception);
+                            base.Dispose();
+                            return;
+                        }
                     }
-                    catch (Exception exception)
+
+                    if (_running)
                     {
-                        base._observer.OnError(exception);
-                        base.Dispose();
-                        return;
+                        base._observer.OnNext(value);
                     }
                 }
 
-                if (_running)
+                public void OnError(Exception error)
                 {
-                    base._observer.OnNext(value);
+                    base._observer.OnError(error);
+                    base.Dispose();
                 }
-            }
-
-            public void OnError(Exception error)
-            {
-                base._observer.OnError(error);
-                base.Dispose();
-            }
 
-            public void OnCompleted()
-            {
-                base._observer.OnCompleted();
-                base.Dispose();
+                public void OnCompleted()
+                {
+                    base._observer.OnCompleted();
+                    base.Dispose();
+                }
             }
         }
 
-        class SkipWhileImpl : Sink<TSource>, IObserver<TSource>
+        internal sealed class PredicateIndexed : Producer<TSource>
         {
-            private readonly SkipWhile<TSource> _parent;
-            private bool _running;
-            private int _index;
+            private readonly IObservable<TSource> _source;
+            private readonly Func<TSource, int, bool> _predicate;
 
-            public SkipWhileImpl(SkipWhile<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
-                : base(observer, cancel)
+            public PredicateIndexed(IObservable<TSource> source, Func<TSource, int, bool> predicate)
             {
-                _parent = parent;
-                _running = false;
-                _index = 0;
+                _source = source;
+                _predicate = predicate;
             }
 
-            public void OnNext(TSource value)
+            protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
-                if (!_running)
+                var sink = new _(_predicate, observer, cancel);
+                setSink(sink);
+                return _source.SubscribeSafe(sink);
+            }
+
+            private sealed class _ : Sink<TSource>, IObserver<TSource>
+            {
+                private readonly Func<TSource, int, bool> _predicate;
+                private bool _running;
+                private int _index;
+
+                public _(Func<TSource, int, bool> predicate, IObserver<TSource> observer, IDisposable cancel)
+                    : base(observer, cancel)
                 {
-                    try
+                    _predicate = predicate;
+                    _running = false;
+                    _index = 0;
+                }
+
+                public void OnNext(TSource value)
+                {
+                    if (!_running)
                     {
-                        _running = !_parent._predicateI(value, checked(_index++));
+                        try
+                        {
+                            _running = !_predicate(value, checked(_index++));
+                        }
+                        catch (Exception exception)
+                        {
+                            base._observer.OnError(exception);
+                            base.Dispose();
+                            return;
+                        }
                     }
-                    catch (Exception exception)
+
+                    if (_running)
                     {
-                        base._observer.OnError(exception);
-                        base.Dispose();
-                        return;
+                        base._observer.OnNext(value);
                     }
                 }
 
-                if (_running)
+                public void OnError(Exception error)
                 {
-                    base._observer.OnNext(value);
+                    base._observer.OnError(error);
+                    base.Dispose();
                 }
-            }
-
-            public void OnError(Exception error)
-            {
-                base._observer.OnError(error);
-                base.Dispose();
-            }
 
-            public void OnCompleted()
-            {
-                base._observer.OnCompleted();
-                base.Dispose();
+                public void OnCompleted()
+                {
+                    base._observer.OnCompleted();
+                    base.Dispose();
+                }
             }
         }
     }