浏览代码

Optimizing layouts of Skip and Take.

Bart De Smet 8 年之前
父节点
当前提交
21b8d55db4

+ 111 - 109
Rx.NET/Source/src/System.Reactive/Linq/Observable/Skip.cs

@@ -7,143 +7,145 @@ using System.Reactive.Disposables;
 
 namespace System.Reactive.Linq.ObservableImpl
 {
-    internal sealed class Skip<TSource> : Producer<TSource>
+    internal static class Skip<TSource>
     {
-        private readonly IObservable<TSource> _source;
-        private readonly int _count;
-        private readonly TimeSpan _duration;
-        internal readonly IScheduler _scheduler;
-
-        public Skip(IObservable<TSource> source, int count)
-        {
-            _source = source;
-            _count = count;
-        }
-
-        public Skip(IObservable<TSource> source, TimeSpan duration, IScheduler scheduler)
-        {
-            _source = source;
-            _duration = duration;
-            _scheduler = scheduler;
-        }
-
-        public IObservable<TSource> Combine(int count)
-        {
-            //
-            // Sum semantics:
-            //
-            //   xs                    --o--o--o--o--o--o--|      xs                    --o--o--o--o--o--o--|
-            //   xs.Skip(2)            --x--x--o--o--o--o--|      xs.Skip(3)            --x--x--x--o--o--o--|
-            //   xs.Skip(2).Skip(3)    --------x--x--x--o--|      xs.Skip(3).Skip(2)    -----------x--x--o--|
-            //
-            return new Skip<TSource>(_source, _count + count);
-        }
-
-        public IObservable<TSource> Combine(TimeSpan duration)
-        {
-            //
-            // Maximum semantics:
-            //
-            //   t                     0--1--2--3--4--5--6--7->   t                     0--1--2--3--4--5--6--7->
-            //                                                    
-            //   xs                    --o--o--o--o--o--o--|      xs                    --o--o--o--o--o--o--|
-            //   xs.Skip(2s)           xxxxxxx-o--o--o--o--|      xs.Skip(3s)           xxxxxxxxxx-o--o--o--|
-            //   xs.Skip(2s).Skip(3s)  xxxxxxxxxx-o--o--o--|      xs.Skip(3s).Skip(2s)  xxxxxxx----o--o--o--|
-            //
-            if (duration <= _duration)
-                return this;
-            else
-                return new Skip<TSource>(_source, duration, _scheduler);
-        }
-
-        protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
-        {
-            if (_scheduler == null)
-            {
-                var sink = new _(this, observer, cancel);
-                setSink(sink);
-                return _source.SubscribeSafe(sink);
-            }
-            else
-            {
-                var sink = new SkipImpl(this, observer, cancel);
-                setSink(sink);
-                return sink.Run();
-            }
-        }
-
-        class _ : Sink<TSource>, IObserver<TSource>
+        internal sealed class Count : Producer<TSource>
         {
-            private readonly Skip<TSource> _parent;
-            private int _remaining;
+            private readonly IObservable<TSource> _source;
+            private readonly int _count;
 
-            public _(Skip<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
-                : base(observer, cancel)
+            public Count(IObservable<TSource> source, int count)
             {
-                _parent = parent;
-                _remaining = _parent._count;
+                _source = source;
+                _count = count;
             }
 
-            public void OnNext(TSource value)
+            public IObservable<TSource> Combine(int count)
             {
-                if (_remaining <= 0)
-                    base._observer.OnNext(value);
-                else
-                    _remaining--;
+                //
+                // Sum semantics:
+                //
+                //   xs                    --o--o--o--o--o--o--|      xs                    --o--o--o--o--o--o--|
+                //   xs.Skip(2)            --x--x--o--o--o--o--|      xs.Skip(3)            --x--x--x--o--o--o--|
+                //   xs.Skip(2).Skip(3)    --------x--x--x--o--|      xs.Skip(3).Skip(2)    -----------x--x--o--|
+                //
+                return new Count(_source, _count + count);
             }
 
-            public void OnError(Exception error)
+            protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
-                base._observer.OnError(error);
-                base.Dispose();
+                var sink = new _(_count, observer, cancel);
+                setSink(sink);
+                return _source.SubscribeSafe(sink);
             }
 
-            public void OnCompleted()
+            private sealed class _ : Sink<TSource>, IObserver<TSource>
             {
-                base._observer.OnCompleted();
-                base.Dispose();
+                private int _remaining;
+
+                public _(int count, IObserver<TSource> observer, IDisposable cancel)
+                    : base(observer, cancel)
+                {
+                    _remaining = count;
+                }
+
+                public void OnNext(TSource value)
+                {
+                    if (_remaining <= 0)
+                        base._observer.OnNext(value);
+                    else
+                        _remaining--;
+                }
+
+                public void OnError(Exception error)
+                {
+                    base._observer.OnError(error);
+                    base.Dispose();
+                }
+
+                public void OnCompleted()
+                {
+                    base._observer.OnCompleted();
+                    base.Dispose();
+                }
             }
         }
 
-        class SkipImpl : Sink<TSource>, IObserver<TSource>
+        internal sealed class Time : Producer<TSource>
         {
-            private readonly Skip<TSource> _parent;
-            private volatile bool _open;
+            private readonly IObservable<TSource> _source;
+            private readonly TimeSpan _duration;
+            internal readonly IScheduler _scheduler;
 
-            public SkipImpl(Skip<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
-                : base(observer, cancel)
+            public Time(IObservable<TSource> source, TimeSpan duration, IScheduler scheduler)
             {
-                _parent = parent;
+                _source = source;
+                _duration = duration;
+                _scheduler = scheduler;
             }
 
-            public IDisposable Run()
+            public IObservable<TSource> Combine(TimeSpan duration)
             {
-                var t = _parent._scheduler.Schedule(_parent._duration, Tick);
-                var d = _parent._source.SubscribeSafe(this);
-                return StableCompositeDisposable.Create(t, d);
-            }
-
-            private void Tick()
-            {
-                _open = true;
-            }
-
-            public void OnNext(TSource value)
-            {
-                if (_open)
-                    base._observer.OnNext(value);
+                //
+                // Maximum semantics:
+                //
+                //   t                     0--1--2--3--4--5--6--7->   t                     0--1--2--3--4--5--6--7->
+                //                                                    
+                //   xs                    --o--o--o--o--o--o--|      xs                    --o--o--o--o--o--o--|
+                //   xs.Skip(2s)           xxxxxxx-o--o--o--o--|      xs.Skip(3s)           xxxxxxxxxx-o--o--o--|
+                //   xs.Skip(2s).Skip(3s)  xxxxxxxxxx-o--o--o--|      xs.Skip(3s).Skip(2s)  xxxxxxx----o--o--o--|
+                //
+                if (duration <= _duration)
+                    return this;
+                else
+                    return new Time(_source, duration, _scheduler);
             }
 
-            public void OnError(Exception error)
+            protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
-                base._observer.OnError(error);
-                base.Dispose();
+                var sink = new _(observer, cancel);
+                setSink(sink);
+                return sink.Run(this);
             }
 
-            public void OnCompleted()
+            private sealed class _ : Sink<TSource>, IObserver<TSource>
             {
-                base._observer.OnCompleted();
-                base.Dispose();
+                private volatile bool _open;
+
+                public _(IObserver<TSource> observer, IDisposable cancel)
+                    : base(observer, cancel)
+                {
+                }
+
+                public IDisposable Run(Time parent)
+                {
+                    var t = parent._scheduler.Schedule(parent._duration, Tick);
+                    var d = parent._source.SubscribeSafe(this);
+                    return StableCompositeDisposable.Create(t, d);
+                }
+
+                private void Tick()
+                {
+                    _open = true;
+                }
+
+                public void OnNext(TSource value)
+                {
+                    if (_open)
+                        base._observer.OnNext(value);
+                }
+
+                public void OnError(Exception error)
+                {
+                    base._observer.OnError(error);
+                    base.Dispose();
+                }
+
+                public void OnCompleted()
+                {
+                    base._observer.OnCompleted();
+                    base.Dispose();
+                }
             }
         }
     }

+ 121 - 119
Rx.NET/Source/src/System.Reactive/Linq/Observable/Take.cs

@@ -7,167 +7,169 @@ using System.Reactive.Disposables;
 
 namespace System.Reactive.Linq.ObservableImpl
 {
-    internal sealed class Take<TSource> : Producer<TSource>
+    internal static class Take<TSource>
     {
-        private readonly IObservable<TSource> _source;
-        private readonly int _count;
-        private readonly TimeSpan _duration;
-        internal readonly IScheduler _scheduler;
-
-        public Take(IObservable<TSource> source, int count)
-        {
-            _source = source;
-            _count = count;
-        }
-
-        public Take(IObservable<TSource> source, TimeSpan duration, IScheduler scheduler)
-        {
-            _source = source;
-            _duration = duration;
-            _scheduler = scheduler;
-        }
-
-        public IObservable<TSource> Combine(int count)
+        internal sealed class Count : Producer<TSource>
         {
-            //
-            // Minimum semantics:
-            //
-            //   xs                    --o--o--o--o--o--o--|      xs                    --o--o--o--o--o--o--|
-            //   xs.Take(5)            --o--o--o--o--o|           xs.Take(3)            --o--o--o|
-            //   xs.Take(5).Take(3)    --o--o--o|                 xs.Take(3).Take(5)    --o--o--o|
-            //
-            if (_count <= count)
-                return this;
-            else
-                return new Take<TSource>(_source, count);
-        }
+            private readonly IObservable<TSource> _source;
+            private readonly int _count;
 
-        public IObservable<TSource> Combine(TimeSpan duration)
-        {
-            //
-            // Minimum semantics:
-            //
-            //   t                     0--1--2--3--4--5--6--7->   t                     0--1--2--3--4--5--6--7->
-            //                                                    
-            //   xs                    --o--o--o--o--o--o--|      xs                    --o--o--o--o--o--o--|
-            //   xs.Take(5s)           --o--o--o--o--o|           xs.Take(3s)           --o--o--o|
-            //   xs.Take(5s).Take(3s)  --o--o--o|                 xs.Take(3s).Take(5s)  --o--o--o|
-            //
-            if (_duration <= duration)
-                return this;
-            else
-                return new Take<TSource>(_source, duration, _scheduler);
-        }
-
-        protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
-        {
-            if (_scheduler == null)
+            public Count(IObservable<TSource> source, int count)
             {
-                var sink = new _(this, observer, cancel);
-                setSink(sink);
-                return _source.SubscribeSafe(sink);
+                _source = source;
+                _count = count;
             }
-            else
+
+            public IObservable<TSource> Combine(int count)
             {
-                var sink = new TakeImpl(this, observer, cancel);
-                setSink(sink);
-                return sink.Run();
+                //
+                // Minimum semantics:
+                //
+                //   xs                    --o--o--o--o--o--o--|      xs                    --o--o--o--o--o--o--|
+                //   xs.Take(5)            --o--o--o--o--o|           xs.Take(3)            --o--o--o|
+                //   xs.Take(5).Take(3)    --o--o--o|                 xs.Take(3).Take(5)    --o--o--o|
+                //
+                if (_count <= count)
+                    return this;
+                else
+                    return new Count(_source, count);
             }
-        }
-
-        class _ : Sink<TSource>, IObserver<TSource>
-        {
-            private readonly Take<TSource> _parent;
-            private int _remaining;
 
-            public _(Take<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
-                : base(observer, cancel)
+            protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
-                _parent = parent;
-                _remaining = _parent._count;
+                var sink = new _(_count, observer, cancel);
+                setSink(sink);
+                return _source.SubscribeSafe(sink);
             }
 
-            public void OnNext(TSource value)
+            private sealed class _ : Sink<TSource>, IObserver<TSource>
             {
-                if (_remaining > 0)
+                private readonly Count _parent;
+                private int _remaining;
+
+                public _(int count, IObserver<TSource> observer, IDisposable cancel)
+                    : base(observer, cancel)
                 {
-                    --_remaining;
-                    base._observer.OnNext(value);
+                    _remaining = count;
+                }
 
-                    if (_remaining == 0)
+                public void OnNext(TSource value)
+                {
+                    if (_remaining > 0)
                     {
-                        base._observer.OnCompleted();
-                        base.Dispose();
+                        --_remaining;
+                        base._observer.OnNext(value);
+
+                        if (_remaining == 0)
+                        {
+                            base._observer.OnCompleted();
+                            base.Dispose();
+                        }
                     }
                 }
-            }
 
-            public void OnError(Exception error)
-            {
-                base._observer.OnError(error);
-                base.Dispose();
-            }
+                public void OnError(Exception error)
+                {
+                    base._observer.OnError(error);
+                    base.Dispose();
+                }
 
-            public void OnCompleted()
-            {
-                base._observer.OnCompleted();
-                base.Dispose();
+                public void OnCompleted()
+                {
+                    base._observer.OnCompleted();
+                    base.Dispose();
+                }
             }
         }
 
-        class TakeImpl : Sink<TSource>, IObserver<TSource>
+        internal sealed class Time : Producer<TSource>
         {
-            private readonly Take<TSource> _parent;
+            private readonly IObservable<TSource> _source;
+            private readonly TimeSpan _duration;
+            internal readonly IScheduler _scheduler;
 
-            public TakeImpl(Take<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
-                : base(observer, cancel)
+            public Time(IObservable<TSource> source, TimeSpan duration, IScheduler scheduler)
             {
-                _parent = parent;
+                _source = source;
+                _duration = duration;
+                _scheduler = scheduler;
             }
 
-            private object _gate;
-
-            public IDisposable Run()
+            public IObservable<TSource> Combine(TimeSpan duration)
             {
-                _gate = new object();
+                //
+                // Minimum semantics:
+                //
+                //   t                     0--1--2--3--4--5--6--7->   t                     0--1--2--3--4--5--6--7->
+                //                                                    
+                //   xs                    --o--o--o--o--o--o--|      xs                    --o--o--o--o--o--o--|
+                //   xs.Take(5s)           --o--o--o--o--o|           xs.Take(3s)           --o--o--o|
+                //   xs.Take(5s).Take(3s)  --o--o--o|                 xs.Take(3s).Take(5s)  --o--o--o|
+                //
+                if (_duration <= duration)
+                    return this;
+                else
+                    return new Time(_source, duration, _scheduler);
+            }
 
-                var t = _parent._scheduler.Schedule(_parent._duration, Tick);
-                var d = _parent._source.SubscribeSafe(this);
-                return StableCompositeDisposable.Create(t, d);
+            protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
+            {
+                var sink = new _(observer, cancel);
+                setSink(sink);
+                return sink.Run(this);
             }
 
-            private void Tick()
+            private sealed class _ : Sink<TSource>, IObserver<TSource>
             {
-                lock (_gate)
+                public _(IObserver<TSource> observer, IDisposable cancel)
+                    : base(observer, cancel)
                 {
-                    base._observer.OnCompleted();
-                    base.Dispose();
                 }
-            }
 
-            public void OnNext(TSource value)
-            {
-                lock (_gate)
+                private object _gate;
+
+                public IDisposable Run(Time parent)
                 {
-                    base._observer.OnNext(value);
+                    _gate = new object();
+
+                    var t = parent._scheduler.Schedule(parent._duration, Tick);
+                    var d = parent._source.SubscribeSafe(this);
+                    return StableCompositeDisposable.Create(t, d);
                 }
-            }
 
-            public void OnError(Exception error)
-            {
-                lock (_gate)
+                private void Tick()
                 {
-                    base._observer.OnError(error);
-                    base.Dispose();
+                    lock (_gate)
+                    {
+                        base._observer.OnCompleted();
+                        base.Dispose();
+                    }
                 }
-            }
 
-            public void OnCompleted()
-            {
-                lock (_gate)
+                public void OnNext(TSource value)
                 {
-                    base._observer.OnCompleted();
-                    base.Dispose();
+                    lock (_gate)
+                    {
+                        base._observer.OnNext(value);
+                    }
+                }
+
+                public void OnError(Exception error)
+                {
+                    lock (_gate)
+                    {
+                        base._observer.OnError(error);
+                        base.Dispose();
+                    }
+                }
+
+                public void OnCompleted()
+                {
+                    lock (_gate)
+                    {
+                        base._observer.OnCompleted();
+                        base.Dispose();
+                    }
                 }
             }
         }

+ 6 - 6
Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.StandardSequenceOperators.cs

@@ -344,11 +344,11 @@ namespace System.Reactive.Linq
 
         public virtual IObservable<TSource> Skip<TSource>(IObservable<TSource> source, int count)
         {
-            var skip = source as Skip<TSource>;
-            if (skip != null && skip._scheduler == null)
+            var skip = source as Skip<TSource>.Count;
+            if (skip != null)
                 return skip.Combine(count);
 
-            return new Skip<TSource>(source, count);
+            return new Skip<TSource>.Count(source, count);
         }
 
         #endregion
@@ -387,11 +387,11 @@ namespace System.Reactive.Linq
 
         private static IObservable<TSource> Take_<TSource>(IObservable<TSource> source, int count)
         {
-            var take = source as Take<TSource>;
-            if (take != null && take._scheduler == null)
+            var take = source as Take<TSource>.Count;
+            if (take != null)
                 return take.Combine(count);
 
-            return new Take<TSource>(source, count);
+            return new Take<TSource>.Count(source, count);
         }
 
         #endregion

+ 4 - 4
Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Time.cs

@@ -251,11 +251,11 @@ namespace System.Reactive.Linq
 
         private static IObservable<TSource> Skip_<TSource>(IObservable<TSource> source, TimeSpan duration, IScheduler scheduler)
         {
-            var skip = source as Skip<TSource>;
+            var skip = source as Skip<TSource>.Time;
             if (skip != null && skip._scheduler == scheduler)
                 return skip.Combine(duration);
 
-            return new Skip<TSource>(source, duration, scheduler);
+            return new Skip<TSource>.Time(source, duration, scheduler);
         }
 
         #endregion
@@ -316,11 +316,11 @@ namespace System.Reactive.Linq
 
         private static IObservable<TSource> Take_<TSource>(IObservable<TSource> source, TimeSpan duration, IScheduler scheduler)
         {
-            var take = source as Take<TSource>;
+            var take = source as Take<TSource>.Time;
             if (take != null && take._scheduler == scheduler)
                 return take.Combine(duration);
 
-            return new Take<TSource>(source, duration, scheduler);
+            return new Take<TSource>.Time(source, duration, scheduler);
         }
 
         #endregion