Procházet zdrojové kódy

First steps to reduce operator object layouts.

Bart De Smet před 8 roky
rodič
revize
2235883e39

+ 6 - 6
Rx.NET/Source/src/System.Reactive/Linq/Observable/All.cs

@@ -17,19 +17,19 @@ namespace System.Reactive.Linq.ObservableImpl
 
         protected override IDisposable Run(IObserver<bool> observer, IDisposable cancel, Action<IDisposable> setSink)
         {
-            var sink = new _(this, observer, cancel);
+            var sink = new _(_predicate, observer, cancel);
             setSink(sink);
             return _source.SubscribeSafe(sink);
         }
 
-        class _ : Sink<bool>, IObserver<TSource>
+        private sealed class _ : Sink<bool>, IObserver<TSource>
         {
-            private readonly All<TSource> _parent;
+            private readonly Func<TSource, bool> _predicate;
 
-            public _(All<TSource> parent, IObserver<bool> observer, IDisposable cancel)
+            public _(Func<TSource, bool> predicate, IObserver<bool> observer, IDisposable cancel)
                 : base(observer, cancel)
             {
-                _parent = parent;
+                _predicate = predicate;
             }
 
             public void OnNext(TSource value)
@@ -37,7 +37,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 var res = false;
                 try
                 {
-                    res = _parent._predicate(value);
+                    res = _predicate(value);
                 }
                 catch (Exception ex)
                 {

+ 75 - 69
Rx.NET/Source/src/System.Reactive/Linq/Observable/Any.cs

@@ -4,110 +4,116 @@
 
 namespace System.Reactive.Linq.ObservableImpl
 {
-    internal sealed class Any<TSource> : Producer<bool>
+    internal static class Any<TSource>
     {
-        private readonly IObservable<TSource> _source;
-        private readonly Func<TSource, bool> _predicate;
-
-        public Any(IObservable<TSource> source)
+        internal sealed class Count : Producer<bool>
         {
-            _source = source;
-        }
+            private readonly IObservable<TSource> _source;
 
-        public Any(IObservable<TSource> source, Func<TSource, bool> predicate)
-        {
-            _source = source;
-            _predicate = predicate;
-        }
-
-        protected override IDisposable Run(IObserver<bool> observer, IDisposable cancel, Action<IDisposable> setSink)
-        {
-            if (_predicate != null)
+            public Count(IObservable<TSource> source)
             {
-                var sink = new AnyImpl(this, observer, cancel);
-                setSink(sink);
-                return _source.SubscribeSafe(sink);
+                _source = source;
             }
-            else
+
+            protected override IDisposable Run(IObserver<bool> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new _(observer, cancel);
                 setSink(sink);
                 return _source.SubscribeSafe(sink);
             }
-        }
 
-        class _ : Sink<bool>, IObserver<TSource>
-        {
-            public _(IObserver<bool> observer, IDisposable cancel)
-                : base(observer, cancel)
+            private sealed class _ : Sink<bool>, IObserver<TSource>
             {
-            }
+                public _(IObserver<bool> observer, IDisposable cancel)
+                    : base(observer, cancel)
+                {
+                }
 
-            public void OnNext(TSource value)
-            {
-                base._observer.OnNext(true);
-                base._observer.OnCompleted();
-                base.Dispose();
-            }
+                public void OnNext(TSource value)
+                {
+                    base._observer.OnNext(true);
+                    base._observer.OnCompleted();
+                    base.Dispose();
+                }
 
-            public void OnError(Exception error)
-            {
-                base._observer.OnError(error);
-                base.Dispose();
-            }
+                public void OnError(Exception error)
+                {
+                    base._observer.OnError(error);
+                    base.Dispose();
+                }
 
-            public void OnCompleted()
-            {
-                base._observer.OnNext(false);
-                base._observer.OnCompleted();
-                base.Dispose();
+                public void OnCompleted()
+                {
+                    base._observer.OnNext(false);
+                    base._observer.OnCompleted();
+                    base.Dispose();
+                }
             }
         }
 
-        class AnyImpl : Sink<bool>, IObserver<TSource>
+        internal sealed class Predicate : Producer<bool>
         {
-            private readonly Any<TSource> _parent;
+            private readonly IObservable<TSource> _source;
+            private readonly Func<TSource, bool> _predicate;
 
-            public AnyImpl(Any<TSource> parent, IObserver<bool> observer, IDisposable cancel)
-                : base(observer, cancel)
+            public Predicate(IObservable<TSource> source, Func<TSource, bool> predicate)
             {
-                _parent = parent;
+                _source = source;
+                _predicate = predicate;
             }
 
-            public void OnNext(TSource value)
+            protected override IDisposable Run(IObserver<bool> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
-                var res = false;
-                try
+                var sink = new _(_predicate, observer, cancel);
+                setSink(sink);
+                return _source.SubscribeSafe(sink);
+            }
+
+            private sealed class _ : Sink<bool>, IObserver<TSource>
+            {
+                private readonly Func<TSource, bool> _predicate;
+
+                public _(Func<TSource, bool> predicate, IObserver<bool> observer, IDisposable cancel)
+                    : base(observer, cancel)
                 {
-                    res = _parent._predicate(value);
+                    _predicate = predicate;
                 }
-                catch (Exception ex)
+
+                public void OnNext(TSource value)
+                {
+                    var res = false;
+                    try
+                    {
+                        res = _predicate(value);
+                    }
+                    catch (Exception ex)
+                    {
+                        base._observer.OnError(ex);
+                        base.Dispose();
+                        return;
+                    }
+
+                    if (res)
+                    {
+                        base._observer.OnNext(true);
+                        base._observer.OnCompleted();
+                        base.Dispose();
+                    }
+                }
+
+                public void OnError(Exception error)
                 {
-                    base._observer.OnError(ex);
+                    base._observer.OnError(error);
                     base.Dispose();
-                    return;
                 }
 
-                if (res)
+                public void OnCompleted()
                 {
-                    base._observer.OnNext(true);
+                    base._observer.OnNext(false);
                     base._observer.OnCompleted();
                     base.Dispose();
                 }
             }
-
-            public void OnError(Exception error)
-            {
-                base._observer.OnError(error);
-                base.Dispose();
-            }
-
-            public void OnCompleted()
-            {
-                base._observer.OnNext(false);
-                base._observer.OnCompleted();
-                base.Dispose();
-            }
         }
     }
 }

+ 83 - 75
Rx.NET/Source/src/System.Reactive/Linq/Observable/Count.cs

@@ -4,118 +4,126 @@
 
 namespace System.Reactive.Linq.ObservableImpl
 {
-    internal sealed class Count<TSource> : Producer<int>
+    internal static class Count<TSource>
     {
-        private readonly IObservable<TSource> _source;
-        private readonly Func<TSource, bool> _predicate;
-
-        public Count(IObservable<TSource> source)
+        internal sealed class All : Producer<int>
         {
-            _source = source;
-        }
+            private readonly IObservable<TSource> _source;
 
-        public Count(IObservable<TSource> source, Func<TSource, bool> predicate)
-        {
-            _source = source;
-            _predicate = predicate;
-        }
-
-        protected override IDisposable Run(IObserver<int> observer, IDisposable cancel, Action<IDisposable> setSink)
-        {
-            if (_predicate == null)
+            public All(IObservable<TSource> source)
             {
-                var sink = new _(observer, cancel);
-                setSink(sink);
-                return _source.SubscribeSafe(sink);
+                _source = source;
             }
-            else
+
+            protected override IDisposable Run(IObserver<int> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
-                var sink = new CountImpl(this, observer, cancel);
+                var sink = new _(observer, cancel);
                 setSink(sink);
                 return _source.SubscribeSafe(sink);
             }
-        }
-
-        class _ : Sink<int>, IObserver<TSource>
-        {
-            private int _count;
 
-            public _(IObserver<int> observer, IDisposable cancel)
-                : base(observer, cancel)
+            private sealed class _ : Sink<int>, IObserver<TSource>
             {
-                _count = 0;
-            }
+                private int _count;
 
-            public void OnNext(TSource value)
-            {
-                try
+                public _(IObserver<int> observer, IDisposable cancel)
+                    : base(observer, cancel)
                 {
-                    checked
+                    _count = 0;
+                }
+
+                public void OnNext(TSource value)
+                {
+                    try
                     {
-                        _count++;
+                        checked
+                        {
+                            _count++;
+                        }
+                    }
+                    catch (Exception ex)
+                    {
+                        base._observer.OnError(ex);
+                        base.Dispose();
                     }
                 }
-                catch (Exception ex)
+
+                public void OnError(Exception error)
                 {
-                    base._observer.OnError(ex);
+                    base._observer.OnError(error);
                     base.Dispose();
                 }
-            }
-
-            public void OnError(Exception error)
-            {
-                base._observer.OnError(error);
-                base.Dispose();
-            }
 
-            public void OnCompleted()
-            {
-                base._observer.OnNext(_count);
-                base._observer.OnCompleted();
-                base.Dispose();
+                public void OnCompleted()
+                {
+                    base._observer.OnNext(_count);
+                    base._observer.OnCompleted();
+                    base.Dispose();
+                }
             }
         }
 
-        class CountImpl : Sink<int>, IObserver<TSource>
+        internal sealed class Predicate : Producer<int>
         {
-            private readonly Count<TSource> _parent;
-            private int _count;
+            private readonly IObservable<TSource> _source;
+            private readonly Func<TSource, bool> _predicate;
 
-            public CountImpl(Count<TSource> parent, IObserver<int> observer, IDisposable cancel)
-                : base(observer, cancel)
+            public Predicate(IObservable<TSource> source, Func<TSource, bool> predicate)
             {
-                _parent = parent;
-                _count = 0;
+                _source = source;
+                _predicate = predicate;
             }
 
-            public void OnNext(TSource value)
+            protected override IDisposable Run(IObserver<int> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
-                try
+                var sink = new _(_predicate, observer, cancel);
+                setSink(sink);
+                return _source.SubscribeSafe(sink);
+            }
+
+            private sealed class _ : Sink<int>, IObserver<TSource>
+            {
+                private readonly Func<TSource, bool> _predicate;
+                private int _count;
+
+                public _(Func<TSource, bool> predicate, IObserver<int> observer, IDisposable cancel)
+                    : base(observer, cancel)
+                {
+                    _predicate = predicate;
+                    _count = 0;
+                }
+
+                public void OnNext(TSource value)
                 {
-                    checked
+                    try
                     {
-                        if (_parent._predicate(value))
-                            _count++;
+                        checked
+                        {
+                            if (_predicate(value))
+                            {
+                                _count++;
+                            }
+                        }
+                    }
+                    catch (Exception ex)
+                    {
+                        base._observer.OnError(ex);
+                        base.Dispose();
                     }
                 }
-                catch (Exception ex)
+
+                public void OnError(Exception error)
                 {
-                    base._observer.OnError(ex);
+                    base._observer.OnError(error);
                     base.Dispose();
                 }
-            }
 
-            public void OnError(Exception error)
-            {
-                base._observer.OnError(error);
-                base.Dispose();
-            }
-
-            public void OnCompleted()
-            {
-                base._observer.OnNext(_count);
-                base._observer.OnCompleted();
-                base.Dispose();
+                public void OnCompleted()
+                {
+                    base._observer.OnNext(_count);
+                    base._observer.OnCompleted();
+                    base.Dispose();
+                }
             }
         }
     }

+ 69 - 81
Rx.NET/Source/src/System.Reactive/Linq/Observable/FirstAsync.cs

@@ -4,126 +4,114 @@
 
 namespace System.Reactive.Linq.ObservableImpl
 {
-    internal sealed class FirstAsync<TSource> : Producer<TSource>
+    internal static class FirstAsync<TSource>
     {
-        private readonly IObservable<TSource> _source;
-        private readonly Func<TSource, bool> _predicate;
-        private readonly bool _throwOnEmpty;
-
-        public FirstAsync(IObservable<TSource> source, Func<TSource, bool> predicate, bool throwOnEmpty)
+        internal sealed class Sequence : Producer<TSource>
         {
-            _source = source;
-            _predicate = predicate;
-            _throwOnEmpty = throwOnEmpty;
-        }
+            private readonly IObservable<TSource> _source;
 
-        protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
-        {
-            if (_predicate != null)
+            public Sequence(IObservable<TSource> source)
             {
-                var sink = new FirstAsyncImpl(this, observer, cancel);
-                setSink(sink);
-                return _source.SubscribeSafe(sink);
+                _source = source;
             }
-            else
+
+            protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
-                var sink = new _(this, observer, cancel);
+                var sink = new _(observer, cancel);
                 setSink(sink);
                 return _source.SubscribeSafe(sink);
             }
-        }
 
-        class _ : Sink<TSource>, IObserver<TSource>
-        {
-            private readonly FirstAsync<TSource> _parent;
-
-            public _(FirstAsync<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
-                : base(observer, cancel)
-            {
-                _parent = parent;
-            }
-
-            public void OnNext(TSource value)
+            private sealed class _ : Sink<TSource>, IObserver<TSource>
             {
-                base._observer.OnNext(value);
-                base._observer.OnCompleted();
-                base.Dispose();
-            }
-
-            public void OnError(Exception error)
-            {
-                base._observer.OnError(error);
-                base.Dispose();
-            }
-
-            public void OnCompleted()
-            {
-                if (_parent._throwOnEmpty)
+                public _(IObserver<TSource> observer, IDisposable cancel)
+                    : base(observer, cancel)
                 {
-                    base._observer.OnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS));
                 }
-                else
+
+                public void OnNext(TSource value)
                 {
-                    base._observer.OnNext(default(TSource));
+                    base._observer.OnNext(value);
                     base._observer.OnCompleted();
+                    base.Dispose();
                 }
 
-                base.Dispose();
+                public void OnError(Exception error)
+                {
+                    base._observer.OnError(error);
+                    base.Dispose();
+                }
+
+                public void OnCompleted()
+                {
+                    base._observer.OnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS));
+                    base.Dispose();
+                }
             }
         }
 
-        class FirstAsyncImpl : Sink<TSource>, IObserver<TSource>
+        internal sealed class Predicate : Producer<TSource>
         {
-            private readonly FirstAsync<TSource> _parent;
+            private readonly IObservable<TSource> _source;
+            private readonly Func<TSource, bool> _predicate;
+
+            public Predicate(IObservable<TSource> source, Func<TSource, bool> predicate)
+            {
+                _source = source;
+                _predicate = predicate;
+            }
 
-            public FirstAsyncImpl(FirstAsync<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
-                : base(observer, cancel)
+            protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
-                _parent = parent;
+                var sink = new _(_predicate, observer, cancel);
+                setSink(sink);
+                return _source.SubscribeSafe(sink);
             }
 
-            public void OnNext(TSource value)
+            private sealed class _ : Sink<TSource>, IObserver<TSource>
             {
-                var b = false;
+                private readonly Func<TSource, bool> _predicate;
 
-                try
+                public _(Func<TSource, bool> predicate, IObserver<TSource> observer, IDisposable cancel)
+                    : base(observer, cancel)
                 {
-                    b = _parent._predicate(value);
+                    _predicate = predicate;
                 }
-                catch (Exception ex)
+
+                public void OnNext(TSource value)
                 {
-                    base._observer.OnError(ex);
-                    base.Dispose();
-                    return;
+                    var b = false;
+
+                    try
+                    {
+                        b = _predicate(value);
+                    }
+                    catch (Exception ex)
+                    {
+                        base._observer.OnError(ex);
+                        base.Dispose();
+                        return;
+                    }
+
+                    if (b)
+                    {
+                        base._observer.OnNext(value);
+                        base._observer.OnCompleted();
+                        base.Dispose();
+                    }
                 }
 
-                if (b)
+                public void OnError(Exception error)
                 {
-                    base._observer.OnNext(value);
-                    base._observer.OnCompleted();
+                    base._observer.OnError(error);
                     base.Dispose();
                 }
-            }
-
-            public void OnError(Exception error)
-            {
-                base._observer.OnError(error);
-                base.Dispose();
-            }
 
-            public void OnCompleted()
-            {
-                if (_parent._throwOnEmpty)
+                public void OnCompleted()
                 {
                     base._observer.OnError(new InvalidOperationException(Strings_Linq.NO_MATCHING_ELEMENTS));
+                    base.Dispose();
                 }
-                else
-                {
-                    base._observer.OnNext(default(TSource));
-                    base._observer.OnCompleted();
-                }
-
-                base.Dispose();
             }
         }
     }

