Browse Source

Optimizing ElementAt layouts.

Bart De Smet 8 years ago
parent
commit
ad3ed2dc86

+ 6 - 19
Rx.NET/Source/src/System.Reactive/Linq/Observable/ElementAt.cs

@@ -8,32 +8,28 @@ namespace System.Reactive.Linq.ObservableImpl
     {
         private readonly IObservable<TSource> _source;
         private readonly int _index;
-        private readonly bool _throwOnEmpty;
 
-        public ElementAt(IObservable<TSource> source, int index, bool throwOnEmpty)
+        public ElementAt(IObservable<TSource> source, int index)
         {
             _source = source;
             _index = index;
-            _throwOnEmpty = throwOnEmpty;
         }
 
         protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
-            var sink = new _(this, observer, cancel);
+            var sink = new _(_index, observer, cancel);
             setSink(sink);
             return _source.SubscribeSafe(sink);
         }
 
-        class _ : Sink<TSource>, IObserver<TSource>
+        private sealed class _ : Sink<TSource>, IObserver<TSource>
         {
-            private readonly ElementAt<TSource> _parent;
             private int _i;
 
-            public _(ElementAt<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
+            public _(int index, IObserver<TSource> observer, IDisposable cancel)
                 : base(observer, cancel)
             {
-                _parent = parent;
-                _i = _parent._index;
+                _i = index;
             }
 
             public void OnNext(TSource value)
@@ -56,16 +52,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
             public void OnCompleted()
             {
-                if (_parent._throwOnEmpty)
-                {
-                    base._observer.OnError(new ArgumentOutOfRangeException("index"));
-                }
-                else
-                {
-                    base._observer.OnNext(default(TSource));
-                    base._observer.OnCompleted();
-                }
-                
+                base._observer.OnError(new ArgumentOutOfRangeException("index"));
                 base.Dispose();
             }
         }

+ 61 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/ElementAtOrDefault.cs

@@ -0,0 +1,61 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information. 
+
+namespace System.Reactive.Linq.ObservableImpl
+{
+    internal sealed class ElementAtOrDefault<TSource> : Producer<TSource>
+    {
+        private readonly IObservable<TSource> _source;
+        private readonly int _index;
+
+        public ElementAtOrDefault(IObservable<TSource> source, int index)
+        {
+            _source = source;
+            _index = index;
+        }
+
+        protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
+        {
+            var sink = new _(_index, observer, cancel);
+            setSink(sink);
+            return _source.SubscribeSafe(sink);
+        }
+
+        private sealed class _ : Sink<TSource>, IObserver<TSource>
+        {
+            private int _i;
+
+            public _(int index, IObserver<TSource> observer, IDisposable cancel)
+                : base(observer, cancel)
+            {
+                _i = index;
+            }
+
+            public void OnNext(TSource value)
+            {
+                if (_i == 0)
+                {
+                    base._observer.OnNext(value);
+                    base._observer.OnCompleted();
+                    base.Dispose();
+                }
+
+                _i--;
+            }
+
+            public void OnError(Exception error)
+            {
+                base._observer.OnError(error);
+                base.Dispose();
+            }
+
+            public void OnCompleted()
+            {
+                base._observer.OnNext(default(TSource));
+                base._observer.OnCompleted();
+                base.Dispose();
+            }
+        }
+    }
+}

+ 2 - 2
Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Aggregates.cs

@@ -190,7 +190,7 @@ namespace System.Reactive.Linq
 
         public virtual IObservable<TSource> ElementAt<TSource>(IObservable<TSource> source, int index)
         {
-            return new ElementAt<TSource>(source, index, true);
+            return new ElementAt<TSource>(source, index);
         }
 
         #endregion
@@ -199,7 +199,7 @@ namespace System.Reactive.Linq
 
         public virtual IObservable<TSource> ElementAtOrDefault<TSource>(IObservable<TSource> source, int index)
         {
-            return new ElementAt<TSource>(source, index, false);
+            return new ElementAtOrDefault<TSource>(source, index);
         }
 
         #endregion