+ 120 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/FirstOrDefaultAsync.cs

@@ -0,0 +1,120 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information. 
+
+namespace System.Reactive.Linq.ObservableImpl
+{
+    internal static class FirstOrDefaultAsync<TSource>
+    {
+        internal sealed class Sequence : Producer<TSource>
+        {
+            private readonly IObservable<TSource> _source;
+
+            public Sequence(IObservable<TSource> source)
+            {
+                _source = source;
+            }
+
+            protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
+            {
+                var sink = new _(observer, cancel);
+                setSink(sink);
+                return _source.SubscribeSafe(sink);
+            }
+
+            private sealed class _ : Sink<TSource>, IObserver<TSource>
+            {
+                public _(IObserver<TSource> observer, IDisposable cancel)
+                    : base(observer, cancel)
+                {
+                }
+
+                public void OnNext(TSource value)
+                {
+                    base._observer.OnNext(value);
+                    base._observer.OnCompleted();
+                    base.Dispose();
+                }
+
+                public void OnError(Exception error)
+                {
+                    base._observer.OnError(error);
+                    base.Dispose();
+                }
+
+                public void OnCompleted()
+                {
+                    base._observer.OnNext(default(TSource));
+                    base._observer.OnCompleted();
+                    base.Dispose();
+                }
+            }
+        }
+
+        internal sealed class Predicate : Producer<TSource>
+        {
+            private readonly IObservable<TSource> _source;
+            private readonly Func<TSource, bool> _predicate;
+
+            public Predicate(IObservable<TSource> source, Func<TSource, bool> predicate)
+            {
+                _source = source;
+                _predicate = predicate;
+            }
+
+            protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
+            {
+                var sink = new _(_predicate, observer, cancel);
+                setSink(sink);
+                return _source.SubscribeSafe(sink);
+            }
+
+            private sealed class _ : Sink<TSource>, IObserver<TSource>
+            {
+                private readonly Func<TSource, bool> _predicate;
+
+                public _(Func<TSource, bool> predicate, IObserver<TSource> observer, IDisposable cancel)
+                    : base(observer, cancel)
+                {
+                    _predicate = predicate;
+                }
+
+                public void OnNext(TSource value)
+                {
+                    var b = false;
+
+                    try
+                    {
+                        b = _predicate(value);
+                    }
+                    catch (Exception ex)
+                    {
+                        base._observer.OnError(ex);
+                        base.Dispose();
+                        return;
+                    }
+
+                    if (b)
+                    {
+                        base._observer.OnNext(value);
+                        base._observer.OnCompleted();
+                        base.Dispose();
+                    }
+                }
+
+                public void OnError(Exception error)
+                {
+                    base._observer.OnError(error);
+                    base.Dispose();
+                }
+
+                public void OnCompleted()
+                {
+                    base._observer.OnNext(default(TSource));
+                    base._observer.OnCompleted();
+                    base.Dispose();
+                }
+            }
+        }
+    }
+}

+ 95 - 89
Rx.NET/Source/src/System.Reactive/Linq/Observable/LastAsync.cs

@@ -4,134 +4,140 @@
 
 namespace System.Reactive.Linq.ObservableImpl
 {
-    internal sealed class LastAsync<TSource> : Producer<TSource>
+    internal static class LastAsync<TSource>
     {
-        private readonly IObservable<TSource> _source;
-        private readonly Func<TSource, bool> _predicate;
-        private readonly bool _throwOnEmpty;
-
-        public LastAsync(IObservable<TSource> source, Func<TSource, bool> predicate, bool throwOnEmpty)
+        internal sealed class Sequence : Producer<TSource>
         {
-            _source = source;
-            _predicate = predicate;
-            _throwOnEmpty = throwOnEmpty;
-        }
+            private readonly IObservable<TSource> _source;
 
-        protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
-        {
-            if (_predicate != null)
+            public Sequence(IObservable<TSource> source)
             {
-                var sink = new LastAsyncImpl(this, observer, cancel);
-                setSink(sink);
-                return _source.SubscribeSafe(sink);
+                _source = source;
             }
-            else
+
+            protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
-                var sink = new _(this, observer, cancel);
+                var sink = new _(observer, cancel);
                 setSink(sink);
                 return _source.SubscribeSafe(sink);
             }
-        }
-
-        class _ : Sink<TSource>, IObserver<TSource>
-        {
-            private readonly LastAsync<TSource> _parent;
-            private TSource _value;
-            private bool _seenValue;
-
-            public _(LastAsync<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
-                : base(observer, cancel)
-            {
-                _parent = parent;
-
-                _value = default(TSource);
-                _seenValue = false;
-            }
 
-            public void OnNext(TSource value)
+            private sealed class _ : Sink<TSource>, IObserver<TSource>
             {
-                _value = value;
-                _seenValue = true;
-            }
+                private TSource _value;
+                private bool _seenValue;
 
-            public void OnError(Exception error)
-            {
-                base._observer.OnError(error);
-                base.Dispose();
-            }
+                public _(IObserver<TSource> observer, IDisposable cancel)
+                    : base(observer, cancel)
+                {
+                    _value = default(TSource);
+                    _seenValue = false;
+                }
 
-            public void OnCompleted()
-            {
-                if (!_seenValue && _parent._throwOnEmpty)
+                public void OnNext(TSource value)
                 {
-                    base._observer.OnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS));
+                    _value = value;
+                    _seenValue = true;
                 }
-                else
+
+                public void OnError(Exception error)
                 {
-                    base._observer.OnNext(_value);
-                    base._observer.OnCompleted();
+                    base._observer.OnError(error);
+                    base.Dispose();
                 }
 
-                base.Dispose();
+                public void OnCompleted()
+                {
+                    if (!_seenValue)
+                    {
+                        base._observer.OnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS));
+                    }
+                    else
+                    {
+                        base._observer.OnNext(_value);
+                        base._observer.OnCompleted();
+                    }
+
+                    base.Dispose();
+                }
             }
         }
 
-        class LastAsyncImpl : Sink<TSource>, IObserver<TSource>
+        internal sealed class Predicate : Producer<TSource>
         {
-            private readonly LastAsync<TSource> _parent;
-            private TSource _value;
-            private bool _seenValue;
+            private readonly IObservable<TSource> _source;
+            private readonly Func<TSource, bool> _predicate;
 
-            public LastAsyncImpl(LastAsync<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
-                : base(observer, cancel)
+            public Predicate(IObservable<TSource> source, Func<TSource, bool> predicate)
             {
-                _parent = parent;
+                _source = source;
+                _predicate = predicate;
+            }
 
-                _value = default(TSource);
-                _seenValue = false;
+            protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
+            {
+                var sink = new _(_predicate, observer, cancel);
+                setSink(sink);
+                return _source.SubscribeSafe(sink);
             }
 
-            public void OnNext(TSource value)
+            private sealed class _ : Sink<TSource>, IObserver<TSource>
             {
-                var b = false;
+                private readonly Func<TSource, bool> _predicate;
+                private TSource _value;
+                private bool _seenValue;
 
-                try
-                {
-                    b = _parent._predicate(value);
-                }
-                catch (Exception ex)
+                public _(Func<TSource, bool> predicate, IObserver<TSource> observer, IDisposable cancel)
+                    : base(observer, cancel)
                 {
-                    base._observer.OnError(ex);
-                    base.Dispose();
-                    return;
+                    _predicate = predicate;
+
+                    _value = default(TSource);
+                    _seenValue = false;
                 }
 
-                if (b)
+                public void OnNext(TSource value)
                 {
-                    _value = value;
-                    _seenValue = true;
+                    var b = false;
+
+                    try
+                    {
+                        b = _predicate(value);
+                    }
+                    catch (Exception ex)
+                    {
+                        base._observer.OnError(ex);
+                        base.Dispose();
+                        return;
+                    }
+
+                    if (b)
+                    {
+                        _value = value;
+                        _seenValue = true;
+                    }
                 }
-            }
-
-            public void OnError(Exception error)
-            {
-                base._observer.OnError(error);
-                base.Dispose();
-            }
 
-            public void OnCompleted()
-            {
-                if (!_seenValue && _parent._throwOnEmpty)
+                public void OnError(Exception error)
                 {
-                    base._observer.OnError(new InvalidOperationException(Strings_Linq.NO_MATCHING_ELEMENTS));
+                    base._observer.OnError(error);
+                    base.Dispose();
                 }
-                else
+
+                public void OnCompleted()
                 {
-                    base._observer.OnNext(_value);
-                    base._observer.OnCompleted();
-                }
+                    if (!_seenValue)
+                    {
+                        base._observer.OnError(new InvalidOperationException(Strings_Linq.NO_MATCHING_ELEMENTS));
+                    }
+                    else
+                    {
+                        base._observer.OnNext(_value);
+                        base._observer.OnCompleted();
+                    }
 
-                base.Dispose();
+                    base.Dispose();
+                }
             }
         }
     }

+ 128 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/LastOrDefaultAsync.cs

@@ -0,0 +1,128 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information. 
+
+namespace System.Reactive.Linq.ObservableImpl
+{
+    internal static class LastOrDefaultAsync<TSource>
+    {
+        internal sealed class Sequence : Producer<TSource>
+        {
+            private readonly IObservable<TSource> _source;
+
+            public Sequence(IObservable<TSource> source)
+            {
+                _source = source;
+            }
+
+            protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
+            {
+                var sink = new _(observer, cancel);
+                setSink(sink);
+                return _source.SubscribeSafe(sink);
+            }
+
+            private sealed class _ : Sink<TSource>, IObserver<TSource>
+            {
+                private TSource _value;
+                private bool _seenValue;
+
+                public _(IObserver<TSource> observer, IDisposable cancel)
+                    : base(observer, cancel)
+                {
+                    _value = default(TSource);
+                    _seenValue = false;
+                }
+
+                public void OnNext(TSource value)
+                {
+                    _value = value;
+                    _seenValue = true;
+                }
+
+                public void OnError(Exception error)
+                {
+                    base._observer.OnError(error);
+                    base.Dispose();
+                }
+
+                public void OnCompleted()
+                {
+                    base._observer.OnNext(_value);
+                    base._observer.OnCompleted();
+                    base.Dispose();
+                }
+            }
+        }
+
+        internal sealed class Predicate : Producer<TSource>
+        {
+            private readonly IObservable<TSource> _source;
+            private readonly Func<TSource, bool> _predicate;
+
+            public Predicate(IObservable<TSource> source, Func<TSource, bool> predicate)
+            {
+                _source = source;
+                _predicate = predicate;
+            }
+
+            protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
+            {
+                var sink = new _(_predicate, observer, cancel);
+                setSink(sink);
+                return _source.SubscribeSafe(sink);
+            }
+
+            private sealed class _ : Sink<TSource>, IObserver<TSource>
+            {
+                private readonly Func<TSource, bool> _predicate;
+                private TSource _value;
+                private bool _seenValue;
+
+                public _(Func<TSource, bool> predicate, IObserver<TSource> observer, IDisposable cancel)
+                    : base(observer, cancel)
+                {
+                    _predicate = predicate;
+
+                    _value = default(TSource);
+                    _seenValue = false;
+                }
+
+                public void OnNext(TSource value)
+                {
+                    var b = false;
+
+                    try
+                    {
+                        b = _predicate(value);
+                    }
+                    catch (Exception ex)
+                    {
+                        base._observer.OnError(ex);
+                        base.Dispose();
+                        return;
+                    }
+
+                    if (b)
+                    {
+                        _value = value;
+                        _seenValue = true;
+                    }
+                }
+
+                public void OnError(Exception error)
+                {
+                    base._observer.OnError(error);
+                    base.Dispose();
+                }
+
+                public void OnCompleted()
+                {
+                    base._observer.OnNext(_value);
+                    base._observer.OnCompleted();
+                    base.Dispose();
+                }
+            }
+        }
+    }
+}

+ 83 - 75
Rx.NET/Source/src/System.Reactive/Linq/Observable/LongCount.cs

@@ -4,118 +4,126 @@
 
 namespace System.Reactive.Linq.ObservableImpl
 {
-    internal sealed class LongCount<TSource> : Producer<long>
+    internal static class LongCount<TSource>
     {
-        private readonly IObservable<TSource> _source;
-        private readonly Func<TSource, bool> _predicate;
-
-        public LongCount(IObservable<TSource> source)
+        internal sealed class All : Producer<long>
         {
-            _source = source;
-        }
+            private readonly IObservable<TSource> _source;
 
-        public LongCount(IObservable<TSource> source, Func<TSource, bool> predicate)
-        {
-            _source = source;
-            _predicate = predicate;
-        }
-
-        protected override IDisposable Run(IObserver<long> observer, IDisposable cancel, Action<IDisposable> setSink)
-        {
-            if (_predicate == null)
+            public All(IObservable<TSource> source)
             {
-                var sink = new _(observer, cancel);
-                setSink(sink);
-                return _source.SubscribeSafe(sink);
+                _source = source;
             }
-            else
+
+            protected override IDisposable Run(IObserver<long> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
-                var sink = new LongCountImpl(this, observer, cancel);
+                var sink = new _(observer, cancel);
                 setSink(sink);
                 return _source.SubscribeSafe(sink);
             }
-        }
-
-        class _ : Sink<long>, IObserver<TSource>
-        {
-            private long _count;
 
-            public _(IObserver<long> observer, IDisposable cancel)
-                : base(observer, cancel)
+            private sealed class _ : Sink<long>, IObserver<TSource>
             {
-                _count = 0L;
-            }
+                private long _count;
 
-            public void OnNext(TSource value)
-            {
-                try
+                public _(IObserver<long> observer, IDisposable cancel)
+                    : base(observer, cancel)
                 {
-                    checked
+                    _count = 0L;
+                }
+
+                public void OnNext(TSource value)
+                {
+                    try
                     {
-                        _count++;
+                        checked
+                        {
+                            _count++;
+                        }
+                    }
+                    catch (Exception ex)
+                    {
+                        base._observer.OnError(ex);
+                        base.Dispose();
                     }
                 }
-                catch (Exception ex)
+
+                public void OnError(Exception error)
                 {
-                    base._observer.OnError(ex);
+                    base._observer.OnError(error);
                     base.Dispose();
                 }
-            }
-
-            public void OnError(Exception error)
-            {
-                base._observer.OnError(error);
-                base.Dispose();
-            }
 
-            public void OnCompleted()
-            {
-                base._observer.OnNext(_count);
-                base._observer.OnCompleted();
-                base.Dispose();
+                public void OnCompleted()
+                {
+                    base._observer.OnNext(_count);
+                    base._observer.OnCompleted();
+                    base.Dispose();
+                }
             }
         }
 
-        class LongCountImpl : Sink<long>, IObserver<TSource>
+        internal sealed class Predicate : Producer<long>
         {
-            private readonly LongCount<TSource> _parent;
-            private long _count;
+            private readonly IObservable<TSource> _source;
+            private readonly Func<TSource, bool> _predicate;
 
-            public LongCountImpl(LongCount<TSource> parent, IObserver<long> observer, IDisposable cancel)
-                : base(observer, cancel)
+            public Predicate(IObservable<TSource> source, Func<TSource, bool> predicate)
             {
-                _parent = parent;
-                _count = 0L;
+                _source = source;
+                _predicate = predicate;
             }
 
-            public void OnNext(TSource value)
+            protected override IDisposable Run(IObserver<long> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
-                try
+                var sink = new _(_predicate, observer, cancel);
+                setSink(sink);
+                return _source.SubscribeSafe(sink);
+            }
+
+            private sealed class _ : Sink<long>, IObserver<TSource>
+            {
+                private readonly Func<TSource, bool> _predicate;
+                private long _count;
+
+                public _(Func<TSource, bool> predicate, IObserver<long> observer, IDisposable cancel)
+                    : base(observer, cancel)
+                {
+                    _predicate = predicate;
+                    _count = 0L;
+                }
+
+                public void OnNext(TSource value)
                 {
-                    checked
+                    try
                     {
-                        if (_parent._predicate(value))
-                            _count++;
+                        checked
+                        {
+                            if (_predicate(value))
+                            {
+                                _count++;
+                            }
+                        }
+                    }
+                    catch (Exception ex)
+                    {
+                        base._observer.OnError(ex);
+                        base.Dispose();
                     }
                 }
-                catch (Exception ex)
+
+                public void OnError(Exception error)
                 {
-                    base._observer.OnError(ex);
+                    base._observer.OnError(error);
                     base.Dispose();
                 }
-            }
 
-            public void OnError(Exception error)
-            {
-                base._observer.OnError(error);
-                base.Dispose();
-            }
-
-            public void OnCompleted()
-            {
-                base._observer.OnNext(_count);
-                base._observer.OnCompleted();
-                base.Dispose();
+                public void OnCompleted()
+                {
+                    base._observer.OnNext(_count);
+                    base._observer.OnCompleted();
+                    base.Dispose();
+                }
             }
         }
     }

+ 85 - 79
Rx.NET/Source/src/System.Reactive/Linq/Observable/Select.cs

@@ -4,119 +4,125 @@
 
 namespace System.Reactive.Linq.ObservableImpl
 {
-    internal sealed class Select<TSource, TResult> : Producer<TResult>
+    internal static class Select<TSource, TResult>
     {
-        private readonly IObservable<TSource> _source;
-        private readonly Func<TSource, TResult> _selector;
-        private readonly Func<TSource, int, TResult> _selectorI;
-
-        public Select(IObservable<TSource> source, Func<TSource, TResult> selector)
-        {
-            _source = source;
-            _selector = selector;
-        }
-
-        public Select(IObservable<TSource> source, Func<TSource, int, TResult> selector)
+        internal sealed class Selector : Producer<TResult>
         {
-            _source = source;
-            _selectorI = selector;
-        }
+            private readonly IObservable<TSource> _source;
+            private readonly Func<TSource, TResult> _selector;
 
-        protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
-        {
-            if (_selector != null)
+            public Selector(IObservable<TSource> source, Func<TSource, TResult> selector)
             {
-                var sink = new _(this, observer, cancel);
-                setSink(sink);
-                return _source.SubscribeSafe(sink);
+                _source = source;
+                _selector = selector;
             }
-            else
+
+            protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
-                var sink = new SelectImpl(this, observer, cancel);
+                var sink = new _(_selector, observer, cancel);
                 setSink(sink);
                 return _source.SubscribeSafe(sink);
             }
-        }
-
-        class _ : Sink<TResult>, IObserver<TSource>
-        {
-            private readonly Select<TSource, TResult> _parent;
 
-            public _(Select<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
-                : base(observer, cancel)
+            private sealed class _ : Sink<TResult>, IObserver<TSource>
             {
-                _parent = parent;
-            }
+                private readonly Func<TSource, TResult> _selector;
 
-            public void OnNext(TSource value)
-            {
-                var result = default(TResult);
-                try
+                public _(Func<TSource, TResult> selector, IObserver<TResult> observer, IDisposable cancel)
+                    : base(observer, cancel)
                 {
-                    result = _parent._selector(value);
+                    _selector = selector;
                 }
-                catch (Exception exception)
+
+                public void OnNext(TSource value)
                 {
-                    base._observer.OnError(exception);
+                    var result = default(TResult);
+                    try
+                    {
+                        result = _selector(value);
+                    }
+                    catch (Exception exception)
+                    {
+                        base._observer.OnError(exception);
+                        base.Dispose();
+                        return;
+                    }
+
+                    base._observer.OnNext(result);
+                }
+
+                public void OnError(Exception error)
+                {
+                    base._observer.OnError(error);
                     base.Dispose();
-                    return;
                 }
 
-                base._observer.OnNext(result);
+                public void OnCompleted()
+                {
+                    base._observer.OnCompleted();
+                    base.Dispose();
+                }
             }
+        }
 
-            public void OnError(Exception error)
-            {
-                base._observer.OnError(error);
-                base.Dispose();
-            }
+        internal sealed class SelectorIndexed : Producer<TResult>
+        {
+            private readonly IObservable<TSource> _source;
+            private readonly Func<TSource, int, TResult> _selector;
 
-            public void OnCompleted()
+            public SelectorIndexed(IObservable<TSource> source, Func<TSource, int, TResult> selector)
             {
-                base._observer.OnCompleted();
-                base.Dispose();
+                _source = source;
+                _selector = selector;
             }
-        }
-
-        class SelectImpl : Sink<TResult>, IObserver<TSource>
-        {
-            private readonly Select<TSource, TResult> _parent;
-            private int _index;
 
-            public SelectImpl(Select<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
-                : base(observer, cancel)
+            protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
-                _parent = parent;
-                _index = 0;
+                var sink = new _(_selector, observer, cancel);
+                setSink(sink);
+                return _source.SubscribeSafe(sink);
             }
 
-            public void OnNext(TSource value)
+            private sealed class _ : Sink<TResult>, IObserver<TSource>
             {
-                var result = default(TResult);
-                try
+                private readonly Func<TSource, int, TResult> _selector;
+                private int _index;
+
+                public _(Func<TSource, int, TResult> selector, IObserver<TResult> observer, IDisposable cancel)
+                    : base(observer, cancel)
                 {
-                    result = _parent._selectorI(value, checked(_index++));
+                    _selector = selector;
+                    _index = 0;
                 }
-                catch (Exception exception)
+
+                public void OnNext(TSource value)
                 {
-                    base._observer.OnError(exception);
-                    base.Dispose();
-                    return;
+                    var result = default(TResult);
+                    try
+                    {
+                        result = _selector(value, checked(_index++));
+                    }
+                    catch (Exception exception)
+                    {
+                        base._observer.OnError(exception);
+                        base.Dispose();
+                        return;
+                    }
+
+                    base._observer.OnNext(result);
                 }
 
-                base._observer.OnNext(result);
-            }
-
-            public void OnError(Exception error)
-            {
-                base._observer.OnError(error);
-                base.Dispose();
-            }
+                public void OnError(Exception error)
+                {
+                    base._observer.OnError(error);
+                    base.Dispose();
+                }
 
-            public void OnCompleted()
-            {
-                base._observer.OnCompleted();
-                base.Dispose();
+                public void OnCompleted()
+                {
+                    base._observer.OnCompleted();
+                    base.Dispose();
+                }
             }
         }
     }

+ 101 - 95
Rx.NET/Source/src/System.Reactive/Linq/Observable/SingleAsync.cs

@@ -4,148 +4,154 @@
 
 namespace System.Reactive.Linq.ObservableImpl
 {
-    internal sealed class SingleAsync<TSource> : Producer<TSource>
+    internal static class SingleAsync<TSource>
     {
-        private readonly IObservable<TSource> _source;
-        private readonly Func<TSource, bool> _predicate;
-        private readonly bool _throwOnEmpty;
-
-        public SingleAsync(IObservable<TSource> source, Func<TSource, bool> predicate, bool throwOnEmpty)
+        internal sealed class Sequence : Producer<TSource>
         {
-            _source = source;
-            _predicate = predicate;
-            _throwOnEmpty = throwOnEmpty;
-        }
+            private readonly IObservable<TSource> _source;
 
-        protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
-        {
-            if (_predicate != null)
+            public Sequence(IObservable<TSource> source)
             {
-                var sink = new SingleAsyncImpl(this, observer, cancel);
-                setSink(sink);
-                return _source.SubscribeSafe(sink);
+                _source = source;
             }
-            else
+
+            protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
-                var sink = new _(this, observer, cancel);
+                var sink = new _(observer, cancel);
                 setSink(sink);
                 return _source.SubscribeSafe(sink);
             }
-        }
-
-        class _ : Sink<TSource>, IObserver<TSource>
-        {
-            private readonly SingleAsync<TSource> _parent;
-            private TSource _value;
-            private bool _seenValue;
 
-            public _(SingleAsync<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
-                : base(observer, cancel)
+            private sealed class _ : Sink<TSource>, IObserver<TSource>
             {
-                _parent = parent;
+                private TSource _value;
+                private bool _seenValue;
 
-                _value = default(TSource);
-                _seenValue = false;
-            }
-
-            public void OnNext(TSource value)
-            {
-                if (_seenValue)
+                public _(IObserver<TSource> observer, IDisposable cancel)
+                    : base(observer, cancel)
                 {
-                    base._observer.OnError(new InvalidOperationException(Strings_Linq.MORE_THAN_ONE_ELEMENT));
-                    base.Dispose();
-                    return;
+                    _value = default(TSource);
+                    _seenValue = false;
                 }
 
-                _value = value;
-                _seenValue = true;
-            }
+                public void OnNext(TSource value)
+                {
+                    if (_seenValue)
+                    {
+                        base._observer.OnError(new InvalidOperationException(Strings_Linq.MORE_THAN_ONE_ELEMENT));
+                        base.Dispose();
+                        return;
+                    }
 
-            public void OnError(Exception error)
-            {
-                base._observer.OnError(error);
-                base.Dispose();
-            }
+                    _value = value;
+                    _seenValue = true;
+                }
 
-            public void OnCompleted()
-            {
-                if (!_seenValue && _parent._throwOnEmpty)
+                public void OnError(Exception error)
                 {
-                    base._observer.OnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS));
+                    base._observer.OnError(error);
+                    base.Dispose();
                 }
-                else
+
+                public void OnCompleted()
                 {
-                    base._observer.OnNext(_value);
-                    base._observer.OnCompleted();
-                }
+                    if (!_seenValue)
+                    {
+                        base._observer.OnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS));
+                    }
+                    else
+                    {
+                        base._observer.OnNext(_value);
+                        base._observer.OnCompleted();
+                    }
 
-                base.Dispose();
+                    base.Dispose();
+                }
             }
         }
 
-        class SingleAsyncImpl : Sink<TSource>, IObserver<TSource>
+        internal sealed class Predicate : Producer<TSource>
         {
-            private readonly SingleAsync<TSource> _parent;
-            private TSource _value;
-            private bool _seenValue;
+            private readonly IObservable<TSource> _source;
+            private readonly Func<TSource, bool> _predicate;
 
-            public SingleAsyncImpl(SingleAsync<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
-                : base(observer, cancel)
+            public Predicate(IObservable<TSource> source, Func<TSource, bool> predicate)
             {
-                _parent = parent;
+                _source = source;
+                _predicate = predicate;
+            }
 
-                _value = default(TSource);
-                _seenValue = false;
+            protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
+            {
+                var sink = new _(_predicate, observer, cancel);
+                setSink(sink);
+                return _source.SubscribeSafe(sink);
             }
 
-            public void OnNext(TSource value)
+            private sealed class _ : Sink<TSource>, IObserver<TSource>
             {
-                var b = false;
+                private readonly Func<TSource, bool> _predicate;
+                private TSource _value;
+                private bool _seenValue;
 
-                try
-                {
-                    b = _parent._predicate(value);
-                }
-                catch (Exception ex)
+                public _(Func<TSource, bool> predicate, IObserver<TSource> observer, IDisposable cancel)
+                    : base(observer, cancel)
                 {
-                    base._observer.OnError(ex);
-                    base.Dispose();
-                    return;
+                    _predicate = predicate;
+
+                    _value = default(TSource);
+                    _seenValue = false;
                 }
 
-                if (b)
+                public void OnNext(TSource value)
                 {
-                    if (_seenValue)
+                    var b = false;
+
+                    try
                     {
-                        base._observer.OnError(new InvalidOperationException(Strings_Linq.MORE_THAN_ONE_MATCHING_ELEMENT));
+                        b = _predicate(value);
+                    }
+                    catch (Exception ex)
+                    {
+                        base._observer.OnError(ex);
                         base.Dispose();
                         return;
                     }
 
-                    _value = value;
-                    _seenValue = true;
+                    if (b)
+                    {
+                        if (_seenValue)
+                        {
+                            base._observer.OnError(new InvalidOperationException(Strings_Linq.MORE_THAN_ONE_MATCHING_ELEMENT));
+                            base.Dispose();
+                            return;
+                        }
+
+                        _value = value;
+                        _seenValue = true;
+                    }
                 }
-            }
-
-            public void OnError(Exception error)
-            {
-                base._observer.OnError(error);
-                base.Dispose();
-            }
 
-            public void OnCompleted()
-            {
-                if (!_seenValue && _parent._throwOnEmpty)
+                public void OnError(Exception error)
                 {
-                    base._observer.OnError(new InvalidOperationException(Strings_Linq.NO_MATCHING_ELEMENTS));
+                    base._observer.OnError(error);
+                    base.Dispose();
                 }
-                else
+
+                public void OnCompleted()
                 {
-                    base._observer.OnNext(_value);
-                    base._observer.OnCompleted();
-                }
+                    if (!_seenValue)
+                    {
+                        base._observer.OnError(new InvalidOperationException(Strings_Linq.NO_MATCHING_ELEMENTS));
+                    }
+                    else
+                    {
+                        base._observer.OnNext(_value);
+                        base._observer.OnCompleted();
+                    }
 
-                base.Dispose();
+                    base.Dispose();
+                }
             }
         }
     }

+ 142 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/SingleOrDefaultAsync.cs

@@ -0,0 +1,142 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information. 
+
+namespace System.Reactive.Linq.ObservableImpl
+{
+    internal static class SingleOrDefaultAsync<TSource>
+    {
+        internal sealed class Sequence : Producer<TSource>
+        {
+            private readonly IObservable<TSource> _source;
+
+            public Sequence(IObservable<TSource> source)
+            {
+                _source = source;
+            }
+
+            protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
+            {
+                var sink = new _(observer, cancel);
+                setSink(sink);
+                return _source.SubscribeSafe(sink);
+            }
+
+            private sealed class _ : Sink<TSource>, IObserver<TSource>
+            {
+                private TSource _value;
+                private bool _seenValue;
+
+                public _(IObserver<TSource> observer, IDisposable cancel)
+                    : base(observer, cancel)
+                {
+                    _value = default(TSource);
+                    _seenValue = false;
+                }
+
+                public void OnNext(TSource value)
+                {
+                    if (_seenValue)
+                    {
+                        base._observer.OnError(new InvalidOperationException(Strings_Linq.MORE_THAN_ONE_ELEMENT));
+                        base.Dispose();
+                        return;
+                    }
+
+                    _value = value;
+                    _seenValue = true;
+                }
+
+                public void OnError(Exception error)
+                {
+                    base._observer.OnError(error);
+                    base.Dispose();
+                }
+
+                public void OnCompleted()
+                {
+                    base._observer.OnNext(_value);
+                    base._observer.OnCompleted();
+                    base.Dispose();
+                }
+            }
+        }
+
+        internal sealed class Predicate : Producer<TSource>
+        {
+            private readonly IObservable<TSource> _source;
+            private readonly Func<TSource, bool> _predicate;
+
+            public Predicate(IObservable<TSource> source, Func<TSource, bool> predicate)
+            {
+                _source = source;
+                _predicate = predicate;
+            }
+
+            protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
+            {
+                var sink = new _(_predicate, observer, cancel);
+                setSink(sink);
+                return _source.SubscribeSafe(sink);
+            }
+
+            private sealed class _ : Sink<TSource>, IObserver<TSource>
+            {
+                private readonly Func<TSource, bool> _predicate;
+                private TSource _value;
+                private bool _seenValue;
+
+                public _(Func<TSource, bool> predicate, IObserver<TSource> observer, IDisposable cancel)
+                    : base(observer, cancel)
+                {
+                    _predicate = predicate;
+
+                    _value = default(TSource);
+                    _seenValue = false;
+                }
+
+                public void OnNext(TSource value)
+                {
+                    var b = false;
+
+                    try
+                    {
+                        b = _predicate(value);
+                    }
+                    catch (Exception ex)
+                    {
+                        base._observer.OnError(ex);
+                        base.Dispose();
+                        return;
+                    }
+
+                    if (b)
+                    {
+                        if (_seenValue)
+                        {
+                            base._observer.OnError(new InvalidOperationException(Strings_Linq.MORE_THAN_ONE_MATCHING_ELEMENT));
+                            base.Dispose();
+                            return;
+                        }
+
+                        _value = value;
+                        _seenValue = true;
+                    }
+                }
+
+                public void OnError(Exception error)
+                {
+                    base._observer.OnError(error);
+                    base.Dispose();
+                }
+
+                public void OnCompleted()
+                {
+                    base._observer.OnNext(_value);
+                    base._observer.OnCompleted();
+                    base.Dispose();
+                }
+            }
+        }
+    }
+}

+ 95 - 88
Rx.NET/Source/src/System.Reactive/Linq/Observable/Where.cs

@@ -4,129 +4,136 @@
 
 namespace System.Reactive.Linq.ObservableImpl
 {
-    internal sealed class Where<TSource> : Producer<TSource>
+    internal static class Where<TSource>
     {
-        private readonly IObservable<TSource> _source;
-        private readonly Func<TSource, bool> _predicate;
-        private readonly Func<TSource, int, bool> _predicateI;
-
-        public Where(IObservable<TSource> source, Func<TSource, bool> predicate)
-        {
-            _source = source;
-            _predicate = predicate;
-        }
-
-        public Where(IObservable<TSource> source, Func<TSource, int, bool> predicate)
+        internal sealed class Predicate : Producer<TSource>
         {
-            _source = source;
-            _predicateI = predicate;
-        }
+            private readonly IObservable<TSource> _source;
+            private readonly Func<TSource, bool> _predicate;
 
-        public IObservable<TSource> Combine(Func<TSource, bool> predicate)
-        {
-            if (_predicate != null)
-                return new Where<TSource>(_source, x => _predicate(x) && predicate(x));
-            else
-                return new Where<TSource>(this, predicate);
-        }
+            public Predicate(IObservable<TSource> source, Func<TSource, bool> predicate)
+            {
+                _source = source;
+                _predicate = predicate;
+            }
 
-        protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
-        {
-            if (_predicate != null)
+            public IObservable<TSource> Combine(Func<TSource, bool> predicate)
             {
-                var sink = new _(this, observer, cancel);
-                setSink(sink);
-                return _source.SubscribeSafe(sink);
+                return new Predicate(_source, x => _predicate(x) && predicate(x));
             }
-            else
+
+            protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
-                var sink = new WhereImpl(this, observer, cancel);
+                var sink = new _(_predicate, observer, cancel);
                 setSink(sink);
                 return _source.SubscribeSafe(sink);
             }
-        }
-
-        class _ : Sink<TSource>, IObserver<TSource>
-        {
-            private readonly Where<TSource> _parent;
 
-            public _(Where<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
-                : base(observer, cancel)
+            private sealed class _ : Sink<TSource>, IObserver<TSource>
             {
-                _parent = parent;
-            }
+                private readonly Func<TSource, bool> _predicate;
 
-            public void OnNext(TSource value)
-            {
-                var shouldRun = default(bool);
-                try
+                public _(Func<TSource, bool> predicate, IObserver<TSource> observer, IDisposable cancel)
+                    : base(observer, cancel)
                 {
-                    shouldRun = _parent._predicate(value);
+                    _predicate = predicate;
                 }
-                catch (Exception exception)
+
+                public void OnNext(TSource value)
+                {
+                    var shouldRun = default(bool);
+                    try
+                    {
+                        shouldRun = _predicate(value);
+                    }
+                    catch (Exception exception)
+                    {
+                        base._observer.OnError(exception);
+                        base.Dispose();
+                        return;
+                    }
+
+                    if (shouldRun)
+                    {
+                        base._observer.OnNext(value);
+                    }
+                }
+
+                public void OnError(Exception error)
                 {
-                    base._observer.OnError(exception);
+                    base._observer.OnError(error);
                     base.Dispose();
-                    return;
                 }
 
-                if (shouldRun)
-                    base._observer.OnNext(value);
+                public void OnCompleted()
+                {
+                    base._observer.OnCompleted();
+                    base.Dispose();
+                }
             }
+        }
 
-            public void OnError(Exception error)
-            {
-                base._observer.OnError(error);
-                base.Dispose();
-            }
+        internal sealed class PredicateIndexed : Producer<TSource>
+        {
+            private readonly IObservable<TSource> _source;
+            private readonly Func<TSource, int, bool> _predicate;
 
-            public void OnCompleted()
+            public PredicateIndexed(IObservable<TSource> source, Func<TSource, int, bool> predicate)
             {
-                base._observer.OnCompleted();
-                base.Dispose();
+                _source = source;
+                _predicate = predicate;
             }
-        }
 
-        class WhereImpl : Sink<TSource>, IObserver<TSource>
-        {
-            private readonly Where<TSource> _parent;
-            private int _index;
-
-            public WhereImpl(Where<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
-                : base(observer, cancel)
+            protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
-                _parent = parent;
-                _index = 0;
+                var sink = new _(_predicate, observer, cancel);
+                setSink(sink);
+                return _source.SubscribeSafe(sink);
             }
 
-            public void OnNext(TSource value)
+            private sealed class _ : Sink<TSource>, IObserver<TSource>
             {
-                var shouldRun = default(bool);
-                try
+                private readonly Func<TSource, int, bool> _predicate;
+                private int _index;
+
+                public _(Func<TSource, int, bool> predicate, IObserver<TSource> observer, IDisposable cancel)
+                    : base(observer, cancel)
+                {
+                    _predicate = predicate;
+                    _index = 0;
+                }
+
+                public void OnNext(TSource value)
                 {
-                    shouldRun = _parent._predicateI(value, checked(_index++));
+                    var shouldRun = default(bool);
+                    try
+                    {
+                        shouldRun = _predicate(value, checked(_index++));
+                    }
+                    catch (Exception exception)
+                    {
+                        base._observer.OnError(exception);
+                        base.Dispose();
+                        return;
+                    }
+
+                    if (shouldRun)
+                    {
+                        base._observer.OnNext(value);
+                    }
                 }
-                catch (Exception exception)
+
+                public void OnError(Exception error)
                 {
-                    base._observer.OnError(exception);
+                    base._observer.OnError(error);
                     base.Dispose();
-                    return;
                 }
 
-                if (shouldRun)
-                    base._observer.OnNext(value);
-            }
-
-            public void OnError(Exception error)
-            {
-                base._observer.OnError(error);
-                base.Dispose();
-            }
-
-            public void OnCompleted()
-            {
-                base._observer.OnCompleted();
-                base.Dispose();
+                public void OnCompleted()
+                {
+                    base._observer.OnCompleted();
+                    base.Dispose();
+                }
             }
         }
     }

+ 18 - 18
Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Aggregates.cs

@@ -94,12 +94,12 @@ namespace System.Reactive.Linq
 
         public virtual IObservable<bool> Any<TSource>(IObservable<TSource> source)
         {
-            return new Any<TSource>(source);
+            return new Any<TSource>.Count(source);
         }
 
         public virtual IObservable<bool> Any<TSource>(IObservable<TSource> source, Func<TSource, bool> predicate)
         {
-            return new Any<TSource>(source, predicate);
+            return new Any<TSource>.Predicate(source, predicate);
         }
 
         #endregion
@@ -176,12 +176,12 @@ namespace System.Reactive.Linq
 
         public virtual IObservable<int> Count<TSource>(IObservable<TSource> source)
         {
-            return new Count<TSource>(source);
+            return new Count<TSource>.All(source);
         }
 
         public virtual IObservable<int> Count<TSource>(IObservable<TSource> source, Func<TSource, bool> predicate)
         {
-            return new Count<TSource>(source, predicate);
+            return new Count<TSource>.Predicate(source, predicate);
         }
 
         #endregion
@@ -208,12 +208,12 @@ namespace System.Reactive.Linq
 
         public virtual IObservable<TSource> FirstAsync<TSource>(IObservable<TSource> source)
         {
-            return new FirstAsync<TSource>(source, null, true);
+            return new FirstAsync<TSource>.Sequence(source);
         }
 
         public virtual IObservable<TSource> FirstAsync<TSource>(IObservable<TSource> source, Func<TSource, bool> predicate)
         {
-            return new FirstAsync<TSource>(source, predicate, true);
+            return new FirstAsync<TSource>.Predicate(source, predicate);
         }
 
         #endregion
@@ -222,12 +222,12 @@ namespace System.Reactive.Linq
 
         public virtual IObservable<TSource> FirstOrDefaultAsync<TSource>(IObservable<TSource> source)
         {
-            return new FirstAsync<TSource>(source, null, false);
+            return new FirstOrDefaultAsync<TSource>.Sequence(source);
         }
 
         public virtual IObservable<TSource> FirstOrDefaultAsync<TSource>(IObservable<TSource> source, Func<TSource, bool> predicate)
         {
-            return new FirstAsync<TSource>(source, predicate, false);
+            return new FirstOrDefaultAsync<TSource>.Predicate(source, predicate);
         }
 
         #endregion
@@ -245,12 +245,12 @@ namespace System.Reactive.Linq
 
         public virtual IObservable<TSource> LastAsync<TSource>(IObservable<TSource> source)
         {
-            return new LastAsync<TSource>(source, null, true);
+            return new LastAsync<TSource>.Sequence(source);
         }
 
         public virtual IObservable<TSource> LastAsync<TSource>(IObservable<TSource> source, Func<TSource, bool> predicate)
         {
-            return new LastAsync<TSource>(source, predicate, true);
+            return new LastAsync<TSource>.Predicate(source, predicate);
         }
 
         #endregion
@@ -259,12 +259,12 @@ namespace System.Reactive.Linq
 
         public virtual IObservable<TSource> LastOrDefaultAsync<TSource>(IObservable<TSource> source)
         {
-            return new LastAsync<TSource>(source, null, false);
+            return new LastOrDefaultAsync<TSource>.Sequence(source);
         }
 
         public virtual IObservable<TSource> LastOrDefaultAsync<TSource>(IObservable<TSource> source, Func<TSource, bool> predicate)
         {
-            return new LastAsync<TSource>(source, predicate, false);
+            return new LastOrDefaultAsync<TSource>.Predicate(source, predicate);
         }
 
         #endregion
@@ -273,12 +273,12 @@ namespace System.Reactive.Linq
 
         public virtual IObservable<long> LongCount<TSource>(IObservable<TSource> source)
         {
-            return new LongCount<TSource>(source);
+            return new LongCount<TSource>.All(source);
         }
 
         public virtual IObservable<long> LongCount<TSource>(IObservable<TSource> source, Func<TSource, bool> predicate)
         {
-            return new LongCount<TSource>(source, predicate);
+            return new LongCount<TSource>.Predicate(source, predicate);
         }
 
         #endregion
@@ -591,12 +591,12 @@ namespace System.Reactive.Linq
 
         public virtual IObservable<TSource> SingleAsync<TSource>(IObservable<TSource> source)
         {
-            return new SingleAsync<TSource>(source, null, true);
+            return new SingleAsync<TSource>.Sequence(source);
         }
 
         public virtual IObservable<TSource> SingleAsync<TSource>(IObservable<TSource> source, Func<TSource, bool> predicate)
         {
-            return new SingleAsync<TSource>(source, predicate, true);
+            return new SingleAsync<TSource>.Predicate(source, predicate);
         }
 
         #endregion
@@ -605,12 +605,12 @@ namespace System.Reactive.Linq
 
         public virtual IObservable<TSource> SingleOrDefaultAsync<TSource>(IObservable<TSource> source)
         {
-            return new SingleAsync<TSource>(source, null, false);
+            return new SingleOrDefaultAsync<TSource>.Sequence(source);
         }
 
         public virtual IObservable<TSource> SingleOrDefaultAsync<TSource>(IObservable<TSource> source, Func<TSource, bool> predicate)
         {
-            return new SingleAsync<TSource>(source, predicate, false);
+            return new SingleOrDefaultAsync<TSource>.Predicate(source, predicate);
         }
 
         #endregion

+ 7 - 5
Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.StandardSequenceOperators.cs

@@ -199,12 +199,14 @@ namespace System.Reactive.Linq
 
         public virtual IObservable<TResult> Select<TSource, TResult>(IObservable<TSource> source, Func<TSource, TResult> selector)
         {
-            return new Select<TSource, TResult>(source, selector);
+            // CONSIDER: Add fusion for Select/Select pairs.
+
+            return new Select<TSource, TResult>.Selector(source, selector);
         }
 
         public virtual IObservable<TResult> Select<TSource, TResult>(IObservable<TSource> source, Func<TSource, int, TResult> selector)
         {
-            return new Select<TSource, TResult>(source, selector);
+            return new Select<TSource, TResult>.SelectorIndexed(source, selector);
         }
 
         #endregion
@@ -412,16 +414,16 @@ namespace System.Reactive.Linq
 
         public virtual IObservable<TSource> Where<TSource>(IObservable<TSource> source, Func<TSource, bool> predicate)
         {
-            var where = source as Where<TSource>;
+            var where = source as Where<TSource>.Predicate;
             if (where != null)
                 return where.Combine(predicate);
 
-            return new Where<TSource>(source, predicate);
+            return new Where<TSource>.Predicate(source, predicate);
         }
 
         public virtual IObservable<TSource> Where<TSource>(IObservable<TSource> source, Func<TSource, int, bool> predicate)
         {
-            return new Where<TSource>(source, predicate);
+            return new Where<TSource>.PredicateIndexed(source, predicate);
         }
 
         #endregion